@@ -2,7 +2,7 @@ from __future__ import annotations
22
33import  os 
44from  collections .abc  import  Iterable , Iterator , Sequence 
5- from  datetime  import  datetime 
5+ from  datetime  import  datetime ,  timedelta 
66from  enum  import  Enum 
77from  typing  import  Any , Callable , Optional , Self 
88
@@ -614,6 +614,120 @@ class PyRecordingStream:
614614        Calling operations such as flush or set_sink will result in an error. 
615615        """ 
616616
617+ class  ChunkBatcherConfig :
618+     """Defines the different batching thresholds used within the RecordingStream.""" 
619+ 
620+     def  __init__ (
621+         self ,
622+         flush_tick : int  |  float  |  timedelta  |  None  =  None ,
623+         flush_num_bytes : int  |  None  =  None ,
624+         flush_num_rows : int  |  None  =  None ,
625+         chunk_max_rows_if_unsorted : int  |  None  =  None ,
626+     ) ->  None :
627+         """ 
628+         Initialize the chunk batcher configuration. 
629+ 
630+         Parameters 
631+         ---------- 
632+         flush_tick : int | float | timedelta | None 
633+             Duration of the periodic tick, by default `None`. 
634+             Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable. 
635+ 
636+         flush_num_bytes : int | None 
637+             Flush if the accumulated payload has a size in bytes equal or greater than this, by default `None`. 
638+             Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable. 
639+ 
640+         flush_num_rows : int | None 
641+             Flush if the accumulated payload has a number of rows equal or greater than this, by default `None`. 
642+             Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable. 
643+ 
644+         chunk_max_rows_if_unsorted : int | None 
645+             Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted, 
646+             by default `None`. 
647+             Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable. 
648+ 
649+         """ 
650+ 
651+     @property  
652+     def  flush_tick (self ) ->  timedelta :
653+         """ 
654+         Duration of the periodic tick. 
655+ 
656+         Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable. 
657+         """ 
658+ 
659+     @flush_tick .setter  
660+     def  flush_tick (self , value : float  |  int  |  timedelta ) ->  None :
661+         """ 
662+         Duration of the periodic tick. 
663+ 
664+         Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable. 
665+         """ 
666+ 
667+     @property  
668+     def  flush_num_bytes (self ) ->  int :
669+         """ 
670+         Flush if the accumulated payload has a size in bytes equal or greater than this. 
671+ 
672+         Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable. 
673+         """ 
674+ 
675+     @flush_num_bytes .setter  
676+     def  flush_num_bytes (self , value : int ) ->  None :
677+         """ 
678+         Flush if the accumulated payload has a size in bytes equal or greater than this. 
679+ 
680+         Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable. 
681+         """ 
682+ 
683+     @property  
684+     def  flush_num_rows (self ) ->  int :
685+         """ 
686+         Flush if the accumulated payload has a number of rows equal or greater than this. 
687+ 
688+         Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable. 
689+         """ 
690+ 
691+     @flush_num_rows .setter  
692+     def  flush_num_rows (self , value : int ) ->  None :
693+         """ 
694+         Flush if the accumulated payload has a number of rows equal or greater than this. 
695+ 
696+         Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable. 
697+         """ 
698+ 
699+     @property  
700+     def  chunk_max_rows_if_unsorted (self ) ->  int :
701+         """ 
702+         Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted. 
703+ 
704+         Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable. 
705+         """ 
706+ 
707+     @chunk_max_rows_if_unsorted .setter  
708+     def  chunk_max_rows_if_unsorted (self , value : int ) ->  None :
709+         """ 
710+         Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted. 
711+ 
712+         Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable. 
713+         """ 
714+ 
715+     @staticmethod  
716+     def  DEFAULT () ->  ChunkBatcherConfig :
717+         """Default configuration, applicable to most use cases.""" 
718+ 
719+     @staticmethod  
720+     def  LOW_LATENCY () ->  ChunkBatcherConfig :
721+         """Low-latency configuration, preferred when streaming directly to a viewer.""" 
722+ 
723+     @staticmethod  
724+     def  ALWAYS () ->  ChunkBatcherConfig :
725+         """Always flushes ASAP.""" 
726+ 
727+     @staticmethod  
728+     def  NEVER () ->  ChunkBatcherConfig :
729+         """Never flushes unless manually told to (or hitting one the builtin invariants).""" 
730+ 
617731class  PyMemorySinkStorage :
618732    def  concat_as_bytes (self , concat : Optional [PyMemorySinkStorage ] =  None ) ->  bytes :
619733        """ 
@@ -658,6 +772,7 @@ def new_recording(
658772    make_thread_default : bool  =  True ,
659773    default_enabled : bool  =  True ,
660774    send_properties : bool  =  True ,
775+     batcher_config : Optional [ChunkBatcherConfig ] =  None ,
661776) ->  PyRecordingStream :
662777    """Create a new recording stream.""" 
663778
0 commit comments