Skip to content

Commit 68fe530

Browse files
authored
Added MultiDbClient support with OSS Cluster API (#3734)
* Added Database, Healthcheck, CircuitBreaker, FailureDetector * Added DatabaseSelector, exceptions, refactored existing entities * Added MultiDbConfig * Added DatabaseConfig * Added DatabaseConfig test coverage * Renamed DatabaseSelector into FailoverStrategy * Added CommandExecutor * Updated healthcheck to close circuit on success * Added thread-safeness * Added missing thread-safeness * Added missing thread-safenes for dispatcher * Refactored client to keep databases in WeightedList * Added database CRUD operations * Added on-fly configuration * Added background health checks * Added background healthcheck + half-open event * Refactored background scheduling * Added support for Active-Active pipeline * Refactored healthchecks * Added Pipeline testing * Added support for transactions * Removed code repetitions, fixed weight assignment, added loops enhancement, fixed data structure * Added missing doc blocks * Added support for Pub/Sub in MultiDBClient * Refactored configuration * Refactored failure detector * Refactored retry logic * Added scenario tests * Added pybreaker optional dependency * Added pybreaker to dev dependencies * Rename tests directory * Added scenario tests for Pipeline and Transaction * Added handling of ConnectionRefusedError, added timeouts so cluster could recover * Increased timeouts * Refactored integration tests * Added scenario tests for Pub/Sub * Updated healthcheck retry * Increased timeout to avoid unprepared state before tests * Added backoff retry and changed timeouts * Added retry for healthchecks to avoid fluctuations * Changed retry configuration for healthchecks * Fixed property name * Added check for thread results * Added MultiDbClient support with OSS Cluster API * Removed database statuses * Increased test timeouts * Increased retry timeout * Increased timeout retries * Updated base threshold for retries * Fixed flacky tests * Added missing docblocks
1 parent d6cdaeb commit 68fe530

File tree

12 files changed

+157
-183
lines changed

12 files changed

+157
-183
lines changed

redis/client.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,7 +1217,8 @@ def run_in_thread(
12171217
sleep_time: float = 0.0,
12181218
daemon: bool = False,
12191219
exception_handler: Optional[Callable] = None,
1220-
pubsub = None
1220+
pubsub = None,
1221+
sharded_pubsub: bool = False,
12211222
) -> "PubSubWorkerThread":
12221223
for channel, handler in self.channels.items():
12231224
if handler is None:
@@ -1233,7 +1234,7 @@ def run_in_thread(
12331234

12341235
pubsub = self if pubsub is None else pubsub
12351236
thread = PubSubWorkerThread(
1236-
pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler
1237+
pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler, sharded_pubsub=sharded_pubsub
12371238
)
12381239
thread.start()
12391240
return thread
@@ -1248,12 +1249,14 @@ def __init__(
12481249
exception_handler: Union[
12491250
Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
12501251
] = None,
1252+
sharded_pubsub: bool = False,
12511253
):
12521254
super().__init__()
12531255
self.daemon = daemon
12541256
self.pubsub = pubsub
12551257
self.sleep_time = sleep_time
12561258
self.exception_handler = exception_handler
1259+
self.sharded_pubsub = sharded_pubsub
12571260
self._running = threading.Event()
12581261

12591262
def run(self) -> None:
@@ -1264,7 +1267,10 @@ def run(self) -> None:
12641267
sleep_time = self.sleep_time
12651268
while self._running.is_set():
12661269
try:
1267-
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1270+
if not self.sharded_pubsub:
1271+
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1272+
else:
1273+
pubsub.get_sharded_message(ignore_subscribe_messages=True, timeout=sleep_time)
12681274
except BaseException as e:
12691275
if self.exception_handler is None:
12701276
raise

redis/cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3154,7 +3154,8 @@ def _reinitialize_on_error(self, error):
31543154
self._nodes_manager.initialize()
31553155
self.reinitialize_counter = 0
31563156
else:
3157-
self._nodes_manager.update_moved_exception(error)
3157+
if type(error) == MovedError:
3158+
self._nodes_manager.update_moved_exception(error)
31583159

31593160
self._executing = False
31603161

redis/multidb/client.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from redis.multidb.command_executor import DefaultCommandExecutor
1010
from redis.multidb.config import MultiDbConfig, DEFAULT_GRACE_PERIOD
1111
from redis.multidb.circuit import State as CBState, CircuitBreaker
12-
from redis.multidb.database import State as DBState, Database, AbstractDatabase, Databases
12+
from redis.multidb.database import Database, AbstractDatabase, Databases
1313
from redis.multidb.exception import NoValidDatabaseException
1414
from redis.multidb.failure_detector import FailureDetector
1515
from redis.multidb.healthcheck import HealthCheck
@@ -78,13 +78,8 @@ def raise_exception_on_failed_hc(error):
7878

7979
# Set states according to a weights and circuit state
8080
if database.circuit.state == CBState.CLOSED and not is_active_db_found:
81-
database.state = DBState.ACTIVE
8281
self.command_executor.active_database = database
8382
is_active_db_found = True
84-
elif database.circuit.state == CBState.CLOSED and is_active_db_found:
85-
database.state = DBState.PASSIVE
86-
else:
87-
database.state = DBState.DISCONNECTED
8883

8984
if not is_active_db_found:
9085
raise NoValidDatabaseException('Initial connection failed - no active database found')
@@ -115,8 +110,6 @@ def set_active_database(self, database: AbstractDatabase) -> None:
115110

116111
if database.circuit.state == CBState.CLOSED:
117112
highest_weighted_db, _ = self._databases.get_top_n(1)[0]
118-
highest_weighted_db.state = DBState.PASSIVE
119-
database.state = DBState.ACTIVE
120113
self.command_executor.active_database = database
121114
return
122115

@@ -138,9 +131,7 @@ def add_database(self, database: AbstractDatabase):
138131

139132
def _change_active_database(self, new_database: AbstractDatabase, highest_weight_database: AbstractDatabase):
140133
if new_database.weight > highest_weight_database.weight and new_database.circuit.state == CBState.CLOSED:
141-
new_database.state = DBState.ACTIVE
142134
self.command_executor.active_database = new_database
143-
highest_weight_database.state = DBState.PASSIVE
144135

145136
def remove_database(self, database: Database):
146137
"""
@@ -150,7 +141,6 @@ def remove_database(self, database: Database):
150141
highest_weighted_db, highest_weight = self._databases.get_top_n(1)[0]
151142

152143
if highest_weight <= weight and highest_weighted_db.circuit.state == CBState.CLOSED:
153-
highest_weighted_db.state = DBState.ACTIVE
154144
self.command_executor.active_database = highest_weighted_db
155145

156146
def update_database_weight(self, database: AbstractDatabase, weight: float):
@@ -240,7 +230,7 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep
240230
database.circuit.state = CBState.OPEN
241231
elif is_healthy and database.circuit.state != CBState.CLOSED:
242232
database.circuit.state = CBState.CLOSED
243-
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError) as e:
233+
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError, ValueError) as e:
244234
if database.circuit.state != CBState.OPEN:
245235
database.circuit.state = CBState.OPEN
246236
is_healthy = False
@@ -334,7 +324,7 @@ def execute(self) -> List[Any]:
334324

335325
class PubSub:
336326
"""
337-
PubSub object for multi-database client.
327+
PubSub object for multi database client.
338328
"""
339329
def __init__(self, client: MultiDBClient, **kwargs):
340330
"""Initialize the PubSub object for a multi-database client.
@@ -438,18 +428,33 @@ def get_message(
438428
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
439429
)
440430

441-
get_sharded_message = get_message
431+
def get_sharded_message(
432+
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
433+
):
434+
"""
435+
Get the next message if one is available in a sharded channel, otherwise None.
436+
437+
If timeout is specified, the system will wait for `timeout` seconds
438+
before returning. Timeout should be specified as a floating point
439+
number, or None, to wait indefinitely.
440+
"""
441+
return self._client.command_executor.execute_pubsub_method(
442+
'get_sharded_message',
443+
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
444+
)
442445

443446
def run_in_thread(
444447
self,
445448
sleep_time: float = 0.0,
446449
daemon: bool = False,
447450
exception_handler: Optional[Callable] = None,
451+
sharded_pubsub: bool = False,
448452
) -> "PubSubWorkerThread":
449453
return self._client.command_executor.execute_pubsub_run_in_thread(
450454
sleep_time=sleep_time,
451455
daemon=daemon,
452456
exception_handler=exception_handler,
453-
pubsub=self
457+
pubsub=self,
458+
sharded_pubsub=sharded_pubsub,
454459
)
455460

redis/multidb/command_executor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,15 @@ def execute_pubsub_run_in_thread(
235235
sleep_time: float = 0.0,
236236
daemon: bool = False,
237237
exception_handler: Optional[Callable] = None,
238+
sharded_pubsub: bool = False,
238239
) -> "PubSubWorkerThread":
239240
def callback():
240241
return self._active_pubsub.run_in_thread(
241-
sleep_time, daemon=daemon, exception_handler=exception_handler, pubsub=pubsub
242+
sleep_time,
243+
daemon=daemon,
244+
exception_handler=exception_handler,
245+
pubsub=pubsub,
246+
sharded_pubsub=sharded_pubsub
242247
)
243248

244249
return self._execute_with_failure_detection(callback)

redis/multidb/database.py

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@
88
from redis.multidb.circuit import CircuitBreaker
99
from redis.typing import Number
1010

11-
12-
class State(Enum):
13-
ACTIVE = 0
14-
PASSIVE = 1
15-
DISCONNECTED = 2
16-
1711
class AbstractDatabase(ABC):
1812
@property
1913
@abstractmethod
@@ -39,18 +33,6 @@ def weight(self, weight: float):
3933
"""Set the weight of this database in compare to others."""
4034
pass
4135

42-
@property
43-
@abstractmethod
44-
def state(self) -> State:
45-
"""The state of the current database."""
46-
pass
47-
48-
@state.setter
49-
@abstractmethod
50-
def state(self, state: State):
51-
"""Set the state of the current database."""
52-
pass
53-
5436
@property
5537
@abstractmethod
5638
def circuit(self) -> CircuitBreaker:
@@ -70,8 +52,7 @@ def __init__(
7052
self,
7153
client: Union[redis.Redis, RedisCluster],
7254
circuit: CircuitBreaker,
73-
weight: float,
74-
state: State = State.DISCONNECTED,
55+
weight: float
7556
):
7657
"""
7758
Initialize a new Database instance.
@@ -86,7 +67,6 @@ def __init__(
8667
self._cb = circuit
8768
self._cb.database = self
8869
self._weight = weight
89-
self._state = state
9070

9171
@property
9272
def client(self) -> Union[redis.Redis, RedisCluster]:
@@ -104,14 +84,6 @@ def weight(self) -> float:
10484
def weight(self, weight: float):
10585
self._weight = weight
10686

107-
@property
108-
def state(self) -> State:
109-
return self._state
110-
111-
@state.setter
112-
def state(self, state: State):
113-
self._state = state
114-
11587
@property
11688
def circuit(self) -> CircuitBreaker:
11789
return self._cb

redis/multidb/event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def kwargs(self):
3939

4040
class ResubscribeOnActiveDatabaseChanged(EventListenerInterface):
4141
"""
42-
Re-subscribe the currently active pub / sub to a new active database.
42+
Re-subscribe currently active pub/sub to a new active database.
4343
"""
4444
def listen(self, event: ActiveDatabaseChanged):
4545
old_pubsub = event.command_executor.active_pubsub

redis/multidb/healthcheck.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
from abc import abstractmethod, ABC
2+
3+
import redis
4+
from redis import Redis
25
from redis.retry import Retry
36

47

@@ -51,8 +54,20 @@ def check_health(self, database) -> bool:
5154

5255
def _returns_echoed_message(self, database) -> bool:
5356
expected_message = ["healthcheck", b"healthcheck"]
54-
actual_message = database.client.execute_command('ECHO', "healthcheck")
55-
return actual_message in expected_message
57+
58+
if isinstance(database.client, Redis):
59+
actual_message = database.client.execute_command("ECHO" ,"healthcheck")
60+
return actual_message in expected_message
61+
else:
62+
# For a cluster checks if all nodes are healthy.
63+
all_nodes = database.client.get_nodes()
64+
for node in all_nodes:
65+
actual_message = node.redis_connection.execute_command("ECHO" ,"healthcheck")
66+
67+
if actual_message not in expected_message:
68+
return False
69+
70+
return True
5671

5772
def _dummy_fail(self):
5873
pass

tests/test_multidb/conftest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from redis.multidb.circuit import CircuitBreaker, State as CBState
88
from redis.multidb.config import MultiDbConfig, DatabaseConfig, DEFAULT_HEALTH_CHECK_INTERVAL, \
99
DEFAULT_AUTO_FALLBACK_INTERVAL
10-
from redis.multidb.database import Database, State, Databases
10+
from redis.multidb.database import Database, Databases
1111
from redis.multidb.failover import FailoverStrategy
1212
from redis.multidb.failure_detector import FailureDetector
1313
from redis.multidb.healthcheck import HealthCheck
@@ -38,7 +38,6 @@ def mock_hc() -> HealthCheck:
3838
def mock_db(request) -> Database:
3939
db = Mock(spec=Database)
4040
db.weight = request.param.get("weight", 1.0)
41-
db.state = request.param.get("state", State.ACTIVE)
4241
db.client = Mock(spec=Redis)
4342

4443
cb = request.param.get("circuit", {})
@@ -53,7 +52,6 @@ def mock_db(request) -> Database:
5352
def mock_db1(request) -> Database:
5453
db = Mock(spec=Database)
5554
db.weight = request.param.get("weight", 1.0)
56-
db.state = request.param.get("state", State.ACTIVE)
5755
db.client = Mock(spec=Redis)
5856

5957
cb = request.param.get("circuit", {})
@@ -68,7 +66,6 @@ def mock_db1(request) -> Database:
6866
def mock_db2(request) -> Database:
6967
db = Mock(spec=Database)
7068
db.weight = request.param.get("weight", 1.0)
71-
db.state = request.param.get("state", State.ACTIVE)
7269
db.client = Mock(spec=Redis)
7370

7471
cb = request.param.get("circuit", {})

0 commit comments

Comments
 (0)