Skip to content

Commit 040bb9a

Browse files
committed
LoadBalancer keyed on slot instead of primary node; LoadBalancer not reset when NodesManager resets
1 parent 0d0cfe6 commit 040bb9a

File tree

3 files changed

+31
-40
lines changed

3 files changed

+31
-40
lines changed

redis/cluster.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,39 +1403,39 @@ class LoadBalancer:
14031403
"""
14041404

14051405
def __init__(self, start_index: int = 0) -> None:
1406-
self.primary_to_idx = {}
1406+
self.slot_to_idx = {}
14071407
self.start_index = start_index
14081408

14091409
def get_server_index(
14101410
self,
1411-
primary: str,
1411+
slot: int,
14121412
list_size: int,
14131413
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
14141414
) -> int:
14151415
if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
14161416
return self._get_random_replica_index(list_size)
14171417
else:
14181418
return self._get_round_robin_index(
1419-
primary,
1419+
slot,
14201420
list_size,
14211421
load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
14221422
)
14231423

14241424
def reset(self) -> None:
1425-
self.primary_to_idx.clear()
1425+
self.slot_to_idx.clear()
14261426

14271427
def _get_random_replica_index(self, list_size: int) -> int:
14281428
return random.randint(1, list_size - 1)
14291429

14301430
def _get_round_robin_index(
1431-
self, primary: str, list_size: int, replicas_only: bool
1431+
self, slot: int, list_size: int, replicas_only: bool
14321432
) -> int:
1433-
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1433+
server_index = self.slot_to_idx.setdefault(slot, self.start_index)
14341434
if replicas_only and server_index == 0:
14351435
# skip the primary node index
14361436
server_index = 1
14371437
# Update the index for the next round
1438-
self.primary_to_idx[primary] = (server_index + 1) % list_size
1438+
self.slot_to_idx[slot] = (server_index + 1) % list_size
14391439
return server_index
14401440

14411441

@@ -1575,9 +1575,8 @@ def get_node_from_slot(
15751575

15761576
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
15771577
# get the server index using the strategy defined in load_balancing_strategy
1578-
primary_name = self.slots_cache[slot][0].name
15791578
node_idx = self.read_load_balancer.get_server_index(
1580-
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1579+
slot, len(self.slots_cache[slot]), load_balancing_strategy
15811580
)
15821581
elif (
15831582
server_type is None
@@ -1835,11 +1834,7 @@ def close(self) -> None:
18351834
node.redis_connection.close()
18361835

18371836
def reset(self):
1838-
try:
1839-
self.read_load_balancer.reset()
1840-
except TypeError:
1841-
# The read_load_balancer is None, do nothing
1842-
pass
1837+
pass
18431838

18441839
def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
18451840
"""

tests/test_asyncio/test_cluster.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2444,33 +2444,31 @@ async def test_load_balancer(self, r: RedisCluster) -> None:
24442444
slot_1: [node_1, node_2, node_3],
24452445
slot_2: [node_4, node_5],
24462446
}
2447-
primary1_name = n_manager.slots_cache[slot_1][0].name
2448-
primary2_name = n_manager.slots_cache[slot_2][0].name
24492447
list1_size = len(n_manager.slots_cache[slot_1])
24502448
list2_size = len(n_manager.slots_cache[slot_2])
24512449

24522450
# default load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN
24532451
# slot 1
2454-
assert lb.get_server_index(primary1_name, list1_size) == 0
2455-
assert lb.get_server_index(primary1_name, list1_size) == 1
2456-
assert lb.get_server_index(primary1_name, list1_size) == 2
2457-
assert lb.get_server_index(primary1_name, list1_size) == 0
2452+
assert lb.get_server_index(slot_1, list1_size) == 0
2453+
assert lb.get_server_index(slot_1, list1_size) == 1
2454+
assert lb.get_server_index(slot_1, list1_size) == 2
2455+
assert lb.get_server_index(slot_1, list1_size) == 0
24582456

24592457
# slot 2
2460-
assert lb.get_server_index(primary2_name, list2_size) == 0
2461-
assert lb.get_server_index(primary2_name, list2_size) == 1
2462-
assert lb.get_server_index(primary2_name, list2_size) == 0
2458+
assert lb.get_server_index(slot_2, list2_size) == 0
2459+
assert lb.get_server_index(slot_2, list2_size) == 1
2460+
assert lb.get_server_index(slot_2, list2_size) == 0
24632461

24642462
lb.reset()
2465-
assert lb.get_server_index(primary1_name, list1_size) == 0
2466-
assert lb.get_server_index(primary2_name, list2_size) == 0
2463+
assert lb.get_server_index(slot_1, list1_size) == 0
2464+
assert lb.get_server_index(slot_2, list2_size) == 0
24672465

24682466
# reset the indexes before load balancing strategy test
24692467
lb.reset()
24702468
# load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN_REPLICAS
24712469
for i in [1, 2, 1]:
24722470
srv_index = lb.get_server_index(
2473-
primary1_name,
2471+
slot_1,
24742472
list1_size,
24752473
load_balancing_strategy=LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
24762474
)
@@ -2481,7 +2479,7 @@ async def test_load_balancer(self, r: RedisCluster) -> None:
24812479
# load balancer strategy: LoadBalancerStrategy.RANDOM_REPLICA
24822480
for i in range(5):
24832481
srv_index = lb.get_server_index(
2484-
primary1_name,
2482+
slot_1,
24852483
list1_size,
24862484
load_balancing_strategy=LoadBalancingStrategy.RANDOM_REPLICA,
24872485
)

tests/test_cluster.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,32 +2549,30 @@ def test_load_balancer(self, r):
25492549
slot_1: [node_1, node_2, node_3],
25502550
slot_2: [node_4, node_5],
25512551
}
2552-
primary1_name = n_manager.slots_cache[slot_1][0].name
2553-
primary2_name = n_manager.slots_cache[slot_2][0].name
25542552
list1_size = len(n_manager.slots_cache[slot_1])
25552553
list2_size = len(n_manager.slots_cache[slot_2])
25562554

25572555
# default load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN
25582556
# slot 1
2559-
assert lb.get_server_index(primary1_name, list1_size) == 0
2560-
assert lb.get_server_index(primary1_name, list1_size) == 1
2561-
assert lb.get_server_index(primary1_name, list1_size) == 2
2562-
assert lb.get_server_index(primary1_name, list1_size) == 0
2557+
assert lb.get_server_index(slot_1, list1_size) == 0
2558+
assert lb.get_server_index(slot_1, list1_size) == 1
2559+
assert lb.get_server_index(slot_1, list1_size) == 2
2560+
assert lb.get_server_index(slot_1, list1_size) == 0
25632561
# slot 2
2564-
assert lb.get_server_index(primary2_name, list2_size) == 0
2565-
assert lb.get_server_index(primary2_name, list2_size) == 1
2566-
assert lb.get_server_index(primary2_name, list2_size) == 0
2562+
assert lb.get_server_index(slot_2, list2_size) == 0
2563+
assert lb.get_server_index(slot_2, list2_size) == 1
2564+
assert lb.get_server_index(slot_2, list2_size) == 0
25672565

25682566
lb.reset()
2569-
assert lb.get_server_index(primary1_name, list1_size) == 0
2570-
assert lb.get_server_index(primary2_name, list2_size) == 0
2567+
assert lb.get_server_index(slot_1, list1_size) == 0
2568+
assert lb.get_server_index(slot_2, list2_size) == 0
25712569

25722570
# reset the indexes before load balancing strategy test
25732571
lb.reset()
25742572
# load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN_REPLICAS
25752573
for i in [1, 2, 1]:
25762574
srv_index = lb.get_server_index(
2577-
primary1_name,
2575+
slot_1,
25782576
list1_size,
25792577
load_balancing_strategy=LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
25802578
)
@@ -2585,7 +2583,7 @@ def test_load_balancer(self, r):
25852583
# load balancer strategy: LoadBalancerStrategy.RANDOM_REPLICA
25862584
for i in range(5):
25872585
srv_index = lb.get_server_index(
2588-
primary1_name,
2586+
slot_1,
25892587
list1_size,
25902588
load_balancing_strategy=LoadBalancingStrategy.RANDOM_REPLICA,
25912589
)

0 commit comments

Comments
 (0)