Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/iota-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ tracing.workspace = true
# internal dependencies
consensus-config.workspace = true
iota-genesis-common.workspace = true
iota-grpc-api.workspace = true
iota-keys.workspace = true
iota-names.workspace = true
iota-rest-api.workspace = true
Expand Down
44 changes: 42 additions & 2 deletions crates/iota-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,47 @@ pub struct NodeConfig {
default = "default_grpc_api_config",
skip_serializing_if = "Option::is_none"
)]
pub grpc_api_config: Option<iota_grpc_api::Config>,
pub grpc_api_config: Option<GrpcApiConfig>,
}

/// Configuration for the gRPC API service
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct GrpcApiConfig {
/// The address to bind the gRPC server to
#[serde(default = "default_grpc_api_address")]
pub address: std::net::SocketAddr,

/// Buffer size for broadcast channels used for checkpoint streaming
#[serde(default = "default_checkpoint_broadcast_buffer_size")]
pub checkpoint_broadcast_buffer_size: usize,

/// Buffer size for broadcast channels used for event streaming
#[serde(default = "default_event_broadcast_buffer_size")]
pub event_broadcast_buffer_size: usize,
}

impl Default for GrpcApiConfig {
fn default() -> Self {
Self {
address: default_grpc_api_address(),
checkpoint_broadcast_buffer_size: default_checkpoint_broadcast_buffer_size(),
event_broadcast_buffer_size: default_event_broadcast_buffer_size(),
}
}
}

fn default_grpc_api_address() -> std::net::SocketAddr {
use std::net::{IpAddr, Ipv4Addr};
std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 50051)
}

fn default_checkpoint_broadcast_buffer_size() -> usize {
100
}

fn default_event_broadcast_buffer_size() -> usize {
1000
}

#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
Expand Down Expand Up @@ -576,7 +616,7 @@ pub fn default_json_rpc_address() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9000)
}

pub fn default_grpc_api_config() -> Option<iota_grpc_api::Config> {
pub fn default_grpc_api_config() -> Option<GrpcApiConfig> {
None
}

Expand Down
2 changes: 0 additions & 2 deletions crates/iota-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ iota-config.workspace = true
iota-execution.workspace = true
iota-framework.workspace = true
iota-genesis-builder.workspace = true
iota-grpc-api.workspace = true
iota-grpc-types.workspace = true
iota-json-rpc-types.workspace = true
iota-macros.workspace = true
iota-metrics.workspace = true
Expand Down
16 changes: 15 additions & 1 deletion crates/iota-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tap::Pipe;
use tracing::instrument;

use crate::{
authority::AuthorityState,
authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
checkpoints::CheckpointStore,
epoch::committee_store::CommitteeStore,
execution_cache::ExecutionCacheTraitPointers,
Expand Down Expand Up @@ -380,6 +380,16 @@ impl RestReadStore {
iota_types::storage::error::Error::custom("rest index store is disabled")
})
}

/// Get access to the underlying AuthorityState
pub fn authority_state(&self) -> &Arc<AuthorityState> {
&self.state
}

/// Load epoch store for transaction processing
pub fn load_epoch_store_one_call_per_task(&self) -> Arc<AuthorityPerEpochStore> {
self.state.load_epoch_store_one_call_per_task().clone()
}
Comment on lines +384 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move these methods and make them a part of RestStateReader trait?

}

impl ObjectStore for RestReadStore {
Expand Down Expand Up @@ -534,6 +544,10 @@ impl RestStateReader for RestReadStore {
fn indexes(&self) -> Option<&dyn RestIndexes> {
self.index().ok().map(|index| index as _)
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}

impl RestIndexes for RestIndexStore {
Expand Down
23 changes: 7 additions & 16 deletions crates/iota-core/src/subscription_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use std::sync::Arc;

use iota_grpc_api::EventSubscriber;
use iota_json_rpc_types::{
EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
Expand All @@ -14,7 +13,6 @@ use prometheus::{
IntCounterVec, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry,
};
use tokio_stream::Stream;
use tracing::{error, instrument, trace};

use crate::streamer::Streamer;
Expand Down Expand Up @@ -114,24 +112,17 @@ impl SubscriptionHandler {
Ok(())
}

pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = IotaEvent> {
self.event_streamer.subscribe(filter)
pub fn subscribe_events(
&self,
filter: EventFilter,
) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be BoxStream<'_, IotaEvent>? Or is Unpin important here?

Box::new(Box::pin(self.event_streamer.subscribe(filter)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double Box? Doesn't look right. With BoxStream we can just use one Box::pin.

}

pub fn subscribe_transactions(
&self,
filter: TransactionFilter,
) -> impl Stream<Item = IotaTransactionBlockEffects> {
self.transaction_streamer.subscribe(filter)
}
}

// Implement EventSubscriber trait for gRPC integration
impl EventSubscriber for SubscriptionHandler {
fn subscribe_events(
&self,
filter: EventFilter,
) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin> {
Box::new(Box::pin(self.event_streamer.subscribe(filter)))
) -> Box<dyn futures::Stream<Item = IotaTransactionBlockEffects> + Send + Unpin> {
Box::new(Box::pin(self.transaction_streamer.subscribe(filter)))
Comment on lines +125 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}
}
10 changes: 10 additions & 0 deletions crates/iota-grpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
anyhow.workspace = true
async-stream = "0.3"
bcs.workspace = true
fastcrypto.workspace = true
futures.workspace = true
prost.workspace = true
serde.workspace = true
Expand All @@ -25,14 +26,23 @@ tonic.workspace = true
tracing.workspace = true

# internal dependencies
iota-config.workspace = true
iota-core.workspace = true
iota-grpc-types.workspace = true
iota-json-rpc.workspace = true
iota-json-rpc-types.workspace = true
iota-metrics.workspace = true
iota-storage.workspace = true
iota-types.workspace = true
move-core-types.workspace = true

[build-dependencies]
tonic-build.workspace = true

[dev-dependencies]
# external dependencies
prometheus.workspace = true

# internal dependencies
iota-config.workspace = true
test-cluster.workspace = true
3 changes: 3 additions & 0 deletions crates/iota-grpc-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ fn main() {
"proto/common.proto",
"proto/checkpoint.proto",
"proto/event.proto",
"proto/transaction.proto",
"proto/read.proto",
"proto/write.proto",
],
&["proto/"],
)
Expand Down
7 changes: 7 additions & 0 deletions crates/iota-grpc-api/proto/checkpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ service CheckpointService {
// Checkpoint operations
rpc StreamCheckpoints (CheckpointStreamRequest) returns (stream Checkpoint);
rpc GetEpochFirstCheckpointSequenceNumber (EpochRequest) returns (CheckpointSequenceNumberResponse);
rpc GetLatestCheckpoint (GetLatestCheckpointRequest) returns (Checkpoint);
}

message CheckpointStreamRequest {
Expand All @@ -28,6 +29,12 @@ message CheckpointSequenceNumberResponse {
uint64 sequence_number = 1;
}

// Get the latest checkpoint information
message GetLatestCheckpointRequest {
// If true, get the full CheckpointData (not just the summary).
bool full = 1;
}

message Checkpoint {
uint64 sequence_number = 1;
// Indicates whether bcs_data contains full CheckpointData (true) or just CertifiedCheckpointSummary (false)
Expand Down
44 changes: 42 additions & 2 deletions crates/iota-grpc-api/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,47 @@ message Address {
bytes address = 1;
}

// 32-byte Transaction digest
message TransactionDigest {
// Generic 32-byte digest (used for transactions, objects, etc.)
message Digest {
bytes digest = 1;
}

// Match all events (catch-all filter)
message AllFilter {
}

// Filter by address (sender, package, etc.)
message AddressFilter {
Address address = 1;
}

// Filter by transaction digest
message TransactionDigestFilter {
Digest tx_digest = 1;
}

// Filter by Move module (package + module)
message MoveModuleFilter {
Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by Move event type (package + module + event name)
message MoveEventTypeFilter {
Address package_id = 1; // Package ID
string module = 2; // Module name
string name = 3; // Event name
}

// Filter by Move event module (package + module)
message MoveEventModuleFilter {
Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by Move function (package + module + function)
message MoveFunctionFilter {
Address package_id = 1; // Package ID
optional string module = 2; // Module name (optional)
optional string function = 3; // Function name (optional)
}
50 changes: 8 additions & 42 deletions crates/iota-grpc-api/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,24 @@ message Event {

message EventID {
uint64 event_seq = 1;
iota.grpc.common.TransactionDigest tx_digest = 2;
iota.grpc.common.Digest tx_digest = 2;
}

// Rich event filter that supports gRPC event filtering
message EventFilter {
oneof filter {
AllFilter all = 1;
SenderFilter sender = 2;
TransactionFilter transaction = 3;
MoveModuleFilter move_module = 4;
MoveEventTypeFilter move_event_type = 5;
MoveEventModuleFilter move_event_module = 6;
iota.grpc.common.AllFilter all = 1;
iota.grpc.common.AddressFilter sender = 2;
iota.grpc.common.TransactionDigestFilter transaction = 3;
iota.grpc.common.MoveModuleFilter move_module = 4;
iota.grpc.common.MoveEventTypeFilter move_event_type = 5;
iota.grpc.common.MoveEventModuleFilter move_event_module = 6;
TimeRangeFilter time_range = 7;
}
}

// Match all events (catch-all filter)
message AllFilter {
// Empty - matches all events
}

// Filter by sender address
message SenderFilter {
iota.grpc.common.Address sender = 1; // Sender address
}

// Filter by transaction digest
message TransactionFilter {
iota.grpc.common.TransactionDigest tx_digest = 1; // Transaction digest
}

// Filter by transaction execution module (different from event definition module)
message MoveModuleFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by Move event type (package::module::event_name)
message MoveEventTypeFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name (e.g., "request")
string name = 3; // Event name (e.g., "RequestEvent")
}

// Filter by package and module
message MoveEventModuleFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by timestamp range
message TimeRangeFilter {
uint64 start_time = 1; // Start time in milliseconds since epoch (inclusive)
uint64 end_time = 2; // End time in milliseconds since epoch (exclusive)
}
}
Loading
Loading