diff --git a/Cargo.lock b/Cargo.lock index db7784c..4271df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,15 +473,18 @@ dependencies = [ "fc-rpc-core", "fp-rpc", "frame-system-rpc-runtime-api", + "futures 0.3.16", "hex", "jsonrpc-core 15.1.0", "jsonrpc-core-client 15.1.0", "jsonrpc-derive 15.1.0", "jsonrpc-pubsub 15.1.0", + "log", "pallet-ethereum", "pallet-geode", "pallet-transaction-payment-rpc", "pallet-transfer", + "parity-scale-codec", "sc-client-api", "sc-consensus-babe", "sc-consensus-babe-rpc", diff --git a/node/src/service.rs b/node/src/service.rs index c2e364d..2c3b1ba 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -235,6 +235,8 @@ pub fn new_full(mut config: Configuration) -> Result let is_authority = role.is_authority(); let subscription_task_executor = sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); + let geode_subscription_executor = + sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle()); let babe_config = babe_link.config().clone(); let shared_epoch_changes = babe_link.epoch_changes().clone(); let justification_stream = grandpa_link.justification_stream(); @@ -277,7 +279,11 @@ pub fn new_full(mut config: Configuration) -> Result }, }; - automata_rpc::create_full(deps, subscription_task_executor.clone()) + automata_rpc::create_full( + deps, + subscription_task_executor.clone(), + geode_subscription_executor.clone(), + ) }) }; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 9030223..98423ec 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,6 +39,9 @@ hex = '0.4.3' sp-std = '3.0.0' frame-system-rpc-runtime-api = { default-features = false, version = '3.0.0' } sp-core = { features = ["full_crypto"], version = '3.0.0' } +log = "0.4.8" +futures = "0.3.9" +codec = { default-features = false, features = ['derive'], package = 'parity-scale-codec', version = '2.0.0' } # local dependencies automata-primitives = { path = "../primitives" } diff --git a/rpc/src/geode.rs b/rpc/src/geode.rs index ccd8a79..94d6abd 100644 --- a/rpc/src/geode.rs +++ b/rpc/src/geode.rs @@ -9,14 +9,29 @@ use sp_runtime::{traits::Block as BlockT, RuntimeDebug}; use sp_std::{collections::btree_map::BTreeMap, prelude::*}; use std::sync::Arc; -// #[cfg(feature = "std")] +use codec::{Decode, Encode}; +use sc_client_api::BlockchainEvents; +use sc_client_api::StorageChangeSet; +use sp_core::storage::StorageKey; +use sp_core::{blake2_128, twox_128}; + +use futures::{future, StreamExt, TryStreamExt}; +use jsonrpc_core::futures::{future::Future, sink::Sink, stream, stream::Stream}; +use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; +use log::warn; + use serde::{Deserialize, Serialize}; const RUNTIME_ERROR: i64 = 1; +type GeodeId = [u8; 32]; + #[rpc] /// Geode RPC methods pub trait GeodeServer { + /// RPC Metadata + type Metadata; + /// return the registered geode list #[rpc(name = "registered_geodes")] fn registered_geodes(&self) -> Result>>; @@ -29,6 +44,19 @@ pub trait GeodeServer { /// Return the current state of a geode #[rpc(name = "geode_state")] fn geode_state(&self, geode: [u8; 32]) -> Result>; + + /// Geode state subscription + #[pubsub(subscription = "geode_state", subscribe, name = "geode_subscribeState")] + fn subscribe_geode_state(&self, _: Self::Metadata, _: Subscriber, id: GeodeId); + + /// Unsubscribe from geode state subscription. + #[pubsub( + subscription = "geode_state", + unsubscribe, + name = "geode_unsubscribeState" + )] + fn unsubscribe_geode_state(&self, _: Option, _: SubscriptionId) + -> Result; } /// The geode struct shows its status @@ -71,21 +99,24 @@ impl From> for WrappedGeode { /// An implementation of geode specific RPC methods. pub struct GeodeApi { client: Arc, + manager: SubscriptionManager, } impl GeodeApi { /// Create new `Geode` with the given reference to the client. - pub fn new(client: Arc) -> Self { - GeodeApi { client } + pub fn new(client: Arc, manager: SubscriptionManager) -> Self { + GeodeApi { client, manager } } } impl GeodeServer<::Hash> for GeodeApi where C: Send + Sync + 'static, - C: ProvideRuntimeApi + HeaderBackend, + C: ProvideRuntimeApi + HeaderBackend + BlockchainEvents, C::Api: GeodeRuntimeApi, { + type Metadata = sc_rpc::Metadata; + /// get registered geode list fn registered_geodes(&self) -> Result>> { let api = self.client.runtime_api(); @@ -154,4 +185,99 @@ where Ok(geode_state) } + + fn subscribe_geode_state( + &self, + _metadata: Self::Metadata, + subscriber: Subscriber, + id: GeodeId, + ) { + // get the current state of the geode + // if the geode does not exist, reject the subscription + let initial = match self.geode_state(id.clone()) { + Ok(state) => match state { + Some(initial) => Ok(initial), + None => { + let _ = subscriber.reject(Error::invalid_params("no such geode")); + return; + } + }, + Err(e) => Err(e), + }; + let key: StorageKey = StorageKey(build_storage_key(id.clone())); + let keys = Into::>>::into(vec![key]); + let stream = match self + .client + .storage_changes_notification_stream(keys.as_ref().map(|x| &**x), None) + { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(client_err(err).into()); + return; + } + }; + + let stream = stream + .filter_map(move |(_block, changes)| match get_geode_state(changes) { + Ok(state) => future::ready(Some(Ok::<_, ()>(Ok(state)))), + Err(_) => future::ready(None), + }) + .compat(); + + self.manager.add(subscriber, |sink| { + sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream::iter_result(vec![Ok(initial)]).chain(stream)) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); + } + + fn unsubscribe_geode_state( + &self, + _metadata: Option, + id: SubscriptionId, + ) -> Result { + Ok(self.manager.cancel(id)) + } +} + +fn build_storage_key(id: GeodeId) -> Vec { + let geode_module = twox_128(b"GeodeModule"); + let geodes = twox_128(b"Geodes"); + let geode: AccountId = id.into(); + let geode = blake2_128_concat(&geode.encode()); + + let mut param = vec![]; + param.extend(geode_module); + param.extend(geodes); + param.extend(geode); + param +} + +fn blake2_128_concat(d: &[u8]) -> Vec { + let mut v = blake2_128(d).to_vec(); + v.extend_from_slice(d); + v +} + +fn get_geode_state(changes: StorageChangeSet) -> Result { + for (_, _, data) in changes.iter() { + match data { + Some(data) => { + let mut value: &[u8] = &data.0.clone(); + match Geode::::decode(&mut value) { + Ok(geode) => { + return Ok(geode.state); + } + Err(_) => warn!("unable to decode Geode"), + } + } + None => warn!("empty change set"), + }; + } + Err(Error::internal_error()) +} + +fn client_err(_: sp_blockchain::Error) -> Error { + Error::invalid_request() } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 74b2e86..f881bf9 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -90,6 +90,7 @@ pub struct FullDeps { pub fn create_full( deps: FullDeps, subscription_task_executor: SubscriptionTaskExecutor, + geode_subscription_executor: SubscriptionTaskExecutor, ) -> jsonrpc_core::IoHandler where BE: Backend + 'static, @@ -236,6 +237,7 @@ where io.extend_with(GeodeServer::to_delegate(geode::GeodeApi::new( client.clone(), + SubscriptionManager::new(Arc::new(geode_subscription_executor)), ))); io.extend_with(TransferServer::to_delegate(transfer::TransferApi::new(