Skip to content

Commit f9d9af6

Browse files
committed
Add slot migration client flags and module context flags
Signed-off-by: Jacob Murphy <[email protected]>
1 parent 8d562d2 commit f9d9af6

File tree

4 files changed

+76
-2
lines changed

4 files changed

+76
-2
lines changed

src/module.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4004,6 +4004,12 @@ int VM_GetSelectedDb(ValkeyModuleCtx *ctx) {
40044004
* context is using RESP3.
40054005
*
40064006
* * VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP: The instance is starting
4007+
*
4008+
* * VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT: Indicate the that client attached to this
4009+
* context is the slot import client.
4010+
*
4011+
* * VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT: Indicate the that client attached to this
4012+
* context is the slot export client.
40074013
*/
40084014
int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
40094015
int flags = 0;
@@ -4017,6 +4023,11 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
40174023
if (ctx->client->resp == 3) {
40184024
flags |= VALKEYMODULE_CTX_FLAGS_RESP3;
40194025
}
4026+
if (ctx->client->slot_migration_job && isImportSlotMigrationJob(ctx->client->slot_migration_job)) {
4027+
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT;
4028+
} else if (ctx->client->slot_migration_job) {
4029+
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT;
4030+
}
40204031
}
40214032

40224033
/* For DIRTY flags, we need the blocked client if used */

src/networking.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4212,6 +4212,8 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) {
42124212
if (client->flag.no_evict) *p++ = 'e';
42134213
if (client->flag.no_touch) *p++ = 'T';
42144214
if (client->flag.import_source) *p++ = 'I';
4215+
if (client->slot_migration_job && isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'i';
4216+
if (client->slot_migration_job && !isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'E';
42154217
if (p == flags) *p++ = 'N';
42164218
*p++ = '\0';
42174219

@@ -4696,6 +4698,8 @@ static int validateClientFlagFilter(sds flag_filter) {
46964698
case 'e':
46974699
case 'T':
46984700
case 'I':
4701+
case 'i':
4702+
case 'E':
46994703
case 'N':
47004704
/* Valid flag, do nothing. */
47014705
break;
@@ -4850,6 +4854,12 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
48504854
case 'I': /* Import source flag */
48514855
if (!c->flag.import_source) return 0;
48524856
break;
4857+
case 'i': /* Slot migration import flag */
4858+
if (!c->slot_migration_job || !isImportSlotMigrationJob(c->slot_migration_job)) return 0;
4859+
break;
4860+
case 'E': /* Slot migration export flag */
4861+
if (!c->slot_migration_job || isImportSlotMigrationJob(c->slot_migration_job)) return 0;
4862+
break;
48534863
case 'N': /* Check for no flags */
48544864
if (c->flag.replica || c->flag.primary || c->flag.pubsub ||
48554865
c->flag.multi || c->flag.blocked || c->flag.tracking ||
@@ -4858,7 +4868,7 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
48584868
c->flag.unblocked || c->flag.close_asap ||
48594869
c->flag.unix_socket || c->flag.readonly ||
48604870
c->flag.no_evict || c->flag.no_touch ||
4861-
c->flag.import_source) {
4871+
c->flag.import_source || c->slot_migration_job) {
48624872
return 0;
48634873
}
48644874
break;

src/valkeymodule.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,15 @@ typedef struct ValkeyModuleStreamID {
221221
#define VALKEYMODULE_CTX_FLAGS_ASYNC_LOADING (1 << 23)
222222
/* Valkey is starting. */
223223
#define VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP (1 << 24)
224+
/* The current client is the slot import client */
225+
#define VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT (1 << 25)
226+
/* The current client is the slot export client */
227+
#define VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT (1 << 26)
224228

225229
/* Next context flag, must be updated when adding new flags above!
226230
This flag should not be used directly by the module.
227231
* Use ValkeyModule_GetContextFlagsAll instead. */
228-
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 25)
232+
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 27)
229233

230234
/* Keyspace changes notification classes. Every class is associated with a
231235
* character for configuration purposes.

tests/unit/cluster/cluster-migrateslots.tcl

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,55 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste
520520
}
521521
}
522522

523+
proc verify_client_flag {idx flag expected_count} {
524+
set clients [split [string trim [R $idx client list]] "\r\n"]
525+
set found 0
526+
foreach client $clients {
527+
if {[regexp "id=.*flags=\[a-zA-Z\]*$flag\[a-zA-Z\]* .*" $client]} {
528+
incr found
529+
}
530+
}
531+
if {$found ne $expected_count} {
532+
fail "Expected $flag to appear in client list $expected_count times, got $found: $clients"
533+
}
534+
}
535+
536+
test "Slot migration seen in client flags" {
537+
assert_does_not_resync {
538+
set_debug_prevent_pause 1
539+
540+
assert_match "OK" [R 2 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node0_id]
541+
set jobname [get_job_name 2 16383]
542+
543+
# Check the flags
544+
verify_client_flag 0 "E" 0
545+
verify_client_flag 2 "E" 1
546+
verify_client_flag 0 "i" 1
547+
verify_client_flag 2 "i" 0
548+
assert_match "id=*" [R 2 CLIENT LIST FLAGS E]
549+
assert_equal "" [R 2 CLIENT LIST FLAGS i]
550+
assert_equal "" [R 0 CLIENT LIST FLAGS E]
551+
assert_match "id=*" [R 0 CLIENT LIST FLAGS i]
552+
553+
set_debug_prevent_pause 0
554+
wait_for_migration 0 16383
555+
556+
# Check the flags again
557+
verify_client_flag 0 "E" 0
558+
verify_client_flag 2 "E" 0
559+
verify_client_flag 0 "i" 0
560+
verify_client_flag 2 "i" 0
561+
assert_equal "" [R 2 CLIENT LIST FLAGS E]
562+
assert_equal "" [R 2 CLIENT LIST FLAGS i]
563+
assert_equal "" [R 0 CLIENT LIST FLAGS E]
564+
assert_equal "" [R 0 CLIENT LIST FLAGS i]
565+
566+
# Cleanup for the next test
567+
assert_match "OK" [R 0 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node2_id]
568+
wait_for_migration 2 16383
569+
}
570+
}
571+
523572
# Catch-all test for covering commands sent during incremental replication
524573
test "Single source import - Incremental Command Coverage" {
525574
assert_does_not_resync {

0 commit comments

Comments
 (0)