Skip to content

STR-404: Yellowstone-grpc-client stream auto-reconnect#717

Open
Ozodimgba wants to merge 3 commits intomasterfrom
str-404
Open

STR-404: Yellowstone-grpc-client stream auto-reconnect#717
Ozodimgba wants to merge 3 commits intomasterfrom
str-404

Conversation

@Ozodimgba
Copy link
Copy Markdown
Contributor

Closes #STR-404

@Ozodimgba
Copy link
Copy Markdown
Contributor Author

@leafaar and @lvboudre this should classify as minor bump...the breaking changes are minimal?

Copy link
Copy Markdown
Contributor

@lvboudre lvboudre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed changes

I propose this:

DedupStream object which implements Stream trait and apply the dedup logic you did.

Since we are return impl Stream you can return really complex type with lots of nested generics too.

#[pin_project]
struct DedupStream<GrpcStream> {
  pub(crate) state: DedupState, // this dedup state can recovered by another object on failure.
  #[pin]
  inner: GrpcStream
}

impl<S> Stream< for DedupStream<S> 
  where S: Stream<Item = Result<SubscribeUpdate, Status>> + Send + Unpin + 'static
{
   type Item = <S as Stream>::Item;


   fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
       let mut this = self.project();
       loop  {
           let poll = this.inner.poll_next(cx);
           match poll {
              Poll::Ready(result) => {
                   // do dedup or propagate the error
              }
             Poll:Pending => // return pending
       }
    }
}

Now for the retry logic, you make make another Stream that wraps you dedup stream.

struct NeedReconnect {
   attempt: usize,
   dedup_state: DedupState,
}

struct ReconnectingFut {
  dedup_state: DedupState,
}

// This is just a prototype but it is essentially a state machine, that each state
// can be represented a concrete type, therefore compiled check.
enum AutoReconnectState<GrpcStream> {
   Connected(DedupStream<GrpcStream>),
   NeedReconnect(NeedReconnect),
   ReconnectingFut(ReconnectingFut) // maybe make it generic?
}


pub struct AutoReconnect<GrpcStream> {
  pub(crate) endpoint: Endpoint, // preconfigured endpoint that builds `Channel`,
  // invariant : Dedup_state can only be `Some` for `inner` is `None`
  state: AutoReconnectState<GrpcStream>,
  reconnect_config: ReconnectionConfig // whatever is the reconnect logic.
}


impl<GrpcStream> Stream for AutoReconnect<GrpcStream> 
   where GrpcStream : // Put all the same condition as before
{
    type  Item = <GrpcStream as Stream>::Item;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
       let mut this = self.project();
       //  either poll if alive or apply your reconnection state machine.
    }
}

Using custom Stream types will make the code more elegant instead of cramming all the reconnection logic/state machine inside a loop.

Dedup window

I didn't see in the case any way to prune the dedup state where you stored all dedup keys. We need a maximum retention window that must be configurable by the customer.

@Ozodimgba Ozodimgba requested a review from lvboudre March 27, 2026 01:36
@Ozodimgba Ozodimgba force-pushed the str-404 branch 2 times, most recently from 4e55b86 to dcad5d3 Compare March 31, 2026 19:34
Copy link
Copy Markdown
Contributor

@leafaar leafaar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments, looks good, though I think we need to support DeshredTransactions as well, it's another Subscribe method though, so it can be alittle bit annoying

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants