@@ -53,48 +53,45 @@ impl<B, S: Stream<Item = B>> BatchStream<B, S> {
53
53
impl < B , S : Stream < Item = B > > Stream for BatchStream < B , S > {
54
54
type Item = ShutdownResult < Vec < S :: Item > , Vec < S :: Item > > ;
55
55
56
- /// Polls the stream for the next batch of items.
56
+ /// Polls the stream for the next batch of items using a complex state machine .
57
57
///
58
- /// Returns:
59
- /// - `Poll::Ready(Some(batch))` when a complete batch is available
60
- /// - `Poll::Ready(None)` when the stream has ended
61
- /// - `Poll::Pending` when more items are needed to form a batch
62
- ///
63
- /// The stream will emit a batch when:
64
- /// - The batch reaches maximum size
65
- /// - A timeout occurs
66
- /// - The stream is forcefully stopped
58
+ /// This method implements a batching algorithm that balances throughput
59
+ /// and latency by collecting items into batches based on both size and time constraints.
60
+ /// The polling state machine handles multiple concurrent conditions and ensures proper
61
+ /// resource cleanup during shutdown scenarios.
67
62
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
68
63
let mut this = self . as_mut ( ) . project ( ) ;
69
64
65
+ // Fast path: if the inner stream has already ended, we're done
70
66
if * this. inner_stream_ended {
71
67
return Poll :: Ready ( None ) ;
72
68
}
73
69
74
70
loop {
71
+ // Fast path: if we've been marked as stopped, terminate immediately
75
72
if * this. stream_stopped {
76
73
return Poll :: Ready ( None ) ;
77
74
}
78
75
79
- // If the stream has been asked to stop, we mark the stream as stopped and return the
80
- // remaining elements, irrespectively of boundaries.
76
+ // PRIORITY 1: Check for shutdown signal
77
+ // Shutdown handling takes priority over all other operations to ensure
78
+ // graceful termination. We return any accumulated items with shutdown indication.
81
79
if this. shutdown_rx . has_changed ( ) . unwrap_or ( false ) {
82
80
info ! ( "the stream has been forcefully stopped" ) ;
83
81
84
- // We mark the stream as stopped, in this way any further call with return
85
- // `Poll::Ready(None)`.
82
+ // Mark stream as permanently stopped to prevent further polling
86
83
* this. stream_stopped = true ;
87
84
88
- // We mark the current value as unchanged, effectively acknowledging that we have
89
- // seen it. This does not affect the correctness, but it makes the implementation
90
- // semantically more correct.
85
+ // Acknowledge that we've seen the shutdown signal to maintain watch semantics
91
86
this. shutdown_rx . mark_unchanged ( ) ;
92
87
93
- // Even if we have no items, we return this result, since we signal that a shutdown
94
- // signal was received and the consumer side of the stream, can decide what to do.
88
+ // Return accumulated items (if any) with shutdown indication
89
+ // Even empty batches are returned to signal shutdown occurred
95
90
return Poll :: Ready ( Some ( ShutdownResult :: Shutdown ( std:: mem:: take ( this. items ) ) ) ) ;
96
91
}
97
92
93
+ // PRIORITY 2: Timer management
94
+ // Reset the timeout timer when starting a new batch or after emitting a batch
98
95
if * this. reset_timer {
99
96
this. deadline
100
97
. set ( Some ( tokio:: time:: sleep ( Duration :: from_millis (
@@ -103,48 +100,63 @@ impl<B, S: Stream<Item = B>> Stream for BatchStream<B, S> {
103
100
* this. reset_timer = false ;
104
101
}
105
102
103
+ // PRIORITY 3: Memory optimization
104
+ // Pre-allocate batch capacity when starting to collect items
105
+ // This avoids reallocations during batch collection
106
106
if this. items . is_empty ( ) {
107
107
this. items . reserve_exact ( this. batch_config . max_size ) ;
108
108
}
109
109
110
+ // PRIORITY 4: Poll underlying stream for new items
110
111
match this. stream . as_mut ( ) . poll_next ( cx) {
111
- Poll :: Pending => break ,
112
+ Poll :: Pending => {
113
+ // No more items available right now, check if we should emit due to timeout
114
+ break ;
115
+ }
112
116
Poll :: Ready ( Some ( item) ) => {
117
+ // New item available - add to current batch
113
118
this. items . push ( item) ;
114
119
115
- // If we reached the `max_batch_size` we want to return the batch and reset the
116
- // timer.
120
+ // SIZE-BASED EMISSION: If batch is full, emit immediately
121
+ // This provides throughput optimization for high-volume streams
117
122
if this. items . len ( ) >= this. batch_config . max_size {
118
- * this. reset_timer = true ;
123
+ * this. reset_timer = true ; // Schedule timer reset for next batch
119
124
return Poll :: Ready ( Some ( ShutdownResult :: Ok ( std:: mem:: take ( this. items ) ) ) ) ;
120
125
}
126
+ // Continue loop to collect more items or check other conditions
121
127
}
122
128
Poll :: Ready ( None ) => {
129
+ // STREAM END: Underlying stream finished
130
+ // Return final batch if we have items, otherwise signal completion
123
131
let last = if this. items . is_empty ( ) {
124
- None
132
+ None // No final batch needed
125
133
} else {
126
- * this. reset_timer = true ;
134
+ * this. reset_timer = true ; // Clean up timer state
127
135
Some ( ShutdownResult :: Ok ( std:: mem:: take ( this. items ) ) )
128
136
} ;
129
137
130
- * this. inner_stream_ended = true ;
138
+ * this. inner_stream_ended = true ; // Mark stream as permanently ended
131
139
132
140
return Poll :: Ready ( last) ;
133
141
}
134
142
}
135
143
}
136
144
137
- // If there are items, we want to check the deadline, if it's met, we return the batch
138
- // we currently have in memory, otherwise, we return.
145
+ // PRIORITY 5: Time-based emission check
146
+ // If we have items and the timeout has expired, emit the current batch
147
+ // This provides latency bounds to prevent indefinite delays in low-volume scenarios
139
148
if !this. items . is_empty ( )
140
149
&& let Some ( deadline) = this. deadline . as_pin_mut ( )
141
150
{
151
+ // Check if timeout has elapsed (this will register waker if not ready)
142
152
ready ! ( deadline. poll( cx) ) ;
143
- * this. reset_timer = true ;
153
+
154
+ * this. reset_timer = true ; // Schedule timer reset for next batch
144
155
145
156
return Poll :: Ready ( Some ( ShutdownResult :: Ok ( std:: mem:: take ( this. items ) ) ) ) ;
146
157
}
147
158
159
+ // No conditions met for batch emission - wait for more items or timeout
148
160
Poll :: Pending
149
161
}
150
162
}
0 commit comments