Conversation
packages/wavs/src/dispatcher.rs
Outdated
| // Allocate HD index from the registry (single source of truth for key derivation indices) | ||
| let hd_index = self.service_registry.append(service_manager)?; | ||
|
|
||
| self.add_service_direct(service.clone(), Some(hd_index)) | ||
| .await?; |
There was a problem hiding this comment.
If add_service_direct would fail for any reason, the registry entry is already done. On next restart it would try to restore non existing service.
There was a problem hiding this comment.
Great catch! Added a rollback here
| // Store components | ||
| if let Err(err) = self | ||
| .engine_manager | ||
| .store_components_for_service(&service) | ||
| .await | ||
| { | ||
| tracing::warn!( | ||
| "Failed to store components for restored service {}: {:?}", | ||
| service.name, | ||
| err | ||
| ); | ||
| } |
There was a problem hiding this comment.
Should we just warn in this case?
I'm thinking from node's operator pov:
- he enables persistent storage so quickly recover from restart
- for whatever reason there would be a problem with DB, some services are stored some aren't; Their functionality is not affected and warning traces are easily omitted
- operator restarts node and surprise - some services are restored, some aren't
My point is - I think that feature should be opt-in, but if enabled should just fail adding the new service entirely in case of error.
What do you think?
There was a problem hiding this comment.
Yeah, looking back at this, I like the tracing error better too. If any part fails, we just log and skip.
I'm not sure I see the use-case for optional persistent storage atm (maybe a future PR?). If anything, we should probably add some retry mechanisms to improve reliability if we notice this is the case.
There was a problem hiding this comment.
Pull request overview
Introduces a persisted ServiceRegistry to retain registered ServiceManager entries and their signer HD derivation indices across node restarts, then restores services on startup by re-fetching definitions from chain and re-registering subsystems with the original HD indices.
Changes:
- Add JSON-backed
ServiceRegistry(service_registry.json) with append/remove + unit tests. - Update
Dispatcherstartup to restore services from the registry and allocate HD indices from it on registration/removal. - Adjust submission HD-index counter handling for explicit indices; update call sites for new
add_service_direct(..., hd_index)signature.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/wavs/src/service_registry.rs | New persisted registry implementation + tests. |
| packages/wavs/src/dispatcher.rs | Load registry, restore services at startup, allocate/remove HD indices via registry, update add_service_direct signature. |
| packages/wavs/src/subsystems/submission.rs | Ensure signer HD-index counter advances past explicitly assigned indices. |
| packages/wavs/src/http/handlers/service/add.rs | Update dev direct-add handler for new add_service_direct signature. |
| packages/wavs/src/lib.rs | Export service_registry module. |
| packages/wavs/tests/wavs_systems/mock_app.rs | Update test helper to pass new add_service_direct arg. |
| packages/wavs/tests/mock_e2e.rs | Update e2e test to pass new add_service_direct arg. |
| packages/wavs/tests/dispatcher_tests.rs | Update dispatcher test to pass new add_service_direct arg. |
| packages/wavs/benches/dev_triggers/setup.rs | Update bench setup to pass new add_service_direct arg. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // This is a no-op for auto-incremented indices but critical for | ||
| // explicit indices during restoration from the service registry. | ||
| self.signing_mnemonic_hd_index_count | ||
| .fetch_max(hd_index + 1, std::sync::atomic::Ordering::SeqCst); |
There was a problem hiding this comment.
hd_index + 1 can overflow (u32::MAX) in debug builds (panic) and wrap in release builds. Even if unlikely, it’s easy to make this robust by using saturating_add(1)/checked_add(1) and handling the max-index case explicitly.
| .fetch_max(hd_index + 1, std::sync::atomic::Ordering::SeqCst); | |
| .fetch_max(hd_index.saturating_add(1), std::sync::atomic::Ordering::SeqCst); |
| let hd_index = *next; | ||
| entries.push(RegistryEntry { | ||
| service_manager: sm, | ||
| hd_index, | ||
| }); | ||
| *next = hd_index + 1; | ||
|
|
||
| self.write_locked(&entries, *next)?; |
There was a problem hiding this comment.
*next = hd_index + 1 can overflow once next_hd_index reaches u32::MAX, which would wrap/panic depending on build settings. Consider using checked_add/saturating_add and returning a dedicated error when the HD index space is exhausted.
| // Allocate HD index from the registry (single source of truth for key derivation indices) | ||
| let hd_index = self.service_registry.append(service_manager.clone())?; | ||
|
|
There was a problem hiding this comment.
add_service calls service_registry.append(...), which performs synchronous filesystem I/O. Since this runs on the async request path, it can block Tokio worker threads. Consider moving registry writes to spawn_blocking, using async fs, or batching writes so service registration doesn’t block the runtime.
| self.remove_service_inner(id.clone())?; | ||
|
|
||
| // Remove from persistent registry | ||
| if let Some(sm) = service_manager { | ||
| self.service_registry.remove(&sm)?; | ||
| } | ||
|
|
There was a problem hiding this comment.
remove_service removes the service from in-memory/DB state before mutating the persistent registry. If service_registry.remove fails (I/O), this returns an error after the service is already removed, leaving the registry potentially stale and the call non-atomic. Consider removing from the registry first (or making registry removal best-effort with a warning), or add a compensating rollback.
| self.remove_service_inner(id.clone())?; | |
| // Remove from persistent registry | |
| if let Some(sm) = service_manager { | |
| self.service_registry.remove(&sm)?; | |
| } | |
| // First remove from persistent registry to avoid leaving it stale if this fails | |
| if let Some(sm) = service_manager { | |
| self.service_registry.remove(&sm)?; | |
| } | |
| // Then remove from in-memory/DB state | |
| self.remove_service_inner(id.clone())?; |
| async fn add_service_direct_inner(state: HttpState, service_hash: String) -> HttpResult<()> { | ||
| let service = get_service_inner_hash(&state, service_hash).await?; | ||
| state.dispatcher.add_service_direct(service).await?; | ||
| state.dispatcher.add_service_direct(service, None).await?; | ||
|
|
There was a problem hiding this comment.
handle_add_service_direct registers a service via add_service_direct(service, None) but does not append to the persisted ServiceRegistry, so services added through this endpoint won’t be restored after restart and won’t get stable HD indices. If this endpoint is meant to survive restarts, allocate an HD index via the registry and persist it (or clearly document that /dev/services/* is non-persistent).
| let mut entries = self.entries.write().unwrap(); | ||
| let mut next = self.next_hd_index.write().unwrap(); | ||
|
|
||
| // Check for duplicates | ||
| if entries.iter().any(|e| e.service_manager == sm) { | ||
| return Err(RegistryError::AlreadyRegistered); | ||
| } | ||
|
|
||
| let hd_index = *next; | ||
| entries.push(RegistryEntry { | ||
| service_manager: sm, | ||
| hd_index, | ||
| }); | ||
| *next = hd_index + 1; | ||
|
|
||
| self.write_locked(&entries, *next)?; | ||
| Ok(hd_index) | ||
| } | ||
|
|
||
| pub fn remove(&self, sm: &ServiceManager) -> Result<(), RegistryError> { | ||
| let mut entries = self.entries.write().unwrap(); | ||
| let next = *self.next_hd_index.read().unwrap(); | ||
|
|
||
| if let Some(pos) = entries.iter().position(|e| &e.service_manager == sm) { | ||
| entries.remove(pos); | ||
| self.write_locked(&entries, next)?; |
There was a problem hiding this comment.
append/remove hold the RwLock write guard while performing write_locked (filesystem I/O). This blocks other readers/writers for the duration of the disk write and can become problematic under contention. Consider cloning the updated state, dropping the lock before writing, and only committing the in-memory update after the file write succeeds (or otherwise handling write failures).
| let mut entries = self.entries.write().unwrap(); | |
| let mut next = self.next_hd_index.write().unwrap(); | |
| // Check for duplicates | |
| if entries.iter().any(|e| e.service_manager == sm) { | |
| return Err(RegistryError::AlreadyRegistered); | |
| } | |
| let hd_index = *next; | |
| entries.push(RegistryEntry { | |
| service_manager: sm, | |
| hd_index, | |
| }); | |
| *next = hd_index + 1; | |
| self.write_locked(&entries, *next)?; | |
| Ok(hd_index) | |
| } | |
| pub fn remove(&self, sm: &ServiceManager) -> Result<(), RegistryError> { | |
| let mut entries = self.entries.write().unwrap(); | |
| let next = *self.next_hd_index.read().unwrap(); | |
| if let Some(pos) = entries.iter().position(|e| &e.service_manager == sm) { | |
| entries.remove(pos); | |
| self.write_locked(&entries, next)?; | |
| // First, take a snapshot of the new state without holding write locks during I/O. | |
| let (new_entries, new_next, hd_index) = { | |
| let entries = self.entries.read().unwrap(); | |
| // Check for duplicates using a read lock. | |
| if entries.iter().any(|e| e.service_manager == sm) { | |
| return Err(RegistryError::AlreadyRegistered); | |
| } | |
| let next = *self.next_hd_index.read().unwrap(); | |
| let mut new_entries = entries.clone(); | |
| let hd_index = next; | |
| new_entries.push(RegistryEntry { | |
| service_manager: sm, | |
| hd_index, | |
| }); | |
| let new_next = hd_index + 1; | |
| (new_entries, new_next, hd_index) | |
| }; | |
| // Persist the new state without holding any locks. | |
| self.write_locked(&new_entries, new_next)?; | |
| // After successful persistence, commit the new state in memory. | |
| { | |
| let mut entries = self.entries.write().unwrap(); | |
| let mut next = self.next_hd_index.write().unwrap(); | |
| *entries = new_entries; | |
| *next = new_next; | |
| } | |
| Ok(hd_index) | |
| } | |
| pub fn remove(&self, sm: &ServiceManager) -> Result<(), RegistryError> { | |
| // Take a snapshot of the updated entries without holding write locks during I/O. | |
| let (new_entries, next_opt) = { | |
| let entries = self.entries.read().unwrap(); | |
| let next = *self.next_hd_index.read().unwrap(); | |
| if let Some(pos) = entries.iter().position(|e| &e.service_manager == sm) { | |
| let mut new_entries = entries.clone(); | |
| new_entries.remove(pos); | |
| (Some(new_entries), Some(next)) | |
| } else { | |
| (None, None) | |
| } | |
| }; | |
| // If there was nothing to remove, we're done. | |
| if let (Some(new_entries), Some(next)) = (new_entries, next_opt) { | |
| // Persist the new state without holding any locks. | |
| self.write_locked(&new_entries, next)?; | |
| // After successful persistence, commit the new entries in memory. | |
| let mut entries = self.entries.write().unwrap(); | |
| *entries = new_entries; |
| tmp.flush()?; | ||
| tmp.persist(&self.path)?; | ||
|
|
There was a problem hiding this comment.
tmp.flush() doesn’t guarantee durability on crash/power loss; the registry file may be lost even though the write returned OK. If the goal is crash-safe persistence, consider calling sync_all() on the temp file before persist, and (on Unix) syncing the parent directory after the rename.
| tmp.flush()?; | |
| tmp.persist(&self.path)?; | |
| // Ensure data is durably written to the temp file before renaming. | |
| tmp.as_file_mut().sync_all()?; | |
| tmp.persist(&self.path)?; | |
| // On Unix, also sync the parent directory so the rename is durable. | |
| #[cfg(unix)] | |
| if let Some(parent) = self.path.parent() { | |
| let dir_file = std::fs::File::open(parent)?; | |
| dir_file.sync_all()?; | |
| } |
| #[test] | ||
| fn empty_file_gives_empty_registry() { | ||
| let dir = TempDir::new().unwrap(); | ||
| let reg = ServiceRegistry::load(dir.path()).unwrap(); | ||
| assert!(reg.entries().is_empty()); | ||
| assert_eq!(reg.next_hd_index(), 1); | ||
| } |
There was a problem hiding this comment.
The test name empty_file_gives_empty_registry is misleading: it tests the case where the registry file does not exist, not an empty file. Renaming it (or creating an actual empty file and asserting on the error/behavior) would make the intent clearer.
closes #1055
Introduces a
ServiceRegistrythat persists registered ServiceManager entries and their HD key derivation indices to a JSON file (service_registry.json) in the data directory. On startup, services are restored from the registry by re-fetching their definitions from the chain, re-storing components, and re-registering with all subsystems using their original HD indices.