Skip to content

Commit d2b974d

Browse files
authored
Adding handling of SMIGRATED push notifications (#3857)
* Adding SMIGRATED handling * Applying Copilot's comments * Applying review comments
1 parent da185bc commit d2b974d

File tree

10 files changed

+1088
-156
lines changed

10 files changed

+1088
-156
lines changed

redis/_parsers/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
OSSNodeMigratedNotification,
1515
OSSNodeMigratingNotification,
1616
)
17+
from redis.utils import safe_str
1718

1819
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
1920
from asyncio import timeout as async_timeout
@@ -194,8 +195,9 @@ def parse_oss_maintenance_completed_msg(response):
194195
# Expected message format is:
195196
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
196197
id = response[1]
197-
node_address = response[2]
198+
node_address = safe_str(response[2])
198199
slots = response[3]
200+
199201
return OSSNodeMigratedNotification(id, node_address, slots)
200202

201203
@staticmethod
@@ -225,9 +227,7 @@ def parse_moving_msg(response):
225227
if response[3] is None:
226228
host, port = None, None
227229
else:
228-
value = response[3]
229-
if isinstance(value, bytes):
230-
value = value.decode()
230+
value = safe_str(response[3])
231231
host, port = value.split(":")
232232
port = int(port) if port is not None else None
233233

redis/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@
5858
from redis.lock import Lock
5959
from redis.maint_notifications import (
6060
MaintNotificationsConfig,
61+
OSSMaintNotificationsHandler,
6162
)
6263
from redis.retry import Retry
6364
from redis.utils import (
6465
_set_info_logger,
66+
check_protocol_version,
6567
deprecated_args,
6668
get_lib_version,
6769
safe_str,
@@ -250,6 +252,9 @@ def __init__(
250252
cache_config: Optional[CacheConfig] = None,
251253
event_dispatcher: Optional[EventDispatcher] = None,
252254
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
255+
oss_cluster_maint_notifications_handler: Optional[
256+
OSSMaintNotificationsHandler
257+
] = None,
253258
) -> None:
254259
"""
255260
Initialize a new Redis client.
@@ -288,6 +293,11 @@ def __init__(
288293
will be enabled by default (logic is included in the connection pool
289294
initialization).
290295
Argument is ignored when connection_pool is provided.
296+
oss_cluster_maint_notifications_handler:
297+
handler for OSS cluster notifications - see
298+
`redis.maint_notifications.OSSMaintNotificationsHandler` for details.
299+
Only supported with RESP3
300+
Argument is ignored when connection_pool is provided.
291301
"""
292302
if event_dispatcher is None:
293303
self._event_dispatcher = EventDispatcher()
@@ -357,7 +367,7 @@ def __init__(
357367
"ssl_ciphers": ssl_ciphers,
358368
}
359369
)
360-
if (cache_config or cache) and protocol in [3, "3"]:
370+
if (cache_config or cache) and check_protocol_version(protocol, 3):
361371
kwargs.update(
362372
{
363373
"cache": cache,
@@ -380,6 +390,12 @@ def __init__(
380390
"maint_notifications_config": maint_notifications_config,
381391
}
382392
)
393+
if oss_cluster_maint_notifications_handler:
394+
kwargs.update(
395+
{
396+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
397+
}
398+
)
383399
connection_pool = ConnectionPool(**kwargs)
384400
self._event_dispatcher.dispatch(
385401
AfterPooledConnectionsInstantiationEvent(

redis/cluster.py

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@
5252
WatchError,
5353
)
5454
from redis.lock import Lock
55-
from redis.maint_notifications import MaintNotificationsConfig
55+
from redis.maint_notifications import (
56+
MaintNotificationsConfig,
57+
OSSMaintNotificationsHandler,
58+
)
5659
from redis.retry import Retry
5760
from redis.utils import (
61+
check_protocol_version,
5862
deprecated_args,
5963
dict_merge,
6064
list_keys_to_dict,
@@ -214,6 +218,67 @@ def cleanup_kwargs(**kwargs):
214218
return connection_kwargs
215219

216220

221+
class MaintNotificationsAbstractRedisCluster:
222+
"""
223+
Abstract class for handling maintenance notifications logic.
224+
This class is expected to be used as base class together with RedisCluster.
225+
226+
This class is intended to be used with multiple inheritance!
227+
228+
All logic related to maintenance notifications is encapsulated in this class.
229+
"""
230+
231+
def __init__(
232+
self,
233+
maint_notifications_config: Optional[MaintNotificationsConfig],
234+
**kwargs,
235+
):
236+
# Initialize maintenance notifications
237+
is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
238+
239+
if (
240+
maint_notifications_config
241+
and maint_notifications_config.enabled
242+
and not is_protocol_supported
243+
):
244+
raise RedisError(
245+
"Maintenance notifications handlers on connection are only supported with RESP version 3"
246+
)
247+
if maint_notifications_config is None and is_protocol_supported:
248+
maint_notifications_config = MaintNotificationsConfig()
249+
250+
self.maint_notifications_config = maint_notifications_config
251+
252+
if self.maint_notifications_config and self.maint_notifications_config.enabled:
253+
self._oss_cluster_maint_notifications_handler = (
254+
OSSMaintNotificationsHandler(self, self.maint_notifications_config)
255+
)
256+
# Update connection kwargs for all future nodes connections
257+
self._update_connection_kwargs_for_maint_notifications(
258+
self._oss_cluster_maint_notifications_handler
259+
)
260+
# Update existing nodes connections - they are created as part of the RedisCluster constructor
261+
for node in self.get_nodes():
262+
node.redis_connection.connection_pool.update_maint_notifications_config(
263+
self.maint_notifications_config,
264+
oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
265+
)
266+
else:
267+
self._oss_cluster_maint_notifications_handler = None
268+
269+
def _update_connection_kwargs_for_maint_notifications(
270+
self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
271+
):
272+
"""
273+
Update the connection kwargs for all future connections.
274+
"""
275+
self.nodes_manager.connection_kwargs.update(
276+
{
277+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
278+
}
279+
)
280+
281+
217282
class AbstractRedisCluster:
218283
RedisClusterRequestTTL = 16
219284

@@ -461,7 +526,9 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
461526
self.nodes_manager.default_node = random.choice(replicas)
462527

463528

464-
class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
529+
class RedisCluster(
530+
AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
531+
):
465532
@classmethod
466533
def from_url(cls, url, **kwargs):
467534
"""
@@ -612,8 +679,7 @@ def __init__(
612679
`redis.maint_notifications.MaintNotificationsConfig` for details.
613680
Only supported with RESP3.
614681
If not provided and protocol is RESP3, the maintenance notifications
615-
will be enabled by default (logic is included in the NodesManager
616-
initialization).
682+
will be enabled by default.
617683
:**kwargs:
618684
Extra arguments that will be sent into Redis instance when created
619685
(See Official redis-py doc for supported kwargs - the only limitation
@@ -695,9 +761,16 @@ def __init__(
695761
kwargs.get("decode_responses", False),
696762
)
697763
protocol = kwargs.get("protocol", None)
698-
if (cache_config or cache) and protocol not in [3, "3"]:
764+
if (cache_config or cache) and not check_protocol_version(protocol, 3):
699765
raise RedisError("Client caching is only supported with RESP version 3")
700766

767+
if maint_notifications_config and not check_protocol_version(protocol, 3):
768+
raise RedisError(
769+
"Maintenance notifications are only supported with RESP version 3"
770+
)
771+
if check_protocol_version(protocol, 3) and maint_notifications_config is None:
772+
maint_notifications_config = MaintNotificationsConfig()
773+
701774
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
702775
self.node_flags = self.__class__.NODE_FLAGS.copy()
703776
self.read_from_replicas = read_from_replicas
@@ -709,6 +782,7 @@ def __init__(
709782
else:
710783
self._event_dispatcher = event_dispatcher
711784
self.startup_nodes = startup_nodes
785+
712786
self.nodes_manager = NodesManager(
713787
startup_nodes=startup_nodes,
714788
from_url=from_url,
@@ -763,6 +837,10 @@ def __init__(
763837
self._aggregate_nodes = None
764838
self._lock = threading.RLock()
765839

840+
MaintNotificationsAbstractRedisCluster.__init__(
841+
self, maint_notifications_config, **kwargs
842+
)
843+
766844
def __enter__(self):
767845
return self
768846

@@ -1632,9 +1710,7 @@ def __init__(
16321710
cache_config: Optional[CacheConfig] = None,
16331711
cache_factory: Optional[CacheFactoryInterface] = None,
16341712
event_dispatcher: Optional[EventDispatcher] = None,
1635-
maint_notifications_config: Optional[
1636-
MaintNotificationsConfig
1637-
] = MaintNotificationsConfig(),
1713+
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
16381714
**kwargs,
16391715
):
16401716
self.nodes_cache: Dict[str, Redis] = {}
@@ -1879,11 +1955,29 @@ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
18791955

18801956
return target_node
18811957

1882-
def initialize(self):
1958+
def initialize(
1959+
self,
1960+
additional_startup_nodes_info: List[Tuple[str, int]] = [],
1961+
disconnect_startup_nodes_pools: bool = True,
1962+
):
18831963
"""
18841964
Initializes the nodes cache, slots cache and redis connections.
18851965
:startup_nodes:
18861966
Responsible for discovering other nodes in the cluster
1967+
:disconnect_startup_nodes_pools:
1968+
Whether to disconnect the connection pool of the startup nodes
1969+
after the initialization is complete. This is useful when the
1970+
startup nodes are not part of the cluster and we want to avoid
1971+
keeping the connection open.
1972+
:additional_startup_nodes_info:
1973+
Additional nodes to add temporarily to the startup nodes.
1974+
The additional nodes will be used just in the process of extraction of the slots
1975+
and nodes information from the cluster.
1976+
This is useful when we want to add new nodes to the cluster
1977+
and initialize the client
1978+
with them.
1979+
The format of the list is a list of tuples, where each tuple contains
1980+
the host and port of the node.
18871981
"""
18881982
self.reset()
18891983
tmp_nodes_cache = {}
@@ -1893,9 +1987,25 @@ def initialize(self):
18931987
fully_covered = False
18941988
kwargs = self.connection_kwargs
18951989
exception = None
1990+
1991+
# Create cache if it's not provided and cache config is set
1992+
# should be done before initializing the first connection
1993+
# so that it will be applied to all connections
1994+
if self._cache is None and self._cache_config is not None:
1995+
if self._cache_factory is None:
1996+
self._cache = CacheFactory(self._cache_config).get_cache()
1997+
else:
1998+
self._cache = self._cache_factory.get_cache()
1999+
2000+
additional_startup_nodes = [
2001+
ClusterNode(host, port) for host, port in additional_startup_nodes_info
2002+
]
18962003
# Convert to tuple to prevent RuntimeError if self.startup_nodes
18972004
# is modified during iteration
1898-
for startup_node in tuple(self.startup_nodes.values()):
2005+
for startup_node in (
2006+
*self.startup_nodes.values(),
2007+
*additional_startup_nodes,
2008+
):
18992009
try:
19002010
if startup_node.redis_connection:
19012011
r = startup_node.redis_connection
@@ -1911,7 +2021,11 @@ def initialize(self):
19112021
# Make sure cluster mode is enabled on this node
19122022
try:
19132023
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1914-
r.connection_pool.disconnect()
2024+
if disconnect_startup_nodes_pools:
2025+
# Disconnect the connection pool to avoid keeping the connection open
2026+
# For some cases we might not want to disconnect current pool and
2027+
# lose in flight commands responses
2028+
r.connection_pool.disconnect()
19152029
except ResponseError:
19162030
raise RedisClusterException(
19172031
"Cluster mode is not enabled on this node"
@@ -1992,12 +2106,6 @@ def initialize(self):
19922106
f"one reachable node: {str(exception)}"
19932107
) from exception
19942108

1995-
if self._cache is None and self._cache_config is not None:
1996-
if self._cache_factory is None:
1997-
self._cache = CacheFactory(self._cache_config).get_cache()
1998-
else:
1999-
self._cache = self._cache_factory.get_cache()
2000-
20012109
# Create Redis connections to all nodes
20022110
self.create_redis_connections(list(tmp_nodes_cache.values()))
20032111

0 commit comments

Comments
 (0)