PoC: Raft-based cluster bus#3543
Conversation
Refactors the cluster code to separate the legacy gossip-based cluster
bus from the core cluster logic, making it possible to replace the bus
with an alternative consensus protocol.
### Before
Before this refactoring:
- **cluster.c** — clusterCommand, clusterInit, CLUSTER INFO, CLUSTER
SLOTS/SHARDS, and some high-level dispatching.
- **cluster_legacy.c** — Everything else: gossip message
building/processing, failure detection, epoch management, manual
failover, replica migration, plus all the
common infrastructure that could be protocol-agnostic: node
creation/cleanup, link lifecycle, connection accept/read/write handlers,
clusterConnectNodes, listener
setup, nodes.conf persistence, slot assignment, and clusterCron.
The common infrastructure was embedded in cluster_legacy.c alongside
gossip-specific code.
**cluster_migrateslots.c** (atomic slot migration) and
**cluster_slot_stats.c** (stats) are mostly unaffected by this
refactoring.
### After
A layered architecture with `cluster.{h,c}` as the internal API and
common handling of admin commands, a struct of callbacks (vtable) to
delegate to the cluster implementation, and various reusable components
in separate files.
```
┌──────────────────────────────────────────────────────────────────┐
│ server.c / commands │
│ (calls clusterInit, clusterCron, etc.) │
└──────────────────────────────┬───────────────────────────────────┘
│
┌──────────────────────────────▼───────────────────────────────────┐
│ cluster.c │
│ Command dispatch, vtable dispatchers, common init, │
│ CLUSTER SHARDS/SLOTS, state evaluation, RDB aux fields │
├─────────────┬───────────────┬───────────────┬────────────────────┤
│cluster_nodes│ cluster_state │ cluster_link │cluster_migrateslots│
│ .c / .h │ .c / .h │ .c / .h │ .c / .h │
│ │ │ │ │
│ nodes.conf │ clusterNode, │ Link lifecycle│ Atomic slot │
│ load/save, │ clusterState, │ accept/connect│ migration │
│ CLUSTER │ slot assign, │ read/write, │ (SYNCSLOTS) │
│ NODES text, │ node creation,│ send blocks, │ │
│ aux fields │ iterators, │ buffer mgmt │ │
│ │ dict types │ │ │
└──────┬──────┴───────────────┴───────┬───────┴────────────────────┘
│ │
│ ┌─────────────────────────▼──────────┐
│ │ cluster_bus.h │
│ │ clusterBusType vtable interface │
│ │ (validateMessageHeader, │
│ │ processMessage, postConnect, │
│ │ slotChange, forgetNode, ...) │
│ └─────────────┬──────────────────────┘
│ │
┌──────▼──────────────────▼───────────────────────────────────────┐
│ cluster_legacy.c │
│ │
│ Gossip protocol implementation (clusterLegacyBus): │
│ - Message building & processing (PING, PONG, MEET, FAIL, ..) │
│ - Failure detection (PFAIL/FAIL voting) │
│ - Epoch management (configEpoch, currentEpoch) │
│ - Replica migration, manual failover state machine │
│ - clusterLegacyState, clusterNodeLegacyData │
│ - BUMPEPOCH, SET-CONFIG-EPOCH (via protocolSubcommand) │
└─────────────────────────────────────────────────────────────────┘
```
### What moved where
cluster.c (protocol-agnostic command handling):
- CLUSTER SETSLOT, ADDSLOTS, DELSLOTS, ADDSLOTSRANGE, DELSLOTSRANGE
- CLUSTER REPLICATE, FAILOVER, FORGET, MEET, RESET
- CLUSTER BUMPEPOCH, SET-CONFIG-EPOCH, SAVECONFIG, FLUSHSLOT
- CLUSTER LINKS (dispatches to cluster_link.c)
- CLUSTER SYNCSLOTS, MIGRATESLOTS, GETSLOTMIGRATIONS,
CANCELSLOTMIGRATIONS
- Slot routing (getNodeByQuery, clusterRedirectClient)
- Key migration (MIGRATE, DUMP, RESTORE)
- clusterSetPrimary (role change logic)
cluster_state.c (shared node/slot state):
- Node registry (clusterLookupNode, clusterAddNode, clusterDelNode)
- Node accessors (clusterNodeIsPrimary, humanNodename, etc.)
- Slot state (setMigratingSlotDest, setImportingSlotSource,
clusterCloseAllSlots)
- Shard management (clusterAddNodeToShard, clusterRemoveNodeFromShard)
cluster_link.c (transport layer):
- createClusterLink, freeClusterLink
- clusterWriteHandler, clusterLinkConnectHandler
- handleLinkIOError
- CLUSTER LINKS command implementation
- DEBUG CLUSTERLINK command implementation
### Data structure separation
clusterState and clusterNode are split into common fields (accessible by
all layers) and protocol-specific fields (only accessed by the legacy
bus):
```
clusterState clusterNode
┌─────────────────────────┐ ┌─────────────────────────┐
│ Common (cluster_state.h)│ │ Common (cluster_state.h)│
│ - myself │ │ - name, shard_id │
│ - nodes dict │ │ - flags, ip, ports │
│ - slots[] │ │ - replicaof, replicas │
│ - migrating/importing │ │ - slots bitmap │
│ - slot_migration_jobs │ │ - numslots │
│ - state (ok/fail) │ │ │
│ │ │ protocol_data ──┐ │
│ protocol_data ──┐ │ └─────────────────┼───────┘
└─────────────────┼───────┘ │
│ ┌─────────────────▼───────┐
┌─────────────────▼───────┐ │ Legacy (cluster_legacy) │
│ Legacy (cluster_legacy) │ │ - configEpoch │
│ - currentEpoch │ │ - ping_sent/pong_recv │
│ - failover_auth_* │ │ - link (inbound/outbnd) │
│ - mf_end, mf_replica │ │ - fail_reports │
│ - todo_before_sleep │ │ - orphaned_time │
│ - stats_bus_messages_* │ └─────────────────────────┘
└─────────────────────────┘
```
### clusterBusType vtable
The clusterBusType struct defines the interface between the
protocol-agnostic layer and the bus implementation:
```c
struct clusterBusType {
/* Lifecycle */
void (*init)(void);
void (*cron)(void);
void (*beforeSleep)(void);
/* Slot ownership changes — synchronous in legacy, can be async
* in a consensus-based implementation */
void (*slotChange)(slotRange *ranges, int numranges,
clusterNode *target, void *ctx,
void (*callback)(void *ctx, int success));
/* Failover state management */
void (*resetManualFailoverState)(void);
void (*resetAutomaticFailoverState)(void);
/* Node management */
void (*setReplicaOf)(clusterNode *primary);
void (*failover)(client *c, int force, int takeover);
void (*forgetNode)(const char *node_id, size_t id_len);
void (*meet)(client *c);
/* ... config updates, stats, message propagation ... */
};
```
The legacy implementation (clusterLegacyBus) applies slot changes
immediately and calls the callback inline. A consensus-based
implementation can commit to a log first and call the callback
asynchronously after the commit.
Closes valkey-io#457.
---------
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
clusterDelNode was defined in cluster_legacy.c but declared in the shared cluster_state.h header, making it callable from any protocol implementation despite containing gossip-specific logic (failure report removal via rax iteration). Extract the gossip-specific cleanup into a cleanupNode callback on the bus vtable. Move the common parts (slot cleanup, shard removal, freeClusterNode) and clusterRenameNode to cluster_state.c where they belong as protocol-independent operations. Co-authored-by: Jacob Murphy <jkmurphy@google.com> Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Add a new blocking type for async operations (e.g. Raft commit). When a client is blocked with BLOCKED_ASYNC, the operation handle is owned by the caller. On disconnect or timeout, no special cleanup is needed — the handle will be consumed later and the client lookup will return NULL. Add blockedAsyncCreate/blockedAsyncConsume API in blocked.c. An async handle (opaque blockedAsyncHandle struct) wraps a client ID so that a completion callback can safely look up the client even if it disconnected while waiting. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Implement a Raft-based cluster bus protocol selectable via the
cluster-protocol config option (gossip|raft, default gossip).
Wire format: 8-byte binary header ("RAFT" + uint32 BE totlen)
followed by a space-separated text payload. First token is the
message type.
Implemented features:
- CLUSTER MEET: creates handshake nodes, establishes links
- HELLO message: carries sender node ID and address string in
nodes.conf format (ip:port@cport[,hostname][,aux=val]*)
- Leader election via RequestVote with randomized timeouts
- Heartbeats and log replication via AppendEntries
- NODE_JOIN and SLOT_CHANGE entry types
- PROPOSE message for follower-to-leader forwarding
- Pending proposal callbacks matched on commit (FIFO)
- CLUSTER INFO reports raft role, term, commit index, leader
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Delay the HELLO reply until the NODE_JOIN entry is committed through the Raft log. The reply uses three verbs: - HELLO: initial contact (sent by MEET initiator) - WELCOME: reply after NODE_JOIN is committed (you're accepted) - HI: immediate reply from a singleton to a non-singleton This ensures that when CLUSTER MEET returns OK, the new node is already a voting member of the cluster. Also fixes: - Use cluster->size (voting members) instead of dictSize for cluster size in greetings - Fill empty sender IP from connection peer address - Always increment cluster->size on NODE_JOIN apply even if node already exists in the table - Single-node leader commits entries immediately - Only send HELLO for handshake nodes in postConnect Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Update ADDSLOTS, DELSLOTS, ADDSLOTSRANGE, DELSLOTSRANGE, FLUSHSLOTS, and SETSLOT NODE to block the client using blockClientAsync before calling the slotChange vtable. The completion callbacks now use consumeBlockedClientAsyncHandle and unblockClientAsync. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Move state updates out of cron into event-driven paths: - Extract clusterRaftCheckSlotCoverage() and call it from beforeSleep only when a SLOT_CHANGE entry is applied (todo_update_slot_coverage). - Invalidate cached CLUSTER SLOTS response only when slots or node membership changes (todo_invalidate_slots_cache). - Move single-node leader promotion from cron to initLast. Split the monolithic clusterRaftProcessHello into three handlers: - clusterRaftProcessHello: initial contact, step-down logic, set hello_received for reply in postConnect. - clusterRaftProcessHi: singleton's immediate reply, sets hello_received so the receiver proposes NODE_JOIN. - clusterRaftProcessWelcome: NODE_JOIN committed, fires pending MEET callback. The HI handler fix enables the 3-node MEET pattern where a follower adds a singleton: the follower receives HI and proposes NODE_JOIN to the leader on behalf of the new node. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
getClusterConnectionsCount is called from acceptCommonHandler and INFO output even when cluster is not enabled. It doesn't need protocol-specific logic, so inline it instead of going through the vtable which may be NULL before clusterInit. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
- Only step down on HELLO from unknown node (reconnect HELLO is not a MEET, don't step down) - Set rs->leader on step-down in both HELLO and HI paths so the follower knows where to send PROPOSE - Connect to leader after step-down (todo_connect_nodes) - Use inbound_link fallback for PROPOSE when outbound link is not yet established - Retry broadcast AE in next beforeSleep if some nodes had no link Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Implement CLUSTER REPLICATE for raft by proposing SET_REPLICA_OF entries through the raft log. The entry format is: SET_REPLICA_OF <replica-id> <primary-id-or-dash> On apply, the replica is assigned to the primary (or promoted if dash). For remote replicas, set repl_offset=1 so CLUSTER SHARDS reports 'online' instead of 'loading' (in gossip this is updated via ping/pong). The cluster-nodes-slots.tcl test suite now passes with --config cluster-protocol raft. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Rename AE_OK to AE_ACK since the response carries both success and reject outcomes. Add failure detection: the leader tracks last_ack_time per peer and proposes NODE_FAIL when a node exceeds cluster-node-timeout without responding. On apply, the FAIL flag is set and slot coverage is rechecked so cluster_state transitions to 'fail'. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Document the HELLO/HI/WELCOME protocol, PROPOSE forwarding, all entry types with syntax and descriptions, all message types, failure detection, blocking async commands, CLUSTER SLOTS/SHARDS compatibility considerations, and step-down rules. Rename wire messages: VOTE -> VOTE_REQ, VOTE_OK -> VOTE. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Propose NODE_FORGET entries through the raft log. On apply, clear the node's slots, remove it from its shard, and free it. Uses freeClusterNode directly instead of clusterDelNode which accesses legacy gossip-specific data (failure reports). Also rename NODE_LEAVE to NODE_FORGET to match the client-facing CLUSTER FORGET command. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Propose FAILOVER entries with format '<replica-id> <primary-id>'. On apply: transfer slots from old primary to replica, promote the replica to primary, demote the old primary to replica. If myself is involved, start/stop replication accordingly. Coordinated failover (CLUSTER FAILOVER without FORCE): the replica sends a FAILOVER_PREPARE message directly to the primary over the cluster link (not a raft log entry). The primary pauses writes. The replica monitors its replication offset in cron and proposes the FAILOVER entry once caught up, or times out. FORCE and TAKEOVER variants skip the coordination and propose immediately. Automatic failover (leader-initiated when a primary with slots fails) is not yet implemented. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
When the leader detects NODE_FAIL for a primary with replicas, it sends a REPL_OFFSETS message to each replica containing all sibling replicas' replication offsets. Each replica computes its rank (count of siblings with higher offset) and schedules a FAILOVER proposal with delay rank * 1s. The best replica proposes first; the apply logic makes subsequent proposals a no-op. REPL_OFFSETS is a general-purpose message for propagating replication offsets. On receipt, node->repl_offset is updated for all mentioned nodes, which improves CLUSTER SLOTS/SHARDS health reporting. The message can be extended in the future for periodic offset broadcasts. AE_ACK now carries the sender's replication offset so the leader has accurate data for REPL_OFFSETS. Also fixes: don't require CLUSTER_NODE_PRIMARY flag on nodes that were never explicitly flagged (raft nodes join with flags=0). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Update node->repl_offset from AE_ACK on the leader, so CLUSTER SLOTS and CLUSTER SHARDS show accurate health and offset for remote nodes on the leader. Keep myself->repl_offset up to date in beforeSleep for replicas. The repl_offset=1 workaround in SET_REPLICA_OF remains for non-leader nodes until REPL_OFFSETS broadcast to all nodes is implemented. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
All messages are sent on outbound links. PROPOSE no longer falls back to the leader's inbound link. A follower always has an outbound link to the leader (established when joining the cluster), so the fallback was unnecessary. Log a warning if the link is missing. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Propose NODE_META entries when onMyselfUpdated fires (IP, port, or hostname changes). Format: '<node-id> <address-string>'. On apply, update the node's address using clusterNodeParseAddressString. All entry types are now implemented. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Add --cluster-v2 flag to runtest which sets cluster-protocol=raft globally and enables the cluster-v2:skip tag for skipping tests that are specific to the legacy gossip protocol. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
The test 'Verify the nodes configured with prefer hostname only show hostname for new nodes' uses DEBUG DROP-CLUSTER-PACKET-FILTER which is gossip-specific. Tag it with cluster-v2:skip. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
In raft mode, cluster commands that modify state (MEET, ADDSLOTS, DELSLOTS, etc.) use blockClientAsync which is not compatible with MULTI/EXEC. Reject them early in clusterCommand. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Send PUBLISH and MODULE messages directly to peer nodes (not via raft log). For global pub/sub, send to all nodes. For sharded pub/sub, send only to nodes in the same shard. Module messages are sent to a specific target node or broadcast to all. The wire format uses a newline to separate the header from the binary payload, ensuring binary-safe handling of data that may contain spaces, newlines, or null bytes. Add cluster_stats_messages_module_sent/received counters to CLUSTER INFO output. Also fix: delete keys when a slot is transferred away from myself in the SLOT_CHANGE apply function. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Split clusterRaftFlushPendingProposals into two functions: - clusterRaftFailPendingProposals: fail with error (hard failure) - clusterRaftDeferPendingProposals: keep for retry (leader change) On leader change, proposals are deferred instead of failed. In the next cron tick, if we became leader, append them to our own log. If we're a follower with a link to the new leader, forward them. Also defer when PROPOSE has no outbound link to the leader. This handles transient leader transitions gracefully — the client's CLUSTER FAILOVER command succeeds after the new leader is established instead of failing with 'leader changed'. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
When a node clears an optional field (e.g. cluster-announce-client-port set to 0), the field is omitted from the address string. Without resetting before parsing, the old value persists on other nodes. Reset announce_client_tcp_port, announce_client_tls_port, hostname, and human_nodename before calling clusterNodeParseAddressString so that absent fields are properly cleared. Also skip gossip-specific human-announced-nodename tests in cluster-v2. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Split proposal handling into defer (leader change) and timeout (client waiting). On leader change, proposals are deferred and retried with the new leader or appended locally if we become leader. Add NODE_INFO divergence detection: every 10 seconds, compare the current address string with the last committed one. If they differ (e.g. a CONFIG SET succeeded but the proposal timed out during a partition), re-propose NODE_INFO. Also remove unused clusterRaftFailPendingProposals. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Add flags as a third argument to NODE_INFO entries. The format is now: NODE_INFO <node-id> <address-string> <flags> Where flags is 'nofailover' or 'noflags'. The address string remains compatible with nodes.conf. On apply, NOFAILOVER is set or cleared based on the flags argument. The divergence check also includes flags in the comparison. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Move clusterAutoFailoverOnShutdown from cluster_legacy.c to cluster.c since it's protocol-independent (uses replication stream, not gossip). Remove the auto_failover parameter from the handleServerShutdown vtable callback. Fix crash when CLUSTER FAILOVER FORCE arrives via replication stream: don't pass a callback when the client is the primary connection. This enables the auto-failover-on-shutdown test suite for raft. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
| Forwards a proposal from a follower to the leader. The entry | ||
| uses the same format as log entries (type followed by data). | ||
|
|
||
| FAILOVER_PREPARE |
There was a problem hiding this comment.
As we talked about yesterday - I think doing manual failover orchestration via REPLCONF makes sense. We can continue to align these functionalities to the replication subsystem and then standalone can use them too
There was a problem hiding this comment.
Yes, so the replica sends a new REPLCONF FAILOVER-PAUSE or similar (corresponding to cluster message MFSTART).
It seems strait-forward. Just a compatibility issue: An old version primary doesn't expect that the primary knows the replica's capas and version via REPLCONF CAPA and REPLCONF VERSION, but the replica doesn't know the primary's capas and version. How does the replica know if the primary supports REPLCONF FAILOVER-PAUSE? (An idea: During the handshake, the replica sends a new REPLCONF WHO-ARE-YOU? (TBD) to ask for the primary's capas and/or version. An old version primary will reply -ERR Unrecognized REPLCONF option.)
Practically, how to we move forward with this? I'd like to avoid scope creep in this Raft PR. Either
- you/we/someone implements this mechanism separately with legacy cluster and/or standalone, or
- we merge this Raft PR to cluster-v2 first and do it in cluster-v2 afterwards.
The first option seems nicest to me. I don't know what more is required for standalone though. The primary needs to configure itself as a replica and vice versa. We may need another mechanism for that. In standalone, FAILOVER is initiated on the primary, which pauses itself and waits for the replica, then sends PSYNC FAILOVER to the replica, i.e. it's initiated by the primary and the replica doesn't need to tell the primary to pause. In cluster mode, the actual failover it's a consensus thing (failover auth voting in gossip, committed entry in raft).
There was a problem hiding this comment.
Okay - yeah I'm okay pulling the change out of Cluster V2. In the meantime we can continue with what it is in here, and when that goes into unstable we can reconcile it?
I would like to get the cluster-v2 branch bootstrapped with a real implementation sooner rather than later, so we can start doing all the necessary testing and incremental work
There was a problem hiding this comment.
Yeah, we can also do it in the cluster-v2 but in a separate PR. First I would like to get everything to pass. I'm focusing on failing test cases. I have to debug some things that just don't work.
| The primary pauses writes so the replica can catch up. Not a raft | ||
| log entry — just a direct message over the cluster link. | ||
|
|
||
| REPL_OFFSETS <node-id> <offset> [<node-id> <offset> ...] |
There was a problem hiding this comment.
Perhaps we can just have a general purpose message for all gosisp-style metadata? If it gets big enough, we can get smarter with fingerprinting the metadata and only requesting the content when there is a diff. But for now we can bundle this all into a periodic update
Potential other eventually consistent metadata:
- Loaded Functions
- ACLs
- Node information/availability-zone/IP address/etc (may be fine in the Raft log since it is low volume?)
There was a problem hiding this comment.
Don't we want to push functions and ACLs through the raft log to make it truly consistent? I don't see why the volume would be a problem for these.
I think we should be aware of drift already from the start. If two nodes constantly propose different ACLs and update each other and everybody else, it'd be an endless loop of updates. We should prevent that.
With that in place, I don't see why the volume would matter much. We need to think through how we store these things in a backward/forward compatible way though. Currently we store ACLs in an ACL file and functions in RDB. With raft, we'd store them in the log and cluster state (e.g. nodes.conf or something else; snapshot + append-only file).
There was a problem hiding this comment.
Regarding general-purpose entry type for any kind of metadata or specific ones, we'll have the compatibilty problem regardless:
- with a
SET_GENERAL_PURPOSE_METADATA key="acl" value="foo -@all +hset xxx..."(intended to replace the whole ACL config) an old node can store the key/value pair but doesn't know what to do with it. - with a dedicated
SET_ACL foo -@all +hset xxxd...an old node doesn't understand the entry at all, so it can't apply it, but it still gets stored in the raft log as it's currently implemented. The leader accepts proposals and adds the to the log without any validation so even if the leader is an old node, this can work the same.
However, in both cases, it's pretty bad if some nodes don't understand all the entries. It can lead to inconsistencies. To prevent that, each node should announce its version and/or capabilities, so that other nodes can propose only things that the other nodes can understand.
The broader topic of being able to upgrade the protocol is very important.
There was a problem hiding this comment.
I'm okay with putting ACL/functions through the Raft log. The general-purpose gossip concept was less for compatibility and more for efficiency, since as we accumulate gossip content, we can just use a fingerprint of the gossip content rather than the full payload. But anyways I am getting ahead of the requirements. If offsets is the only case - then I think we can keep it as a dedicated message right now.
There was a problem hiding this comment.
Oh, fingerprint gossip? Like we can use GOSSIPSHA as the command name to mirror EVALSHA? :)
We should only send changes, not push the same content over and over, rigth? If we do that, then I don't see a need for fingerprinted version.
Anyway, we don't need to commit to backward compatibility just yet. We can change anything later. We will need to keep cluster V2 experimental for some time probably.
| Update node metadata (IP, port, hostname). Not yet implemented. | ||
|
|
||
| NODE_FAIL <node-id> | ||
| Mark a node as failed. Proposed by the leader when a peer exceeds |
There was a problem hiding this comment.
I like the idea of moving fault detection to the replication subsystem since we are always sending data and ping/ponging there. We just need the Raft system to be the tie-breaker.
My vision for fault detection is:
- Replication stream (and REPLCONF heartbeats) refresh the health timer on both primary/replica
- When either side sees the other as unavailable, it goes to the Raft group and attempts to mark the other one as unhealthy. The primary uses a shorter timeout than the replica to always favor not failing over
- An unhealthy node is always unsuitable for primaryship, so we can join the NODE_FAIL message with FAILOVER. I think probably calling this UPDATE_SHARD makes sense.
- We can add a shard-level epoch as a fencing token, and now you have a clean pre-validation that we can deny dueling proposals based on first-write-wins:
UPDATE_SHARD <shard-id> [PREV-EPOCH <epoch>] PRIMARY <node-id> HEALTHY-REPLICAS <count> <node-ids...> UNHEALTHY-REPLICAS <count> <node-ids...>
This would also allow us to join with SET_REPLICA_OF
What do you think? PREV-EPOCH is optional, so for operations with no fencing requirements (like adding a new node ID as a replica), we can exclude it. Although we can start with specifying it each time.
Kafka does this with: PartitionRecord(topic, partition, leader, isr, leader_epoch, partition_epoch). isr is the equivalent of "heatlhy replicas" for us. leader_epoch bumps on failover, partition_epoch bumps on new replicas or replicas going out of sync. I don't think we need more than one epoch, but we can look into the KRaft design to understand why they needed two
There was a problem hiding this comment.
My vision for fault detection is:
- Replication stream (and REPLCONF heartbeats) refresh the health timer on both primary/replica
Yes, this is a good idea. I'm aligned on the high level.
Here's what we already have regarding replication heartbeats:
- The replica sends REPLCONF ACK every second (once per replicationCron cycle). Hard-coded to 1 second AFAICT.
- The primary sends PING on the replication stream every 10 seconds if there's no data to replicate. Configurable with
repl-ping-replica-period. - The primary sends an newline
\nto replicas waiting for the full sync (every second if I read correctly). - If the replica didn't get any data nor PING from the primary during 60 seconds, it disconnects the replication connection. Later, it tries to reconnect and PSYNC. Configurable with
repl-timeout.
Should we override these with numbers relative to the configured cluster-node-timeout?
- When either side sees the other as unavailable, it goes to the Raft group and attempts to mark the other one as unhealthy. The primary uses a shorter timeout than the replica to always favor not failing over
- An unhealthy node is always unsuitable for primaryship, so we can join the NODE_FAIL message with FAILOVER. I think probably calling this UPDATE_SHARD makes sense.
In the this Raft PoC, any node can propose anything, and the leader-detected FAIL can co-exist with primary and replica detecting each other as failing.
If we move the shard failure detection and failover decision entirely to the shard, how do the replicas pick the best ranked replica? Should the replicas exchange replication offset by asking each other? In legacy, this happens after marking the primary as FAIL.
I separated FAIL-flagging from FAILOVER just to leave the decision up to each replica if it wants to initiate a FAILOVER for itself. It seemed like a conservative approach. A node may not want to do a failover for some reason, though we have the NOFAILOVER node flag in the global raft state. It's bad if the best ranked replica doesn't want to initate failover for some unknown reason though, because it will just delay the failover. Maybe the leader should just pick the best replica (if it has their replication offsets, which is has in this PoC) and force it to take over by just adding FAILOVER to the log.
We should think through the different network failure modes. If the leader has contact with primary and replica, but they don't have contact with each other, failover shoudn't happen. If the primary marks the replica as FAIL first, as you say, this may be enough.
Raft leader (Nobody failed IMO)
/ \
/ \
/ \
/ \
Primary -- X -- Replica
(My replica (My primary
failed?) failed?)
You have an idea that some nodes are not members of the raft group at all, not even as learners? Every node needs to get the topology updates so they can respond accurately to CLUSTETR SLOTS and such client commands. Then, don't we need them all to be at least as learners? Then, they'll get the raft log and heartbeats, but they don't count in the quorum for committing entries and they can't vote. When they ack heartbeats, we get leader-based failure detection for free.
- We can add a shard-level epoch as a fencing token, and now you have a clean pre-validation that we can deny dueling proposals based on first-write-wins: UPDATE_SHARD [PREV-EPOCH ] PRIMARY HEALTHY-REPLICAS <node-ids...> UNHEALTHY-REPLICAS <node-ids...>
Shard-level epoch can be useful for preventing failover-ping-pong when multiple FAILOVERs are waiting in the Raft log. The first failover will succceed and the next one will have a mismatching shard-epoch and be ignored. Currently in this PoC, the only mechanism used is that the old primary still needs to be primary for the failover to happen. I see now that the design-doc isn't updated but the entry syntax is
FAILOVER <replica-id> <primary-id>
I don't know if we can get failover ping-pong behavior with this syntax. Maybe we can. A shard-level epoch sounds like a good idea.
This would also allow us to join with SET_REPLICA_OF
I think it's clearer if we keep them different. They have different purposes and semantics:
SET_REPLICA_OFis for moving nodes between shards. It corresponds toCUSTER REPLICAOF <node-id>andCLUSTER REPLICAOF NO ONE. It changes the node's shard-id and doesn't affect other nodes.FAILOVERaffects two nodes in a shard. One replica and one primary swap roles atomically.
What do you think? PREV-EPOCH is optional, so for operations with no fencing requirements (like adding a new node ID as a replica), we can exclude it. Although we can start with specifying it each time.
Yes, shard-epoch is a good idea. Probably for FAILOVER and SET_REPLICA_OF. Maybe also SLOT_CHANGE (slot migrations and addslots/delslots). It affects two shards, so should we include both shards' epochs and require both to match?
Kafka does this with: PartitionRecord(topic, partition, leader, isr, leader_epoch, partition_epoch). isr is the equivalent of "heatlhy replicas" for us. leader_epoch bumps on failover, partition_epoch bumps on new replicas or replicas going out of sync. I don't think we need more than one epoch, but we can look into the KRaft design to understand why they needed two
Is one of the epochs the global Raft leader's term? I don't know, but one shard-epoch to avoiding conflicts for concurrent changes sounds like a good idea
There was a problem hiding this comment.
Update: I don't know if failure detection can co-exist. If a replica marks its primary as FAIL and the leader marks it as not failing (based on AppendEntriesAck), they will compete...
There was a problem hiding this comment.
Should we override these with numbers relative to the configured cluster-node-timeout?
In the final implementation - yes. Probably MIN(1 second, node-timeout/2)? But we need to build out the rest of the health check system first. So for this PR, we can leave it at 1 second.
There was a problem hiding this comment.
You have an idea that some nodes are not members of the raft group at all, not even as learners?
Oh yeah, they would still need to replicate. But replicating from the Raft group is not the same as being a member. It could even be chaining-style replication.
Anyways, for now we should keep them all in the same group. Adding a "learner/observer" role later should be possible.
When they ack heartbeats, we get leader-based failure detection for free.
This forces leader to be fully connected to all learners, which isn't a strict requirement. We could use chaining replication, and they also don't need to necessarily ACK the replication stream.
When I was doing the POC, I had them get a stream of AppendEntries, and if they get an AppendEntries that is too far ahead, they just request snapshot. But there is no match_index tracking
There was a problem hiding this comment.
I don't know if we can get failover ping-pong behavior with this syntax. Maybe we can. A shard-level epoch sounds like a good idea.
Yeah the main issue here is:
- serializing slot migrations with failovers AND
- stale failover proposals
For more on 2, without epochs, you could have:
- <Node A primary, Node B replica>
- Node B -> leader:
FAILOVER node-a node-b - Leader: Process + commit first
FAILOVER node-a node-b - <Node B is partitioned, doesn't hear about failover>
- <Node A comes back up, wants to be primary again>
- Node A -> leader:
FAILOVER node-b node-a - Leader: Process + commit
FAILOVER node-b node-a - <Node B times out since it is partitioned, retries the failover>
- Node B -> leader:
FAILOVER node-a node-b - <Node B, partition resolves>
- <Now Leader gets request for second
FAILOVER node-a node-b> - Leader: Process + commit second
FAILOVER node-a node-b - <Node B, back to primary again erroneously>
It's a bit of an edge case, but good to protect against
There was a problem hiding this comment.
Update: I don't know if failure detection can co-exist. If a replica marks its primary as FAIL and the leader marks it as not failing (based on AppendEntriesAck), they will compete...
Yeah I think we should converge on one source of truth. Anyways I think we should add failure detection (as I described) incrementally. We need to start somewhere
There was a problem hiding this comment.
We need to start somewhere
So we start with leader-based failure detection as in this PR and replace it later? It already seems to work so I'd like to get the remaining small missing pieces working (CLUSTER RESET is one of them). We should open issues or something for these follow-ups.
| NODE_FORGET <node-id> | ||
| Remove a node from the cluster (CLUSTER FORGET). Not yet implemented. | ||
|
|
||
| SLOT_CHANGE <node-id-or-dash> <range> [<range> ...] |
There was a problem hiding this comment.
- See later comment, but I think a per-shard fencing token would help. Then SLOT_CHANGE needs to precondition on the fencing token being unchanged. So adding a PREV-SOURCE-EPOCH parameter makes sense
- Not sure, but we may want to also add PREV-TARGET-EPOCH since a failover on the target during migration may be data loss in async replication
- We should bump the per-shard epoch on both the target and source when we do the final migration
With those changes, we will effectively serialize all failovers and migrations to/from a shard. So it will simplify our reasoning and safety
So I think:
SLOT_CHANGE [PREV-SOURCE-EPOCH <epoch> SOURCE <shard-id>] PREV-TARGET-EPOCH <epoch> TARGET <shard-id> RANGE <range count> <ranges...>
No SOURCE would indicate a slot claim, which would be rejected if the slot is claimed (or in the process of being claimed) by anyone else at that time. We can keep the dash syntax for un-claiming a slot, but it should still fence on source epoch.
WDYT?
There was a problem hiding this comment.
Yeah, source epoch sounds good. I replied to the comment above before reading theis one. 😄
Shall I add shard-epoch now or shall we get this merged and do it afterwards?
There was a problem hiding this comment.
Let's do incremental. It is already massive. But good to align!
There was a problem hiding this comment.
OK. We can move it to an issue. Agent's suggestion about this: SLOT_CHANGE should check epoch but not bump it. SET_REPLICA_OF should bump both epochs (source and target shard). Otherwise, we can lose a whole shard if it races with ASM.
Shard-epoch
-
SET_REPLICA_OF: A replica moves from shard X to shard Y (or gets promoted to
its own shard). Both shards change membership. So yes, it should bump both the
source and target shard epochs. A stale SET_REPLICA_OF that references an old
epoch of either shard should be rejected. -
SLOT_CHANGE: Slots move from shard A to shard B. This changes both shards. It
should check the epochs of both source and target shards. But should it also
bump them?
If SLOT_CHANGE bumps the epoch, then two concurrent SLOT_CHANGE entries for
different slots in the same shard would conflict — the second one would see a
bumped epoch and become a no-op, even though it's for a different slot. That's
too aggressive.
So SLOT_CHANGE should check but not bump. Only FAILOVER and SET_REPLICA_OF bump,
because they change the shard's internal structure (who is primary, who is
replica). SLOT_CHANGE just moves slots between shards — it doesn't change the
shard's internal topology.
This means: FAILOVER in shard A invalidates pending SLOT_CHANGE entries
involving shard A (correct — the migration should be rolled back). But two
SLOT_CHANGE entries for different slots in the same shard can both apply
(correct — they're independent).
| their data formats: | ||
|
|
||
| ``` | ||
| NODE_JOIN <node-id> <address> |
There was a problem hiding this comment.
As I mentioned offline - I think NODE_ADD may come with addition metadata changes. Not sure how we would want to handle that? Maybe we can just add the option to claim slot ranges as part of NODE_ADD. Or we can have it reconcile post-join, with no guarantees of success?
I'm biasing towards the latter, since anyways doing ADDSLOTS -> MEET may result in collisions today, in which case we pick an arbitrary winner. Doing the same in Raft is okay to pick an arbitrary winner IMO
There was a problem hiding this comment.
I don't know. I included the full aux metadata here as you can see. The <address> parameter is the indentification in nodes.conf format: ip:port@cport,[hostname][,aux=value]*, so that metadata does get updated.
I'm biasing towards the latter, since anyways doing ADDSLOTS -> MEET may result in collisions today, in which case we pick an arbitrary winner. Doing the same in Raft is okay to pick an arbitrary winner IMO
Yeah, as long as it's consistent.
Are you thinking about how to join two non-empty clusters?
There was a problem hiding this comment.
Yeah I'm thinking for joining two non-empty clusters. I guess we would just cache the old cluster content, and try to apply it CRDT-style on the newly joined cluster?
This will be one of the bigger pieces of work we have to do incrementally. Either way, it seems fine for now
There was a problem hiding this comment.
Yeah, I think it's possible to make joining two clusters work. CRDT-style sounds good, maybe using joint consensus or otherwise moving one node at a time using multiple log entries. Then, we can also make MEET and ADDSLOTS non-blocking again to behave more like in legacy.
| `5461`. A dash as node-id means "no owner" (delete slots) or "no | ||
| primary" (promote to primary). | ||
|
|
||
| ### Why typed entries instead of a key-value store? |
There was a problem hiding this comment.
We can callout the other projects (Kafka, Cockroach, TiKV). The language I saw was "domain specific operations". Etcd is the only one that does the KV style, and it makes sense since that is a more general-purpose design.
There was a problem hiding this comment.
Please suggest some text. :)
| two primaries. | ||
|
|
||
| With typed entries: | ||
| - Both FAILOVER entries get committed in order. |
There was a problem hiding this comment.
I think Domain Specific Operations can still do fencing, we don't need to commit twice. It will be much simpler if we have leader pre-validate and, once replicated, always apply.
There was a problem hiding this comment.
Much simpler?
I don't know. Leader validation adds complexity and it needs to send an reply to the propose which the proposer needs to handle. In which way does it get simpler?
Doing it at apply-time (when it's committed) using shard-epoch seems simpler. An entry with a mismatching shard-epoch will be ignored as a no-op.
There was a problem hiding this comment.
The tradeoff:
- No validation (current): simple leader, conflicts resolved at apply time. Works well when conflicts are rare.
- Leader validation: rejects stale proposals early. More complex leader, but cleaner log. Useful if conflicts become common.
- Apply-time guard (shard-epoch): entries enter the log but are no-ops on apply. Simple, but wastes log space.
There was a problem hiding this comment.
I think in the completeness of time they will both be useful:
- Byzantine/buggy leaders can fail pre-validation - good to still have apply time checks
- Pre-validation helps keep the log clean and save work
- Pre-validation helps fast fail if we know the proposal will fail anyways, which is useful signal to the proposer
But - for starters - we only need apply time validation. So I'm okay with it
There was a problem hiding this comment.
Yeah, after chatting yesterday, I think it will be enough that the leader sends a REJECT message back to the proposer if it rejects it. If it doesn't reject it, the ACK is when it appears in the log. We can add it later. Old nodes that don't support it will just time out some proposals (commands like manual failover will hang until timing out, 3 * node_timeout in current code IIRC).
Include the shard-id in the SET_REPLICA_OF log entry so all nodes assign the same shard-id deterministically from the log. For promotion (REPLICATE NO ONE): a new random shard-id is generated at proposal time and included in the entry. For assignment: the primary's current shard-id is included and used as a guard on apply — if the primary has moved to a different shard, the entry is a no-op. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Move automatic failover scheduling from NODE_FAIL apply to beforeSleep via a todo_schedule_failover flag. NODE_RECOVER clears the flag if the primary recovers before beforeSleep runs. This prevents stale failover proposals when a node catches up on old log entries — a NODE_FAIL followed by NODE_RECOVER in the same batch will cancel the failover instead of acting on outdated state. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
clusterAutoFailoverOnShutdown sends CLUSTER FAILOVER FORCE via the replication stream. The receiving replica processes it with a NULL callback (no client to reply to). The legacy failover function called the callback unconditionally, causing a NULL function pointer crash. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Implement both soft and hard reset. Both clear all slots, forget all nodes, reset raft state (term, log, role), and assign a new shard-id. Hard reset also changes the node ID. Soft reset keeps the node ID, which is safe when used after CLUSTER FORGET (e.g. valkey-cli --cluster del-node). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Test suite that simulates cluster nodes from Tcl by speaking the raft wire protocol directly on the cluster bus. Includes helpers for connecting, sending, receiving raft messages and replying to AE. Tests: HELLO/HI handshake, REPL_OFFSETS delivery via AE (as follower), and REPL_OFFSETS broadcast from leader after offset transition. Fix REPL_OFFSETS message format: use sdscatlen for node names instead of sdscatfmt with printf-style %.40s (which sdscatfmt doesn't support). Skip broadcasting REPL_OFFSETS for nodes not yet in the log (MEET flag) or with offset 0. Only send to followers that have acked at least one entry. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Move the reconnect logic (buffer limit check, handshake timeout cleanup, throttled reconnection with inbound_link bypass and budget) from cluster_legacy.c into the shared clusterConnectNodes() in cluster_link.c. Both gossip and raft now use the same reconnect path. Gossip retains its own per-node maintenance pass for PFAIL counting and MEET-for-missing-inbound (clusterNodeCronGossipMaintenance). Move NODE_CONNECTION_RETRIES_PER_TIMEOUT to cluster_link.h. This fixes the links.tcl test for raft: after a link is freed due to buffer overflow, the node reconnects promptly even when node_timeout is set very high (the inbound_link bypass skips throttling). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
The migration state on the importing node may not be updated immediately after the slot change commits in raft. Use wait_for_condition instead of a bare assert. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
- Extract clusterRaftBroadcastNodeOffset helper to avoid duplication. - Remove redundant RAFT_DATA(node)->peer.repl_offset field; use the shared node->repl_offset exclusively. - Include leader's own offset in periodic REPL_OFFSETS broadcast. - Broadcast leader's own offset on 0->non-zero transition (e.g. after demotion and PSYNC completion). - Add 'Deleting keys in dirty slot' log message in SLOT_CHANGE apply. - Report total_cluster_links_buffer_limit_exceeded in raft cluster info. - Skip gossip-specific tests: config epoch (base.tcl), gossip packet filter (slot-ownership.tcl), valkey-cli --cluster create (cli.tcl). - Fix cluster-raft-proto.tcl to handle AE heartbeats in REPL_OFFSETS wait loop. - Fix clang-format alignment in cluster.c. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Instead of calling clusterSetPrimary/replicationUnsetPrimary during log apply (which can trigger repeated reconnects when multiple entries are applied in a batch), set a todo_update_replication flag and process it in beforeSleep. This ensures only one replication state change happens after all log entries are applied. Also fix clusterSetPrimary arguments: use closeSlots=0 and full_sync_required=0 for same-shard operations (FAILOVER sibling reassignment and primary demotion), matching gossip behavior. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Extract clusterRaftBuildAllOffsetsMsg helper to build the REPL_OFFSETS message once and send to multiple peers without rebuilding. Send REPL_OFFSETS to a peer immediately after HI handshake completes (when the leader receives HI on an outbound link). This ensures peers get accurate offset data right after connecting, without waiting for the next periodic broadcast. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Fix the backdating optimization on leader election: save the old leader's name before overwriting rs->leader, so the comparison actually matches the old leader's peer entry. Add majority-overdue check in failure detection: if more than half of all peers have stale last_ack_time (exceeding node_timeout), the problem is likely on the leader's side (it was paused or partitioned). Reset all ack times instead of proposing NODE_FAIL for everyone. This prevents spurious failovers after a leader pause/resume with split votes delaying the election. Send a PING on the replication stream immediately when the first replica comes online and master_repl_offset is 0. This ensures replicas get a non-zero offset right away instead of waiting for repl-ping-replica-period (10s). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Replace mstime() with monotonicMs() (getMonotonicUs()/1000) throughout cluster_raft.c. This makes failure detection, election timeouts, and heartbeat intervals immune to system clock adjustments (NTP jumps). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Move the cluster-v2 (raft protocol) test run from a step in test-ubuntu-latest to its own job (test-cluster-raft). This allows it to run in parallel and makes failures easier to identify. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Free pending_proposals, pending_meets, raft log entries, and the raft state struct itself in handleServerShutdown. This prevents memory leaks detected by the macOS leaks tool during test cleanup. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Replica migration (automatic movement of replicas to orphaned primaries) is not yet implemented in raft. Skip the tests and document it as future work in the design doc. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Only write the raftLeader field if a leader is actually known (leader[0] != 0). Previously, an all-zeros leader produced a truncated line that failed parsing on restart. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Skip the pubsub byte stats test for raft (byte tracking not yet implemented for the raft cluster bus). Fix memory leak in CLUSTER RESET: free raft log entries and pending proposals/meets before resetting the state. Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Both tests shut down and restart nodes, which requires raft persistence (not yet implemented). Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
This PR implements a Raft-based cluster bus protocol as an alternative to the gossip protocol. All cluster metadata operations go through Raft consensus, providing strong consistency for cluster state changes.
Based on the
cluster-v2branch (protocol separation).The admin commands CLUSTER MEET, etc. are preserved, to make it as easy as possible for users to keep their existing control plane logic when migrating. The existing test framework for cluster tests can also be reused (except where they test gossip-specific cluster bus packet details).
Design
See design-docs/cluster-raft.md for the full design document including sequence diagrams and message formats.
This PoC uses a simple text-based protocol for the cluster bus. The focus is on checking how much of the existing testing infrastructure and test cases can be made working.
Raft log entry types
last_ack_timein cronWire protocol messages
BLOCKED_ASYNC mechanism
Cluster mutation commands block the client asynchronously until the Raft entry is committed. This is implemented via
blockClientAsync()/consumeBlockedClientAsyncHandle()insrc/blocked.c. Cluster mutation commands are rejected inside MULTI/EXEC for raft.Test results (--cluster-v2)
Example:
The
auto-failover-on-shutdown.tcltests are flaky and need leader transfer on shutdown to be reliable. That's future work.All other tests that don't work with Raft yet are marked with the tag
cluster-v2:skipwith a comment about why each test is skipped.TODO
Completed
--cluster-v2flag andcluster-v2:skiptagRemaining
valkey-cli --clustertooling compatibility (uses MULTI internally)total_cluster_links_buffer_limit_exceededstat for raft links