@@ -88,10 +88,8 @@ async def _flush_after_interval(self) -> None:
8888 if the batch has not already been drained by the size trigger.
8989 """
9090 try :
91- logger .debug ("timer sleeping" )
9291 await asyncio .sleep (self ._batch_interval_s )
9392 except asyncio .CancelledError :
94- logger .debug ("timer caught cancelled error" )
9593 return
9694
9795 batch : list [Input ] | None = None
@@ -102,6 +100,7 @@ async def _flush_after_interval(self) -> None:
102100 self ._flush_timer = None
103101
104102 if batch :
103+ logger .debug ("Flushing messages based on timer" )
105104 await self ._flush_batch (batch )
106105
107106 def _drain_locked (self ) -> list [Input ]:
@@ -136,6 +135,9 @@ async def add(self, item: Input) -> None:
136135 batch_to_flush = self ._drain_locked ()
137136
138137 if batch_to_flush :
138+ logger .debug ("Flushing messages based on backlog size" )
139+ logger .debug ("Remaining items in _batch_buffer: %d" , len (self ._batch_buffer ))
140+ logger .debug ("Batch size to flush after: %d" , self ._batch_size )
139141 await self ._flush_batch (batch_to_flush )
140142
141143 async def flush (self ) -> None :
@@ -150,6 +152,7 @@ async def flush(self) -> None:
150152 if self ._batch_buffer :
151153 batch_to_flush = self ._drain_locked ()
152154 if batch_to_flush :
155+ logger .debug ("Flushing messages based on manual trigger" )
153156 await self ._flush_batch (batch_to_flush )
154157
155158 async def _process_batch (self , batch : list [Input ]) -> list [Output ]:
0 commit comments