1414
1515//! Utilities for scanners.
1616
17+ use std:: collections:: VecDeque ;
1718use std:: fmt;
19+ use std:: pin:: Pin ;
1820use std:: sync:: { Arc , Mutex } ;
21+ use std:: task:: { Context , Poll } ;
1922use std:: time:: { Duration , Instant } ;
2023
2124use async_stream:: try_stream;
2225use datafusion:: physical_plan:: metrics:: { ExecutionPlanMetricsSet , MetricBuilder , Time } ;
2326use datatypes:: arrow:: record_batch:: RecordBatch ;
27+ use datatypes:: timestamp:: timestamp_array_to_primitive;
2428use futures:: Stream ;
2529use prometheus:: IntGauge ;
2630use smallvec:: SmallVec ;
@@ -33,11 +37,13 @@ use crate::metrics::{
3337 IN_PROGRESS_SCAN , PRECISE_FILTER_ROWS_TOTAL , READ_BATCHES_RETURN , READ_ROW_GROUPS_TOTAL ,
3438 READ_ROWS_IN_ROW_GROUP_TOTAL , READ_ROWS_RETURN , READ_STAGE_ELAPSED ,
3539} ;
36- use crate :: read:: range:: { RangeBuilderList , RowGroupIndex } ;
40+ use crate :: read:: range:: { RangeBuilderList , RangeMeta , RowGroupIndex } ;
3741use crate :: read:: scan_region:: StreamContext ;
3842use crate :: read:: { Batch , BoxedBatchStream , BoxedRecordBatchStream , ScannerMetrics , Source } ;
3943use crate :: sst:: file:: FileTimeRange ;
44+ use crate :: sst:: parquet:: DEFAULT_ROW_GROUP_SIZE ;
4045use crate :: sst:: parquet:: file_range:: FileRange ;
46+ use crate :: sst:: parquet:: flat_format:: time_index_column_index;
4147use crate :: sst:: parquet:: reader:: { ReaderFilterMetrics , ReaderMetrics } ;
4248
4349/// Verbose scan metrics for a partition.
@@ -697,6 +703,71 @@ pub(crate) fn scan_flat_mem_ranges(
697703 }
698704}
699705
706+ /// Files with row count greater than this threshold can contribute to the estimation.
707+ const SPLIT_ROW_THRESHOLD : u64 = DEFAULT_ROW_GROUP_SIZE as u64 ;
708+ /// Number of series threshold for splitting batches.
709+ const NUM_SERIES_THRESHOLD : u64 = 10240 ;
710+ /// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
711+ /// 60 samples per hour.
712+ const BATCH_SIZE_THRESHOLD : u64 = 50 ;
713+
714+ /// Returns true if splitting flat record batches may improve merge performance.
715+ pub ( crate ) fn should_split_flat_batches_for_merge (
716+ stream_ctx : & Arc < StreamContext > ,
717+ range_meta : & RangeMeta ,
718+ ) -> bool {
719+ // Number of files to split and scan.
720+ let mut num_files_to_split = 0 ;
721+ let mut num_mem_rows = 0 ;
722+ let mut num_mem_series = 0 ;
723+ // Checks each file range, returns early if any range is not splittable.
724+ // For mem ranges, we collect the total number of rows and series because the number of rows in a
725+ // mem range may be too small.
726+ for index in & range_meta. row_group_indices {
727+ if stream_ctx. is_mem_range_index ( * index) {
728+ let memtable = & stream_ctx. input . memtables [ index. index ] ;
729+ // Is mem range
730+ let stats = memtable. stats ( ) ;
731+ num_mem_rows += stats. num_rows ( ) ;
732+ num_mem_series += stats. series_count ( ) ;
733+ } else if stream_ctx. is_file_range_index ( * index) {
734+ // This is a file range.
735+ let file_index = index. index - stream_ctx. input . num_memtables ( ) ;
736+ let file = & stream_ctx. input . files [ file_index] ;
737+ if file. meta_ref ( ) . num_rows < SPLIT_ROW_THRESHOLD || file. meta_ref ( ) . num_series == 0 {
738+ // If the file doesn't have enough rows, or the number of series is unavailable, skips it.
739+ continue ;
740+ }
741+ debug_assert ! ( file. meta_ref( ) . num_rows > 0 ) ;
742+ if !can_split_series ( file. meta_ref ( ) . num_rows , file. meta_ref ( ) . num_series ) {
743+ // We can't split batches in a file.
744+ return false ;
745+ } else {
746+ num_files_to_split += 1 ;
747+ }
748+ }
749+ // Skips non-file and non-mem ranges.
750+ }
751+
752+ if num_files_to_split > 0 {
753+ // We mainly consider file ranges because they have enough data for sampling.
754+ true
755+ } else if num_mem_series > 0 && num_mem_rows > 0 {
756+ // If we don't have files to scan, we check whether to split by the memtable.
757+ can_split_series ( num_mem_rows as u64 , num_mem_series as u64 )
758+ } else {
759+ false
760+ }
761+ }
762+
763+ fn can_split_series ( num_rows : u64 , num_series : u64 ) -> bool {
764+ assert ! ( num_series > 0 ) ;
765+ assert ! ( num_rows > 0 ) ;
766+
767+ // It doesn't have too many series or it will have enough rows for each batch.
768+ num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
769+ }
770+
700771/// Scans file ranges at `index`.
701772pub ( crate ) async fn scan_file_ranges (
702773 stream_ctx : Arc < StreamContext > ,
@@ -876,3 +947,83 @@ pub(crate) async fn maybe_scan_flat_other_ranges(
876947 }
877948 . fail ( )
878949}
950+
951+ /// A stream wrapper that splits record batches from an inner stream.
952+ pub ( crate ) struct SplitRecordBatchStream < S > {
953+ /// The inner stream that yields record batches.
954+ inner : S ,
955+ /// Buffer for split batches.
956+ batches : VecDeque < RecordBatch > ,
957+ }
958+
959+ impl < S > SplitRecordBatchStream < S > {
960+ /// Creates a new splitting stream wrapper.
961+ pub ( crate ) fn new ( inner : S ) -> Self {
962+ Self {
963+ inner,
964+ batches : VecDeque :: new ( ) ,
965+ }
966+ }
967+ }
968+
969+ impl < S > Stream for SplitRecordBatchStream < S >
970+ where
971+ S : Stream < Item = Result < RecordBatch > > + Unpin ,
972+ {
973+ type Item = Result < RecordBatch > ;
974+
975+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
976+ loop {
977+ // First, check if we have buffered split batches
978+ if let Some ( batch) = self . batches . pop_front ( ) {
979+ return Poll :: Ready ( Some ( Ok ( batch) ) ) ;
980+ }
981+
982+ // Poll the inner stream for the next batch
983+ let record_batch = match futures:: ready!( Pin :: new( & mut self . inner) . poll_next( cx) ) {
984+ Some ( Ok ( batch) ) => batch,
985+ Some ( Err ( e) ) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
986+ None => return Poll :: Ready ( None ) ,
987+ } ;
988+
989+ // Split the batch and buffer the results
990+ split_record_batch ( record_batch, & mut self . batches ) ;
991+ // Continue the loop to return the first split batch
992+ }
993+ }
994+ }
995+
996+ /// Splits the batch by timestamps.
997+ ///
998+ /// # Panics
999+ /// Panics if the timestamp array is invalid.
1000+ pub ( crate ) fn split_record_batch ( record_batch : RecordBatch , batches : & mut VecDeque < RecordBatch > ) {
1001+ let batch_rows = record_batch. num_rows ( ) ;
1002+ if batch_rows == 0 {
1003+ return ;
1004+ }
1005+ if batch_rows < 2 {
1006+ batches. push_back ( record_batch) ;
1007+ return ;
1008+ }
1009+
1010+ let time_index_pos = time_index_column_index ( record_batch. num_columns ( ) ) ;
1011+ let timestamps = record_batch. column ( time_index_pos) ;
1012+ let ( ts_values, _unit) = timestamp_array_to_primitive ( timestamps) . unwrap ( ) ;
1013+ let mut offsets = Vec :: with_capacity ( 16 ) ;
1014+ offsets. push ( 0 ) ;
1015+ let values = ts_values. values ( ) ;
1016+ for ( i, & value) in values. iter ( ) . take ( batch_rows - 1 ) . enumerate ( ) {
1017+ if value > values[ i + 1 ] {
1018+ offsets. push ( i + 1 ) ;
1019+ }
1020+ }
1021+ offsets. push ( values. len ( ) ) ;
1022+
1023+ // Splits the batch by offsets.
1024+ for ( i, & start) in offsets[ ..offsets. len ( ) - 1 ] . iter ( ) . enumerate ( ) {
1025+ let end = offsets[ i + 1 ] ;
1026+ let rows_in_batch = end - start;
1027+ batches. push_back ( record_batch. slice ( start, rows_in_batch) ) ;
1028+ }
1029+ }
0 commit comments