-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: support additional eth_subscribe handlers #20777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great start
| /// Fallback handler for unknown subscription kinds or params. | ||
| fallback_handler: Option<FallbackHandler>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a way to inject additional handlers in here, so I think this could be like an Arc<Rwlock<HashMap<String, Handler> for example
| pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self { | ||
| let inner = EthPubSubInner { eth_api, subscription_task_spawner }; | ||
| Self::with_spawner_and_fallback(eth_api, subscription_task_spawner, None) | ||
| } | ||
|
|
||
| /// Creates a new, shareable instance with a fallback handler for unknown subscription kinds or | ||
| /// params. | ||
| pub fn with_spawner_and_fallback_handler<F>( | ||
| eth_api: Eth, | ||
| subscription_task_spawner: Box<dyn TaskSpawner>, | ||
| fallback_handler: F, | ||
| ) -> Self | ||
| where | ||
| F: Fn(String, Option<Box<JsonRawValue>>) -> Result<PubSubStream, ErrorObject<'static>> | ||
| + Send | ||
| + Sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can keep all of this as is, and instead add smth like register_handler() that inserts a new handler
this is similar to the Methods type
pub struct Methods {
callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
reth/crates/rpc/rpc-builder/src/lib.rs
Lines 1755 to 1760 in 485eb2e
| /// Merge the given [`Methods`] in all configured methods. | |
| /// | |
| /// Fails if any of the methods in other is present already. | |
| pub fn merge_configured( | |
| &mut self, | |
| other: impl Into<Methods>, |
basically, we want a way to inject additional handlers after the fact, like:
reth/examples/exex-subscription/src/main.rs
Line 172 in 485eb2e
| ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah should do it
|
pending @mattsse |
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, a few more style suggestions
crates/rpc/rpc/src/eth/pubsub.rs
Outdated
| type PubSubStream = Box<dyn Stream<Item = Box<JsonRawValue>> + Send + Unpin>; | ||
| type SubscriptionHandler = Arc< | ||
| dyn Fn(Option<Box<JsonRawValue>>) -> Result<PubSubStream, ErrorObject<'static>> + Send + Sync, | ||
| >; | ||
| type FallbackHandler = Arc< | ||
| dyn Fn(String, Option<Box<JsonRawValue>>) -> Result<PubSubStream, ErrorObject<'static>> | ||
| + Send | ||
| + Sync, | ||
| >; | ||
|
|
||
| struct ParsedSubscription { | ||
| kind: SubscriptionKind, | ||
| params: Option<Params>, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| enum ParseSubscriptionError { | ||
| UnsupportedKind, | ||
| InvalidParams(&'static str), | ||
| } | ||
|
|
||
| impl ParseSubscriptionError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these private types below the ethpubinner impl block so that the order is
pub
private
| /// Creates a new, shareable instance with a fallback handler for unknown subscription kinds or | ||
| /// params. | ||
| pub fn with_spawner_and_fallback_handler<F>( | ||
| eth_api: Eth, | ||
| subscription_task_spawner: Box<dyn TaskSpawner>, | ||
| fallback_handler: F, | ||
| ) -> Self | ||
| where | ||
| F: Fn(String, Option<Box<JsonRawValue>>) -> Result<PubSubStream, ErrorObject<'static>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do without this fn
| /// Dispatches validated subscription kinds to their concrete stream implementations. | ||
| async fn handle_parsed( | ||
| &self, | ||
| accepted_sink: SubscriptionSink, | ||
| parsed: ParsedSubscription, | ||
| ) -> Result<(), ErrorObject<'static>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should remain unchanged entirely, easier to review that way if we dont alter existing code too much, so this should remain
pub async fn handle_accepted(
&self,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
with the same body
| /// Fallback handler for unknown subscription kinds or params. | ||
| fallback_handler: Option<FallbackHandler>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this we don't need because if no additional_handlers match then we return an error
This closes #20750
Changes
Parse the subscription
kindand raw params at the callsite: if thekindmaps to a knownSubscriptionKind(newHeads/logs/newPendingTransactions/syncing), we validate and normalize params upfront and then invoke the standard handlers.If the
kindis unknown, we try a per‑kind custom handler (registered viaregister_handler) first, then fall back to a globalfallback_handlerif configured; otherwise we returninvalid_params.invalid_paramsto preserve strict RPC semantics and avoid masking client errors. Custom subscriptions are only accepted when explicitly registered (per‑kind handler or global fallback), and they must provide their own subscription stream; otherwise the request is rejected.Invalid paths are explicitly:
logsexpectsFilter/none;newPendingTransactionsexpectsbool/none; deserialization/type mismatch) →invalid_params.invalid_params.Err(ErrorObject)→ forwarded as‑is (e.g.,invalid_paramsor other error).cc @mattsse