diff --git a/Cargo.lock b/Cargo.lock index 28981c0d8df..076481e3760 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5777,7 +5777,6 @@ dependencies = [ "dirs 5.0.1", "fastcrypto", "iota-genesis-common", - "iota-grpc-api", "iota-keys", "iota-names", "iota-rest-api", @@ -5830,8 +5829,6 @@ dependencies = [ "iota-execution", "iota-framework", "iota-genesis-builder", - "iota-grpc-api", - "iota-grpc-types", "iota-json-rpc-types", "iota-macros", "iota-metrics", @@ -6396,12 +6393,18 @@ dependencies = [ "anyhow", "async-stream", "bcs", + "fastcrypto", "futures", "iota-config", + "iota-core", "iota-grpc-types", + "iota-json-rpc", "iota-json-rpc-types", + "iota-metrics", + "iota-storage", "iota-types", "move-core-types", + "prometheus", "prost", "serde", "serde_json", @@ -7793,7 +7796,6 @@ dependencies = [ "iota-config", "iota-execution", "iota-genesis-builder", - "iota-grpc-api", "iota-macros", "iota-names", "iota-protocol-config", diff --git a/crates/iota-config/Cargo.toml b/crates/iota-config/Cargo.toml index da9fd3b2087..9db949cae3a 100644 --- a/crates/iota-config/Cargo.toml +++ b/crates/iota-config/Cargo.toml @@ -30,7 +30,6 @@ tracing.workspace = true # internal dependencies consensus-config.workspace = true iota-genesis-common.workspace = true -iota-grpc-api.workspace = true iota-keys.workspace = true iota-names.workspace = true iota-rest-api.workspace = true diff --git a/crates/iota-config/src/node.rs b/crates/iota-config/src/node.rs index 50f4c654b02..766bdd5825d 100644 --- a/crates/iota-config/src/node.rs +++ b/crates/iota-config/src/node.rs @@ -266,7 +266,47 @@ pub struct NodeConfig { default = "default_grpc_api_config", skip_serializing_if = "Option::is_none" )] - pub grpc_api_config: Option, + pub grpc_api_config: Option, +} + +/// Configuration for the gRPC API service +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct GrpcApiConfig { + /// The address to bind the gRPC server to + #[serde(default = "default_grpc_api_address")] + pub address: std::net::SocketAddr, + + /// Buffer size for broadcast channels used for checkpoint streaming + #[serde(default = "default_checkpoint_broadcast_buffer_size")] + pub checkpoint_broadcast_buffer_size: usize, + + /// Buffer size for broadcast channels used for event streaming + #[serde(default = "default_event_broadcast_buffer_size")] + pub event_broadcast_buffer_size: usize, +} + +impl Default for GrpcApiConfig { + fn default() -> Self { + Self { + address: default_grpc_api_address(), + checkpoint_broadcast_buffer_size: default_checkpoint_broadcast_buffer_size(), + event_broadcast_buffer_size: default_event_broadcast_buffer_size(), + } + } +} + +fn default_grpc_api_address() -> std::net::SocketAddr { + use std::net::{IpAddr, Ipv4Addr}; + std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 50051) +} + +fn default_checkpoint_broadcast_buffer_size() -> usize { + 100 +} + +fn default_event_broadcast_buffer_size() -> usize { + 1000 } #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)] @@ -576,7 +616,7 @@ pub fn default_json_rpc_address() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9000) } -pub fn default_grpc_api_config() -> Option { +pub fn default_grpc_api_config() -> Option { None } diff --git a/crates/iota-core/Cargo.toml b/crates/iota-core/Cargo.toml index 7c9f44bff73..10bdde0089b 100644 --- a/crates/iota-core/Cargo.toml +++ b/crates/iota-core/Cargo.toml @@ -68,8 +68,6 @@ iota-config.workspace = true iota-execution.workspace = true iota-framework.workspace = true iota-genesis-builder.workspace = true -iota-grpc-api.workspace = true -iota-grpc-types.workspace = true iota-json-rpc-types.workspace = true iota-macros.workspace = true iota-metrics.workspace = true diff --git a/crates/iota-core/src/storage.rs b/crates/iota-core/src/storage.rs index db1d6c0686e..6ef6e23ba21 100644 --- a/crates/iota-core/src/storage.rs +++ b/crates/iota-core/src/storage.rs @@ -28,7 +28,7 @@ use tap::Pipe; use tracing::instrument; use crate::{ - authority::AuthorityState, + authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore}, checkpoints::CheckpointStore, epoch::committee_store::CommitteeStore, execution_cache::ExecutionCacheTraitPointers, @@ -380,6 +380,16 @@ impl RestReadStore { iota_types::storage::error::Error::custom("rest index store is disabled") }) } + + /// Get access to the underlying AuthorityState + pub fn authority_state(&self) -> &Arc { + &self.state + } + + /// Load epoch store for transaction processing + pub fn load_epoch_store_one_call_per_task(&self) -> Arc { + self.state.load_epoch_store_one_call_per_task().clone() + } } impl ObjectStore for RestReadStore { @@ -534,6 +544,10 @@ impl RestStateReader for RestReadStore { fn indexes(&self) -> Option<&dyn RestIndexes> { self.index().ok().map(|index| index as _) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl RestIndexes for RestIndexStore { diff --git a/crates/iota-core/src/subscription_handler.rs b/crates/iota-core/src/subscription_handler.rs index 020a597abf9..05c266ec748 100644 --- a/crates/iota-core/src/subscription_handler.rs +++ b/crates/iota-core/src/subscription_handler.rs @@ -4,7 +4,6 @@ use std::sync::Arc; -use iota_grpc_api::EventSubscriber; use iota_json_rpc_types::{ EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter, @@ -14,7 +13,6 @@ use prometheus::{ IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, }; -use tokio_stream::Stream; use tracing::{error, instrument, trace}; use crate::streamer::Streamer; @@ -114,24 +112,17 @@ impl SubscriptionHandler { Ok(()) } - pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream { - self.event_streamer.subscribe(filter) + pub fn subscribe_events( + &self, + filter: EventFilter, + ) -> Box + Send + Unpin> { + Box::new(Box::pin(self.event_streamer.subscribe(filter))) } pub fn subscribe_transactions( &self, filter: TransactionFilter, - ) -> impl Stream { - self.transaction_streamer.subscribe(filter) - } -} - -// Implement EventSubscriber trait for gRPC integration -impl EventSubscriber for SubscriptionHandler { - fn subscribe_events( - &self, - filter: EventFilter, - ) -> Box + Send + Unpin> { - Box::new(Box::pin(self.event_streamer.subscribe(filter))) + ) -> Box + Send + Unpin> { + Box::new(Box::pin(self.transaction_streamer.subscribe(filter))) } } diff --git a/crates/iota-grpc-api/Cargo.toml b/crates/iota-grpc-api/Cargo.toml index c4a4e6ba2b2..fe6cd8a13b9 100644 --- a/crates/iota-grpc-api/Cargo.toml +++ b/crates/iota-grpc-api/Cargo.toml @@ -14,6 +14,7 @@ workspace = true anyhow.workspace = true async-stream = "0.3" bcs.workspace = true +fastcrypto.workspace = true futures.workspace = true prost.workspace = true serde.workspace = true @@ -25,8 +26,13 @@ tonic.workspace = true tracing.workspace = true # internal dependencies +iota-config.workspace = true +iota-core.workspace = true iota-grpc-types.workspace = true +iota-json-rpc.workspace = true iota-json-rpc-types.workspace = true +iota-metrics.workspace = true +iota-storage.workspace = true iota-types.workspace = true move-core-types.workspace = true @@ -34,5 +40,9 @@ move-core-types.workspace = true tonic-build.workspace = true [dev-dependencies] +# external dependencies +prometheus.workspace = true + +# internal dependencies iota-config.workspace = true test-cluster.workspace = true diff --git a/crates/iota-grpc-api/build.rs b/crates/iota-grpc-api/build.rs index 31eb7ee8aeb..888b9f99ec3 100644 --- a/crates/iota-grpc-api/build.rs +++ b/crates/iota-grpc-api/build.rs @@ -8,6 +8,9 @@ fn main() { "proto/common.proto", "proto/checkpoint.proto", "proto/event.proto", + "proto/transaction.proto", + "proto/read.proto", + "proto/write.proto", ], &["proto/"], ) diff --git a/crates/iota-grpc-api/proto/checkpoint.proto b/crates/iota-grpc-api/proto/checkpoint.proto index 35e92f87bdd..d1a93d669ed 100644 --- a/crates/iota-grpc-api/proto/checkpoint.proto +++ b/crates/iota-grpc-api/proto/checkpoint.proto @@ -11,6 +11,7 @@ service CheckpointService { // Checkpoint operations rpc StreamCheckpoints (CheckpointStreamRequest) returns (stream Checkpoint); rpc GetEpochFirstCheckpointSequenceNumber (EpochRequest) returns (CheckpointSequenceNumberResponse); + rpc GetLatestCheckpoint (GetLatestCheckpointRequest) returns (Checkpoint); } message CheckpointStreamRequest { @@ -28,6 +29,12 @@ message CheckpointSequenceNumberResponse { uint64 sequence_number = 1; } +// Get the latest checkpoint information +message GetLatestCheckpointRequest { + // If true, get the full CheckpointData (not just the summary). + bool full = 1; +} + message Checkpoint { uint64 sequence_number = 1; // Indicates whether bcs_data contains full CheckpointData (true) or just CertifiedCheckpointSummary (false) diff --git a/crates/iota-grpc-api/proto/common.proto b/crates/iota-grpc-api/proto/common.proto index bce0d44fe16..28d598aa184 100644 --- a/crates/iota-grpc-api/proto/common.proto +++ b/crates/iota-grpc-api/proto/common.proto @@ -15,7 +15,47 @@ message Address { bytes address = 1; } -// 32-byte Transaction digest -message TransactionDigest { +// Generic 32-byte digest (used for transactions, objects, etc.) +message Digest { bytes digest = 1; } + +// Match all events (catch-all filter) +message AllFilter { +} + +// Filter by address (sender, package, etc.) +message AddressFilter { + Address address = 1; +} + +// Filter by transaction digest +message TransactionDigestFilter { + Digest tx_digest = 1; +} + +// Filter by Move module (package + module) +message MoveModuleFilter { + Address package_id = 1; // Package ID + string module = 2; // Module name +} + +// Filter by Move event type (package + module + event name) +message MoveEventTypeFilter { + Address package_id = 1; // Package ID + string module = 2; // Module name + string name = 3; // Event name +} + +// Filter by Move event module (package + module) +message MoveEventModuleFilter { + Address package_id = 1; // Package ID + string module = 2; // Module name +} + +// Filter by Move function (package + module + function) +message MoveFunctionFilter { + Address package_id = 1; // Package ID + optional string module = 2; // Module name (optional) + optional string function = 3; // Function name (optional) +} \ No newline at end of file diff --git a/crates/iota-grpc-api/proto/event.proto b/crates/iota-grpc-api/proto/event.proto index 629d25e334a..4f91db5d3d3 100644 --- a/crates/iota-grpc-api/proto/event.proto +++ b/crates/iota-grpc-api/proto/event.proto @@ -28,58 +28,24 @@ message Event { message EventID { uint64 event_seq = 1; - iota.grpc.common.TransactionDigest tx_digest = 2; + iota.grpc.common.Digest tx_digest = 2; } // Rich event filter that supports gRPC event filtering message EventFilter { oneof filter { - AllFilter all = 1; - SenderFilter sender = 2; - TransactionFilter transaction = 3; - MoveModuleFilter move_module = 4; - MoveEventTypeFilter move_event_type = 5; - MoveEventModuleFilter move_event_module = 6; + iota.grpc.common.AllFilter all = 1; + iota.grpc.common.AddressFilter sender = 2; + iota.grpc.common.TransactionDigestFilter transaction = 3; + iota.grpc.common.MoveModuleFilter move_module = 4; + iota.grpc.common.MoveEventTypeFilter move_event_type = 5; + iota.grpc.common.MoveEventModuleFilter move_event_module = 6; TimeRangeFilter time_range = 7; } } -// Match all events (catch-all filter) -message AllFilter { - // Empty - matches all events -} - -// Filter by sender address -message SenderFilter { - iota.grpc.common.Address sender = 1; // Sender address -} - -// Filter by transaction digest -message TransactionFilter { - iota.grpc.common.TransactionDigest tx_digest = 1; // Transaction digest -} - -// Filter by transaction execution module (different from event definition module) -message MoveModuleFilter { - iota.grpc.common.Address package_id = 1; // Package ID - string module = 2; // Module name -} - -// Filter by Move event type (package::module::event_name) -message MoveEventTypeFilter { - iota.grpc.common.Address package_id = 1; // Package ID - string module = 2; // Module name (e.g., "request") - string name = 3; // Event name (e.g., "RequestEvent") -} - -// Filter by package and module -message MoveEventModuleFilter { - iota.grpc.common.Address package_id = 1; // Package ID - string module = 2; // Module name -} - // Filter by timestamp range message TimeRangeFilter { uint64 start_time = 1; // Start time in milliseconds since epoch (inclusive) uint64 end_time = 2; // End time in milliseconds since epoch (exclusive) -} +} \ No newline at end of file diff --git a/crates/iota-grpc-api/proto/read.proto b/crates/iota-grpc-api/proto/read.proto new file mode 100644 index 00000000000..4ddff72c64e --- /dev/null +++ b/crates/iota-grpc-api/proto/read.proto @@ -0,0 +1,35 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package iota.grpc.read; + +import "common.proto"; + +service ReadService { + rpc GetObject (GetObjectRequest) returns (GetObjectResponse); +} + +// GetObject +message GetObjectRequest { + // The ID of the queried object + iota.grpc.common.Address object_id = 1; + // Options for specifying the content to be returned + optional ObjectDataOptions options = 2; +} + +message ObjectDataOptions { + bool show_type = 1; + bool show_owner = 2; + bool show_previous_transaction = 3; + bool show_display = 4; + bool show_content = 5; + bool show_bcs = 6; + bool show_storage_rebate = 7; +} + +message GetObjectResponse { + optional string json_data = 1; // JSON-serialized IotaObjectData (success) + optional string json_error = 2; // JSON-serialized IotaObjectResponseError (error) +} diff --git a/crates/iota-grpc-api/proto/transaction.proto b/crates/iota-grpc-api/proto/transaction.proto new file mode 100644 index 00000000000..b8ca3010253 --- /dev/null +++ b/crates/iota-grpc-api/proto/transaction.proto @@ -0,0 +1,85 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package iota.grpc.transactions; + +import "common.proto"; + +// Transaction subscription service +service TransactionService { + rpc StreamTransactions (TransactionStreamRequest) returns (stream Transaction); +} + +// Request to stream transactions with optional filtering +message TransactionStreamRequest { + TransactionFilter filter = 1; +} + +// Transaction data +message Transaction { + string json_data = 1; // JSON-serialized IotaTransactionBlockEffects +} + +// Rich transaction filter that supports gRPC transaction filtering +message TransactionFilter { + oneof filter { + iota.grpc.common.AllFilter all = 1; + CheckpointFilter checkpoint = 2; + iota.grpc.common.MoveFunctionFilter move_function = 3; + InputObjectFilter input_object = 4; + ChangedObjectFilter changed_object = 5; + iota.grpc.common.AddressFilter from_address = 6; + iota.grpc.common.AddressFilter to_address = 7; + FromAndToAddressFilter from_and_to_address = 8; + iota.grpc.common.AddressFilter from_or_to_address = 9; + TransactionKindFilter transaction_kind = 10; + TransactionKindInFilter transaction_kind_in = 11; + } +} + +// Filter by checkpoint sequence number +message CheckpointFilter { + uint64 checkpoint_sequence_number = 1; +} + +// Filter by input object ID +message InputObjectFilter { + iota.grpc.common.Address object_id = 1; +} + +// Filter by changed object ID (created, mutated, unwrapped) +message ChangedObjectFilter { + iota.grpc.common.Address object_id = 1; +} + + +// Filter by sender and recipient addresses +message FromAndToAddressFilter { + iota.grpc.common.Address from_address = 1; + iota.grpc.common.Address to_address = 2; +} + + +// Filter by transaction kind +message TransactionKindFilter { + TransactionKind kind = 1; +} + +// Filter by transaction kinds (any of the specified kinds) +message TransactionKindInFilter { + repeated TransactionKind kinds = 1; +} + +// Transaction kinds +enum TransactionKind { + SYSTEM_TRANSACTION = 0; + PROGRAMMABLE_TRANSACTION = 1; + GENESIS = 2; + CONSENSUS_COMMIT_PROLOGUE_V1 = 3; + AUTHENTICATOR_STATE_UPDATE_V1 = 4; + RANDOMNESS_STATE_UPDATE = 5; + END_OF_EPOCH_TRANSACTION = 6; +} + diff --git a/crates/iota-grpc-api/proto/write.proto b/crates/iota-grpc-api/proto/write.proto new file mode 100644 index 00000000000..75ceca5afcc --- /dev/null +++ b/crates/iota-grpc-api/proto/write.proto @@ -0,0 +1,50 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package iota.grpc.write; + +import "common.proto"; + +service WriteService { + rpc ExecuteTransaction (ExecuteTransactionRequest) returns (ExecuteTransactionResponse); +} + +// ExecuteTransaction +message ExecuteTransactionRequest { + // BCS serialized transaction data bytes + bytes tx_bytes = 1; + // List of signatures + repeated bytes signatures = 2; + // Options for specifying the content to be returned + optional TransactionResponseOptions options = 3; + // The request type + optional ExecuteTransactionRequestType request_type = 4; +} + +message TransactionResponseOptions { + ///Whether to show transaction input data + bool show_input = 1; + /// Whether to show bcs-encoded transaction input data + bool show_raw_input = 2; + // Whether to show transaction effects + bool show_effects = 3; + // Whether to show transaction events + bool show_events = 4; + // Whether to show object changes + bool show_object_changes = 5; + // Whether to show balance changes + bool show_balance_changes = 6; + // Whether to show raw transaction effects + bool show_raw_effects = 7; +} + +enum ExecuteTransactionRequestType { + WAIT_FOR_EFFECTS_CERT = 0; + WAIT_FOR_LOCAL_EXECUTION = 1; +} + +message ExecuteTransactionResponse { + string json_data = 1; // JSON-serialized IotaTransactionBlockResponse +} diff --git a/crates/iota-grpc-api/src/checkpoint_service.rs b/crates/iota-grpc-api/src/checkpoint_service.rs index ad0d238cae0..4a8fd09bc6f 100644 --- a/crates/iota-grpc-api/src/checkpoint_service.rs +++ b/crates/iota-grpc-api/src/checkpoint_service.rs @@ -5,12 +5,12 @@ use std::{pin::Pin, sync::Arc}; use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status}; -use tracing::{debug, info}; +use tracing::debug; use crate::{ - checkpoint::{ - CheckpointSequenceNumberResponse, CheckpointStreamRequest, EpochRequest, - checkpoint_service_server::CheckpointService, + checkpoints::{ + Checkpoint, CheckpointSequenceNumberResponse, CheckpointStreamRequest, EpochRequest, + GetLatestCheckpointRequest, checkpoint_service_server::CheckpointService, }, types::*, }; @@ -75,7 +75,7 @@ impl CheckpointGrpcService { #[tonic::async_trait] impl CheckpointService for CheckpointGrpcService { type StreamCheckpointsStream = - Pin> + Send>>; + Pin> + Send>>; async fn stream_checkpoints( &self, @@ -130,13 +130,63 @@ impl CheckpointService for CheckpointGrpcService { } }; - info!( - "First checkpoint for epoch {}: seq={}", - epoch, sequence_number - ); + debug!("First checkpoint for epoch {epoch}: seq={sequence_number}"); Ok(Response::new(CheckpointSequenceNumberResponse { sequence_number, })) } + + async fn get_latest_checkpoint( + &self, + request: Request, + ) -> Result, Status> { + debug!("get_latest_checkpoint called"); + let req = request.into_inner(); + + // Get the latest checkpoint sequence number + let sequence_number = self + .reader + .get_latest_checkpoint_sequence_number() + .ok_or_else(|| Status::not_found("No checkpoints available"))?; + + // Create checkpoint response using the same structure as checkpoint streaming + let checkpoint = if req.full { + // Get full checkpoint data + if let Some(data) = self.reader.get_full_checkpoint_data(sequence_number) { + let grpc_data = iota_grpc_types::CheckpointData::from(data); + Checkpoint { + sequence_number, + is_full: true, + bcs_data: Some( + crate::common::BcsData::serialize_from(&grpc_data).map_err(|e| { + Status::internal(format!("BCS serialization failed: {e}")) + })?, + ), + } + } else { + return Err(Status::not_found("Checkpoint data not found")); + } + } else { + // Get checkpoint summary + if let Some(summary) = self.reader.get_checkpoint_summary(sequence_number) { + let grpc_summary = iota_grpc_types::CertifiedCheckpointSummary::from(summary); + Checkpoint { + sequence_number, + is_full: false, + bcs_data: Some( + crate::common::BcsData::serialize_from(&grpc_summary).map_err(|e| { + Status::internal(format!("BCS serialization failed: {e}")) + })?, + ), + } + } else { + return Err(Status::not_found("Checkpoint summary not found")); + } + }; + + debug!("Latest checkpoint: {sequence_number}"); + + Ok(Response::new(checkpoint)) + } } diff --git a/crates/iota-grpc-api/src/client/checkpoint.rs b/crates/iota-grpc-api/src/client/checkpoint.rs index 90bf65170a0..d53e8f285fd 100644 --- a/crates/iota-grpc-api/src/client/checkpoint.rs +++ b/crates/iota-grpc-api/src/client/checkpoint.rs @@ -5,7 +5,9 @@ use futures::{Stream, StreamExt}; use iota_grpc_types::{CertifiedCheckpointSummary, CheckpointData}; use tonic::transport::Channel; -use crate::checkpoint::checkpoint_service_client::CheckpointServiceClient; +use crate::checkpoints::{ + GetLatestCheckpointRequest, checkpoint_service_client::CheckpointServiceClient, +}; /// Enum representing the content of a checkpoint, either full data or summary. #[derive(Debug, Clone)] @@ -46,7 +48,7 @@ impl CheckpointClient { end_sequence_number: Option, full: bool, ) -> Result>, tonic::Status> { - let request = crate::checkpoint::CheckpointStreamRequest { + let request = crate::checkpoints::CheckpointStreamRequest { start_sequence_number, end_sequence_number, full, @@ -67,7 +69,7 @@ impl CheckpointClient { &mut self, epoch: u64, ) -> Result { - let request = crate::checkpoint::EpochRequest { epoch }; + let request = crate::checkpoints::EpochRequest { epoch }; let response = self .client .get_epoch_first_checkpoint_sequence_number(request) @@ -75,6 +77,19 @@ impl CheckpointClient { Ok(response.into_inner().sequence_number) } + /// Get the latest checkpoint. + pub async fn get_latest_checkpoint( + &mut self, + full: bool, + ) -> Result { + let request = GetLatestCheckpointRequest { full }; + let response = self.client.get_latest_checkpoint(request).await?; + let checkpoint = response.into_inner(); + + Self::deserialize_checkpoint(&checkpoint) + .map_err(|e| tonic::Status::internal(format!("Failed to deserialize checkpoint: {e}"))) + } + // ======================================== // Private Helper Methods // ======================================== @@ -83,7 +98,7 @@ impl CheckpointClient { /// summary). Returns either checkpoint data or summary depending on the /// checkpoint type. fn deserialize_checkpoint( - checkpoint: &crate::checkpoint::Checkpoint, + checkpoint: &crate::checkpoints::Checkpoint, ) -> anyhow::Result { let bcs_data = checkpoint .bcs_data diff --git a/crates/iota-grpc-api/src/client/mod.rs b/crates/iota-grpc-api/src/client/mod.rs index 64fdca400c8..877fe700a33 100644 --- a/crates/iota-grpc-api/src/client/mod.rs +++ b/crates/iota-grpc-api/src/client/mod.rs @@ -6,7 +6,13 @@ mod checkpoint; mod event; mod node_client; +mod read; +mod transaction; +mod write; pub use checkpoint::{CheckpointClient, CheckpointContent}; pub use event::EventClient; pub use node_client::NodeClient; +pub use read::ReadClient; +pub use transaction::TransactionClient; +pub use write::WriteClient; diff --git a/crates/iota-grpc-api/src/client/node_client.rs b/crates/iota-grpc-api/src/client/node_client.rs index 20c49608ad8..9c448c6d382 100644 --- a/crates/iota-grpc-api/src/client/node_client.rs +++ b/crates/iota-grpc-api/src/client/node_client.rs @@ -5,7 +5,10 @@ use std::sync::OnceLock; use tonic::transport::Channel; -use super::{checkpoint::CheckpointClient, event::EventClient}; +use super::{ + checkpoint::CheckpointClient, event::EventClient, read::ReadClient, + transaction::TransactionClient, write::WriteClient, +}; /// gRPC client factory for IOTA node operations. pub struct NodeClient { @@ -15,6 +18,12 @@ pub struct NodeClient { checkpoint_client: OnceLock, /// Cached event client (singleton) event_client: OnceLock, + /// Cached transaction client (singleton) + transaction_client: OnceLock, + /// Cached read client (singleton) + read_client: OnceLock, + /// Cached write client (singleton) + write_client: OnceLock, } impl NodeClient { @@ -26,6 +35,9 @@ impl NodeClient { channel, checkpoint_client: OnceLock::new(), event_client: OnceLock::new(), + transaction_client: OnceLock::new(), + read_client: OnceLock::new(), + write_client: OnceLock::new(), }) } @@ -70,4 +82,49 @@ impl NodeClient { .clone(), ) } + + /// Get a transaction service client. + /// + /// Returns `Some(TransactionClient)` if the node supports transaction + /// streaming operations, `None` otherwise. The client is created only + /// once and cached for subsequent calls. + pub fn transaction_client(&self) -> Option { + // For now, always return Some since transaction service is always available + // In the future, this could check node capabilities first + Some( + self.transaction_client + .get_or_init(|| TransactionClient::new(self.channel.clone())) + .clone(), + ) + } + + /// Get a read service client. + /// + /// Returns `Some(ReadClient)` if the node supports read operations, + /// `None` otherwise. The client is created only once and cached for + /// subsequent calls. + pub fn read_client(&self) -> Option { + // For now, always return Some since read service is always available + // In the future, this could check node capabilities first + Some( + self.read_client + .get_or_init(|| ReadClient::new(self.channel.clone())) + .clone(), + ) + } + + /// Get a write service client. + /// + /// Returns `Some(WriteClient)` if the node supports write operations, + /// `None` otherwise. The client is created only once and cached for + /// subsequent calls. + pub fn write_client(&self) -> Option { + // For now, always return Some since write service is always available + // In the future, this could check node capabilities first + Some( + self.write_client + .get_or_init(|| WriteClient::new(self.channel.clone())) + .clone(), + ) + } } diff --git a/crates/iota-grpc-api/src/client/read.rs b/crates/iota-grpc-api/src/client/read.rs new file mode 100644 index 00000000000..58fd4a7d327 --- /dev/null +++ b/crates/iota-grpc-api/src/client/read.rs @@ -0,0 +1,86 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use iota_json_rpc_types::{IotaObjectData, IotaObjectDataOptions, IotaObjectResponse}; +use iota_types::{base_types::ObjectID, error::IotaObjectResponseError}; +use tonic::{Status, transport::Channel}; + +use crate::{ + common::Address, + read::{GetObjectRequest, ObjectDataOptions, read_service_client::ReadServiceClient}, +}; + +/// Dedicated client for read-related gRPC operations. +#[derive(Clone)] +pub struct ReadClient { + client: ReadServiceClient, +} + +impl ReadClient { + /// Create a new ReadClient from a shared gRPC channel. + pub(super) fn new(channel: Channel) -> Self { + Self { + client: ReadServiceClient::new(channel), + } + } + + /// Get an object by ID with specified options. + /// + /// # Arguments + /// * `object_id` - The object ID to retrieve + /// * `options` - Options for what data to include in the response + /// + /// # Returns + /// Result containing IotaObjectResponse + pub async fn get_object( + &mut self, + object_id: ObjectID, + options: Option, + ) -> Result { + // Convert to gRPC request format + let grpc_request = GetObjectRequest { + object_id: Some(Address { + address: object_id.into_bytes().to_vec(), + }), + options: options.map(|opts| ObjectDataOptions { + show_type: opts.show_type, + show_owner: opts.show_owner, + show_previous_transaction: opts.show_previous_transaction, + show_display: opts.show_display, + show_content: opts.show_content, + show_bcs: opts.show_bcs, + show_storage_rebate: opts.show_storage_rebate, + }), + }; + + // Make gRPC call + let response = self.client.get_object(grpc_request).await?; + + let grpc_response = response.into_inner(); + + // Deserialize JSON response + Self::deserialize_response(&grpc_response) + } + + /// Deserialize JSON response into IotaObjectResponse + fn deserialize_response( + response: &crate::read::GetObjectResponse, + ) -> Result { + // Check for success data first + if let Some(json_data) = &response.json_data { + // Deserialize success data + let object_data: IotaObjectData = serde_json::from_str(json_data).map_err(|e| { + Status::internal(format!("Failed to deserialize object data from JSON: {e}")) + })?; + Ok(IotaObjectResponse::new_with_data(object_data)) + } else if let Some(json_error) = &response.json_error { + // Deserialize error + let error: IotaObjectResponseError = serde_json::from_str(json_error).map_err(|e| { + Status::internal(format!("Failed to deserialize error from JSON: {e}")) + })?; + Ok(IotaObjectResponse::new_with_error(error)) + } else { + Err(Status::internal("Response contains neither data nor error")) + } + } +} diff --git a/crates/iota-grpc-api/src/client/transaction.rs b/crates/iota-grpc-api/src/client/transaction.rs new file mode 100644 index 00000000000..1cd3f8654e6 --- /dev/null +++ b/crates/iota-grpc-api/src/client/transaction.rs @@ -0,0 +1,69 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use futures::{Stream, StreamExt}; +use iota_json_rpc_types::IotaTransactionBlockEffects; +use tonic::transport::Channel; + +use crate::transactions::{ + Transaction, TransactionStreamRequest, transaction_service_client::TransactionServiceClient, +}; + +/// Dedicated client for transaction-related gRPC operations. +/// +/// This client handles all transaction service interactions including streaming +/// transactions with filtering capabilities. +#[derive(Clone)] +pub struct TransactionClient { + client: TransactionServiceClient, +} + +impl TransactionClient { + /// Create a new TransactionClient from a shared gRPC channel. + pub(super) fn new(channel: Channel) -> Self { + Self { + client: TransactionServiceClient::new(channel), + } + } + + /// Stream transaction effects with automatic BCS deserialization and + /// filtering. + /// + /// # Arguments + /// * `filter` - Transaction filter to apply to the stream + /// + /// # Returns + /// A stream of IOTA transaction effects that match the specified filter + pub async fn stream_transactions( + &mut self, + filter: crate::transactions::TransactionFilter, + ) -> Result>, tonic::Status> + { + let request = TransactionStreamRequest { + filter: Some(filter), + }; + let response = self.client.stream_transactions(request).await?; + let stream = response.into_inner(); + + // Transform the stream to deserialize Transaction protobuf messages + // into native IotaTransactionBlockEffects + Ok(stream.map(|transaction_result| { + transaction_result.and_then(|transaction| { + Self::deserialize_transaction(&transaction).map_err(|e| { + tonic::Status::internal(format!("Failed to deserialize transaction: {e}")) + }) + }) + })) + } + + /// Deserialize transaction effects from JSON-serialized data. + fn deserialize_transaction(tx: &Transaction) -> anyhow::Result { + // Extract JSON string directly + let json_data = &tx.json_data; + + // Deserialize directly from JSON string + serde_json::from_str(json_data) + .map_err(|e| anyhow!("Failed to deserialize transaction effects from JSON: {e}")) + } +} diff --git a/crates/iota-grpc-api/src/client/write.rs b/crates/iota-grpc-api/src/client/write.rs new file mode 100644 index 00000000000..fbb4ff8bca4 --- /dev/null +++ b/crates/iota-grpc-api/src/client/write.rs @@ -0,0 +1,53 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use iota_json_rpc_types::IotaTransactionBlockResponse; +use tonic::{Status, transport::Channel}; + +use crate::write::{ + ExecuteTransactionRequest, ExecuteTransactionResponse, write_service_client::WriteServiceClient, +}; + +/// Dedicated client for write-related gRPC operations. +#[derive(Clone)] +pub struct WriteClient { + client: WriteServiceClient, +} + +impl WriteClient { + /// Create a new WriteClient from a shared gRPC channel. + pub(super) fn new(channel: Channel) -> Self { + Self { + client: WriteServiceClient::new(channel), + } + } + + /// Execute a transaction with specified options. + pub async fn execute_transaction( + &mut self, + request: ExecuteTransactionRequest, + ) -> Result { + // Make gRPC call + let response = self.client.execute_transaction(request).await?; + + let grpc_response = response.into_inner(); + + // Deserialize JSON response + Self::deserialize_response(&grpc_response) + } + + /// Deserialize JSON response into IotaTransactionBlockResponse + fn deserialize_response( + response: &ExecuteTransactionResponse, + ) -> Result { + // Extract JSON string directly + let json_data = &response.json_data; + + // Deserialize directly from JSON string + serde_json::from_str(json_data).map_err(|e| { + Status::internal(format!( + "Failed to deserialize transaction response from JSON: {e}" + )) + }) + } +} diff --git a/crates/iota-grpc-api/src/config.rs b/crates/iota-grpc-api/src/config.rs deleted file mode 100644 index 5639a05be3b..00000000000 --- a/crates/iota-grpc-api/src/config.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) 2025 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -/// Configuration for the gRPC API service -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -#[serde(rename_all = "kebab-case")] -pub struct Config { - /// The address to bind the gRPC server to - #[serde(default = "default_grpc_api_address")] - pub address: std::net::SocketAddr, - - /// Buffer size for broadcast channels used for checkpoint streaming - #[serde(default = "default_checkpoint_broadcast_buffer_size")] - pub checkpoint_broadcast_buffer_size: usize, - - /// Buffer size for broadcast channels used for event streaming - #[serde(default = "default_event_broadcast_buffer_size")] - pub event_broadcast_buffer_size: usize, -} - -impl Default for Config { - fn default() -> Self { - Self { - address: default_grpc_api_address(), - checkpoint_broadcast_buffer_size: default_checkpoint_broadcast_buffer_size(), - event_broadcast_buffer_size: default_event_broadcast_buffer_size(), - } - } -} - -fn default_grpc_api_address() -> std::net::SocketAddr { - use std::net::{IpAddr, Ipv4Addr}; - std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 50051) -} - -fn default_checkpoint_broadcast_buffer_size() -> usize { - 100 -} - -fn default_event_broadcast_buffer_size() -> usize { - 1000 -} diff --git a/crates/iota-grpc-api/src/event_service.rs b/crates/iota-grpc-api/src/event_service.rs index 066d16a3664..68bd074b9e4 100644 --- a/crates/iota-grpc-api/src/event_service.rs +++ b/crates/iota-grpc-api/src/event_service.rs @@ -4,6 +4,7 @@ use std::{str::FromStr, sync::Arc}; use futures::StreamExt; +use iota_core::subscription_handler::SubscriptionHandler; use iota_json_rpc_types::{EventFilter, IotaEvent}; use iota_types::{ base_types::{IotaAddress, ObjectID}, @@ -15,18 +16,18 @@ use tonic::{Request, Response, Status}; use tracing::debug; use crate::{ + common::{Address, BcsData, Digest}, events::{Event, EventId, EventStreamRequest, event_service_server::EventService}, - types::EventSubscriber, }; pub struct EventGrpcService { - pub event_subscriber: Arc, + pub event_subscriber: Arc, pub cancellation_token: CancellationToken, } impl EventGrpcService { pub fn new( - event_subscriber: Arc, + event_subscriber: Arc, cancellation_token: CancellationToken, ) -> Self { Self { @@ -108,7 +109,7 @@ fn create_event_filter(proto_filter: &crate::events::EventFilter) -> Result Ok(EventFilter::All(vec![])), Some(Filter::Sender(f)) => { - let sender = parse_iota_address(&f.sender, "Sender address")?; + let sender = parse_iota_address(&f.address, "Sender address")?; Ok(EventFilter::Sender(sender)) } Some(Filter::Transaction(f)) => { @@ -148,10 +149,7 @@ fn create_event_filter(proto_filter: &crate::events::EventFilter) -> Result, - field_name: &str, -) -> Result { +fn parse_object_id(address: &Option
, field_name: &str) -> Result { let address = address .as_ref() .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?; @@ -164,10 +162,7 @@ fn parse_identifier(id_str: &str, field_name: &str) -> Result, - field_name: &str, -) -> Result { +fn parse_iota_address(address: &Option
, field_name: &str) -> Result { let address = address .as_ref() .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?; @@ -175,10 +170,7 @@ fn parse_iota_address( .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}"))) } -fn parse_tx_digest( - digest: &Option, - field_name: &str, -) -> Result { +fn parse_tx_digest(digest: &Option, field_name: &str) -> Result { let digest = digest .as_ref() .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?; @@ -192,21 +184,21 @@ impl From<&IotaEvent> for Event { Event { event_id: Some(EventId { event_seq: event.id.event_seq, - tx_digest: Some(crate::common::TransactionDigest { + tx_digest: Some(Digest { digest: event.id.tx_digest.into_inner().to_vec(), }), }), - package_id: Some(crate::common::Address { + package_id: Some(Address { address: event.package_id.to_vec(), }), transaction_module: event.transaction_module.to_string(), - sender: Some(crate::common::Address { + sender: Some(Address { address: event.sender.to_vec(), }), type_name: event.type_.to_string(), parsed_json: event.parsed_json.to_string(), timestamp_ms: event.timestamp_ms, - event_data: Some(crate::common::BcsData { + event_data: Some(BcsData { data: event.bcs.bytes().to_vec(), }), } diff --git a/crates/iota-grpc-api/src/lib.rs b/crates/iota-grpc-api/src/lib.rs index a12b22366b5..e3d3a93162e 100644 --- a/crates/iota-grpc-api/src/lib.rs +++ b/crates/iota-grpc-api/src/lib.rs @@ -6,7 +6,15 @@ pub mod common { tonic::include_proto!("iota.grpc.common"); } -pub mod checkpoint { +pub mod read { + tonic::include_proto!("iota.grpc.read"); +} + +pub mod write { + tonic::include_proto!("iota.grpc.write"); +} + +pub mod checkpoints { tonic::include_proto!("iota.grpc.checkpoints"); } @@ -14,22 +22,32 @@ pub mod events { tonic::include_proto!("iota.grpc.events"); } +pub mod transactions { + tonic::include_proto!("iota.grpc.transactions"); +} + // Modules pub mod checkpoint_service; pub mod client; -pub mod config; pub mod event_service; +pub mod read_service; pub mod server; +pub mod transaction_service; pub mod types; +pub mod write_service; // Re-export commonly used types and traits pub use checkpoint_service::CheckpointGrpcService; -pub use client::{CheckpointClient, CheckpointContent, EventClient, NodeClient}; -pub use config::Config; +pub use client::{ + CheckpointClient, CheckpointContent, EventClient, NodeClient, ReadClient, TransactionClient, + WriteClient, +}; pub use event_service::EventGrpcService; +pub use read_service::ReadGrpcService; pub use server::{GrpcServerHandle, start_grpc_server}; +pub use transaction_service::TransactionGrpcService; pub use types::{ - CheckpointDataBroadcaster, CheckpointSummaryBroadcaster, EventSubscriber, - GrpcCheckpointDataBroadcaster, GrpcCheckpointSummaryBroadcaster, GrpcReader, GrpcStateReader, - RestStateReaderAdapter, + CheckpointDataBroadcaster, CheckpointSummaryBroadcaster, GrpcCheckpointDataBroadcaster, + GrpcCheckpointSummaryBroadcaster, GrpcReader, }; +pub use write_service::WriteGrpcService; diff --git a/crates/iota-grpc-api/src/read_service.rs b/crates/iota-grpc-api/src/read_service.rs new file mode 100644 index 00000000000..9202ab4421e --- /dev/null +++ b/crates/iota-grpc-api/src/read_service.rs @@ -0,0 +1,237 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use iota_json_rpc_types::{ + DisplayFieldsResponse, EventFilter, IotaObjectData, IotaObjectDataOptions, +}; +use iota_metrics::spawn_monitored_task; +use iota_types::{ + base_types::ObjectID, + display::DisplayVersionUpdatedEvent, + error::IotaObjectResponseError, + object::{Object, ObjectRead}, +}; +use move_core_types::annotated_value::MoveStructLayout; +use tonic::{Request, Response, Status}; +use tracing::{debug, instrument, warn}; + +use crate::{ + GrpcReader, + read::{GetObjectRequest, GetObjectResponse, read_service_server::ReadService}, +}; + +pub struct ReadGrpcService { + pub grpc_reader: Arc, +} + +impl ReadGrpcService { + pub fn new(grpc_reader: Arc) -> Self { + Self { grpc_reader } + } +} + +// The `ReadService` is the auto-generated trait from the protobuf definition. +// It's generated by tonic/protobuf and defines the interface that any gRPC read +// service must implement. +#[tonic::async_trait] +impl ReadService for ReadGrpcService { + #[instrument(skip(self))] + async fn get_object( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + debug!("get_object called"); + + // Extract object ID from gRPC request + let object_id_bytes = req + .object_id + .as_ref() + .ok_or_else(|| Status::invalid_argument("Object ID is required"))?; + + if object_id_bytes.address.len() != 32 { + return Err(Status::invalid_argument(format!( + "Object ID must be 32 bytes, got {}", + object_id_bytes.address.len() + ))); + } + + let mut bytes = [0u8; 32]; + bytes.copy_from_slice(&object_id_bytes.address); + let object_id = ObjectID::from_bytes(bytes) + .map_err(|e| Status::invalid_argument(format!("Invalid object ID: {e}")))?; + + // Convert gRPC options to JSON-RPC options + let options = req.options.map(|opts| IotaObjectDataOptions { + show_type: opts.show_type, + show_owner: opts.show_owner, + show_previous_transaction: opts.show_previous_transaction, + show_display: opts.show_display, + show_content: opts.show_content, + show_bcs: opts.show_bcs, + show_storage_rebate: opts.show_storage_rebate, + }); + + let grpc_reader = self.grpc_reader.clone(); + let object_read = spawn_monitored_task!(async move { + grpc_reader.get_object_read(&object_id).map_err(|e| { + warn!(?object_id, "Failed to get object: {:?}", e); + Status::internal(format!("Failed to read object from storage: {e}")) + }) + }) + .await + .map_err(|e| Status::internal(format!("Task execution failed: {e}")))??; + + let grpc_response = match object_read { + ObjectRead::NotExists(id) => { + // Object not found - return error + let error = IotaObjectResponseError::NotExists { object_id: id }; + serialize_error_to_json(error)? + } + ObjectRead::Exists(object_ref, object, layout) => { + // Object exists + let mut display_fields = None; + if options.as_ref().map(|o| o.show_display).unwrap_or(false) { + match get_display_fields(&self.grpc_reader, &object, &layout).await { + Ok(rendered_fields) => display_fields = Some(rendered_fields), + Err(_e) => { + // Return error response with display error, like JSON-RPC + let data = IotaObjectData::new( + object_ref, + object, + layout, + options.unwrap_or_default(), + None, + ) + .map_err(|e| { + Status::internal(format!("Failed to create object data: {e}")) + })?; + + return Ok(Response::new(GetObjectResponse { + json_data: Some(serde_json::to_string(&data).map_err(|e| { + Status::internal(format!( + "Failed to serialize object data: {e}" + )) + })?), + json_error: Some( + serde_json::to_string(&IotaObjectResponseError::Display { + error: "Failed to compute display fields".to_string(), + }) + .map_err(|e| { + Status::internal(format!("Failed to serialize error: {e}")) + })?, + ), + })); + } + } + } + + let data = IotaObjectData::new( + object_ref, + object, + layout, + options.unwrap_or_default(), + display_fields, + ) + .map_err(|e| Status::internal(format!("Failed to create object data: {e}")))?; + + serialize_data_to_json(data)? + } + ObjectRead::Deleted(object_ref) => { + // Object was deleted - return error + let error = IotaObjectResponseError::Deleted { + object_id: object_ref.0, + version: object_ref.1, + digest: object_ref.2, + }; + serialize_error_to_json(error)? + } + }; + + debug!("Object retrieved successfully"); + Ok(Response::new(grpc_response)) + } +} + +/// Serialize success data to JSON (matches IotaObjectResponse::new_with_data) +fn serialize_data_to_json(data: IotaObjectData) -> Result { + let json_data = serde_json::to_string(&data) + .map_err(|e| Status::internal(format!("Failed to serialize object data to JSON: {e}")))?; + + Ok(GetObjectResponse { + json_data: Some(json_data), + json_error: None, + }) +} + +/// Serialize error to JSON (matches IotaObjectResponse::new_with_error) +fn serialize_error_to_json(error: IotaObjectResponseError) -> Result { + let json_error = serde_json::to_string(&error) + .map_err(|e| Status::internal(format!("Failed to serialize error to JSON: {e}")))?; + + Ok(GetObjectResponse { + json_data: None, + json_error: Some(json_error), + }) +} + +/// Get display fields for an object +async fn get_display_fields( + grpc_reader: &Arc, + original_object: &Object, + original_layout: &Option, +) -> Result { + // Check if we have access to both AuthorityState and TransactionKeyValueStore + let (authority_state, kv_store) = match ( + grpc_reader.authority_state(), + grpc_reader.transaction_kv_store().as_ref(), + ) { + (Some(state), Some(kv_store)) => (state, kv_store), + _ => { + // Graceful fallback: return empty display fields if dependencies not available + return Ok(DisplayFieldsResponse { + data: None, + error: None, + }); + } + }; + + // Extract object type and layout, same logic as JSON RPC + let Some((object_type, layout)) = + iota_json_rpc::read_api::get_object_type_and_struct(original_object, original_layout) + .map_err(|e| Status::internal(format!("Failed to get object type and struct: {e}")))? + else { + return Ok(DisplayFieldsResponse { + data: None, + error: None, + }); + }; + + // Query for display object, same as JSON RPC + let mut events = authority_state + .query_events( + kv_store, + EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(&object_type)), + None, + 1, + true, + ) + .await + .map_err(|e| Status::internal(format!("Failed to query display events: {e}")))?; + + // Process display object if found + if let Some(event) = events.pop() { + let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes()) + .map_err(|e| Status::internal(format!("Failed to deserialize display event: {e}")))?; + + return iota_json_rpc::read_api::get_rendered_fields(display.fields, &layout) + .map_err(|e| Status::internal(format!("Failed to render display fields: {e}"))); + } + + Ok(DisplayFieldsResponse { + data: None, + error: None, + }) +} diff --git a/crates/iota-grpc-api/src/server.rs b/crates/iota-grpc-api/src/server.rs index 89c09c51fc5..d7fced7147f 100644 --- a/crates/iota-grpc-api/src/server.rs +++ b/crates/iota-grpc-api/src/server.rs @@ -6,6 +6,10 @@ use std::{net::SocketAddr, sync::Arc}; use anyhow::Result; +use iota_core::{ + authority_client::NetworkAuthorityClient, subscription_handler::SubscriptionHandler, + transaction_orchestrator::TransactionOrchestrator, +}; use tokio::sync::broadcast; use tokio_stream::wrappers::TcpListenerStream; use tokio_util::sync::CancellationToken; @@ -13,9 +17,11 @@ use tonic::transport::Server; use crate::{ CheckpointGrpcService, EventGrpcService, GrpcCheckpointDataBroadcaster, - GrpcCheckpointSummaryBroadcaster, GrpcReader, - checkpoint::checkpoint_service_server::CheckpointServiceServer, - events::event_service_server::EventServiceServer, + GrpcCheckpointSummaryBroadcaster, GrpcReader, ReadGrpcService, TransactionGrpcService, + WriteGrpcService, checkpoints::checkpoint_service_server::CheckpointServiceServer, + events::event_service_server::EventServiceServer, read::read_service_server::ReadServiceServer, + transactions::transaction_service_server::TransactionServiceServer, + write::write_service_server::WriteServiceServer, }; /// Handle to control a running gRPC server @@ -58,16 +64,12 @@ impl GrpcServerHandle { } } -/// Start a gRPC server with checkpoint and event services -/// -/// This function creates and starts a gRPC server that hosts checkpoint-related -/// and event streaming services. Currently includes the checkpoint streaming -/// and event streaming services, but can be extended to host additional -/// services in the future. +/// Start a gRPC server with services pub async fn start_grpc_server( grpc_reader: Arc, - event_subscriber: Arc, - config: crate::Config, + event_subscriber: Arc, + transaction_orchestrator: Option>>, + config: iota_config::node::GrpcApiConfig, shutdown_token: CancellationToken, ) -> Result { // Create broadcast channels @@ -79,7 +81,7 @@ pub async fn start_grpc_server( GrpcCheckpointSummaryBroadcaster::new(checkpoint_summary_tx); let checkpoint_data_broadcaster = GrpcCheckpointDataBroadcaster::new(checkpoint_data_tx); - // Create the gRPC services - both get the cancellation token directly from + // Create the gRPC services - all get the cancellation token directly from // server level let checkpoint_service = CheckpointGrpcService::new( grpc_reader.clone(), @@ -87,12 +89,20 @@ pub async fn start_grpc_server( checkpoint_data_broadcaster.clone(), shutdown_token.clone(), ); - let event_service = EventGrpcService::new(event_subscriber, shutdown_token.clone()); + let event_service = EventGrpcService::new(event_subscriber.clone(), shutdown_token.clone()); + let transaction_service = TransactionGrpcService::new(event_subscriber, shutdown_token.clone()); + + // Create new read and write services + let read_service = ReadGrpcService::new(grpc_reader.clone()); + let write_service = WriteGrpcService::new(transaction_orchestrator, grpc_reader.clone()); // Create the server with proper address binding let server_builder = Server::builder() + .add_service(ReadServiceServer::new(read_service)) + .add_service(WriteServiceServer::new(write_service)) .add_service(CheckpointServiceServer::new(checkpoint_service)) - .add_service(EventServiceServer::new(event_service)); + .add_service(EventServiceServer::new(event_service)) + .add_service(TransactionServiceServer::new(transaction_service)); // Bind to the address to get the actual local address (especially important for // port 0) diff --git a/crates/iota-grpc-api/src/transaction_service.rs b/crates/iota-grpc-api/src/transaction_service.rs new file mode 100644 index 00000000000..a6c6300ee4f --- /dev/null +++ b/crates/iota-grpc-api/src/transaction_service.rs @@ -0,0 +1,263 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use futures::StreamExt; +use iota_core::subscription_handler::SubscriptionHandler; +use iota_json_rpc_types::{IotaTransactionBlockEffectsAPI, IotaTransactionKind, TransactionFilter}; +use iota_types::base_types::{IotaAddress, ObjectID}; +use tokio_util::sync::CancellationToken; +use tonic::{Request, Response, Status}; +use tracing::debug; + +use crate::transactions::{ + Transaction, TransactionStreamRequest, transaction_service_server::TransactionService, +}; + +pub struct TransactionGrpcService { + pub event_subscriber: Arc, + pub cancellation_token: CancellationToken, +} + +impl TransactionGrpcService { + pub fn new( + event_subscriber: Arc, + cancellation_token: CancellationToken, + ) -> Self { + Self { + event_subscriber, + cancellation_token, + } + } +} + +// The `TransactionService` is the auto-generated trait from the protobuf +// definition. It's generated by tonic/protobuf and defines the interface that +// any gRPC transaction service must implement. +#[tonic::async_trait] +impl TransactionService for TransactionGrpcService { + type StreamTransactionsStream = + std::pin::Pin> + Send>>; + + async fn stream_transactions( + &self, + request: Request, + ) -> Result, Status> { + let proto_filter = request + .into_inner() + .filter + .ok_or_else(|| Status::invalid_argument("Filter is required"))?; + + let transaction_filter = create_transaction_filter(&proto_filter)?; + debug!("New gRPC client subscribed with filter: {transaction_filter:?}"); + + // Subscribe to transactions using the EventSubscriber trait + let transaction_stream = self + .event_subscriber + .subscribe_transactions(transaction_filter); + let cancellation_token = self.cancellation_token.clone(); + + let stream = async_stream::try_stream! { + // Pin the stream for use with .next() and tokio::select! + // Safe because the stream has Unpin bound + let mut pinned_stream = std::pin::Pin::new(transaction_stream); + + loop { + let transaction_result = tokio::select! { + transaction_option = pinned_stream.next() => transaction_option, + _ = cancellation_token.cancelled() => { + debug!("Transaction stream cancelled"); + None + } + }; + + match transaction_result { + Some(transaction_effects) => { + debug!( + "Transaction matched filter: TX: {}", + transaction_effects.transaction_digest() + ); + + // Convert to protobuf Transaction + let proto_transaction = Transaction::from(&transaction_effects); + + yield proto_transaction; + } + None => { + // Stream ended or cancellation occurred + debug!("Transaction stream ended"); + break; + } + } + } + }; + + Ok(Response::new(Box::pin(stream))) + } +} + +/// Convert protobuf TransactionFilter to iota_json_rpc_types::TransactionFilter +fn create_transaction_filter( + proto_filter: &crate::transactions::TransactionFilter, +) -> Result { + use crate::transactions::transaction_filter::Filter; + + match &proto_filter.filter { + Some(Filter::All(_)) => { + // For "All" transactions, include all possible transaction kinds + Ok(TransactionFilter::TransactionKindIn(vec![ + IotaTransactionKind::SystemTransaction, + IotaTransactionKind::ProgrammableTransaction, + IotaTransactionKind::Genesis, + IotaTransactionKind::ConsensusCommitPrologueV1, + IotaTransactionKind::AuthenticatorStateUpdateV1, + IotaTransactionKind::RandomnessStateUpdate, + IotaTransactionKind::EndOfEpochTransaction, + ])) + } + Some(Filter::Checkpoint(f)) => { + Ok(TransactionFilter::Checkpoint(f.checkpoint_sequence_number)) + } + Some(Filter::MoveFunction(f)) => { + let package_id = parse_object_id(&f.package_id, "Package ID")?; + Ok(TransactionFilter::MoveFunction { + package: package_id, + module: f.module.clone(), + function: f.function.clone(), + }) + } + Some(Filter::InputObject(f)) => { + let object_id = parse_object_id(&f.object_id, "Object ID")?; + Ok(TransactionFilter::InputObject(object_id)) + } + Some(Filter::ChangedObject(f)) => { + let object_id = parse_object_id(&f.object_id, "Object ID")?; + Ok(TransactionFilter::ChangedObject(object_id)) + } + Some(Filter::FromAddress(f)) => { + let address = parse_iota_address(&f.address, "From address")?; + Ok(TransactionFilter::FromAddress(address)) + } + Some(Filter::ToAddress(f)) => { + let address = parse_iota_address(&f.address, "To address")?; + Ok(TransactionFilter::ToAddress(address)) + } + Some(Filter::FromAndToAddress(f)) => { + let from_address = parse_iota_address(&f.from_address, "From address")?; + let to_address = parse_iota_address(&f.to_address, "To address")?; + Ok(TransactionFilter::FromAndToAddress { + from: from_address, + to: to_address, + }) + } + Some(Filter::FromOrToAddress(f)) => { + let address = parse_iota_address(&f.address, "From or to address")?; + Ok(TransactionFilter::FromOrToAddress { addr: address }) + } + Some(Filter::TransactionKind(f)) => { + let kind = match crate::transactions::TransactionKind::try_from(f.kind) { + Ok(crate::transactions::TransactionKind::SystemTransaction) => { + IotaTransactionKind::SystemTransaction + } + Ok(crate::transactions::TransactionKind::ProgrammableTransaction) => { + IotaTransactionKind::ProgrammableTransaction + } + Ok(crate::transactions::TransactionKind::Genesis) => IotaTransactionKind::Genesis, + Ok(crate::transactions::TransactionKind::ConsensusCommitPrologueV1) => { + IotaTransactionKind::ConsensusCommitPrologueV1 + } + Ok(crate::transactions::TransactionKind::AuthenticatorStateUpdateV1) => { + IotaTransactionKind::AuthenticatorStateUpdateV1 + } + Ok(crate::transactions::TransactionKind::RandomnessStateUpdate) => { + IotaTransactionKind::RandomnessStateUpdate + } + Ok(crate::transactions::TransactionKind::EndOfEpochTransaction) => { + IotaTransactionKind::EndOfEpochTransaction + } + Err(_) => return Err(Status::invalid_argument("Invalid transaction kind")), + }; + Ok(TransactionFilter::TransactionKind(kind)) + } + Some(Filter::TransactionKindIn(f)) => { + let mut kinds = Vec::new(); + for k in &f.kinds { + let kind = match crate::transactions::TransactionKind::try_from(*k) { + Ok(crate::transactions::TransactionKind::SystemTransaction) => { + IotaTransactionKind::SystemTransaction + } + Ok(crate::transactions::TransactionKind::ProgrammableTransaction) => { + IotaTransactionKind::ProgrammableTransaction + } + Ok(crate::transactions::TransactionKind::Genesis) => { + IotaTransactionKind::Genesis + } + Ok(crate::transactions::TransactionKind::ConsensusCommitPrologueV1) => { + IotaTransactionKind::ConsensusCommitPrologueV1 + } + Ok(crate::transactions::TransactionKind::AuthenticatorStateUpdateV1) => { + IotaTransactionKind::AuthenticatorStateUpdateV1 + } + Ok(crate::transactions::TransactionKind::RandomnessStateUpdate) => { + IotaTransactionKind::RandomnessStateUpdate + } + Ok(crate::transactions::TransactionKind::EndOfEpochTransaction) => { + IotaTransactionKind::EndOfEpochTransaction + } + Err(_) => { + return Err(Status::invalid_argument("Invalid transaction kind in list")); + } + }; + kinds.push(kind); + } + Ok(TransactionFilter::TransactionKindIn(kinds)) + } + None => { + // Default to all transaction types + Ok(TransactionFilter::TransactionKindIn(vec![ + IotaTransactionKind::SystemTransaction, + IotaTransactionKind::ProgrammableTransaction, + IotaTransactionKind::Genesis, + IotaTransactionKind::ConsensusCommitPrologueV1, + IotaTransactionKind::AuthenticatorStateUpdateV1, + IotaTransactionKind::RandomnessStateUpdate, + IotaTransactionKind::EndOfEpochTransaction, + ])) + } + } +} + +// Helper functions to reduce repetition and improve error messages +fn parse_object_id( + address: &Option, + field_name: &str, +) -> Result { + let address = address + .as_ref() + .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?; + ObjectID::from_bytes(&address.address) + .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}"))) +} + +fn parse_iota_address( + address: &Option, + field_name: &str, +) -> Result { + let address = address + .as_ref() + .ok_or_else(|| Status::invalid_argument(format!("{field_name} is required")))?; + IotaAddress::from_bytes(&address.address) + .map_err(|e| Status::invalid_argument(format!("Invalid {field_name}: {e}"))) +} + +// Conversion from JSON-RPC types to gRPC protobuf using JSON serialization +impl From<&iota_json_rpc_types::IotaTransactionBlockEffects> for Transaction { + fn from(effects: &iota_json_rpc_types::IotaTransactionBlockEffects) -> Self { + // Serialize JSON-RPC effects directly to JSON + let json_data = serde_json::to_string(effects) + .expect("IotaTransactionBlockEffects should always serialize to JSON"); + + Transaction { json_data } + } +} diff --git a/crates/iota-grpc-api/src/types.rs b/crates/iota-grpc-api/src/types.rs index b63cd66f5ed..a1bc718f4cd 100644 --- a/crates/iota-grpc-api/src/types.rs +++ b/crates/iota-grpc-api/src/types.rs @@ -4,15 +4,21 @@ use std::sync::Arc; use anyhow::Result; +use iota_core::{ + authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore}, + storage::RestReadStore, +}; use iota_grpc_types::{ CertifiedCheckpointSummary as GrpcCertifiedCheckpointSummary, CheckpointData as GrpcCheckpointData, }; -use iota_json_rpc_types::{EventFilter, IotaEvent}; +use iota_storage::key_value_store::TransactionKeyValueStore; use iota_types::{ + base_types::ObjectID, full_checkpoint_content::CheckpointData, messages_checkpoint::CertifiedCheckpointSummary, - storage::{RestStateReader, error::Kind}, + object::{Object, ObjectRead}, + storage::{ReadStore, RestStateReader, error::Kind}, }; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::{Receiver, Sender, error::RecvError}; @@ -20,7 +26,7 @@ use tokio_util::sync::CancellationToken; use tonic::Status; use tracing::debug; -use crate::{checkpoint::Checkpoint, common::BcsData}; +use crate::{checkpoints::Checkpoint, common::BcsData}; /// Trait for broadcasting checkpoint summaries pub trait CheckpointSummaryBroadcaster { @@ -32,15 +38,6 @@ pub trait CheckpointDataBroadcaster { fn send(&self, data: &CheckpointData) -> anyhow::Result<()>; } -/// Trait for subscribing to event streams (used by gRPC service) -pub trait EventSubscriber: Send + Sync { - /// Subscribe to events with the given filter - fn subscribe_events( - &self, - filter: iota_json_rpc_types::EventFilter, - ) -> Box + Send + Unpin>; -} - /// Wrapper that converts native CertifiedCheckpointSummary to gRPC type before /// broadcasting #[derive(Clone)] @@ -174,17 +171,6 @@ impl CheckpointDataBroadcaster for () { } } -/// No-op implementation for unit type (used in tests and when event -/// subscription is not needed) -impl EventSubscriber for () { - fn subscribe_events( - &self, - _filter: EventFilter, - ) -> Box + Send + Unpin> { - Box::new(Box::pin(futures::stream::empty())) - } -} - impl BcsData { pub fn serialize_from(data: &T) -> Result where @@ -204,107 +190,138 @@ impl BcsData { // Type aliases and utility types pub type CheckpointStreamResult = Result; - -// Storage abstraction traits for gRPC access -// These traits provide an abstraction layer over the storage backend, -// making it easier to implement gRPC services with different storage types -// (e.g., production database vs simulacrum for testing). - -/// Trait for reading checkpoint data from storage -pub trait GrpcStateReader: Send + Sync + 'static { - /// Get the latest checkpoint sequence number - fn get_latest_checkpoint_sequence_number(&self) -> Option; - - /// Get checkpoint summary by sequence number - fn get_checkpoint_summary(&self, seq: u64) -> Option; - - /// Get full checkpoint data by sequence number - fn get_checkpoint_data(&self, seq: u64) -> Option; - - /// Get epoch's last checkpoint for epoch boundary calculations - fn get_epoch_last_checkpoint( - &self, - epoch: u64, - ) -> anyhow::Result>; -} - -/// Adapter that implements GrpcStateReader for RestStateReader -pub struct RestStateReaderAdapter { - inner: Arc, +/// Central gRPC data reader that provides unified access to checkpoint data. +/// It provides methods for streaming both full checkpoint data and checkpoint +/// summaries. +#[derive(Clone)] +pub struct GrpcReader { + state_reader: Arc, + transaction_kv_store: Option>, } -impl GrpcStateReader for RestStateReaderAdapter { - fn get_latest_checkpoint_sequence_number(&self) -> Option { - match self.inner.try_get_latest_checkpoint() { - Ok(checkpoint) => Some(*checkpoint.sequence_number()), - Err(e) => match e.kind() { - // Expected during server initialization when no checkpoints have been executed yet - // Return None to indicate service is not ready rather than panicking - Kind::Missing => None, - // Unexpected storage errors - _ => panic!("Unexpected storage error: {e}"), - }, +impl GrpcReader { + /// Primary constructor for production use with RestReadStore + pub fn new( + rest_read_store: Arc, + transaction_kv_store: Option>, + ) -> Self { + Self { + state_reader: rest_read_store, + transaction_kv_store, } } - fn get_checkpoint_summary(&self, seq: u64) -> Option { - self.inner - .get_checkpoint_by_sequence_number(seq) - .map(CertifiedCheckpointSummary::from) + /// Constructor for tests/mocks with generic RestStateReader + pub fn from_rest_state_reader( + state_reader: Arc, + transaction_kv_store: Option>, + ) -> Self { + Self { + state_reader, + transaction_kv_store, + } } - fn get_checkpoint_data(&self, seq: u64) -> Option { - let summary = self.inner.get_checkpoint_by_sequence_number(seq)?; - let contents = self.inner.get_checkpoint_contents_by_sequence_number(seq)?; - Some(self.inner.get_checkpoint_data(summary, contents)) + /// Load epoch store for transaction processing with graceful fallback + pub fn load_epoch_store_one_call_per_task(&self) -> Option> { + // Downcast to RestReadStore for enhanced functionality + self.state_reader + .as_any() + .downcast_ref::() + .map(|store| store.load_epoch_store_one_call_per_task()) } - fn get_epoch_last_checkpoint( + /// Get epoch's last checkpoint for epoch boundary calculations with + /// gRPC-friendly error handling + pub fn get_epoch_last_checkpoint( &self, epoch: u64, ) -> anyhow::Result> { - match self.inner.get_epoch_last_checkpoint(epoch) { + match self.state_reader.get_epoch_last_checkpoint(epoch) { Ok(Some(checkpoint)) => Ok(Some(CertifiedCheckpointSummary::from(checkpoint))), Ok(None) => Ok(None), Err(e) => Err(e.into()), } } -} -/// Central gRPC data reader that provides unified access to checkpoint data. -/// It provides methods for streaming both full checkpoint data and checkpoint -/// summaries. -#[derive(Clone)] -pub struct GrpcReader { - state_reader: Arc, -} + /// Get full checkpoint data by sequence number with gRPC-friendly error + /// handling + pub fn get_full_checkpoint_data(&self, seq: u64) -> Option { + let summary = self + .state_reader + .try_get_checkpoint_by_sequence_number(seq) + .ok()??; + let contents = self + .state_reader + .try_get_checkpoint_contents_by_sequence_number(seq) + .ok()??; + Some(self.state_reader.get_checkpoint_data(summary, contents)) + } -impl GrpcReader { - pub fn new(state_reader: Arc) -> Self { - Self { state_reader } + /// Get checkpoint summary by sequence number with type conversion + pub fn get_checkpoint_summary(&self, seq: u64) -> Option { + self.state_reader + .try_get_checkpoint_by_sequence_number(seq) + .ok()? + .map(CertifiedCheckpointSummary::from) } - pub fn from_rest_state_reader(state_reader: Arc) -> Self { - Self { - state_reader: Arc::new(RestStateReaderAdapter { - inner: state_reader, - }), + /// Get the latest checkpoint sequence number with gRPC-friendly error + /// handling + pub fn get_latest_checkpoint_sequence_number(&self) -> Option { + match self.state_reader.try_get_latest_checkpoint() { + Ok(checkpoint) => Some(*checkpoint.sequence_number()), + Err(e) => match e.kind() { + // Expected during server initialization when no checkpoints have been executed yet + // Return None to indicate service is not ready rather than panicking + Kind::Missing => None, + // Unexpected storage errors + _ => panic!("Unexpected storage error: {e}"), + }, } } - pub fn get_epoch_last_checkpoint( - &self, - epoch: u64, - ) -> anyhow::Result> { - self.state_reader.get_epoch_last_checkpoint(epoch) + /// Get object data by object ID with anyhow error handling + pub fn get_object(&self, object_id: &ObjectID) -> anyhow::Result> { + match self.state_reader.try_get_object(object_id) { + Ok(object) => Ok(object), + Err(e) => Err(e.into()), + } } - fn get_full_checkpoint_data(&self, seq: u64) -> Option { - self.state_reader.get_checkpoint_data(seq) + /// Access to authority_state for display fields computation + pub fn authority_state(&self) -> Option<&Arc> { + // Downcast to RestReadStore for enhanced functionality + self.state_reader + .as_any() + .downcast_ref::() + .map(|store| store.authority_state()) } - pub fn get_latest_checkpoint_sequence_number(&self) -> Option { - self.state_reader.get_latest_checkpoint_sequence_number() + /// Access to transaction_kv_store for display fields computation + pub fn transaction_kv_store(&self) -> &Option> { + &self.transaction_kv_store + } + + /// Get object with layout information like JSON RPC (when AuthorityState is + /// available) + pub fn get_object_read(&self, object_id: &ObjectID) -> anyhow::Result { + match self.authority_state() { + Some(state) => { + // Use AuthorityState.get_object_read() for full ObjectRead with layout + state.get_object_read(object_id).map_err(Into::into) + } + None => { + // Fallback: use basic object access and construct ObjectRead manually + match self.get_object(object_id)? { + Some(object) => { + let object_ref = object.compute_object_reference(); + Ok(ObjectRead::Exists(object_ref, object, None)) // No layout available + } + None => Ok(ObjectRead::NotExists(*object_id)), + } + } + } } /// Generic checkpoint streaming implementation that works with checkpoint @@ -452,7 +469,6 @@ impl GrpcReader { cancellation_token, |reader, seq| { reader - .state_reader .get_checkpoint_summary(seq) .map(GrpcCertifiedCheckpointSummary::from) .map(Arc::new) diff --git a/crates/iota-grpc-api/src/write_service.rs b/crates/iota-grpc-api/src/write_service.rs new file mode 100644 index 00000000000..01c62c101df --- /dev/null +++ b/crates/iota-grpc-api/src/write_service.rs @@ -0,0 +1,318 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use fastcrypto::traits::ToFromBytes; +use iota_core::{ + authority_client::NetworkAuthorityClient, transaction_orchestrator::TransactionOrchestrator, +}; +use iota_json_rpc::{ObjectProviderCache, get_balance_changes_from_effect, get_object_changes}; +use iota_json_rpc_types::{ + IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse, +}; +use iota_metrics::spawn_monitored_task; +use iota_types::{ + base_types::IotaAddress, + effects::TransactionEffectsAPI, + quorum_driver_types::{ + ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1, + IsTransactionExecutedLocally, + }, + signature::GenericSignature, + storage::PostExecutionPackageResolver, + transaction::{InputObjectKind, Transaction, TransactionData, TransactionDataAPI}, +}; +use tonic::{Request, Response, Status}; +use tracing::{Instrument, debug, instrument}; + +use crate::{ + GrpcReader, + write::{ + ExecuteTransactionRequest, ExecuteTransactionResponse, TransactionResponseOptions, + write_service_server::WriteService, + }, +}; + +pub struct WriteGrpcService { + /// Transaction orchestrator + pub transaction_orchestrator: Option>>, + /// GrpcReader for data access including epoch store when available + pub grpc_reader: Arc, +} + +impl WriteGrpcService { + pub fn new( + transaction_orchestrator: Option>>, + grpc_reader: Arc, + ) -> Self { + Self { + transaction_orchestrator, + grpc_reader, + } + } + + /// Prepare transaction request + #[expect(clippy::type_complexity)] + fn prepare_execute_transaction_request( + &self, + tx_bytes: Vec, + signatures: Vec>, + opts: Option, + ) -> Result< + ( + ExecuteTransactionRequestV1, + TransactionResponseOptions, + IotaAddress, + Vec, + Transaction, + Option, + Vec, + ), + Status, + > { + let opts = opts.unwrap_or_default(); + let tx_data: TransactionData = bcs::from_bytes(&tx_bytes) + .map_err(|e| Status::invalid_argument(format!("Failed to deserialize: {e}")))?; + let sender = tx_data.sender(); + let input_objs = tx_data.input_objects().unwrap_or_default(); + + let mut sigs = Vec::new(); + for sig in signatures { + let signature = GenericSignature::from_bytes(&sig) + .map_err(|e| Status::invalid_argument(format!("Invalid signature: {e}")))?; + sigs.push(signature); + } + let txn = Transaction::from_generic_sig_data(tx_data, sigs); + let raw_transaction = if opts.show_raw_input { + bcs::to_bytes(txn.data()).map_err(|e| { + Status::internal(format!("Raw transaction serialization failed: {e}")) + })? + } else { + vec![] + }; + + let transaction_block = if opts.show_input { + if let Some(epoch_store) = self.grpc_reader.load_epoch_store_one_call_per_task() { + Some( + IotaTransactionBlock::try_from( + txn.data().clone(), + epoch_store.module_cache(), + *txn.digest(), + ) + .map_err(|e| { + Status::internal(format!("Failed to create IotaTransactionBlock: {e}")) + })?, + ) + } else { + return Err(Status::internal("Epoch store not available")); + } + } else { + None + }; + + let request = ExecuteTransactionRequestV1 { + transaction: txn.clone(), + include_events: opts.show_events, + include_input_objects: opts.show_balance_changes || opts.show_object_changes, + include_output_objects: opts.show_balance_changes + || opts.show_object_changes + || opts.show_events, + include_auxiliary_data: false, + }; + + Ok(( + request, + opts, + sender, + input_objs, + txn, + transaction_block, + raw_transaction, + )) + } + + /// Serialize IotaTransactionBlockResponse to JSON + fn serialize_response_to_json( + response: &IotaTransactionBlockResponse, + ) -> Result { + let json_data = serde_json::to_string(response) + .map_err(|e| Status::internal(format!("Failed to serialize response to JSON: {e}")))?; + + Ok(ExecuteTransactionResponse { json_data }) + } + + async fn handle_post_orchestration( + &self, + response: ExecuteTransactionResponseV1, + is_executed_locally: IsTransactionExecutedLocally, + opts: TransactionResponseOptions, + digest: iota_types::base_types::TransactionDigest, + input_objs: Vec, + transaction: Option, + raw_transaction: Vec, + sender: IotaAddress, + ) -> Result, Status> { + let events = if opts.show_events { + tracing::trace!("Resolving events"); + if let (Some(epoch_store), Some(authority_state)) = ( + self.grpc_reader.load_epoch_store_one_call_per_task(), + self.grpc_reader.authority_state().as_ref(), + ) { + let backing_package_store = PostExecutionPackageResolver::new( + authority_state.get_backing_package_store().clone(), + &response.output_objects, + ); + let mut layout_resolver = epoch_store + .executor() + .type_layout_resolver(Box::new(backing_package_store)); + Some( + IotaTransactionBlockEvents::try_from( + response.events.unwrap_or_default(), + digest, + None, + layout_resolver.as_mut(), + ) + .map_err(|e| Status::internal(format!("Failed to convert events: {e}")))?, + ) + } else { + return Err(Status::internal( + "Cannot convert events: missing epoch store or authority state", + )); + } + } else { + None + }; + + let object_cache = { + response.output_objects.and_then(|output_objects| { + self.grpc_reader.authority_state().map(|authority_state| { + ObjectProviderCache::new_with_output_objects( + authority_state.clone(), + output_objects, + ) + }) + }) + }; + + let balance_changes = match &object_cache { + Some(object_cache) if opts.show_balance_changes => Some( + get_balance_changes_from_effect( + object_cache, + &response.effects.effects, + input_objs, + None, + ) + .instrument(tracing::trace_span!("resolving balance changes")) + .await + .map_err(|e| Status::internal(format!("Failed to get balance changes: {e}")))?, + ), + _ => None, + }; + + let object_changes = match &object_cache { + Some(object_cache) if opts.show_object_changes => Some( + get_object_changes( + object_cache, + sender, + response.effects.effects.modified_at_versions(), + response.effects.effects.all_changed_objects(), + response.effects.effects.all_removed_objects(), + ) + .instrument(tracing::trace_span!("resolving object changes")) + .await + .map_err(|e| Status::internal(format!("Failed to get object changes: {e}")))?, + ), + _ => None, + }; + + let raw_effects = if opts.show_raw_effects { + bcs::to_bytes(&response.effects.effects) + .map_err(|e| Status::internal(format!("Raw effects serialization failed: {e}")))? + } else { + vec![] + }; + + let iota_response = + IotaTransactionBlockResponse { + digest, + transaction, + raw_transaction, + effects: opts + .show_effects + .then(|| { + response.effects.effects.try_into().map_err(|e| { + Status::internal(format!("Failed to convert effects: {e}")) + }) + }) + .transpose()?, + events, + object_changes, + balance_changes, + timestamp_ms: None, + confirmed_local_execution: Some(is_executed_locally), + checkpoint: None, + errors: vec![], + raw_effects, + }; + + // Serialize to JSON + let grpc_response = Self::serialize_response_to_json(&iota_response)?; + + debug!("Transaction executed successfully"); + Ok(Response::new(grpc_response)) + } +} + +// The `WriteService` is the auto-generated trait from the protobuf definition. +// It's generated by tonic/protobuf and defines the interface that any gRPC +// write service must implement. +#[tonic::async_trait] +impl WriteService for WriteGrpcService { + #[instrument("grpc_api_execute_transaction", level = "trace", skip_all)] + async fn execute_transaction( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let request_type = req + .request_type + .map(|rt| match rt { + 0 => ExecuteTransactionRequestType::WaitForEffectsCert, + 1 => ExecuteTransactionRequestType::WaitForLocalExecution, + _ => ExecuteTransactionRequestType::WaitForEffectsCert, // fallback to default + }) + .unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert); + + let (execute_request, opts, sender, input_objs, txn, transaction_block, raw_transaction) = + self.prepare_execute_transaction_request(req.tx_bytes, req.signatures, req.options)?; + + let digest = *txn.digest(); + + let orchestrator = self + .transaction_orchestrator + .clone() + .ok_or_else(|| Status::unimplemented("Transaction execution not available"))?; + + tracing::trace!("Spawning transaction orchestrator task for transaction: {digest}",); + let (response, is_executed_locally) = spawn_monitored_task!( + orchestrator.execute_transaction_block(execute_request, request_type, None) + ) + .await + .map_err(|e| Status::internal(format!("Task execution failed: {e}")))? + .map_err(|e| Status::internal(format!("Transaction execution failed: {e}")))?; + + self.handle_post_orchestration( + response, + is_executed_locally, + opts, + digest, + input_objs, + transaction_block, + raw_transaction, + sender, + ) + .await + } +} diff --git a/crates/iota-grpc-api/tests/checkpoint_e2e.rs b/crates/iota-grpc-api/tests/checkpoint_e2e.rs index daf79942c15..25ae76d28a3 100644 --- a/crates/iota-grpc-api/tests/checkpoint_e2e.rs +++ b/crates/iota-grpc-api/tests/checkpoint_e2e.rs @@ -4,28 +4,16 @@ use std::time::Duration; use futures::StreamExt; -use iota_config::local_ip_utils; -use iota_grpc_api::client::{CheckpointClient, NodeClient}; -use test_cluster::{TestCluster, TestClusterBuilder}; - -async fn setup_test_cluster_and_client() -> (TestCluster, CheckpointClient) { - let localhost = local_ip_utils::localhost_for_testing(); - let grpc_port = local_ip_utils::get_available_port(&localhost); - let grpc_addr = format!("{localhost}:{grpc_port}"); - - // Start a test cluster with gRPC enabled and pruning disabled - let cluster = TestClusterBuilder::new() - .with_fullnode_grpc_api_address(grpc_addr.parse().expect("Invalid gRPC address")) - .disable_fullnode_pruning() - .with_num_validators(1) - .build() - .await; - - let client = NodeClient::connect(&format!("http://{grpc_addr}")) - .await - .expect("connect gRPC"); +use iota_grpc_api::client::CheckpointClient; +use test_cluster::TestCluster; + +mod utils; +use utils::setup_test_cluster_and_client; - let checkpoint_client = client +async fn setup_test_cluster_and_checkpoint_client() -> (TestCluster, CheckpointClient) { + let (cluster, node_client) = setup_test_cluster_and_client().await; + + let checkpoint_client = node_client .checkpoint_client() .expect("Checkpoint client should be available"); @@ -34,7 +22,7 @@ async fn setup_test_cluster_and_client() -> (TestCluster, CheckpointClient) { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn e2e_stream_checkpoints() { - let (_cluster, mut client) = setup_test_cluster_and_client().await; + let (_cluster, mut client) = setup_test_cluster_and_checkpoint_client().await; // Request all checkpoints using the higher-level GrpcNodeClient API let mut stream = client @@ -78,7 +66,7 @@ async fn e2e_stream_checkpoints() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_epoch_first_checkpoint_sequence_number() { - let (cluster, mut client) = setup_test_cluster_and_client().await; + let (cluster, mut client) = setup_test_cluster_and_checkpoint_client().await; let sender = cluster.get_address_0(); let receiver = cluster.get_address_1(); @@ -150,7 +138,7 @@ async fn test_get_epoch_first_checkpoint_sequence_number() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_stream_full_checkpoint_data() { - let (_cluster, mut client) = setup_test_cluster_and_client().await; + let (_cluster, mut client) = setup_test_cluster_and_checkpoint_client().await; let mut stream = client .stream_checkpoints(None, Some(2), true) @@ -183,3 +171,74 @@ async fn test_stream_full_checkpoint_data() { .await .expect("waiting for checkpoint data timed out"); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_get_latest_checkpoint() { + let (cluster, mut client) = setup_test_cluster_and_checkpoint_client().await; + + let sender = cluster.get_address_0(); + let receiver = cluster.get_address_1(); + + // Wait for at least a few checkpoints to be available + cluster.wait_for_checkpoint(2, None).await; + + // Execute a transaction to generate some activity + cluster.transfer_iota_must_exceed(sender, receiver, 1).await; + + // Wait for more checkpoints + cluster.wait_for_checkpoint(5, None).await; + + // Test getting the latest checkpoint using the CheckpointClient + let latest_checkpoint = tokio::time::timeout(Duration::from_secs(30), async { + client.get_latest_checkpoint(false).await + }) + .await + .expect("timeout waiting for latest checkpoint") + .expect("get_latest_checkpoint failed"); + + // The latest checkpoint should be at least 5 + let sequence_number = match &latest_checkpoint { + iota_grpc_api::client::CheckpointContent::Summary(summary) => match summary { + iota_grpc_types::CertifiedCheckpointSummary::V1(v1_summary) => { + *v1_summary.data().sequence_number() + } + }, + iota_grpc_api::client::CheckpointContent::Data(data) => match data { + iota_grpc_types::CheckpointData::V1(v1_data) => { + v1_data.checkpoint_summary.sequence_number + } + }, + }; + + assert!( + sequence_number >= 5, + "Latest checkpoint should be at least 5, got {sequence_number}" + ); + + // Test getting the latest checkpoint with full data + let latest_full_checkpoint = tokio::time::timeout(Duration::from_secs(30), async { + client.get_latest_checkpoint(true).await + }) + .await + .expect("timeout waiting for latest full checkpoint") + .expect("get_latest_checkpoint with full data failed"); + + // Should have the same sequence number + let full_sequence_number = match &latest_full_checkpoint { + iota_grpc_api::client::CheckpointContent::Summary(summary) => match summary { + iota_grpc_types::CertifiedCheckpointSummary::V1(v1_summary) => { + *v1_summary.data().sequence_number() + } + }, + iota_grpc_api::client::CheckpointContent::Data(data) => match data { + iota_grpc_types::CheckpointData::V1(v1_data) => { + v1_data.checkpoint_summary.sequence_number + } + }, + }; + + assert_eq!( + full_sequence_number, sequence_number, + "Full checkpoint should have same sequence number" + ); +} diff --git a/crates/iota-grpc-api/tests/checkpoint_stream.rs b/crates/iota-grpc-api/tests/checkpoint_stream.rs index 10d7b19638d..1983573849a 100644 --- a/crates/iota-grpc-api/tests/checkpoint_stream.rs +++ b/crates/iota-grpc-api/tests/checkpoint_stream.rs @@ -7,10 +7,10 @@ use std::{ time::Duration, }; -use iota_config::local_ip_utils; +use iota_config::{local_ip_utils, node::GrpcApiConfig}; +use iota_core::subscription_handler::SubscriptionHandler; use iota_grpc_api::{ - CheckpointDataBroadcaster, CheckpointSummaryBroadcaster, Config, EventSubscriber, GrpcReader, - GrpcServerHandle, + CheckpointDataBroadcaster, CheckpointSummaryBroadcaster, GrpcReader, GrpcServerHandle, client::{CheckpointClient, CheckpointContent, NodeClient}, start_grpc_server, }; @@ -25,6 +25,7 @@ use iota_types::{ }, storage::{RestIndexes, RestStateReader, error::Result as StorageResult}, }; +use prometheus::Registry; use tokio_stream::StreamExt; struct MockRestStateReader { @@ -362,11 +363,15 @@ impl RestStateReader for MockRestStateReader { fn indexes(&self) -> Option<&dyn RestIndexes> { None } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } async fn test_server_and_client_setup>( checkpoint_range: I, - config_customizer: impl FnOnce(&mut Config), + config_customizer: impl FnOnce(&mut GrpcApiConfig), ) -> ( GrpcServerHandle, CheckpointClient, @@ -375,23 +380,25 @@ async fn test_server_and_client_setup>( let mock = Arc::new(MockRestStateReader::new_from_iter(checkpoint_range)); let checkpoints = mock.checkpoints.clone(); let cancellation_token = tokio_util::sync::CancellationToken::new(); - let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(mock)); + let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(mock, None)); let localhost = local_ip_utils::localhost_for_testing(); let grpc_port = local_ip_utils::get_available_port(&localhost); - let mut config = Config { + let mut config = GrpcApiConfig { address: format!("{localhost}:{grpc_port}").parse().unwrap(), - ..Config::default() + ..GrpcApiConfig::default() }; config_customizer(&mut config); - // Use the no-op EventSubscriber implementation for unit type - let dummy_event_subscriber = Arc::new(()) as Arc; + // Create a real SubscriptionHandler for testing + let registry = Registry::new(); + let event_subscriber = Arc::new(SubscriptionHandler::new(®istry)); let server_handle = start_grpc_server( grpc_reader, - dummy_event_subscriber, + event_subscriber, + None, // No write API for tests config, cancellation_token, ) diff --git a/crates/iota-grpc-api/tests/event_e2e.rs b/crates/iota-grpc-api/tests/event_e2e.rs index f1f24a81114..559bc9593f4 100644 --- a/crates/iota-grpc-api/tests/event_e2e.rs +++ b/crates/iota-grpc-api/tests/event_e2e.rs @@ -4,16 +4,18 @@ use std::time::Duration; use futures::StreamExt; -use iota_config::local_ip_utils; use iota_grpc_api::{ - client::{EventClient, NodeClient}, - common::Address, - events::{AllFilter, EventFilter, MoveEventTypeFilter, SenderFilter, event_filter::Filter}, + client::EventClient, + common::{Address, AddressFilter, AllFilter, MoveEventTypeFilter}, + events::{EventFilter, event_filter::Filter}, }; use iota_types::{base_types::ObjectID, effects::TransactionEffectsAPI, transaction::CallArg}; -use test_cluster::{TestCluster, TestClusterBuilder}; +use test_cluster::TestCluster; use tokio::time::timeout; +mod utils; +use utils::setup_test_cluster_and_client; + // Test constants for Move packages and contracts const NFT_PACKAGE: &str = "nft"; const BASICS_PACKAGE: &str = "basics"; @@ -30,22 +32,9 @@ const CREATOR_FIELD: &str = "creator"; const NAME_FIELD: &str = "name"; async fn setup_test_cluster() -> (TestCluster, EventClient, ObjectID, ObjectID) { - let localhost = local_ip_utils::localhost_for_testing(); - let grpc_port = local_ip_utils::get_available_port(&localhost); - let grpc_addr = format!("{localhost}:{grpc_port}"); - - let cluster = TestClusterBuilder::new() - .with_fullnode_grpc_api_address(grpc_addr.parse().expect("Invalid gRPC address")) - .disable_fullnode_pruning() - .with_num_validators(1) - .build() - .await; - - let client = NodeClient::connect(&format!("http://{grpc_addr}")) - .await - .expect("Failed to connect to gRPC"); + let (cluster, node_client) = setup_test_cluster_and_client().await; - let event_client = client + let event_client = node_client .event_client() .expect("Event client should be available"); @@ -113,8 +102,8 @@ async fn test_event_filtering_and_bcs_serialization() { // Client 2: SenderFilter - should receive only events from sender_1 let mut sender_client = event_client.clone(); let sender_filter = EventFilter { - filter: Some(Filter::Sender(SenderFilter { - sender: Some(Address { + filter: Some(Filter::Sender(AddressFilter { + address: Some(Address { address: sender_1.to_vec(), }), })), diff --git a/crates/iota-grpc-api/tests/read_e2e.rs b/crates/iota-grpc-api/tests/read_e2e.rs new file mode 100644 index 00000000000..7d8e53abf01 --- /dev/null +++ b/crates/iota-grpc-api/tests/read_e2e.rs @@ -0,0 +1,132 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use iota_grpc_api::client::ReadClient; +use iota_json_rpc_types::IotaObjectDataOptions; +use iota_types::{base_types::ObjectID, error::IotaObjectResponseError}; +use test_cluster::TestCluster; + +mod utils; +use utils::setup_test_cluster_and_client; + +async fn setup_test_cluster_and_read_client() -> (TestCluster, ReadClient) { + let (cluster, node_client) = setup_test_cluster_and_client().await; + + let read_client = node_client + .read_client() + .expect("Read client should be available"); + + (cluster, read_client) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_read_service_get_object() { + let (cluster, mut read_client) = setup_test_cluster_and_read_client().await; + + // Get a known object (gas object from account) + let sender = cluster.get_address_0(); + let owned_objects = cluster + .get_owned_objects(sender, None) + .await + .expect("Failed to get owned objects"); + + assert!( + !owned_objects.is_empty(), + "Should have at least one owned object" + ); + + // Find a gas object (coin type) - just use the first object for simplicity + let gas_object = &owned_objects[0]; + + let object_id = gas_object.data.as_ref().unwrap().object_id; + + let options = Some(IotaObjectDataOptions { + show_type: true, + show_owner: true, + show_previous_transaction: false, + show_display: false, + show_content: false, + show_bcs: false, + show_storage_rebate: false, + }); + + let response = tokio::time::timeout( + Duration::from_secs(30), + read_client.get_object(object_id, options), + ) + .await + .expect("timeout waiting for object") + .expect("ReadService get_object should work"); + + // Verify ReadService get_object works correctly + assert!(response.data.is_some(), "Should have data"); + assert!(response.error.is_none(), "Should not have error"); + let iota_object_data = response.data.unwrap(); + + let expected_object_ref = gas_object.data.as_ref().unwrap().object_ref(); + + // Verify object matches expected data + assert_eq!( + iota_object_data.object_id, expected_object_ref.0, + "Object ID should match" + ); + assert_eq!( + iota_object_data.version, expected_object_ref.1, + "Version should match" + ); + assert!( + iota_object_data.type_.is_some(), + "Object type should be present" + ); + assert!( + iota_object_data.owner.is_some(), + "Object owner should be present" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_read_service_nonexistent_object() { + let (_cluster, mut read_client) = setup_test_cluster_and_read_client().await; + + // Use a dummy object ID that doesn't exist + let nonexistent_object_id = [0u8; 32]; + + let nonexistent_id = ObjectID::from_bytes(nonexistent_object_id).unwrap(); + let options = Some(IotaObjectDataOptions { + show_type: true, + show_owner: true, + show_previous_transaction: false, + show_display: false, + show_content: false, + show_bcs: false, + show_storage_rebate: false, + }); + + let response = tokio::time::timeout( + Duration::from_secs(30), + read_client.get_object(nonexistent_id, options), + ) + .await + .expect("timeout waiting for object") + .expect("ReadService get_object should work"); + + // Verify ReadService returns proper error for non-existent object + assert!(response.data.is_none(), "Should not have data"); + assert!(response.error.is_some(), "Should have error"); + let iota_error = response.error.unwrap(); + + match iota_error { + IotaObjectResponseError::NotExists { object_id } => { + let expected_object_id = ObjectID::from_bytes(nonexistent_object_id).unwrap(); + assert_eq!( + object_id, expected_object_id, + "Error object ID should match" + ); + } + other_error => { + panic!("Expected NotExists error, got: {other_error:?}"); + } + } +} diff --git a/crates/iota-grpc-api/tests/transaction_e2e.rs b/crates/iota-grpc-api/tests/transaction_e2e.rs new file mode 100644 index 00000000000..9ef69884d89 --- /dev/null +++ b/crates/iota-grpc-api/tests/transaction_e2e.rs @@ -0,0 +1,249 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use futures::StreamExt; +use iota_grpc_api::{ + common::{AddressFilter, AllFilter}, + transactions::{TransactionFilter, transaction_filter::Filter}, +}; +use iota_json_rpc_types::IotaTransactionBlockEffectsAPI; +use test_cluster::TestCluster; +use tokio::time::timeout; + +mod utils; +use utils::setup_test_cluster_and_client; + +async fn setup_test_cluster() -> ( + TestCluster, + iota_grpc_api::client::TransactionClient, + iota_types::base_types::IotaAddress, +) { + let (cluster, node_client) = setup_test_cluster_and_client().await; + + let transaction_client = node_client + .transaction_client() + .expect("Transaction client should be available"); + + let sender = cluster.get_address_0(); + + (cluster, transaction_client, sender) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_filtering_and_bcs_serialization() { + let (cluster, transaction_client, sender_1) = setup_test_cluster().await; + let sender_2 = cluster.get_address_1(); + + // Client 1: AllTransactionsFilter - should receive all transactions + let mut all_client = transaction_client.clone(); + let all_filter = TransactionFilter { + filter: Some(Filter::All(AllFilter {})), + }; + let mut all_stream = all_client + .stream_transactions(all_filter) + .await + .expect("Failed to create all transactions stream"); + + // Client 2: FromAddressFilter - should receive only transactions from sender_1 + let mut sender_client = transaction_client.clone(); + let sender_filter = TransactionFilter { + filter: Some(Filter::FromAddress(AddressFilter { + address: Some(iota_grpc_api::common::Address { + address: sender_1.to_vec(), + }), + })), + }; + let mut sender_stream = sender_client + .stream_transactions(sender_filter) + .await + .expect("Failed to create sender transactions stream"); + + // Generate transactions after subscription is established + let cluster_clone = std::sync::Arc::new(cluster); + let generate_transactions_task = { + let cluster = cluster_clone.clone(); + tokio::spawn(async move { + // Wait for the subscription to be established. + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Generate 2 transactions from sender_1 (using smaller amounts) + for _i in 0..2 { + let tx = cluster + .test_transaction_builder_with_sender(sender_1) + .await + .transfer_iota(Some(100), sender_2) + .build(); + let signed_tx = cluster.sign_transaction(&tx); + cluster.execute_transaction(signed_tx).await; + tokio::time::sleep(Duration::from_millis(500)).await; + } + + // Generate 1 transaction from sender_2 (using smaller amount) + let tx = cluster + .test_transaction_builder_with_sender(sender_2) + .await + .transfer_iota(Some(100), sender_1) + .build(); + let signed_tx = cluster.sign_transaction(&tx); + cluster.execute_transaction(signed_tx).await; + tokio::time::sleep(Duration::from_millis(500)).await; + + // Wait a bit more to ensure all transactions are processed + tokio::time::sleep(Duration::from_millis(2000)).await; + }) + }; + + // Concurrently collect transactions from both clients + let all_transactions_task = tokio::spawn(async move { + let mut all_transactions = Vec::new(); + + let result = timeout(Duration::from_secs(30), async { + while let Some(transaction_result) = all_stream.next().await { + match transaction_result { + Ok(transaction) => { + // Verify transaction data integrity + assert!(!transaction.transaction_digest().to_string().is_empty()); + + all_transactions.push(transaction); + + if all_transactions.len() >= 3 { + break; + } + } + Err(e) => panic!("AllTransactionsFilter client error: {e}"), + } + } + }) + .await; + + assert!( + result.is_ok(), + "AllTransactionsFilter should receive transactions" + ); + (all_transactions.len(), all_transactions) + }); + + let sender_transactions_task = tokio::spawn(async move { + let mut sender_transactions = Vec::new(); + + let result = timeout(Duration::from_secs(30), async { + while let Some(transaction_result) = sender_stream.next().await { + match transaction_result { + Ok(transaction) => { + // Verify transaction data integrity + assert!(!transaction.transaction_digest().to_string().is_empty()); + + sender_transactions.push(transaction); + + if sender_transactions.len() >= 2 { + break; + } + } + Err(e) => panic!("FromAddressFilter client error: {e}"), + } + } + }) + .await; + + assert!( + result.is_ok(), + "FromAddressFilter should receive transactions" + ); + (sender_transactions.len(), sender_transactions) + }); + + // Wait for all tasks to finish + let (all_results, sender_results, _) = tokio::join!( + all_transactions_task, + sender_transactions_task, + generate_transactions_task + ); + let (all_count, _all_transactions) = + all_results.expect("AllTransactionsFilter task should complete"); + let (sender_count, _sender_transactions) = + sender_results.expect("FromAddressFilter task should complete"); + + // Verify individual filter behaviors: + // - AllTransactionsFilter: receives all transactions (2 from sender_1 + 1 from + // sender_2 = 3) + // - FromAddressFilter: receives only transactions from sender_1 (2 + // transactions) + assert_eq!( + all_count, 3, + "AllTransactionsFilter should receive all 3 transactions" + ); + assert_eq!( + sender_count, 2, + "FromAddressFilter should receive 2 transactions from sender_1" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_kind_filtering() { + let (cluster, transaction_client, sender) = setup_test_cluster().await; + + // Test TransactionKind filter + let mut kind_client = transaction_client.clone(); + let kind_filter = TransactionFilter { + filter: Some(Filter::TransactionKind( + iota_grpc_api::transactions::TransactionKindFilter { + kind: iota_grpc_api::transactions::TransactionKind::ProgrammableTransaction as i32, + }, + )), + }; + let mut kind_stream = kind_client + .stream_transactions(kind_filter) + .await + .expect("Failed to create transaction kind stream"); + + // Generate a programmable transaction + let cluster_clone = std::sync::Arc::new(cluster); + let generate_transaction_task = { + let cluster = cluster_clone.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let tx = cluster + .test_transaction_builder_with_sender(sender) + .await + .transfer_iota(Some(100), cluster.get_address_1()) + .build(); + let signed_tx = cluster.sign_transaction(&tx); + cluster.execute_transaction(signed_tx).await; + + tokio::time::sleep(Duration::from_millis(2000)).await; + }) + }; + + let kind_transactions_task = tokio::spawn(async move { + let mut transactions = Vec::new(); + + let result = timeout(Duration::from_secs(20), async { + if let Some(transaction_result) = kind_stream.next().await { + match transaction_result { + Ok(transaction) => { + assert!(!transaction.transaction_digest().to_string().is_empty()); + transactions.push(transaction); + } + Err(e) => panic!("TransactionKind filter error: {e}"), + } + } + }) + .await; + + assert!( + result.is_ok(), + "TransactionKind filter should receive transactions" + ); + transactions.len() + }); + + let (_, kind_count_result) = tokio::join!(generate_transaction_task, kind_transactions_task); + let kind_count = kind_count_result.expect("Kind transactions task should complete"); + assert_eq!( + kind_count, 1, + "TransactionKind filter should receive 1 programmable transaction" + ); +} diff --git a/crates/iota-grpc-api/tests/utils.rs b/crates/iota-grpc-api/tests/utils.rs new file mode 100644 index 00000000000..8dbaafa62d7 --- /dev/null +++ b/crates/iota-grpc-api/tests/utils.rs @@ -0,0 +1,28 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use iota_config::local_ip_utils; +use iota_grpc_api::client::NodeClient; +use test_cluster::{TestCluster, TestClusterBuilder}; + +/// Basic setup that returns cluster and node client +pub async fn setup_test_cluster_and_client() -> (TestCluster, NodeClient) { + let localhost = local_ip_utils::localhost_for_testing(); + let grpc_port = local_ip_utils::get_available_port(&localhost); + let grpc_addr = format!("{localhost}:{grpc_port}"); + + // Start a test cluster with gRPC enabled and pruning disabled + let cluster = TestClusterBuilder::new() + .with_fullnode_grpc_api_address(grpc_addr.parse().expect("Invalid gRPC address")) + .disable_fullnode_pruning() + .with_num_validators(1) + .build() + .await; + + // Create NodeClient + let node_client = NodeClient::connect(&format!("http://{grpc_addr}")) + .await + .expect("connect gRPC"); + + (cluster, node_client) +} diff --git a/crates/iota-grpc-api/tests/write_e2e.rs b/crates/iota-grpc-api/tests/write_e2e.rs new file mode 100644 index 00000000000..52e8b1cedb1 --- /dev/null +++ b/crates/iota-grpc-api/tests/write_e2e.rs @@ -0,0 +1,158 @@ +// Copyright (c) 2025 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use iota_grpc_api::{ + client::WriteClient, + write::{ExecuteTransactionRequest, TransactionResponseOptions}, +}; +use test_cluster::TestCluster; + +mod utils; +use utils::setup_test_cluster_and_client; + +async fn setup_test_cluster_and_write_client() -> (TestCluster, WriteClient) { + let (cluster, node_client) = setup_test_cluster_and_client().await; + + let write_client = node_client + .write_client() + .expect("Write client should be available"); + + (cluster, write_client) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_write_service_execute_transaction() { + let (cluster, mut write_client) = setup_test_cluster_and_write_client().await; + + let sender = cluster.get_address_0(); + let receiver = cluster.get_address_1(); + let amount = 1000u64; + + // Build a real transfer transaction using TestCluster's infrastructure + let tx_data = cluster + .test_transaction_builder_with_sender(sender) + .await + .transfer_iota(Some(amount), receiver) + .build(); + + // Sign the transaction to get proper signatures + let signed_tx = cluster.sign_transaction(&tx_data); + + // Extract real transaction bytes and signatures + let tx_bytes = + bcs::to_bytes(&signed_tx.data().intent_message().value).expect("BCS serialization failed"); + let signatures: Vec> = signed_tx + .tx_signatures() + .iter() + .map(|sig| sig.as_ref().to_vec()) + .collect(); + + // Test execute_transaction via WriteService with real transaction data + let tx_result = tokio::time::timeout(Duration::from_secs(30), async { + let request = ExecuteTransactionRequest { + tx_bytes, + signatures, + options: Some(TransactionResponseOptions { + show_input: false, + show_raw_input: false, + show_effects: true, + show_events: false, + show_object_changes: false, + show_balance_changes: false, + show_raw_effects: false, + }), + request_type: None, // Uses default: WaitForEffectsCert + }; + + let response = write_client.execute_transaction(request).await?; + + // Validate the IotaTransactionBlockResponse + assert!( + !response.digest.inner().is_empty(), + "Response should have a valid digest" + ); + + // Since we requested show_effects: true, validate effects are present + assert!( + response.effects.is_some(), + "Effects should be present when show_effects is true" + ); + + // Validate that fields we didn't request are None/empty + assert!( + response.transaction.is_none(), + "Transaction should be None when show_input is false" + ); + assert!( + response.raw_transaction.is_empty(), + "Raw transaction should be empty when show_raw_input is false" + ); + assert!( + response.events.is_none(), + "Events should be None when show_events is false" + ); + assert!( + response.object_changes.is_none(), + "Object changes should be None when show_object_changes is false" + ); + assert!( + response.balance_changes.is_none(), + "Balance changes should be None when show_balance_changes is false" + ); + assert!( + response.raw_effects.is_empty(), + "Raw effects should be empty when show_raw_effects is false" + ); + + Ok::<(), anyhow::Error>(()) + }) + .await + .expect("timeout waiting for transaction"); + + match tx_result { + Ok(()) => {} + Err(e) => { + panic!("WriteService transaction execution failed: {e}"); + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_write_service_invalid_transaction() { + let (_cluster, mut write_client) = setup_test_cluster_and_write_client().await; + + // Create invalid transaction data (dummy bytes that won't deserialize properly) + let tx_bytes = vec![0u8; 32]; // Invalid transaction bytes + let signatures = vec![vec![0u8; 64]]; // Invalid signature + + // Test execute_transaction with invalid data via WriteService + let tx_result = tokio::time::timeout(Duration::from_secs(30), async { + let request = ExecuteTransactionRequest { + tx_bytes, + signatures, + options: Some(TransactionResponseOptions { + show_input: false, + show_raw_input: false, + show_effects: true, + show_events: false, + show_object_changes: false, + show_balance_changes: false, + show_raw_effects: false, + }), + request_type: Some(1), // WaitForLocalExecution + }; + + let _response = write_client.execute_transaction(request).await?; + // Should not reach here with invalid transaction data + Ok::<(), anyhow::Error>(()) + }) + .await + .expect("timeout waiting for transaction"); + + if let Ok(()) = tx_result { + // This would be unexpected for invalid transaction data + panic!("WriteService should not succeed with invalid transaction data"); + } +} diff --git a/crates/iota-grpc-types/src/lib.rs b/crates/iota-grpc-types/src/lib.rs index f350190cc92..2d9097b2228 100644 --- a/crates/iota-grpc-types/src/lib.rs +++ b/crates/iota-grpc-types/src/lib.rs @@ -9,6 +9,12 @@ use serde::{Deserialize, Serialize}; +/// Forward-compatible versioned transaction effects for gRPC streaming. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum TransactionEffects { + V1(iota_types::effects::TransactionEffectsV1), +} + /// Forward-compatible versioned checkpoint data for gRPC streaming. #[derive(Serialize, Deserialize, Clone, Debug)] pub enum CheckpointData { @@ -27,6 +33,12 @@ impl From for CheckpointDat } } +impl From for TransactionEffects { + fn from(effects: iota_types::effects::TransactionEffectsV1) -> Self { + Self::V1(effects) + } +} + impl From for CertifiedCheckpointSummary { @@ -35,6 +47,23 @@ impl From } } +impl TransactionEffects { + /// Extract the V1 transaction effects, returning None for unknown versions + pub fn into_v1(self) -> Option { + match self { + Self::V1(effects) => Some(effects), + } + } + + /// Get a reference to the V1 transaction effects, returning None for + /// unknown versions + pub fn as_v1(&self) -> Option<&iota_types::effects::TransactionEffectsV1> { + match self { + Self::V1(effects) => Some(effects), + } + } +} + impl CheckpointData { /// Extract the V1 checkpoint data, returning None for unknown versions pub fn into_v1(self) -> Option { diff --git a/crates/iota-node/src/lib.rs b/crates/iota-node/src/lib.rs index 4dba533439c..5b16ba57e32 100644 --- a/crates/iota-node/src/lib.rs +++ b/crates/iota-node/src/lib.rs @@ -825,6 +825,9 @@ impl IotaNode { None }; + // Create transaction key-value store early for both HTTP and gRPC servers + let kv_store = build_kv_store(&state, &config, &prometheus_registry)?; + let http_server = build_http_server( state.clone(), state_sync_store.clone(), @@ -833,6 +836,7 @@ impl IotaNode { &prometheus_registry, custom_rpc_runtime, software_version, + kv_store.clone(), ) .await?; @@ -866,8 +870,14 @@ impl IotaNode { let iota_node_metrics = Arc::new(IotaNodeMetrics::new(®istry_service.default_registry())); - let grpc_server_handle = - build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?; + let grpc_server_handle = build_grpc_server( + &config, + state.clone(), + state_sync_store.clone(), + &transaction_orchestrator, + kv_store.clone(), + ) + .await?; let validator_components = if state.is_validator(&epoch_store) { let (components, _) = futures::join!( @@ -2308,6 +2318,8 @@ async fn build_grpc_server( config: &NodeConfig, state: Arc, state_sync_store: RocksDbStore, + transaction_orchestrator: &Option>>, + transaction_kv_store: Arc, ) -> Result> { // Validators do not expose gRPC APIs if config.consensus_config().is_some() || !config.enable_grpc_api { @@ -2323,18 +2335,17 @@ async fn build_grpc_server( // Create cancellation token for proper shutdown hierarchy let shutdown_token = CancellationToken::new(); - // Create GrpcReader - let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store)); + // Create GrpcReader with RestReadStore and TransactionKeyValueStore for full + // functionality + let grpc_reader = Arc::new(GrpcReader::new(rest_read_store, Some(transaction_kv_store))); // Get the subscription handler from the state for event streaming - let event_subscriber = - state.subscription_handler.clone() as Arc; + let event_subscriber = state.subscription_handler.clone(); - // Pass the same token to both GrpcReader (already done above) and - // start_grpc_server let handle = start_grpc_server( grpc_reader, event_subscriber, + transaction_orchestrator.clone(), grpc_config.clone(), shutdown_token, ) @@ -2367,6 +2378,7 @@ pub async fn build_http_server( prometheus_registry: &Registry, _custom_runtime: Option, software_version: &'static str, + kv_store: Arc, ) -> Result> { // Validators do not expose these APIs if config.consensus_config().is_some() { @@ -2383,8 +2395,6 @@ pub async fn build_http_server( config.firewall_config.clone(), ); - let kv_store = build_kv_store(&state, config, prometheus_registry)?; - let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry)); server.register_module(ReadApi::new( state.clone(), diff --git a/crates/iota-swarm-config/Cargo.toml b/crates/iota-swarm-config/Cargo.toml index 63f06762be6..e591c36e3c4 100644 --- a/crates/iota-swarm-config/Cargo.toml +++ b/crates/iota-swarm-config/Cargo.toml @@ -26,7 +26,6 @@ tracing.workspace = true # internal dependencies iota-config.workspace = true iota-genesis-builder.workspace = true -iota-grpc-api.workspace = true iota-macros.workspace = true iota-names.workspace = true iota-protocol-config.workspace = true diff --git a/crates/iota-swarm-config/src/node_config_builder.rs b/crates/iota-swarm-config/src/node_config_builder.rs index 27649f1aa07..b3a581b7867 100644 --- a/crates/iota-swarm-config/src/node_config_builder.rs +++ b/crates/iota-swarm-config/src/node_config_builder.rs @@ -15,7 +15,7 @@ use iota_config::{ AuthorityKeyPairWithPath, AuthorityOverloadConfig, AuthorityStorePruningConfig, CheckpointExecutorConfig, DBCheckpointConfig, DEFAULT_GRPC_CONCURRENCY_LIMIT, ExecutionCacheConfig, ExecutionCacheType, ExpensiveSafetyCheckConfig, Genesis, - KeyPairWithPath, RunWithRange, StateArchiveConfig, StateSnapshotConfig, + GrpcApiConfig, KeyPairWithPath, RunWithRange, StateArchiveConfig, StateSnapshotConfig, default_enable_index_processing, default_end_of_epoch_broadcast_channel_capacity, default_zklogin_oauth_providers, }, @@ -294,7 +294,7 @@ pub struct FullnodeConfigBuilder { data_ingestion_dir: Option, disable_pruning: bool, iota_names_config: Option, - grpc_api_config: Option, + grpc_api_config: Option, } impl FullnodeConfigBuilder { @@ -420,7 +420,7 @@ impl FullnodeConfigBuilder { self } - pub fn with_grpc_api_config(mut self, config: iota_grpc_api::Config) -> Self { + pub fn with_grpc_api_config(mut self, config: GrpcApiConfig) -> Self { self.grpc_api_config = Some(config); self } diff --git a/crates/iota-swarm/src/memory/swarm.rs b/crates/iota-swarm/src/memory/swarm.rs index edd66376aa4..da45db29650 100644 --- a/crates/iota-swarm/src/memory/swarm.rs +++ b/crates/iota-swarm/src/memory/swarm.rs @@ -15,9 +15,8 @@ use anyhow::Result; use futures::future::try_join_all; use iota_config::{ ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig, - node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange}, + node::{AuthorityOverloadConfig, DBCheckpointConfig, GrpcApiConfig, RunWithRange}, }; -use iota_grpc_api; use iota_macros::nondeterministic; use iota_names::config::IotaNamesConfig; use iota_node::IotaNodeHandle; @@ -72,7 +71,7 @@ pub struct SwarmBuilder { state_accumulator_config: StateAccumulatorV1EnabledConfig, disable_fullnode_pruning: bool, iota_names_config: Option, - fullnode_grpc_api_config: Option, + fullnode_grpc_api_config: Option, } impl SwarmBuilder { @@ -314,7 +313,7 @@ impl SwarmBuilder { self } - pub fn with_fullnode_grpc_api_config(mut self, config: iota_grpc_api::Config) -> Self { + pub fn with_fullnode_grpc_api_config(mut self, config: GrpcApiConfig) -> Self { self.fullnode_grpc_api_config = Some(config); self } diff --git a/crates/iota-transactional-test-runner/src/simulator_persisted_store.rs b/crates/iota-transactional-test-runner/src/simulator_persisted_store.rs index 2a212075174..2256e4c0c08 100644 --- a/crates/iota-transactional-test-runner/src/simulator_persisted_store.rs +++ b/crates/iota-transactional-test-runner/src/simulator_persisted_store.rs @@ -710,6 +710,10 @@ impl RestStateReader for PersistedStoreInnerReadOnlyWrapper { ) -> iota_types::storage::error::Result> { todo!() } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl PersistedStoreInnerReadOnlyWrapper { diff --git a/crates/iota-types/src/effects/mod.rs b/crates/iota-types/src/effects/mod.rs index 548506bfe94..4847fecd62a 100644 --- a/crates/iota-types/src/effects/mod.rs +++ b/crates/iota-types/src/effects/mod.rs @@ -4,8 +4,7 @@ use std::collections::{BTreeMap, BTreeSet}; -use effects_v1::TransactionEffectsV1; -pub use effects_v1::UnchangedSharedKind; +pub use effects_v1::{TransactionEffectsV1, UnchangedSharedKind}; use enum_dispatch::enum_dispatch; pub use object_change::{EffectsObjectChange, ObjectIn, ObjectOut}; use serde::{Deserialize, Serialize}; diff --git a/crates/iota-types/src/storage/read_store.rs b/crates/iota-types/src/storage/read_store.rs index 45bfc249e5e..bc77dde33fe 100644 --- a/crates/iota-types/src/storage/read_store.rs +++ b/crates/iota-types/src/storage/read_store.rs @@ -814,6 +814,9 @@ pub trait RestStateReader: ObjectStore + ReadStore + Send + Sync { // Get a handle to an instance of the RpcIndexes fn indexes(&self) -> Option<&dyn RestIndexes>; + + /// Enable downcasting to concrete types for enhanced functionality + fn as_any(&self) -> &dyn std::any::Any; } pub trait RestIndexes: Send + Sync { diff --git a/crates/iota/src/iota_commands.rs b/crates/iota/src/iota_commands.rs index e8d6ad3ed03..8ee834c4161 100644 --- a/crates/iota/src/iota_commands.rs +++ b/crates/iota/src/iota_commands.rs @@ -18,7 +18,8 @@ use fastcrypto::traits::KeyPair; use iota_config::{ Config, FULL_NODE_DB_PATH, IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME, IOTA_CLIENT_CONFIG, IOTA_FULLNODE_CONFIG, IOTA_GENESIS_FILENAME, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, - NodeConfig, PersistedConfig, genesis_blob_exists, iota_config_dir, node::Genesis, + NodeConfig, PersistedConfig, genesis_blob_exists, iota_config_dir, + node::{Genesis, GrpcApiConfig}, p2p::SeedPeer, }; use iota_faucet::{AppState, FaucetConfig, SimpleFaucet, create_wallet_context, start_faucet}; @@ -791,8 +792,8 @@ async fn start( swarm_builder = swarm_builder.with_fullnode_grpc_api_config(grpc_config); } else { warn!("gRPC API enabled but no grpc-api-config provided, using default"); - swarm_builder = swarm_builder - .with_fullnode_grpc_api_config(iota_grpc_api::Config::default()); + swarm_builder = + swarm_builder.with_fullnode_grpc_api_config(GrpcApiConfig::default()); } } } diff --git a/crates/simulacrum/src/lib.rs b/crates/simulacrum/src/lib.rs index e85d9dfd7c1..1ac0e1ee1d2 100644 --- a/crates/simulacrum/src/lib.rs +++ b/crates/simulacrum/src/lib.rs @@ -595,6 +595,10 @@ impl RestStateReader for fn indexes(&self) -> Option<&dyn iota_types::storage::RestIndexes> { None } + + fn as_any(&self) -> &dyn std::any::Any { + todo!() + } } impl Simulacrum { diff --git a/crates/test-cluster/src/lib.rs b/crates/test-cluster/src/lib.rs index 9db52fc318a..adc095882cf 100644 --- a/crates/test-cluster/src/lib.rs +++ b/crates/test-cluster/src/lib.rs @@ -17,7 +17,7 @@ use iota_config::{ Config, ExecutionCacheConfig, ExecutionCacheType, IOTA_CLIENT_CONFIG, IOTA_KEYSTORE_FILENAME, IOTA_NETWORK_CONFIG, NodeConfig, PersistedConfig, genesis::Genesis, - node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange}, + node::{AuthorityOverloadConfig, DBCheckpointConfig, GrpcApiConfig, RunWithRange}, }; use iota_core::{ authority_aggregator::AuthorityAggregator, authority_client::NetworkAuthorityClient, @@ -1002,7 +1002,7 @@ pub struct TestClusterBuilder { fullnode_run_with_range: Option, fullnode_policy_config: Option, fullnode_fw_config: Option, - fullnode_grpc_api_config: Option, + fullnode_grpc_api_config: Option, max_submit_position: Option, submit_delay_step_override_millis: Option, validator_state_accumulator_config: StateAccumulatorV1EnabledConfig,