Skip to content

Enable multi-command parsing for replicated clients#3597

Open
enjoy-binbin wants to merge 2 commits intovalkey-io:unstablefrom
enjoy-binbin:replicaed_pipeline
Open

Enable multi-command parsing for replicated clients#3597
enjoy-binbin wants to merge 2 commits intovalkey-io:unstablefrom
enjoy-binbin:replicaed_pipeline

Conversation

@enjoy-binbin
Copy link
Copy Markdown
Member

Multi-command parsing in parseMultibulkBuffer (introduced #2092) was
disabled for replicated clients because the per-command replication
offset relied on c->qb_pos as the right boundary of the just-applied
command. Replication stream is actually a big pipeline, so if it can
be supported, the processing speed of the replica can be improved.

Decouple parsing position from application position by recording, for
every parsed command, the qb_pos snapshot taken right after that
command finished parsing:

  • parsedCommand.qb_end_pos: snapshot stored in the queue entry, set
    by parseMultibulkBuffer when a command is pushed into cmd_queue.
  • client.qb_applied: snapshot of the command currently being
    processed, set by the parsers (for c->argv) and by
    consumeCommandQueue (when popping a queued command).

commandProcessed() now uses c->qb_applied instead of c->qb_pos to
advance reploff by exactly the bytes of the just-applied command.
beforeNextClient shifts qb_end_pos of pending queue entries when the
replicated client's querybuf is trimmed, keeping subsequent reploff
updates consistent.

Both fields are populated unconditionally so that a client transitioning
to replicated mid-command (e.g. SYNCSLOTS ESTABLISH installs slot_migration_job
inside its own handler) still has a valid value when commandProcessed() runs.

Multi-command parsing in parseMultibulkBuffer (introduced valkey-io#2092) was
disabled for replicated clients because the per-command replication
offset relied on c->qb_pos as the right boundary of the just-applied
command. Replication stream is actually a big pipeline, so if it can
be supported, the processing speed of the replica can be improved.

Decouple parsing position from application position by recording, for
every parsed command, the qb_pos snapshot taken right after that
command finished parsing:
  - parsedCommand.qb_end_pos: snapshot stored in the queue entry, set
    by parseMultibulkBuffer when a command is pushed into cmd_queue.
  - client.qb_applied: snapshot of the command currently being
    processed, set by the parsers (for c->argv) and by
    consumeCommandQueue (when popping a queued command).

commandProcessed() now uses c->qb_applied instead of c->qb_pos to
advance reploff by exactly the bytes of the just-applied command.
beforeNextClient shifts qb_end_pos of pending queue entries when the
replicated client's querybuf is trimmed, keeping subsequent reploff
updates consistent.

Both fields are populated unconditionally so that a client transitioning
to replicated mid-command (e.g. SYNCSLOTS ESTABLISH installs slot_migration_job
inside its own handler) still has a valid value when commandProcessed() runs.

Signed-off-by: Binbin <binloveplay1314@qq.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 76.63%. Comparing base (5b7ac66) to head (a6364de).
⚠️ Report is 8 commits behind head on unstable.

Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #3597      +/-   ##
============================================
- Coverage     76.66%   76.63%   -0.03%     
============================================
  Files           160      160              
  Lines         80472    80483      +11     
============================================
- Hits          61690    61678      -12     
- Misses        18782    18805      +23     
Files with missing lines Coverage Δ
src/networking.c 92.27% <100.00%> (+0.11%) ⬆️
src/server.h 100.00% <ø> (ø)

... and 23 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread src/server.h
Comment thread src/networking.c Outdated
Comment thread src/networking.c
c->parsed_cmd = p->cmd;
c->slot = p->slot;
/* Restore qb_pos snapshot so commandProcessed() can update reploff precisely. */
c->qb_applied = p->qb_end_pos;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we guard the read on isReplicatedClient do we need to guard the write?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't guard the write, we need to maintain the write everytime since we have a CLUSTER SYNCSLOTS ESTABLISH command, this command will turn the client into a replicated client in the mid run.

Signed-off-by: Binbin <binloveplay1314@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants