Skip to content

Commit dd2827a

Browse files
Add support for asynchronous release to replicaKeysWithExpire on writable replica (#2849)
## Problem When executing `FLUSHALL ASYNC` on a **writable replica** that has a large number of expired keys directly written to it, the main thread gets blocked for an extended period while synchronously releasing the `replicaKeysWithExpire` dictionary. ## Root Cause `FLUSHALL ASYNC` is designed for asynchronous lazy freeing of core data structures, but the release of `replicaKeysWithExpire` (a dictionary tracking expired keys on replicas) still happens synchronously in the main thread. This synchronous operation becomes a bottleneck when dealing with massive key volumes, as it cannot be offloaded to the lazyfree background thread. This PR addresses the issue by moving the release of `replicaKeysWithExpire` to the lazyfree background thread, aligning it with the asynchronous design of `FLUSHALL ASYNC` and eliminating main thread blocking. ## User scenarios In some operations, people often need to do primary-replica switches. One goal is to avoid noticeable impact on the business—like key loss or reduced availability (e.g., write failures). Here is the process: First, temporarily switch traffic to writable replicas. Then we wait for the primary pending replication data to be fully synced (so primry and replicas are in sync), before finishing the switch. We don't usually need to do the flush in this case, but it's an optimization that can be done. Signed-off-by: Scut-Corgis <[email protected]> Signed-off-by: jiegang0219 <[email protected]> Co-authored-by: Binbin <[email protected]>
1 parent 8ea7f13 commit dd2827a

File tree

4 files changed

+31
-5
lines changed

4 files changed

+31
-5
lines changed

src/db.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
682682
/* Empty the database structure. */
683683
removed = emptyDbStructure(server.db, dbnum, async, callback);
684684

685-
if (dbnum == -1) flushReplicaKeysWithExpireList();
685+
if (dbnum == -1) flushReplicaKeysWithExpireList(async);
686686

687687
if (with_functions) {
688688
serverAssert(dbnum == -1);
@@ -1796,7 +1796,7 @@ void swapMainDbWithTempDb(serverDb **tempDb) {
17961796
}
17971797

17981798
trackingInvalidateKeysOnFlush(1);
1799-
flushReplicaKeysWithExpireList();
1799+
flushReplicaKeysWithExpireList(1);
18001800
}
18011801

18021802
/* SWAPDB db1 db2 */

src/expire.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,13 @@ size_t getReplicaKeyWithExpireCount(void) {
644644
* but it is not worth it since anyway race conditions using the same set
645645
* of key names in a writable replica and in its primary will lead to
646646
* inconsistencies. This is just a best-effort thing we do. */
647-
void flushReplicaKeysWithExpireList(void) {
647+
void flushReplicaKeysWithExpireList(int async) {
648648
if (replicaKeysWithExpire) {
649-
dictRelease(replicaKeysWithExpire);
649+
if (async) {
650+
freeReplicaKeysWithExpireAsync(replicaKeysWithExpire);
651+
} else {
652+
dictRelease(replicaKeysWithExpire);
653+
}
650654
replicaKeysWithExpire = NULL;
651655
}
652656
}

src/expire.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ enum activeExpiryType {
5151
typedef struct client client;
5252
typedef struct serverObject robj;
5353
typedef struct serverDb serverDb;
54+
typedef struct dict dict;
5455

5556
/* return the relevant expiration policy based on the current server state and the provided flags.
5657
* FLAGS can indicate either:
@@ -64,8 +65,9 @@ int convertExpireArgumentToUnixTime(client *c, robj *arg, long long basetime, in
6465
void activeExpireCycle(int type);
6566
void expireReplicaKeys(void);
6667
void rememberReplicaKeyWithExpire(serverDb *db, robj *key);
67-
void flushReplicaKeysWithExpireList(void);
68+
void flushReplicaKeysWithExpireList(int async);
6869
size_t getReplicaKeyWithExpireCount(void);
6970
bool timestampIsExpired(mstime_t when);
71+
void freeReplicaKeysWithExpireAsync(dict *replica_keys_with_expire);
7072

7173
#endif

src/lazyfree.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) {
8585
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
8686
}
8787

88+
/* Release the replicaKeysWithExpire dict. */
89+
void lazyFreeReplicaKeysWithExpire(void *args[]) {
90+
dict *replica_keys_with_expire = args[0];
91+
size_t len = dictSize(replica_keys_with_expire);
92+
dictRelease(replica_keys_with_expire);
93+
atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed);
94+
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
95+
}
96+
8897
/* Return the number of currently pending objects to free. */
8998
size_t lazyfreeGetPendingObjectsCount(void) {
9099
size_t aux = atomic_load_explicit(&lazyfree_objects, memory_order_relaxed);
@@ -260,3 +269,14 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
260269
raxFree(index);
261270
}
262271
}
272+
273+
274+
/* Free replicaKeysWithExpire dict, if the dict is huge enough, free it in async way. */
275+
void freeReplicaKeysWithExpireAsync(dict *replica_keys_with_expire) {
276+
if (dictSize(replica_keys_with_expire) > LAZYFREE_THRESHOLD) {
277+
atomic_fetch_add_explicit(&lazyfree_objects, dictSize(replica_keys_with_expire), memory_order_relaxed);
278+
bioCreateLazyFreeJob(lazyFreeReplicaKeysWithExpire, 1, replica_keys_with_expire);
279+
} else {
280+
dictRelease(replica_keys_with_expire);
281+
}
282+
}

0 commit comments

Comments
 (0)