Skip to content

Commit d5bb986

Browse files
authored
Add slot migration client flags and module context flags (#2639)
New client flags in reported by CLIENT INFO and CLIENT LIST: * `i` for atomic slot migration importing client * `E` for atomic slot migration exporting client New flags in return value of `ValkeyModule_GetContextFlags`: * `VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT`: Indicate the that client attached to this context is the slot import client. * `VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT`: Indicate the that client attached to this context is the slot export client. Users could use this to monitor the underlying client info of the slot migration, and more clearly understand why they see extra clients during the migration. Modules can use these to detect keyspace notifications on import clients. I am also adding export flags for symmetry, although there should not be keyspace notifications. But they would potentially be visible in command filters or in server events triggered by that client. --------- Signed-off-by: Jacob Murphy <[email protected]>
1 parent 3073979 commit d5bb986

File tree

4 files changed

+76
-2
lines changed

4 files changed

+76
-2
lines changed

src/module.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4004,6 +4004,12 @@ int VM_GetSelectedDb(ValkeyModuleCtx *ctx) {
40044004
* context is using RESP3.
40054005
*
40064006
* * VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP: The instance is starting
4007+
*
4008+
* * VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT: Indicate the that client attached to this
4009+
* context is the slot import client.
4010+
*
4011+
* * VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT: Indicate the that client attached to this
4012+
* context is the slot export client.
40074013
*/
40084014
int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
40094015
int flags = 0;
@@ -4017,6 +4023,11 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
40174023
if (ctx->client->resp == 3) {
40184024
flags |= VALKEYMODULE_CTX_FLAGS_RESP3;
40194025
}
4026+
if (ctx->client->slot_migration_job && isImportSlotMigrationJob(ctx->client->slot_migration_job)) {
4027+
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT;
4028+
} else if (ctx->client->slot_migration_job) {
4029+
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT;
4030+
}
40204031
}
40214032

40224033
/* For DIRTY flags, we need the blocked client if used */

src/networking.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4212,6 +4212,8 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) {
42124212
if (client->flag.no_evict) *p++ = 'e';
42134213
if (client->flag.no_touch) *p++ = 'T';
42144214
if (client->flag.import_source) *p++ = 'I';
4215+
if (client->slot_migration_job && isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'i';
4216+
if (client->slot_migration_job && !isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'E';
42154217
if (p == flags) *p++ = 'N';
42164218
*p++ = '\0';
42174219

@@ -4696,6 +4698,8 @@ static int validateClientFlagFilter(sds flag_filter) {
46964698
case 'e':
46974699
case 'T':
46984700
case 'I':
4701+
case 'i':
4702+
case 'E':
46994703
case 'N':
47004704
/* Valid flag, do nothing. */
47014705
break;
@@ -4850,6 +4854,12 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
48504854
case 'I': /* Import source flag */
48514855
if (!c->flag.import_source) return 0;
48524856
break;
4857+
case 'i': /* Slot migration import flag */
4858+
if (!c->slot_migration_job || !isImportSlotMigrationJob(c->slot_migration_job)) return 0;
4859+
break;
4860+
case 'E': /* Slot migration export flag */
4861+
if (!c->slot_migration_job || isImportSlotMigrationJob(c->slot_migration_job)) return 0;
4862+
break;
48534863
case 'N': /* Check for no flags */
48544864
if (c->flag.replica || c->flag.primary || c->flag.pubsub ||
48554865
c->flag.multi || c->flag.blocked || c->flag.tracking ||
@@ -4858,7 +4868,7 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
48584868
c->flag.unblocked || c->flag.close_asap ||
48594869
c->flag.unix_socket || c->flag.readonly ||
48604870
c->flag.no_evict || c->flag.no_touch ||
4861-
c->flag.import_source) {
4871+
c->flag.import_source || c->slot_migration_job) {
48624872
return 0;
48634873
}
48644874
break;

src/valkeymodule.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,15 @@ typedef struct ValkeyModuleStreamID {
221221
#define VALKEYMODULE_CTX_FLAGS_ASYNC_LOADING (1 << 23)
222222
/* Valkey is starting. */
223223
#define VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP (1 << 24)
224+
/* The current client is the slot import client */
225+
#define VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT (1 << 25)
226+
/* The current client is the slot export client */
227+
#define VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT (1 << 26)
224228

225229
/* Next context flag, must be updated when adding new flags above!
226230
This flag should not be used directly by the module.
227231
* Use ValkeyModule_GetContextFlagsAll instead. */
228-
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 25)
232+
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 27)
229233

230234
/* Keyspace changes notification classes. Every class is associated with a
231235
* character for configuration purposes.

tests/unit/cluster/cluster-migrateslots.tcl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,55 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste
521521
}
522522
}
523523

524+
proc verify_client_flag {idx flag expected_count} {
525+
set clients [split [string trim [R $idx client list]] "\r\n"]
526+
set found 0
527+
foreach client $clients {
528+
if {[regexp "flags=\[a-zA-Z\]*$flag" $client]} {
529+
incr found
530+
}
531+
}
532+
if {$found ne $expected_count} {
533+
fail "Expected $flag to appear in client list $expected_count times, got $found: $clients"
534+
}
535+
}
536+
537+
test "Slot migration seen in client flags" {
538+
assert_does_not_resync {
539+
set_debug_prevent_pause 1
540+
541+
assert_match "OK" [R 2 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node0_id]
542+
set jobname [get_job_name 2 16383]
543+
544+
# Check the flags
545+
verify_client_flag 0 "E" 0
546+
verify_client_flag 2 "E" 1
547+
verify_client_flag 0 "i" 1
548+
verify_client_flag 2 "i" 0
549+
assert_match "id=*" [R 2 CLIENT LIST FLAGS E]
550+
assert_equal "" [R 2 CLIENT LIST FLAGS i]
551+
assert_equal "" [R 0 CLIENT LIST FLAGS E]
552+
assert_match "id=*" [R 0 CLIENT LIST FLAGS i]
553+
554+
set_debug_prevent_pause 0
555+
wait_for_migration 0 16383
556+
557+
# Check the flags again
558+
verify_client_flag 0 "E" 0
559+
verify_client_flag 2 "E" 0
560+
verify_client_flag 0 "i" 0
561+
verify_client_flag 2 "i" 0
562+
assert_equal "" [R 2 CLIENT LIST FLAGS E]
563+
assert_equal "" [R 2 CLIENT LIST FLAGS i]
564+
assert_equal "" [R 0 CLIENT LIST FLAGS E]
565+
assert_equal "" [R 0 CLIENT LIST FLAGS i]
566+
567+
# Cleanup for the next test
568+
assert_match "OK" [R 0 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node2_id]
569+
wait_for_migration 2 16383
570+
}
571+
}
572+
524573
test "Import with hz set to 1" {
525574
assert_does_not_resync {
526575
set old_hz [lindex [R 0 CONFIG GET hz] 1]

0 commit comments

Comments
 (0)