Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/wavs/benches/dev_triggers/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl DevTriggersRuntime {
assert_eq!(digest, component_digest);

// Register service
futures::executor::block_on(dispatcher.add_service_direct(service.clone()))
futures::executor::block_on(dispatcher.add_service_direct(service.clone(), None))
.expect("add service to dispatcher");

// Start dispatcher (store handle + context for graceful shutdown)
Expand Down
180 changes: 158 additions & 22 deletions packages/wavs/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use wavs_types::{
use wavs_types::{Service, ServiceError, ServiceId, SignerResponse, TriggerAction};

use crate::config::Config;
use crate::service_registry::{RegistryError, ServiceRegistry};
use crate::services::{Services, ServicesError};
use crate::subsystems::aggregator::error::AggregatorError;
use crate::subsystems::aggregator::{Aggregator, AggregatorCommand};
Expand Down Expand Up @@ -76,6 +77,7 @@ pub struct Dispatcher<S: CAStorage> {
pub dispatcher_to_submission_tx: crossbeam::channel::Sender<SubmissionCommand>,
pub dispatcher_to_aggregator_tx: crossbeam::channel::Sender<AggregatorCommand>,
pub db_storage: WavsDb,
pub service_registry: ServiceRegistry,
/// Cached EVM HTTP providers per chain to avoid creating new connections for each query
evm_http_providers: Arc<RwLock<HashMap<ChainKey, DynProvider>>>,
/// Cached Cosmos query clients per chain to avoid creating new connections for each query
Expand Down Expand Up @@ -117,6 +119,7 @@ impl Dispatcher<FileStorage> {

let file_storage = FileStorage::new(config.data.join("ca"))?;
let db_storage = WavsDb::new()?;
let service_registry = ServiceRegistry::load(&config.data)?;

let services = Services::new(db_storage.clone());

Expand Down Expand Up @@ -170,6 +173,7 @@ impl Dispatcher<FileStorage> {
aggregator,
services,
db_storage,
service_registry,
chain_configs: config.chains.clone(),
metrics: metrics.dispatcher.clone(),
ipfs_gateway: config.ipfs_gateway.clone(),
Expand Down Expand Up @@ -369,8 +373,113 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
}
});

// populate the initial triggers
let initial_services = self.services.list(Bound::Unbounded, Bound::Unbounded)?;
// Restore services from the persisted registry
let registry_entries = self.service_registry.entries();
let chain_configs_for_restore = self.chain_configs.read().unwrap().clone();
let ipfs_gateway_for_restore = self.ipfs_gateway.clone();
let evm_providers_for_restore = self.evm_http_providers.clone();
let cosmos_clients_for_restore = self.cosmos_query_clients.clone();

let initial_services: Vec<Service> = ctx.rt.block_on(async {
// Fetch all services from chain in parallel (bounded concurrency)
const MAX_CONCURRENT_RESTORES: usize = 10;
let fetched: Vec<_> = stream::iter(&registry_entries)
.map(|entry| {
let chain_configs = &chain_configs_for_restore;
let ipfs_gateway = &ipfs_gateway_for_restore;
let evm_providers = &evm_providers_for_restore;
let cosmos_clients = &cosmos_clients_for_restore;
async move {
let (chain, address) = match &entry.service_manager {
ServiceManager::Evm { chain, address } => {
(chain.clone(), layer_climb::prelude::Address::from(*address))
}
ServiceManager::Cosmos { chain, address } => (
chain.clone(),
layer_climb::prelude::Address::from(address.clone()),
),
};
let result = query_service_from_address(
chain,
address,
chain_configs,
ipfs_gateway,
evm_providers,
cosmos_clients,
)
.await;
(entry, result)
}
})
.buffer_unordered(MAX_CONCURRENT_RESTORES)
.collect::<Vec<_>>()
.await;

// Process results sequentially (DB writes and manager registration are not async-safe to parallelize)
let mut restored = Vec::new();
for (entry, result) in fetched {
match result {
Ok(service) => {
// Store the service in DB
if let Err(err) = self.services.save(&service) {
tracing::error!(
"Failed to save restored service {} to DB: {:?}",
service.name,
err
);
continue;
}

// Store components
if let Err(err) = self
.engine_manager
.store_components_for_service(&service)
.await
{
tracing::error!(
"Failed to store components for restored service {}: {:?}",
service.name,
err
);
continue;
}
Comment on lines 433 to 445
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just warn in this case?
I'm thinking from node's operator pov:

  • he enables persistent storage so quickly recover from restart
  • for whatever reason there would be a problem with DB, some services are stored some aren't; Their functionality is not affected and warning traces are easily omitted
  • operator restarts node and surprise - some services are restored, some aren't

My point is - I think that feature should be opt-in, but if enabled should just fail adding the new service entirely in case of error.
What do you think?

Copy link
Collaborator Author

@ismellike ismellike Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looking back at this, I like the tracing error better too. If any part fails, we just log and skip.

I'm not sure I see the use-case for optional persistent storage atm (maybe a future PR?). If anything, we should probably add some retry mechanisms to improve reliability if we notice this is the case.


// Add to managers with explicit HD index from registry
if let Err(err) = add_service_to_managers(
&service,
&self.trigger_manager,
&self.submission_manager,
&self.dispatcher_to_aggregator_tx,
Some(entry.hd_index),
) {
tracing::error!(
"Failed to add restored service {} to managers: {:?}",
service.name,
err
);
continue;
}

tracing::info!(
"Restored service {} [{:?}] with HD index {}",
service.name,
service.manager,
entry.hd_index
);
restored.push(service);
}
Err(err) => {
tracing::error!(
"Failed to restore service from chain for {:?}: {:?}",
entry.service_manager,
err
);
}
}
}
restored
});

let total_workflows: usize = initial_services.iter().map(|s| s.workflows.len()).sum();
tracing::info!(
"Initializing dispatcher: services={}, workflows={}, components={}",
Expand All @@ -379,16 +488,6 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
self.list_component_digests()?.len()
);

for service in initial_services.iter() {
add_service_to_managers(
service,
&self.trigger_manager,
&self.submission_manager,
&self.dispatcher_to_aggregator_tx,
None,
)?;
}

// Check ServiceURI for each service at startup and update if needed (bounded concurrency)
let chain_configs = self.chain_configs.read().unwrap().clone();
let ipfs_gateway = self.ipfs_gateway.clone();
Expand Down Expand Up @@ -491,13 +590,14 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
&self,
service_manager: ServiceManager,
) -> Result<Service, DispatcherError> {
let (chain, address) = match service_manager {
let (chain, address) = match &service_manager {
ServiceManager::Evm { chain, address } => {
(chain, layer_climb::prelude::Address::from(address))
}
ServiceManager::Cosmos { chain, address } => {
(chain, layer_climb::prelude::Address::from(address))
(chain.clone(), layer_climb::prelude::Address::from(*address))
}
ServiceManager::Cosmos { chain, address } => (
chain.clone(),
layer_climb::prelude::Address::from(address.clone()),
),
};
let chain_configs = self.chain_configs.read().unwrap().clone();
let service = query_service_from_address(
Expand All @@ -510,7 +610,19 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
)
.await?;

self.add_service_direct(service.clone()).await?;
// Allocate HD index from the registry (single source of truth for key derivation indices)
let hd_index = self.service_registry.append(service_manager.clone())?;

Comment on lines +613 to +615
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_service calls service_registry.append(...), which performs synchronous filesystem I/O. Since this runs on the async request path, it can block Tokio worker threads. Consider moving registry writes to spawn_blocking, using async fs, or batching writes so service registration doesn’t block the runtime.

Copilot uses AI. Check for mistakes.
if let Err(e) = self
.add_service_direct(service.clone(), Some(hd_index))
.await
{
// Roll back the registry entry so we don't leave a stale entry on disk
if let Err(remove_err) = self.service_registry.remove(&service_manager) {
tracing::error!("Failed to roll back registry entry after add_service_direct failure: {remove_err}");
}
return Err(e);
}

// Get current service count for logging
let current_services = self.services.list(Bound::Unbounded, Bound::Unbounded)?;
Expand All @@ -524,7 +636,11 @@ impl<S: CAStorage + 'static> Dispatcher<S> {

// this is public just so we can call it from tests
#[instrument(skip(self), fields(subsys = "Dispatcher", service.name = %service.name, service.manager = ?service.manager))]
pub async fn add_service_direct(&self, service: Service) -> Result<(), DispatcherError> {
pub async fn add_service_direct(
&self,
service: Service,
hd_index: Option<u32>,
) -> Result<(), DispatcherError> {
let service_id = service.id();
tracing::info!("Adding service: {} [{:?}]", service.name, service.manager);
// Check if service is already registered
Expand All @@ -546,14 +662,30 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
&self.trigger_manager,
&self.submission_manager,
&self.dispatcher_to_aggregator_tx,
None,
hd_index,
)?;

Ok(())
}

#[instrument(skip(self), fields(subsys = "Dispatcher"))]
pub fn remove_service(&self, id: ServiceId) -> Result<(), DispatcherError> {
// Look up the ServiceManager before removing so we can delete it from the registry
let service_manager = self.services.get(&id).ok().map(|s| s.manager.clone());

self.remove_service_inner(id.clone())?;

// Remove from persistent registry
if let Some(sm) = service_manager {
self.service_registry.remove(&sm)?;
}

Comment on lines +676 to +682
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove_service removes the service from in-memory/DB state before mutating the persistent registry. If service_registry.remove fails (I/O), this returns an error after the service is already removed, leaving the registry potentially stale and the call non-atomic. Consider removing from the registry first (or making registry removal best-effort with a warning), or add a compensating rollback.

Suggested change
self.remove_service_inner(id.clone())?;
// Remove from persistent registry
if let Some(sm) = service_manager {
self.service_registry.remove(&sm)?;
}
// First remove from persistent registry to avoid leaving it stale if this fails
if let Some(sm) = service_manager {
self.service_registry.remove(&sm)?;
}
// Then remove from in-memory/DB state
self.remove_service_inner(id.clone())?;

Copilot uses AI. Check for mistakes.
Ok(())
}

/// Remove a service from in-memory state without mutating the persistent registry.
/// Used by `change_service_inner` where the ServiceManager hasn't changed.
fn remove_service_inner(&self, id: ServiceId) -> Result<(), DispatcherError> {
self.services.remove(&id)?;
self.engine_manager.engine.remove_storage(&id);
self.trigger_manager.remove_service(id.clone())?;
Expand Down Expand Up @@ -642,8 +774,9 @@ impl<S: CAStorage + 'static> Dispatcher<S> {
.store_components_for_service(&service)
.await?;

// Remove the old service - after this, no await points until the new service is added
self.remove_service(service_id.clone())?;
// Remove the old service from in-memory state only (ServiceManager hasn't changed,
// so no registry mutation needed)
self.remove_service_inner(service_id.clone())?;

// Store the service BEFORE setting up triggers/P2P subscription
// This ensures the service is in the database before any triggers can fire
Expand Down Expand Up @@ -923,4 +1056,7 @@ pub enum DispatcherError {

#[error("Cosmos query error: {0}")]
CosmosQuery(anyhow::Error),

#[error("Service registry: {0}")]
Registry(#[from] RegistryError),
}
2 changes: 1 addition & 1 deletion packages/wavs/src/http/handlers/service/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn handle_add_service_direct(

async fn add_service_direct_inner(state: HttpState, service_hash: String) -> HttpResult<()> {
let service = get_service_inner_hash(&state, service_hash).await?;
state.dispatcher.add_service_direct(service).await?;
state.dispatcher.add_service_direct(service, None).await?;

Comment on lines 65 to 68
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_add_service_direct registers a service via add_service_direct(service, None) but does not append to the persisted ServiceRegistry, so services added through this endpoint won’t be restored after restart and won’t get stable HD indices. If this endpoint is meant to survive restarts, allocate an HD index via the registry and persist it (or clearly document that /dev/services/* is non-persistent).

Copilot uses AI. Check for mistakes.
state.metrics.increment_registered_services();

Expand Down
1 change: 1 addition & 0 deletions packages/wavs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod config;
pub mod dispatcher; // where we have the high-level dispatcher
pub mod health;
pub mod http;
pub mod service_registry;
pub mod services;
pub mod subsystems; // subsystems: engine, submission, and trigger // services lookup

Expand Down
Loading