Conversation
Add EzRaft as a simplified wrapper around openraft that dramatically reduces the complexity of building distributed consensus applications. Users only need to implement two traits with three methods total, compared to 21+ methods in raw openraft. Changes: - Add EzEntry type with (u64, u64) log ID that implements RaftEntry - Add EzStorage trait for persistence (2 methods: load_state, save_state) - Add EzStateMachine trait for business logic (1 method: apply) - Add EzRaft with built-in HTTP networking and configuration - Add complete KV store example demonstrating the framework - Organize code into types, traits, config, storage, network, and server modules
Add `load_log_range(start, end)` method to avoid loading all logs into memory when only a subset is needed. This improves efficiency during replication when reading specific log entries. Changes: - Add `load_log_range()` to `EzStorage` trait for range-based log reading - Update `try_get_log_entries()` to use `load_log_range()` instead of `load_state()` - Use `LogIndexOptionExt::next_index()` for cleaner range bounds calculation - Implement `load_log_range()` in kvstore example
Replace `EzFullState` struct with a simple tuple `(EzMeta<T>, Option<EzSnapshot<T>>)`.
The struct was just a thin wrapper after removing the `logs` field, so a tuple
is cleaner and more direct.
Changes:
- Remove `EzFullState` struct
- Add `EzSnapshot<T>` type alias for `(EzSnapshotMeta<T>, Vec<u8>)`
- Change `load_state()` to return `Option<(EzMeta<T>, Option<EzSnapshot<T>>)>`
- Update all call sites and documentation
Upgrade tip:
Update `load_state()` implementations:
- Before: `Ok(Some(EzFullState { meta, snapshot }))`
- After: `Ok(Some((meta, snapshot)))`
Update imports:
- Remove: `use ezraft::EzFullState;`
- Add: `use ezraft::EzSnapshot;` (if needed)
Clean up the kvstore example by using derive_more for Display, renaming types for clarity, and serializing EzEntry directly. Add helper method to StorageAdapter to reduce boilerplate in metadata persistence. Changes: - Use `derive_more::Display` for `Request` enum - Rename `KvStoreTypes` to `KvTypes`, `KvStore` to `KvStateMachine` - Serialize `EzEntry` directly instead of intermediate `StoredEntry` - Add `save_meta()` helper to `StorageAdapter` for metadata persistence - Add TODO for `applied_state()` returning incorrect value
Simplify log ID conversions from `LogId` to `EzLogId` using the new `RaftLogId::to_type()` method instead of manual tuple construction. Changes: - Replace `(**id.committed_leader_id(), id.index)` with `id.to_type()` - Simplify conversions in `truncate_after()`, `purge()`, `install_snapshot()` - Simplify conversions in `EzEntry::new()` and `set_log_id()`
Improve error handling in `load_state()` to properly distinguish between first-run (NotFound) and actual errors. Also simplify `log_id_parts()`. Changes: - Distinguish `NotFound` from other errors in meta/snapshot loading - Propagate non-NotFound errors instead of silently ignoring them - Simplify `log_id_parts()` to use `RaftLogId::log_id_parts()` directly
Add StateMachineState wrapper to properly track Raft metadata alongside the user's state machine. This fixes applied_state to return correct values and ensures membership changes are tracked. Changes: - Add StateMachineState wrapper for tracking last_applied and membership - Change load_state return type to always return EzMeta, default if first run - Make EzSnapshotMeta a type alias to SnapshotMeta - Add build_snapshot and install_snapshot to EzStateMachine trait - Send response for all entry types: Normal, Membership, Blank - Add Default bound to EzTypes::Response
Simplify snapshot handling by making EzSnapshot a direct alias to OpenRaft's Snapshot type. This provides full compatibility and reduces type conversions. Changes: - Make EzSnapshot a type alias to Snapshot<OpenRaftTypes<T>> - Update EzStateUpdate::WriteSnapshot to take EzSnapshot directly - Simplify get_current_snapshot to return EzSnapshot directly - Update kvstore example to work with new Snapshot type
Describe EzRaft's dual purpose: lowering the barrier for beginners who want quick prototypes, and serving as an API design laboratory for exploring intuitive interface patterns that may inform future OpenRaft improvements.
Add Status section to clarify this is an experimental API design laboratory. Reorder goals to prioritize API exploration over prototyping. Note that APIs may change and production use is not the primary audience.
Describe the planned stable phase: well-considered abstractions that expose necessary details while hiding complexity.
Update trait signatures and examples to reflect recent changes: - load_state returns (EzMeta, Option<EzSnapshot>) - EzStorage has 3 methods: load_state, save_state, load_log_range - EzStateMachine has 3 methods: apply, build_snapshot, install_snapshot - Response requires Default derive - Update method counts in overview and comparison table
Remove cluster_name, election_timeout_min_ms, election_timeout_max_ms. Use Duration type for heartbeat_interval instead of milliseconds. Calculate election timeout automatically as 3-6x heartbeat interval.
Rename for clarity and consistency: - MyAppTypes → AppTypes - FileStorage → AppStorage - MyStore → AppStateMachine - store → state_machine
Improve method naming to better reflect their purpose: - `restore` clearly indicates recovering state on startup - `persist` is a single verb covering all write operations - `read_logs` describes the action, not the mechanism Changes: - Rename `load_state()` to `restore()` - Rename `save_state()` to `persist()` - Rename `load_log_range()` to `read_logs()` - Update doc examples with consistent variable names
Clean up unused type aliases that were never referenced in the codebase. Only EzVote is kept as it's used in the EzMeta struct. Changes: - Remove EzOpenRaft, EzLogIdOf, EzEntryOf, EzMembershipOf, EzSnapshotDataOf - Remove unused LogId and Membership imports
Simplify kvstore example and fix all clippy warnings in the crate. Changes: - Use Default derive for KvStateMachine instead of manual new() - Remove unnecessary Serialize/Deserialize derives from KvTypes - Remove unused type parameters S, M from server handler functions - Use io::Error::other() instead of io::Error::new(ErrorKind::Other, ...) - Fix non-canonical PartialOrd implementation
Simplify the persistence operation enum name to match the method name. Also simplified variant names: WriteMeta -> Meta, WriteLog -> Log, WriteSnapshot -> Snapshot. Changes: - Rename EzStateUpdate to Persist - Rename variants to Meta, Log, Snapshot - Update all usages in code, docs, and examples
More descriptive variant name that matches the type it contains. Changes: - Rename Persist::Log to Persist::LogEntry
Put T: EzTypes first for consistency with other structs. Changes: - Change StorageState<S, T> to StorageState<T, S>
Fix warnings after simplifying EzRaft structure. Changes: - Remove unused imports (StateMachineState, StorageState, Mutex) - Add doc comment for storage field - Add storage() accessor method
Consistent with the rest of the codebase. Changes: - Change EzNetworkFactory<C> to EzNetworkFactory<T: EzTypes> - Change Network<C> to Network<T: EzTypes> - Use C<T> type alias for OpenRaftTypes<T>
Remove unnecessary type parameters from `EzNetworkFactory` and `Network` structs. These structs only store runtime data (addr, client, target) that does not depend on the Raft type configuration. Type parameters are now only used at the impl level where needed for trait implementations. Changes: - Remove `T: EzTypes` type parameter from `EzNetworkFactory` and `Network` - Remove `PhantomData<T>` fields from both structs - Add `Cfg` type parameter to `Network::request()` method - Use `#[derive(Default)]` for `EzNetworkFactory`
New nodes can automatically join an existing cluster by specifying a seed
node address. The leader generates a unique node ID from the log index,
eliminating manual node ID assignment.
Changes:
- Add `node_id` field to `EzMeta` for persistence
- Add `/api/join` endpoint for cluster join requests
- Change `EzRaft::new()` to accept `seed_addr: Option<String>` instead of `node_id`
- Auto-initialize first node (node_id=0) when no seed provided
- Add `save_meta()` and `node_id()` methods to `StorageAdapter`
- Update kvstore example to use `--seed` instead of `--node-id`
Upgrade tip:
Update `EzRaft::new()` calls from:
`EzRaft::new(node_id, addr, sm, storage, config)`
to:
`EzRaft::new(addr, sm, storage, config, seed_addr)`
For the first node, use `None` as seed_addr.
For joining nodes, use `Some("leader_addr")` as seed_addr.
Refactor the HTTP server to hold `EzRaft` instead of the raw OpenRaft instance, enabling handlers to use higher-level EzRaft methods directly. Remove redundant `/api/init` and `/api/add_learner` endpoints since cluster initialization is now automatic and learners are added via the `/api/join` flow. Changes: - Make `EzRaft` cloneable with manual `Clone` impl (no bounds on S, M) - Server holds `EzRaft<T, S, M>` instead of `Arc<openraft::Raft<C<T>>>` - Remove `/api/init` endpoint (auto-init when `node_id == 0`) - Remove `/api/add_learner` endpoint (handled by `/api/join`) - Fix auto-init condition: `node_id == 0` instead of `seed_addr.is_none()` - Use `BTreeMap::from_iter()` and `.ok()` for cleaner code
Use OpenRaft `ChangeMembers` type directly as the API payload instead of a custom wrapper struct. This follows the principle of using transparent types to avoid unnecessary complexity. Changes: - Change `change_membership()` to accept `ChangeMembers` directly - Remove `ChangeMembershipRequest` wrapper struct
Consolidate HTTP handlers into an EzServer struct to reduce type parameter repetition. Each handler is now a method on EzServer, with type bounds defined once on the impl block. Changes: - Add `EzServer<T, S, M>` struct to wrap `EzRaft` - Convert standalone handler functions to `EzServer` methods - Rename handler parameter from `data` to `ez` for clarity - Remove unused `PhantomData` from `StorageAdapter`
Simplify the type hierarchy by using dynamic dispatch for the state machine trait. This reduces type parameters from 3 to 2 across the entire API, making EzRaft easier to use while incurring only one heap allocation. Changes: - Change `StateMachineState<T, M>` to `StateMachineState<T>` with boxed trait object - Change `StorageAdapter<T, S, M>` to `StorageAdapter<T, S>` - Change `EzRaft<T, S, M>` to `EzRaft<T, S>` - Change `EzServer<T, S, M>` to `EzServer<T, S>` - Update `new()` methods to accept `impl EzStateMachine<T> + 'static` Upgrade tip: Update type annotations from 3 parameters to 2: - `EzRaft::<Types, _, _>::new(...)` → `EzRaft::<Types, _>::new(...)` - `EzServer<T, S, M>` → `EzServer<T, S>` - `StorageAdapter<T, S, M>` → `StorageAdapter<T, S>` The `EzTypes` marker type now requires `Serialize + Deserialize` derives.
Further simplify the type hierarchy by using dynamic dispatch for the storage trait. Combined with the previous commit, this reduces EzRaft from 3 type parameters down to just 1, making the API maximally simple. Changes: - Change `StorageState<T, S>` to `StorageState<T>` with boxed trait object - Change `StorageAdapter<T, S>` to `StorageAdapter<T>` - Change `EzRaft<T, S>` to `EzRaft<T>` - Change `EzServer<T, S>` to `EzServer<T>` - Update `new()` methods to accept `impl EzStorage<T> + 'static` Upgrade tip: Update type annotations from 2 parameters to 1: - `EzRaft::<Types, _>::new(...)` → `EzRaft::<Types>::new(...)` - `EzServer<T, S>` → `EzServer<T>` - `StorageAdapter<T, S>` → `StorageAdapter<T>`
Rename methods and types for clarity and consistency with standard terminology. Changes: - Rename `EzStorage::restore()` to `load()` (pairs with `persist()`) - Rename `StorageState<T>` to `StorageWithCache<T>` - Rename field `storage_state` to `storage` in `StorageAdapter`
📝 WalkthroughWalkthroughThe pull request introduces a new Changes
Sequence DiagramsequenceDiagram
participant Client
participant EzRaft
participant StorageAdapter
participant UserStateMachine
participant UserStorage
Client->>EzRaft: write(request)
EzRaft->>EzRaft: propose to OpenRaft
EzRaft->>StorageAdapter: append log entry
StorageAdapter->>UserStorage: persist LogEntry
UserStorage-->>StorageAdapter: ok
StorageAdapter->>StorageAdapter: apply entry to SM
StorageAdapter->>UserStateMachine: apply(request)
UserStateMachine-->>StorageAdapter: response
StorageAdapter-->>EzRaft: log applied
EzRaft-->>Client: T::Response
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Greptile OverviewGreptile SummaryIntroduces Key changes:
Critical issues found:
Confidence Score: 2/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant User as User Application
participant EzRaft as EzRaft
participant Storage as StorageAdapter
participant UserStorage as User Storage Impl
participant UserSM as User StateMachine
participant OpenRaft as OpenRaft Core
participant Network as HTTP Network
Note over User,Network: Node Initialization & Cluster Join
User->>EzRaft: new(addr, state_machine, storage, config, seed)
EzRaft->>Storage: new(user_storage, user_sm)
Storage->>UserStorage: load()
UserStorage-->>Storage: (EzMeta, Option<Snapshot>)
alt Has seed node (joining cluster)
EzRaft->>Network: HTTP POST /api/join to seed
Network-->>EzRaft: node_id assigned
EzRaft->>Storage: save_meta(node_id)
Storage->>UserStorage: persist(Meta)
else First node (no seed)
EzRaft->>EzRaft: node_id = 0
EzRaft->>Storage: save_meta(node_id = 0)
EzRaft->>OpenRaft: initialize(single node membership)
end
EzRaft->>OpenRaft: Raft::new(node_id, config, network, storage)
EzRaft-->>User: EzRaft instance
Note over User,Network: Client Write Operation
User->>EzRaft: write(request)
EzRaft->>OpenRaft: client_write(request)
OpenRaft->>Storage: append(log_entries)
loop For each entry
Storage->>UserStorage: persist(LogEntry)
end
Storage->>UserStorage: persist(Meta with updated last_log_id)
OpenRaft->>Network: replicate to followers
Network->>Network: HTTP POST /raft/append to peers
Note over OpenRaft: Wait for quorum
OpenRaft->>Storage: apply(entries)
Storage->>UserSM: apply(request)
UserSM-->>Storage: response
Storage-->>OpenRaft: response
OpenRaft-->>EzRaft: response
EzRaft-->>User: response
Note over User,Network: Snapshot Creation
OpenRaft->>Storage: build_snapshot()
Storage->>UserSM: build_snapshot()
UserSM-->>Storage: Vec<u8>
Storage->>UserStorage: persist(Snapshot)
UserStorage-->>Storage: Ok
Storage-->>OpenRaft: Snapshot
|
| async fn truncate_after(&mut self, last_log_id: Option<LogId<OpenRaftTypes<T>>>) -> Result<(), std::io::Error> { | ||
| self.save_meta(|m| { | ||
| m.last_log_id = last_log_id.map(|id| id.to_type()); | ||
| }) | ||
| .await | ||
| } |
There was a problem hiding this comment.
truncate_after only updates metadata but doesn't inform the user storage to delete the truncated logs. This could lead to inconsistencies where read_logs returns entries that should have been truncated.
Consider adding a new method to the EzStorage trait (e.g., delete_logs_after) or documenting that users should handle stale logs during reads.
Prompt To Fix With AI
This is a comment left during a code review.
Path: ezraft/src/storage.rs
Line: 172:177
Comment:
`truncate_after` only updates metadata but doesn't inform the user storage to delete the truncated logs. This could lead to inconsistencies where `read_logs` returns entries that should have been truncated.
Consider adding a new method to the `EzStorage` trait (e.g., `delete_logs_after`) or documenting that users should handle stale logs during reads.
How can I resolve this? If you propose a fix, please make it concise.| async fn append<I>(&mut self, entries: I, callback: IOFlushed<OpenRaftTypes<T>>) -> Result<(), std::io::Error> | ||
| where | ||
| I: IntoIterator<Item = <OpenRaftTypes<T> as RaftTypeConfig>::Entry> + Send, | ||
| I::IntoIter: Send, | ||
| { | ||
| let mut last_log_id = None; | ||
|
|
||
| // Save all log entries | ||
| for entry in entries { | ||
| last_log_id = Some(entry.log_id); | ||
| let update = Persist::LogEntry(entry); | ||
| let mut state = self.storage.lock().await; | ||
| state.storage.persist(update).await?; | ||
| } |
There was a problem hiding this comment.
acquiring and releasing the mutex for each entry in the loop creates a potential race condition where other threads could interleave operations between entries
| async fn append<I>(&mut self, entries: I, callback: IOFlushed<OpenRaftTypes<T>>) -> Result<(), std::io::Error> | |
| where | |
| I: IntoIterator<Item = <OpenRaftTypes<T> as RaftTypeConfig>::Entry> + Send, | |
| I::IntoIter: Send, | |
| { | |
| let mut last_log_id = None; | |
| // Save all log entries | |
| for entry in entries { | |
| last_log_id = Some(entry.log_id); | |
| let update = Persist::LogEntry(entry); | |
| let mut state = self.storage.lock().await; | |
| state.storage.persist(update).await?; | |
| } | |
| async fn append<I>(&mut self, entries: I, callback: IOFlushed<OpenRaftTypes<T>>) -> Result<(), std::io::Error> | |
| where | |
| I: IntoIterator<Item = <OpenRaftTypes<T> as RaftTypeConfig>::Entry> + Send, | |
| I::IntoIter: Send, | |
| { | |
| let mut last_log_id = None; | |
| // Hold lock for entire operation to ensure atomicity | |
| let mut state = self.storage.lock().await; | |
| // Save all log entries | |
| for entry in entries { | |
| last_log_id = Some(entry.log_id); | |
| let update = Persist::LogEntry(entry); | |
| state.storage.persist(update).await?; | |
| } | |
| <details><summary>Prompt To Fix With AI</summary> | |
| `````markdown | |
| This is a comment left during a code review. | |
| Path: ezraft/src/storage.rs | |
| Line: 148:161 | |
| Comment: | |
| acquiring and releasing the mutex for each entry in the loop creates a potential race condition where other threads could interleave operations between entries | |
| ```suggestion | |
| async fn append<I>(&mut self, entries: I, callback: IOFlushed<OpenRaftTypes<T>>) -> Result<(), std::io::Error> | |
| where | |
| I: IntoIterator<Item = <OpenRaftTypes<T> as RaftTypeConfig>::Entry> + Send, | |
| I::IntoIter: Send, | |
| { | |
| let mut last_log_id = None; | |
| // Hold lock for entire operation to ensure atomicity | |
| let mut state = self.storage.lock().await; | |
| // Save all log entries | |
| for entry in entries { | |
| last_log_id = Some(entry.log_id); | |
| let update = Persist::LogEntry(entry); | |
| state.storage.persist(update).await?; | |
| } | |
| How can I resolve this? If you propose a fix, please make it concise. |
| async fn purge(&mut self, log_id: LogId<OpenRaftTypes<T>>) -> Result<(), std::io::Error> { | ||
| self.save_meta(|m| m.last_purged = Some(log_id.to_type())).await | ||
| } |
There was a problem hiding this comment.
purge only updates metadata but doesn't delete purged log files from user storage. Over time, this will accumulate deleted logs on disk indefinitely.
Consider adding a method to EzStorage trait to physically delete logs (e.g., delete_logs_before).
Prompt To Fix With AI
This is a comment left during a code review.
Path: ezraft/src/storage.rs
Line: 179:181
Comment:
`purge` only updates metadata but doesn't delete purged log files from user storage. Over time, this will accumulate deleted logs on disk indefinitely.
Consider adding a method to `EzStorage` trait to physically delete logs (e.g., `delete_logs_before`).
How can I resolve this? If you propose a fix, please make it concise.| async fn read_logs(&mut self, start: u64, end: u64) -> io::Result<Vec<EzEntry<Types>>> { | ||
| let mut logs = Vec::new(); | ||
|
|
||
| for index in start..end { | ||
| let data = match fs::read(&self.log_path(index)).await { | ||
| Ok(d) => d, | ||
| Err(e) if e.kind() == io::ErrorKind::NotFound => continue, | ||
| Err(e) => return Err(e), | ||
| }; | ||
| logs.push(serde_json::from_slice(&data)?); | ||
| } | ||
|
|
||
| Ok(logs) | ||
| } |
There was a problem hiding this comment.
silently skips missing log entries with continue, which could hide corruption. If last_log_id is set to index 10 but only logs 1-5 exist, this returns a partial result instead of erroring.
Consider returning an error when expected logs are missing to catch data inconsistencies early.
Prompt To Fix With AI
This is a comment left during a code review.
Path: ezraft/examples/kvstore.rs
Line: 182:195
Comment:
silently skips missing log entries with `continue`, which could hide corruption. If `last_log_id` is set to index 10 but only logs 1-5 exist, this returns a partial result instead of erroring.
Consider returning an error when expected logs are missing to catch data inconsistencies early.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In @ezraft/README.md:
- Around line 28-106: The Quick Start snippet is missing essential imports
causing compile errors: add std::path::PathBuf and std::io for AppStorage and
I/O return types, import async_trait::async_trait for the EzStorage and
EzStateMachine impls, and use a concrete Result return type (e.g.,
anyhow::Result or std::result::Result<(), Box<dyn std::error::Error>>) for main.
Update the top-of-file imports so AppStorage (base_dir: PathBuf), the
async_trait annotations on impls for EzStorage<AppTypes> and
EzStateMachine<AppTypes>, and the async main() signature all compile without
errors.
In @ezraft/src/lib.rs:
- Line 76: The public re-export "pub use openraft::RaftTypeConfig;" exposes
internal OpenRaft types unnecessarily; remove the public leak by either deleting
the re-export or changing it to a crate-only export (e.g., "pub(crate) use
openraft::RaftTypeConfig;") so only internal modules like storage.rs and
network.rs can access it; update any internal imports if you change visibility
and run a build to ensure no external code relied on the public symbol.
In @ezraft/src/network.rs:
- Around line 50-59: The code currently calls
Client::builder().no_proxy().build().unwrap() inside
EzNetworkFactory::new_client which can panic and also creates a new
reqwest::Client per node; instead, move the Client construction out of
new_client into EzNetworkFactory as a shared field (e.g., store a
reqwest::Client in the EzNetworkFactory struct), construct the client once when
the factory is created and surface any build error there (return a Result or
handle initialization error outside this method) so no unwrap is used, and
change new_client to clone/share that stored client (remove the per-target
Client::builder() call and the unwrap) when constructing Network in new_client.
In @ezraft/src/raft.rs:
- Around line 256-305: The request_join function can loop indefinitely following
leader redirects; add a retry limit by introducing a max_retries constant or
parameter and a retry counter inside request_join, incrementing it each loop
iteration and returning an io::Error (e.g., "max join retries exceeded") when
the counter exceeds max_retries; keep using the existing client, target_addr,
JoinRequest, and JoinResponse logic but bail out with a clear error after the
configured number of redirects to prevent infinite loops.
🧹 Nitpick comments (6)
ezraft/Cargo.toml (1)
12-12: Consider using specific tokio features instead of "full".The
tokiodependency currently uses the "full" feature set, which includes many features that may not be needed. For an experimental/prototype crate, this is acceptable for convenience. However, if EzRaft moves toward production use, consider specifying only the required features (e.g., "rt-multi-thread", "macros", "sync", "time", "io-util", "net") to reduce compile times and binary size.ezraft/src/lib.rs (1)
10-63: Consider making the documentation example compile and test.The example is currently marked with
ignore, preventing it from being tested bycargo test --doc. This means the example could drift out of sync with actual API changes. Consider either:
- Making the example complete and removing the
ignoreflag so it's tested, or- Referencing the complete
examples/kvstore.rsfile instead with#![doc = include_str!("../examples/kvstore.rs")]or similar patternezraft/src/network.rs (1)
68-95: Consider adding request timeout for Raft RPC reliability.The HTTP requests lack explicit timeouts. For Raft protocol correctness, requests should time out to allow leader election and failure detection to proceed. OpenRaft's
RPCOptionlikely provides timeout hints that could be used.Suggested improvement
async fn request<Req, Resp, Err, Cfg>( &mut self, uri: impl Display, req: Req, + timeout: Option<std::time::Duration>, ) -> Result<Result<Resp, Err>, RPCError<Cfg>> where Cfg: openraft::RaftTypeConfig, Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, Err: std::error::Error + Serialize + DeserializeOwned, { let url = format!("http://{}/{}", self.addr, uri); - let resp = self.client.post(url.clone()).json(&req).send().await.map_err(|e| { + let mut request_builder = self.client.post(url.clone()).json(&req); + if let Some(t) = timeout { + request_builder = request_builder.timeout(t); + } + let resp = request_builder.send().await.map_err(|e| { + if e.is_timeout() { + return RPCError::Timeout(openraft::error::Timeout::new("request timed out")); + } if e.is_connect() { RPCError::Unreachable(Unreachable::new(&e)) } else { RPCError::Network(NetworkError::new(&e)) } })?;ezraft/examples/kvstore.rs (1)
71-98: State machine implementation is correct.Minor nit:
key.clone()on line 76 is unnecessary sinceinserttakes ownership of the key.Optional fix
Request::Set { key, value } => { - self.data.insert(key.clone(), value); + self.data.insert(key, value); Response { value: None } }ezraft/src/raft.rs (1)
62-150: Consider limiting retries or adding backoff in cluster initialization.Two observations:
Line 141:
raft.initialize(nodes).await.ok()silently ignores all errors. While this handles the restart case, it also swallows unexpected errors. Consider logging or checking the specific error type.The join logic in
request_join(called on line 111) has an infinite loop that could hang if the cluster is in a bad state.Optional: Log initialization errors
if node_id == 0 { use std::collections::BTreeMap; let nodes = BTreeMap::from_iter([(node_id, BasicNode::new(http_addr.clone()))]); - // Ignore error if already initialized (restart case) - raft.initialize(nodes).await.ok(); + // Ignore "already initialized" error (restart case), but log unexpected errors + if let Err(e) = raft.initialize(nodes).await { + // InitializeError::NotAllowed is expected on restart + tracing::debug!("initialize returned error (expected on restart): {}", e); + } }ezraft/src/storage.rs (1)
148-170: Lock acquired per-entry causes inefficiency and potential interleaving.The lock is acquired and released for each entry in the loop (line 159), then again in
save_meta(line 165). This is inefficient for large batches and allows other operations to interleave between entries of the same append batch.Proposed fix: hold lock for entire batch
async fn append<I>(&mut self, entries: I, callback: IOFlushed<OpenRaftTypes<T>>) -> Result<(), std::io::Error> where I: IntoIterator<Item = <OpenRaftTypes<T> as RaftTypeConfig>::Entry> + Send, I::IntoIter: Send, { let mut last_log_id = None; + let mut state = self.storage.lock().await; // Save all log entries for entry in entries { last_log_id = Some(entry.log_id); let update = Persist::LogEntry(entry); - let mut state = self.storage.lock().await; state.storage.persist(update).await?; } // Update metadata once with the last entry's log_id if let Some(log_id) = last_log_id { - self.save_meta(|m| m.last_log_id = Some(log_id)).await?; + state.cached_meta.last_log_id = Some(log_id); + let update = Persist::Meta(state.cached_meta.clone()); + state.storage.persist(update).await?; } callback.io_completed(Ok(())); Ok(()) }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
Cargo.tomlezraft/Cargo.tomlezraft/README.mdezraft/examples/kvstore.rsezraft/src/config.rsezraft/src/lib.rsezraft/src/network.rsezraft/src/raft.rsezraft/src/server.rsezraft/src/storage.rsezraft/src/trait_.rsezraft/src/type_config.rsezraft/src/types.rs
🧰 Additional context used
🧬 Code graph analysis (6)
ezraft/src/trait_.rs (2)
ezraft/examples/kvstore.rs (6)
load(134-157)persist(159-180)read_logs(182-195)apply(73-88)build_snapshot(90-92)install_snapshot(94-97)ezraft/src/storage.rs (3)
apply(248-285)build_snapshot(339-362)install_snapshot(295-327)
ezraft/src/lib.rs (1)
ezraft/src/raft.rs (1)
storage(251-253)
ezraft/src/types.rs (1)
ezraft/src/type_config.rs (3)
fmt(74-76)clone(42-44)default(48-50)
ezraft/src/type_config.rs (1)
ezraft/src/types.rs (4)
clone(118-125)default(131-138)fmt(46-48)fmt(55-61)
ezraft/src/raft.rs (3)
ezraft/src/storage.rs (2)
new(83-111)node_id(122-124)ezraft/src/server.rs (3)
new(32-34)run(37-55)run(150-154)ezraft/src/network.rs (1)
new(45-47)
ezraft/src/network.rs (4)
ezraft/src/raft.rs (2)
new(92-150)addr(236-238)ezraft/src/server.rs (1)
new(32-34)multiraft/src/network.rs (1)
target(134-136)openraft/src/error/mod.rs (1)
with_raft_error(348-354)
🪛 GitHub Actions: ci
ezraft/src/server.rs
[warning] 1-1: Rustfmt diff detected in server.rs: formatting changes are suggested.
ezraft/src/raft.rs
[warning] 1-1: Rustfmt diff detected in raft.rs: formatting changes are suggested.
ezraft/src/network.rs
[warning] 1-1: Rustfmt diff detected in network.rs: formatting changes are suggested.
🪛 GitHub Check: examples (nightly, log-mem)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, multi-raft-kv)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-grpc)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-network-v2)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-opendal-snapshot-data)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-single-threaded)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, raft-kv-rocksdb)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: examples (nightly, rocksstore)
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
🪛 GitHub Check: lint
ezraft/src/server.rs
[warning] 148-148:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
[warning] 117-117:
Diff in /home/runner/work/openraft/openraft/ezraft/src/server.rs
ezraft/src/raft.rs
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 193-193:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
ezraft/src/network.rs
[warning] 129-129:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
[warning] 114-114:
Diff in /home/runner/work/openraft/openraft/ezraft/src/network.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: Greptile Review
- GitHub Check: examples (stable, raft-kv-memstore-single-threaded)
- GitHub Check: examples (stable, raft-kv-rocksdb)
- GitHub Check: examples (nightly, log-mem)
- GitHub Check: examples (stable, raft-kv-memstore-network-v2)
- GitHub Check: examples (stable, rocksstore)
- GitHub Check: examples (nightly, multi-raft-kv)
- GitHub Check: examples (nightly, raft-kv-memstore-network-v2)
- GitHub Check: examples (nightly, raft-kv-memstore-opendal-snapshot-data)
- GitHub Check: test-crate-tests (nightly, 0, single-term-leader)
- GitHub Check: test-crate-tests (nightly, 30)
- GitHub Check: test-crate-tests (stable, 0)
- GitHub Check: tests-feature-test (nightly, single-term-leader)
- GitHub Check: tests-feature-test (nightly)
- GitHub Check: test-openraft-core-crates (stable)
- GitHub Check: Summary
🔇 Additional comments (29)
Cargo.toml (1)
72-72: LGTM!The workspace member addition is correct and properly integrates the new EzRaft crate.
ezraft/README.md (3)
112-131: LGTM!The EzStorage trait documentation is clear, accurate, and provides good guidance on the separation of concerns between framework and user code.
133-150: LGTM!The EzStateMachine trait documentation clearly explains the interface and responsibilities, making it easy for users to understand what they need to implement.
152-186: LGTM!The configuration, HTTP API, comparison table, and license sections are all well-written and provide valuable context for users evaluating or using EzRaft.
ezraft/src/config.rs (2)
34-51: LGTM!The conversion logic correctly implements the documented 3x-6x election timeout calculation based on heartbeat interval. The
as u64cast on line 40 is safe in practice since heartbeat intervals will never approach the u64 maximum milliseconds (~584 million years). The validation step ensures the resulting config is valid.
26-32: LGTM!The default heartbeat interval of 500ms is a sensible choice for most Raft deployments, balancing responsiveness with network overhead.
ezraft/src/lib.rs (1)
65-72: LGTM!The module organization is clean and logical, grouping related functionality (config, networking, storage, traits, types). Making all modules public provides flexibility during the experimental phase, while the curated re-exports (lines 74-88) establish the main public API surface.
ezraft/src/trait_.rs (3)
1-18: LGTM! Clean module setup with appropriate imports.The module documentation clearly explains the purpose and the imports are well-organized.
19-81: Well-designed storage trait with good documentation.The
EzStoragetrait provides a clean abstraction for persistence with:
- Clear separation of concerns (load, persist, read_logs)
- Appropriate async trait with
Send + Sync + 'staticbounds for concurrent access- Comprehensive documentation including example implementation
The
read_logsrange semantics ([start, end)) are consistent with Rust conventions.
83-132: State machine trait is well-structured.The
EzStateMachinetrait cleanly separates business logic concerns:
applyfor request processingbuild_snapshot/install_snapshotfor state persistenceThe async design allows for flexible implementations. Documentation with the KV store example helps users understand the expected usage pattern.
ezraft/src/network.rs (1)
97-138: RaftNetworkV1 implementation looks correct.The trait implementation properly delegates to the HTTP request helper and handles error mapping. The
unwrap()onInfallibleresults (lines 109, 136) is safe sinceInfalliblecannot be constructed.ezraft/examples/kvstore.rs (4)
1-49: Good example module setup with clear documentation.The usage instructions in the module docs are helpful for beginners. Request/Response types are well-defined with appropriate derives.
100-130: File storage helper methods are well-organized.Clear path conventions for meta, logs, and snapshots.
132-196: EzStorage implementation handles edge cases correctly.
NotFounderrors are properly handled as defaults/skips- JSON serialization is consistent
- Log reading with
continueon missing entries allows for compacted logsNote: The synchronous
cursor.seek()andcursor.read_to_end()on lines 173-175 are acceptable since they operate on in-memoryCursor<Vec<u8>>.
198-231: Main function and CLI setup are clean.The example properly demonstrates cluster formation with seed node joining.
ezraft/src/type_config.rs (3)
1-28: Clean trait definition with appropriate bounds.
EzTypesprovides a minimal interface for users while ensuring all necessary constraints for serialization and async usage.
29-77: Manual trait implementations correctly avoid T bounds.The implementations for
Copy,Clone,Default,PartialEq,Eq,PartialOrd,Ord, andDebugare correctly implemented without requiringTto implement these traits, sinceOpenRaftTypes<T>only containsPhantomData<T>.
79-95: RaftTypeConfig implementation maps types correctly.The configuration provides sensible defaults:
u64for NodeId and TermBasicNodefor NodeCursor<Vec<u8>>for SnapshotData (simple in-memory approach)TokioRuntimefor async operationsezraft/src/types.rs (4)
1-24: Module setup and EzLogId type alias are appropriate.Using
(u64, u64)for log IDs leverages OpenRaft's blanketRaftLogIdimplementation for tuples.
93-139: EzMeta with manual Clone and Default is correct.Manual implementations avoid requiring
T: CloneorT: Defaultbounds while still allowing the struct to be cloned and defaulted.
141-170: Type aliases and Persist enum are well-designed.The
Persistenum provides a clean abstraction for the three types of persistence operations, with appropriateDisplayformatting for logging/debugging.
26-91: EzEntry implementation is well-structured.The
#[serde(bound = "")]annotation correctly avoids requiringT: Serialize + Deserializebounds on the struct itself.The
RaftPayloadandRaftEntrytrait implementations delegate appropriately to the inner payload. Thelog_id.to_type()calls (lines 79, 89) correctly use theRaftLogId::to_type()method from OpenRaft to convert betweenLogId<OpenRaftTypes<T>>and the simplifiedEzLogIdtuple type.ezraft/src/server.rs (3)
1-35: Server structure and constructor are well-organized.The module provides a clean HTTP wrapper around EzRaft.
56-104: RPC handlers correctly delegate to OpenRaft.The append, vote, change_membership, and metrics handlers are properly implemented.
106-164: Join handler uses log index for node ID assignment.This is a clever approach to generate unique node IDs, but note that concurrent join requests could theoretically get assigned to the cluster before all being promoted. The current design adds them as learners only, which is appropriate.
The
JoinResponsetype usingResult<u64, Option<String>>is an unconventional but functional API design.ezraft/src/raft.rs (2)
1-60: EzRaft struct and Clone implementation are correct.The struct properly encapsulates the OpenRaft instance with its supporting components.
152-254: Public API methods are well-documented and correctly implemented.The
write,add_learner,change_membership,is_leader,metrics, andservemethods provide a clean interface. UsingLeaseReadforis_leaderis an appropriate choice for checking leadership.ezraft/src/storage.rs (2)
172-181: Log entries not actually removed from storage on truncate/purge.Both
truncate_afterandpurgeonly update cached metadata without removing the actual log entries from user storage. Whileread_logsrespects these bounds, the underlying storage will accumulate log entries indefinitely.Is this intentional? If the user storage is expected to handle compaction independently, consider documenting this expectation in the
EzStoragetrait. Otherwise, the storage trait may need a method to delete log ranges.
248-285: LGTM!The apply loop correctly handles all entry types (Normal, Membership, Blank), updates
last_applied, and sends responses through the responder. Holding the lock during the entire stream iteration ensures consistency.
| ```rust | ||
| use ezraft::{EzRaft, EzConfig, EzStorage, EzStateMachine, EzMeta, EzSnapshot, EzEntry, Persist, EzTypes}; | ||
| use serde::{Serialize, Deserialize}; | ||
| use std::collections::BTreeMap; | ||
|
|
||
| // 1. Define your request/response types | ||
| #[derive(Serialize, Deserialize, Debug, Clone)] | ||
| pub enum Request { Set { key: String, value: String } } | ||
|
|
||
| #[derive(Serialize, Deserialize, Debug, Clone, Default)] | ||
| pub struct Response { pub value: Option<String> } | ||
|
|
||
| // 2. Implement EzTypes trait | ||
| struct AppTypes; | ||
| impl EzTypes for AppTypes { | ||
| type Request = Request; | ||
| type Response = Response; | ||
| } | ||
|
|
||
| // 3. Implement storage persistence (3 methods) | ||
| struct AppStorage { base_dir: PathBuf } | ||
|
|
||
| #[async_trait] | ||
| impl EzStorage<AppTypes> for AppStorage { | ||
| async fn load(&mut self) -> Result<(EzMeta<AppTypes>, Option<EzSnapshot<AppTypes>>), io::Error> { | ||
| // Load meta (or default) and optional snapshot from disk | ||
| } | ||
|
|
||
| async fn persist(&mut self, op: Persist<AppTypes>) -> Result<(), io::Error> { | ||
| // Persist operation to disk | ||
| } | ||
|
|
||
| async fn read_logs(&mut self, start: u64, end: u64) -> Result<Vec<EzEntry<AppTypes>>, io::Error> { | ||
| // Read log entries in range [start, end) | ||
| } | ||
| } | ||
|
|
||
| // 4. Implement state machine (3 methods) | ||
| struct AppStateMachine { data: BTreeMap<String, String> } | ||
|
|
||
| #[async_trait] | ||
| impl EzStateMachine<AppTypes> for AppStateMachine { | ||
| async fn apply(&mut self, req: Request) -> Response { | ||
| match req { | ||
| Request::Set { key, value } => { | ||
| self.data.insert(key, value); | ||
| Response { value: None } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn build_snapshot(&self) -> io::Result<Vec<u8>> { | ||
| // Serialize state machine to bytes | ||
| } | ||
|
|
||
| async fn install_snapshot(&mut self, data: &[u8]) -> io::Result<()> { | ||
| // Restore state machine from bytes | ||
| } | ||
| } | ||
|
|
||
| // 5. Use it | ||
| #[tokio::main] | ||
| async fn main() -> Result<()> { | ||
| let state_machine = AppStateMachine { data: BTreeMap::new() }; | ||
| let storage = AppStorage { base_dir: "./data".into() }; | ||
|
|
||
| // First node (creates cluster) | ||
| let raft = EzRaft::<AppTypes>::new( | ||
| "127.0.0.1:8080", | ||
| state_machine, | ||
| storage, | ||
| EzConfig::default(), | ||
| None, // No seed = first node | ||
| ).await?; | ||
|
|
||
| // Or join existing cluster: Some("127.0.0.1:8080".into()) | ||
| raft.serve().await?; | ||
| } | ||
| ``` |
There was a problem hiding this comment.
Quick Start example is missing required imports.
The example will not compile as-is because several imports are missing:
std::path::PathBuf(used on line 48)std::io(used on lines 52, 56, 60, 79, 83)async_trait::async_trait(used on lines 50, 68)- Either
anyhow::Resultor explicitstd::result::Result<(), Box<dyn std::error::Error>>for the main return type (line 90)
📝 Suggested complete import section
use ezraft::{EzRaft, EzConfig, EzStorage, EzStateMachine, EzMeta, EzSnapshot, EzEntry, Persist, EzTypes};
use serde::{Serialize, Deserialize};
use std::collections::BTreeMap;
+use std::path::PathBuf;
+use std::io;
+use async_trait::async_trait;
+use anyhow::Result;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ```rust | |
| use ezraft::{EzRaft, EzConfig, EzStorage, EzStateMachine, EzMeta, EzSnapshot, EzEntry, Persist, EzTypes}; | |
| use serde::{Serialize, Deserialize}; | |
| use std::collections::BTreeMap; | |
| // 1. Define your request/response types | |
| #[derive(Serialize, Deserialize, Debug, Clone)] | |
| pub enum Request { Set { key: String, value: String } } | |
| #[derive(Serialize, Deserialize, Debug, Clone, Default)] | |
| pub struct Response { pub value: Option<String> } | |
| // 2. Implement EzTypes trait | |
| struct AppTypes; | |
| impl EzTypes for AppTypes { | |
| type Request = Request; | |
| type Response = Response; | |
| } | |
| // 3. Implement storage persistence (3 methods) | |
| struct AppStorage { base_dir: PathBuf } | |
| #[async_trait] | |
| impl EzStorage<AppTypes> for AppStorage { | |
| async fn load(&mut self) -> Result<(EzMeta<AppTypes>, Option<EzSnapshot<AppTypes>>), io::Error> { | |
| // Load meta (or default) and optional snapshot from disk | |
| } | |
| async fn persist(&mut self, op: Persist<AppTypes>) -> Result<(), io::Error> { | |
| // Persist operation to disk | |
| } | |
| async fn read_logs(&mut self, start: u64, end: u64) -> Result<Vec<EzEntry<AppTypes>>, io::Error> { | |
| // Read log entries in range [start, end) | |
| } | |
| } | |
| // 4. Implement state machine (3 methods) | |
| struct AppStateMachine { data: BTreeMap<String, String> } | |
| #[async_trait] | |
| impl EzStateMachine<AppTypes> for AppStateMachine { | |
| async fn apply(&mut self, req: Request) -> Response { | |
| match req { | |
| Request::Set { key, value } => { | |
| self.data.insert(key, value); | |
| Response { value: None } | |
| } | |
| } | |
| } | |
| async fn build_snapshot(&self) -> io::Result<Vec<u8>> { | |
| // Serialize state machine to bytes | |
| } | |
| async fn install_snapshot(&mut self, data: &[u8]) -> io::Result<()> { | |
| // Restore state machine from bytes | |
| } | |
| } | |
| // 5. Use it | |
| #[tokio::main] | |
| async fn main() -> Result<()> { | |
| let state_machine = AppStateMachine { data: BTreeMap::new() }; | |
| let storage = AppStorage { base_dir: "./data".into() }; | |
| // First node (creates cluster) | |
| let raft = EzRaft::<AppTypes>::new( | |
| "127.0.0.1:8080", | |
| state_machine, | |
| storage, | |
| EzConfig::default(), | |
| None, // No seed = first node | |
| ).await?; | |
| // Or join existing cluster: Some("127.0.0.1:8080".into()) | |
| raft.serve().await?; | |
| } | |
| ``` |
🤖 Prompt for AI Agents
In @ezraft/README.md around lines 28 - 106, The Quick Start snippet is missing
essential imports causing compile errors: add std::path::PathBuf and std::io for
AppStorage and I/O return types, import async_trait::async_trait for the
EzStorage and EzStateMachine impls, and use a concrete Result return type (e.g.,
anyhow::Result or std::result::Result<(), Box<dyn std::error::Error>>) for main.
Update the top-of-file imports so AppStorage (base_dir: PathBuf), the
async_trait annotations on impls for EzStorage<AppTypes> and
EzStateMachine<AppTypes>, and the async main() signature all compile without
errors.
|
|
||
| // Re-export public API | ||
| pub use config::EzConfig; | ||
| pub use openraft::RaftTypeConfig; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's see the context around the re-exports in lib.rs
echo "=== lib.rs re-exports section ==="
sed -n '74,88p' ezraft/src/lib.rs
echo -e "\n=== Search for RaftTypeConfig in public signatures ==="
rg "pub (fn|struct|trait|impl|type)" ezraft/src/ -A 3 | grep -B 3 "RaftTypeConfig" | head -40
echo -e "\n=== Search for all RaftTypeConfig usage in codebase ==="
rg "RaftTypeConfig" ezraft/src/ -n
echo -e "\n=== Check if it's used in examples or tests ==="
rg "RaftTypeConfig" ezraft/ -n --include="*.rs"Repository: drmingdrmer/openraft
Length of output: 2347
🏁 Script executed:
#!/bin/bash
echo "=== Check EzStorage trait definition ==="
rg "pub trait EzStorage" ezraft/src/ -A 20 | head -50
echo -e "\n=== Check EzStateMachine trait definition ==="
rg "pub trait EzStateMachine" ezraft/src/ -A 20 | head -50
echo -e "\n=== Look at storage.rs trait methods ==="
sed -n '130,210p' ezraft/src/storage.rs
echo -e "\n=== Check if RaftTypeConfig is in any public method signatures ==="
rg "pub.*fn.*RaftTypeConfig|pub.*type.*RaftTypeConfig" ezraft/src/Repository: drmingdrmer/openraft
Length of output: 5775
🏁 Script executed:
#!/bin/bash
echo "=== Check if RaftTypeConfig appears in documentation or README ==="
fd "README|example" ezraft/ --type f | xargs cat 2>/dev/null | grep -i "RaftTypeConfig" || echo "No matches in docs"
echo -e "\n=== Check examples directory ==="
find ezraft -name "*example*" -o -name "*examples*" 2>/dev/null | head -10
echo -e "\n=== Check if RaftTypeConfig is used in any public APIs or const generics ==="
rg "RaftTypeConfig" ezraft/src/raft.rs
echo -e "\n=== Summary: Check what other OpenRaft types are re-exported and how RaftTypeConfig differs ==="
grep "pub use" ezraft/src/lib.rsRepository: drmingdrmer/openraft
Length of output: 763
Remove RaftTypeConfig from the public API.
This re-export serves no purpose for public API consumers—it doesn't appear in any public function signatures, traits, or documentation. RaftTypeConfig is only used internally for trait implementations in storage.rs and network.rs. Since users work with EzTypes, OpenRaftTypes, and other EzRaft wrappers instead, exposing OpenRaft's RaftTypeConfig unnecessarily leaks internal complexity. Consider removing this re-export entirely or changing it to pub(crate).
🤖 Prompt for AI Agents
In @ezraft/src/lib.rs at line 76, The public re-export "pub use
openraft::RaftTypeConfig;" exposes internal OpenRaft types unnecessarily; remove
the public leak by either deleting the re-export or changing it to a crate-only
export (e.g., "pub(crate) use openraft::RaftTypeConfig;") so only internal
modules like storage.rs and network.rs can access it; update any internal
imports if you change visibility and run a build to ensure no external code
relied on the public symbol.
| impl<T: EzTypes> RaftNetworkFactory<C<T>> for EzNetworkFactory { | ||
| type Network = Adapter<C<T>, Network>; | ||
|
|
||
| async fn new_client(&mut self, target: u64, node: &BasicNode) -> Self::Network { | ||
| let addr = node.addr.clone(); | ||
| let client = Client::builder().no_proxy().build().unwrap(); | ||
|
|
||
| Network { addr, client, target }.into_v2() | ||
| } | ||
| } |
There was a problem hiding this comment.
Avoid unwrap() on client build; consider reusing the HTTP client.
Client::builder().no_proxy().build().unwrap() could panic on misconfiguration. Also, creating a new Client per target node misses connection pooling benefits - reqwest Client is designed to be cloned and shared.
Proposed fix
Consider storing a shared Client in EzNetworkFactory:
-#[derive(Default)]
-pub struct EzNetworkFactory;
+#[derive(Clone)]
+pub struct EzNetworkFactory {
+ client: Client,
+}
impl EzNetworkFactory {
- /// Create a new network factory
pub fn new() -> Self {
- Self
+ Self {
+ client: Client::builder()
+ .no_proxy()
+ .build()
+ .expect("Failed to build HTTP client"),
+ }
}
}
+impl Default for EzNetworkFactory {
+ fn default() -> Self {
+ Self::new()
+ }
+}
impl<T: EzTypes> RaftNetworkFactory<C<T>> for EzNetworkFactory {
type Network = Adapter<C<T>, Network>;
async fn new_client(&mut self, target: u64, node: &BasicNode) -> Self::Network {
let addr = node.addr.clone();
- let client = Client::builder().no_proxy().build().unwrap();
+ let client = self.client.clone();
Network { addr, client, target }.into_v2()
}
}🤖 Prompt for AI Agents
In @ezraft/src/network.rs around lines 50 - 59, The code currently calls
Client::builder().no_proxy().build().unwrap() inside
EzNetworkFactory::new_client which can panic and also creates a new
reqwest::Client per node; instead, move the Client construction out of
new_client into EzNetworkFactory as a shared field (e.g., store a
reqwest::Client in the EzNetworkFactory struct), construct the client once when
the factory is created and surface any build error there (return a Result or
handle initialization error outside this method) so no unwrap is used, and
change new_client to clone/share that stored client (remove the per-target
Client::builder() call and the unwrap) when constructing Network in new_client.
| /// Request to join a cluster | ||
| #[derive(Debug, Serialize)] | ||
| struct JoinRequest { | ||
| addr: String, | ||
| } | ||
|
|
||
| /// Join response: Ok(node_id) or Err(leader_addr) | ||
| type JoinResponse = Result<u64, Option<String>>; | ||
|
|
||
| /// Request to join a cluster via seed node | ||
| /// | ||
| /// Retries with leader if seed is not the leader. | ||
| async fn request_join(seed_addr: &str, my_addr: &str) -> Result<u64, io::Error> { | ||
| let client = reqwest::Client::builder() | ||
| .no_proxy() | ||
| .build() | ||
| .map_err(|e| io::Error::other(e.to_string()))?; | ||
|
|
||
| let mut target_addr = seed_addr.to_string(); | ||
|
|
||
| loop { | ||
| let url = format!("http://{}/api/join", target_addr); | ||
| let req = JoinRequest { addr: my_addr.to_string() }; | ||
|
|
||
| let resp = client | ||
| .post(&url) | ||
| .json(&req) | ||
| .send() | ||
| .await | ||
| .map_err(|e| io::Error::other(format!("join request failed: {}", e)))?; | ||
|
|
||
| if !resp.status().is_success() { | ||
| return Err(io::Error::other(format!( | ||
| "join request failed with status: {}", | ||
| resp.status() | ||
| ))); | ||
| } | ||
|
|
||
| let join_resp: JoinResponse = resp | ||
| .json() | ||
| .await | ||
| .map_err(|e| io::Error::other(format!("failed to parse join response: {}", e)))?; | ||
|
|
||
| match join_resp { | ||
| Ok(node_id) => return Ok(node_id), | ||
| Err(Some(leader)) => target_addr = leader, | ||
| Err(None) => return Err(io::Error::other("no leader available")), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Add retry limit to request_join to prevent infinite loop.
The join request loop (lines 276-304) follows leader redirects indefinitely. In an unstable cluster or network partition, this could hang forever. Consider adding a maximum retry count.
Proposed fix
async fn request_join(seed_addr: &str, my_addr: &str) -> Result<u64, io::Error> {
let client = reqwest::Client::builder()
.no_proxy()
.build()
.map_err(|e| io::Error::other(e.to_string()))?;
let mut target_addr = seed_addr.to_string();
+ const MAX_REDIRECTS: u32 = 10;
+ let mut redirects = 0;
loop {
+ if redirects >= MAX_REDIRECTS {
+ return Err(io::Error::other("too many leader redirects during join"));
+ }
+
let url = format!("http://{}/api/join", target_addr);
let req = JoinRequest { addr: my_addr.to_string() };
let resp = client
.post(&url)
.json(&req)
.send()
.await
.map_err(|e| io::Error::other(format!("join request failed: {}", e)))?;
if !resp.status().is_success() {
return Err(io::Error::other(format!(
"join request failed with status: {}",
resp.status()
)));
}
let join_resp: JoinResponse = resp
.json()
.await
.map_err(|e| io::Error::other(format!("failed to parse join response: {}", e)))?;
match join_resp {
Ok(node_id) => return Ok(node_id),
- Err(Some(leader)) => target_addr = leader,
+ Err(Some(leader)) => {
+ target_addr = leader;
+ redirects += 1;
+ }
Err(None) => return Err(io::Error::other("no leader available")),
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Request to join a cluster | |
| #[derive(Debug, Serialize)] | |
| struct JoinRequest { | |
| addr: String, | |
| } | |
| /// Join response: Ok(node_id) or Err(leader_addr) | |
| type JoinResponse = Result<u64, Option<String>>; | |
| /// Request to join a cluster via seed node | |
| /// | |
| /// Retries with leader if seed is not the leader. | |
| async fn request_join(seed_addr: &str, my_addr: &str) -> Result<u64, io::Error> { | |
| let client = reqwest::Client::builder() | |
| .no_proxy() | |
| .build() | |
| .map_err(|e| io::Error::other(e.to_string()))?; | |
| let mut target_addr = seed_addr.to_string(); | |
| loop { | |
| let url = format!("http://{}/api/join", target_addr); | |
| let req = JoinRequest { addr: my_addr.to_string() }; | |
| let resp = client | |
| .post(&url) | |
| .json(&req) | |
| .send() | |
| .await | |
| .map_err(|e| io::Error::other(format!("join request failed: {}", e)))?; | |
| if !resp.status().is_success() { | |
| return Err(io::Error::other(format!( | |
| "join request failed with status: {}", | |
| resp.status() | |
| ))); | |
| } | |
| let join_resp: JoinResponse = resp | |
| .json() | |
| .await | |
| .map_err(|e| io::Error::other(format!("failed to parse join response: {}", e)))?; | |
| match join_resp { | |
| Ok(node_id) => return Ok(node_id), | |
| Err(Some(leader)) => target_addr = leader, | |
| Err(None) => return Err(io::Error::other("no leader available")), | |
| } | |
| } | |
| } | |
| /// Request to join a cluster | |
| #[derive(Debug, Serialize)] | |
| struct JoinRequest { | |
| addr: String, | |
| } | |
| /// Join response: Ok(node_id) or Err(leader_addr) | |
| type JoinResponse = Result<u64, Option<String>>; | |
| /// Request to join a cluster via seed node | |
| /// | |
| /// Retries with leader if seed is not the leader. | |
| async fn request_join(seed_addr: &str, my_addr: &str) -> Result<u64, io::Error> { | |
| let client = reqwest::Client::builder() | |
| .no_proxy() | |
| .build() | |
| .map_err(|e| io::Error::other(e.to_string()))?; | |
| let mut target_addr = seed_addr.to_string(); | |
| const MAX_REDIRECTS: u32 = 10; | |
| let mut redirects = 0; | |
| loop { | |
| if redirects >= MAX_REDIRECTS { | |
| return Err(io::Error::other("too many leader redirects during join")); | |
| } | |
| let url = format!("http://{}/api/join", target_addr); | |
| let req = JoinRequest { addr: my_addr.to_string() }; | |
| let resp = client | |
| .post(&url) | |
| .json(&req) | |
| .send() | |
| .await | |
| .map_err(|e| io::Error::other(format!("join request failed: {}", e)))?; | |
| if !resp.status().is_success() { | |
| return Err(io::Error::other(format!( | |
| "join request failed with status: {}", | |
| resp.status() | |
| ))); | |
| } | |
| let join_resp: JoinResponse = resp | |
| .json() | |
| .await | |
| .map_err(|e| io::Error::other(format!("failed to parse join response: {}", e)))?; | |
| match join_resp { | |
| Ok(node_id) => return Ok(node_id), | |
| Err(Some(leader)) => { | |
| target_addr = leader; | |
| redirects += 1; | |
| } | |
| Err(None) => return Err(io::Error::other("no leader available")), | |
| } | |
| } | |
| } |
🧰 Tools
🪛 GitHub Check: examples (nightly, log-mem)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, multi-raft-kv)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-grpc)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-network-v2)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-opendal-snapshot-data)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore-single-threaded)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-memstore)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, raft-kv-rocksdb)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: examples (nightly, rocksstore)
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🪛 GitHub Check: lint
[warning] 291-291:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 276-276:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
[warning] 266-266:
Diff in /home/runner/work/openraft/openraft/ezraft/src/raft.rs
🤖 Prompt for AI Agents
In @ezraft/src/raft.rs around lines 256 - 305, The request_join function can
loop indefinitely following leader redirects; add a retry limit by introducing a
max_retries constant or parameter and a retry counter inside request_join,
incrementing it each loop iteration and returning an io::Error (e.g., "max join
retries exceeded") when the counter exceeds max_retries; keep using the existing
client, target_addr, JoinRequest, and JoinResponse logic but bail out with a
clear error after the configured number of redirects to prevent infinite loops.
| /// Run the HTTP server | ||
| pub async fn run(self) -> std::io::Result<()> { | ||
| let addr = self.raft.addr().to_string(); | ||
| let server_data = Data::new(self); | ||
|
|
||
| let server = HttpServer::new(move || { | ||
| App::new() | ||
| .app_data(server_data.clone()) | ||
| // Raft internal RPC | ||
| .route("/raft/append", web::post().to(Self::handle_append)) | ||
| .route("/raft/vote", web::post().to(Self::handle_vote)) | ||
| // Admin API | ||
| .route("/api/join", web::post().to(Self::handle_join)) | ||
| .route("/api/change_membership", web::post().to(Self::handle_change_membership)) | ||
| .route("/api/metrics", web::get().to(Self::handle_metrics)) | ||
| }) | ||
| .bind(&addr)?; | ||
|
|
||
| server.run().await | ||
| } |
There was a problem hiding this comment.
Missing /raft/snapshot endpoint - snapshot installation will fail.
The Network implementation in network.rs (line 118) sends InstallSnapshotRequest to "raft/snapshot", but this route is not defined in the server. This will cause snapshot-based recovery and catch-up replication to fail.
Proposed fix - add snapshot handler
Add the route and handler:
.route("/raft/append", web::post().to(Self::handle_append))
.route("/raft/vote", web::post().to(Self::handle_vote))
+ .route("/raft/snapshot", web::post().to(Self::handle_snapshot))
// Admin APIAnd add the handler method:
/// Raft install snapshot RPC handler
async fn handle_snapshot(
req: web::Json<raft::InstallSnapshotRequest<C<T>>>,
ez: Data<Self>,
) -> Result<web::Json<raft::InstallSnapshotResponse<C<T>>>, actix_web::Error> {
let resp = ez
.raft
.inner()
.install_snapshot(req.into_inner())
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("install_snapshot failed: {}", e)))?;
Ok(web::Json(resp))
}| // Load initial metadata and snapshot | ||
| let (cached_meta, snapshot) = user_storage.load().await?; | ||
|
|
||
| // Initialize state machine state from snapshot or defaults | ||
| let (last_applied, last_membership) = match &snapshot { | ||
| Some(snap) => (snap.meta.last_log_id, snap.meta.last_membership.clone()), | ||
| None => (None, StoredMembership::new(None, Membership::default())), | ||
| }; | ||
|
|
||
| let storage = StorageWithCache { | ||
| storage: Box::new(user_storage), | ||
| cached_meta, | ||
| }; | ||
|
|
||
| let sm_state = StateMachineState { | ||
| user_sm: Box::new(user_sm), | ||
| last_applied, | ||
| membership: last_membership, | ||
| }; |
There was a problem hiding this comment.
State machine not restored from snapshot on initialization.
When a snapshot exists, only the metadata (last_applied, membership) is extracted and stored. The actual snapshot data is never applied to user_sm, so the state machine starts empty despite having a valid snapshot to restore from. This causes data loss on node restart.
Proposed fix
// Load initial metadata and snapshot
let (cached_meta, snapshot) = user_storage.load().await?;
- // Initialize state machine state from snapshot or defaults
- let (last_applied, last_membership) = match &snapshot {
- Some(snap) => (snap.meta.last_log_id, snap.meta.last_membership.clone()),
- None => (None, StoredMembership::new(None, Membership::default())),
+ // Initialize state machine state from snapshot or defaults
+ let mut user_sm = Box::new(user_sm);
+ let (last_applied, last_membership) = match &snapshot {
+ Some(snap) => {
+ // Restore user state machine from snapshot data
+ let mut data = Vec::new();
+ let mut cursor = snap.snapshot.clone();
+ cursor.seek(SeekFrom::Start(0))?;
+ cursor.read_to_end(&mut data)?;
+ user_sm.install_snapshot(&data).await?;
+ (snap.meta.last_log_id, snap.meta.last_membership.clone())
+ }
+ None => (None, StoredMembership::new(None, Membership::default())),
};
let storage = StorageWithCache {
storage: Box::new(user_storage),
cached_meta,
};
let sm_state = StateMachineState {
- user_sm: Box::new(user_sm),
+ user_sm,
last_applied,
membership: last_membership,
};| async fn build_snapshot(&mut self) -> Result<Snapshot<OpenRaftTypes<T>>, std::io::Error> { | ||
| // Get current state machine state and build snapshot data | ||
| let (last_applied, last_membership, snapshot_data) = { | ||
| let sm = self.sm_state.lock().await; | ||
| let data = sm.user_sm.build_snapshot().await?; | ||
| (sm.last_applied, sm.membership.clone(), data) | ||
| }; | ||
|
|
||
| let snapshot_id = match last_applied { | ||
| Some(log_id) => format!("{}-{}", log_id.leader_id.term, log_id.index), | ||
| None => "0-0".to_string(), | ||
| }; | ||
|
|
||
| let snapshot_meta = SnapshotMeta { | ||
| last_log_id: last_applied, | ||
| last_membership, | ||
| snapshot_id, | ||
| }; | ||
|
|
||
| Ok(Snapshot { | ||
| meta: snapshot_meta, | ||
| snapshot: Cursor::new(snapshot_data), | ||
| }) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
OpenRaft RaftSnapshotBuilder build_snapshot storage persistence contract
💡 Result:
Summary of the OpenRaft RaftSnapshotBuilder / build_snapshot contract
Key signature
- build_snapshot(&mut self) -> impl Future<Output = Result<Snapshot, StorageError<C::NodeId>>> + Send. [1]
What build_snapshot must provide
- Produce a Snapshot that contains the state of all applied logs (including membership) up to a single last_log_id. The snapshot is the serialized state-machine view used for log compaction and snapshot install. [1][3]
Consistency / correctness requirements
- The snapshot must represent a consistent view of the state machine (a single point-in-time covering all applied logs up to its last_log_id). Implementations must ensure the snapshot builder captures that view so the async build_snapshot can complete independently while new entries may be applied. (Common strategies: clone-in-memory snapshot view or use the DB’s native snapshot mechanism.) [1][4]
Persistence implications (two acceptable patterns)
- Persistent state-machine: apply() persists state to disk on each apply; snapshots need not be persisted.
- Persistent snapshot: apply() may not persist state, but each snapshot produced by build_snapshot must be persisted and later used to rebuild state on startup. Choose one strategy and implement startup recovery accordingly. [2][3]
Practical expectations for implementors
- get_snapshot_builder() (async) should return a builder that holds the consistent view; build_snapshot() then serializes that view into a Snapshot. [3]
- Snapshot metadata must include last_log_id and membership info so OpenRaft can compact logs and recover membership without replaying all logs. [4]
References
- RaftSnapshotBuilder trait (signature, description). [1]
- RaftStateMachine / storage guidance (applied-state, apply, snapshot builder, persistence tradeoffs). [2][3]
- Implementation patterns and snapshot metadata (examples: memstore, RocksDB). [4]
Sources
[1] openraft::storage::RaftSnapshotBuilder (docs.rs).
[2] openraft::storage::RaftStateMachine (docs.rs).
[3] OpenRaft getting started / storage docs (docs.rs).
[4] Implementation notes / guide (DeepWiki / OpenRaft examples).
🏁 Script executed:
rg -A 20 "async fn apply" ezraft/src/storage.rsRepository: drmingdrmer/openraft
Length of output: 901
🏁 Script executed:
rg -A 50 "async fn apply" ezraft/src/storage.rs | head -80Repository: drmingdrmer/openraft
Length of output: 1825
🌐 Web query:
OpenRaft log compaction snapshot creation build_snapshot install_snapshot flow
💡 Result:
Summary of OpenRaft snapshot / log-compaction flow (build_snapshot → transmit → install_snapshot)
- When to snapshot
- SnapshotPolicy controls automatic triggering (e.g. LogsSinceLast(n)) or you can call Raft::trigger_snapshot() manually. [1][6]
- Building the snapshot on the leader
- The storage-backed state machine exposes a RaftSnapshotBuilder (or RaftStateMachine::get_snapshot_builder / build_snapshot) that produces Snapshot objects. build_snapshot must include all applied state (including membership) and may perform log compaction (merge/purge log entries). [2][4]
- Snapshot transmission
- The leader sends the snapshot to the follower(s) via the RaftNetwork snapshot API. OpenRaft supports chunked streaming or a full-snapshot API (full_snapshot / Chunked::send_snapshot). The leader will decide to send InstallSnapshot when a follower lags too far. [5][3]
- Receiving on the follower
- The follower storage implements begin_receiving_snapshot() to get a writable SnapshotData handle; OpenRaft writes the streamed snapshot into that handle. [4]
- Installing the snapshot
- Once streaming is complete (done==true), the follower calls install_snapshot() / Raft::install_full_snapshot(), which must atomically replace the state machine with the snapshot, update applied log id and membership from snapshot meta, persist the snapshot reference (so get_current_snapshot() returns it), and delete/purge older logs as appropriate. If the incoming snapshot is older than current state, it is ignored. [3][4]
- Post-install log compaction semantics
- After installation the leader/follower will truncate/purge log ranges covered by the snapshot (so logs <= snapshot.last_log_id can be purged), preventing unbounded log growth; SnapshotPolicy controls periodic snapshots that enable this compaction. [1][2][6]
Recommended places in the OpenRaft docs/source to read for exact method signatures and examples:
- SnapshotPolicy and overview. [1][6]
- RaftSnapshotBuilder / build_snapshot docs. [2]
- RaftStateMachine methods: begin_receiving_snapshot, install_snapshot, get_current_snapshot. [4]
- Raft::install_snapshot / install_full_snapshot and the upgrade/Chunked guidance for transmission. [3][5]
If you want, I can extract the exact call sequence and pseudo-code for leader and follower (with the specific OpenRaft method names).
🌐 Web query:
OpenRaft leader persist snapshot locally after build_snapshot trigger_snapshot
💡 Result:
Short answer: OpenRaft itself does not implicitly write a snapshot file for you — the snapshot is produced/managed by your storage implementation. The Raft core calls your SnapshotBuilder (build_snapshot / get_snapshot_builder) when snapshotting (including after Raft::trigger_snapshot()), and it expects your storage/state-machine to persist the snapshot and make it available via get_current_snapshot / begin_receiving_snapshot / install_snapshot semantics. In practice the leader triggers build_snapshot, receives a Snapshot value from your builder, and your storage implementation is responsible for persisting that snapshot to disk and returning it later; OpenRaft’s docs and traits require that persistent-snapshot implementations persist the snapshot so get_current_snapshot can return it. (docs.rs)
If you want, tell me which OpenRaft version and storage (memstore, RocksDB, custom) you use and I’ll point to the exact functions/examples to change so the leader saves snapshots to a local file.
Citations:
- 1: https://docs.rs/openraft/latest/openraft/storage/trait.RaftSnapshotBuilder.html?utm_source=openai
Persist snapshots in build_snapshot() before returning.
The snapshot returned by build_snapshot() must be persisted to storage. OpenRaft calls build_snapshot() on the leader during snapshot creation (via SnapshotPolicy or trigger_snapshot()), and since apply() does not persist state, ezraft is using the persistent-snapshot pattern where all snapshots must be written to disk. Currently, the snapshot is created in memory and returned without persisting it. If the leader crashes before transmitting the snapshot to followers (or if logs are compacted based on this snapshot), the snapshot data will be lost on restart because get_current_snapshot() will not find it.
Checklist
Summary by CodeRabbit
Release Notes
New Features
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.