Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 152 additions & 1 deletion src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

//! Utilities for scanners.

use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use async_stream::try_stream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::timestamp::timestamp_array_to_primitive;
use futures::Stream;
use prometheus::IntGauge;
use smallvec::SmallVec;
Expand All @@ -33,11 +37,13 @@ use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
};
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};

/// Verbose scan metrics for a partition.
Expand Down Expand Up @@ -697,6 +703,71 @@ pub(crate) fn scan_flat_mem_ranges(
}
}

/// Files with row count greater than this threshold can contribute to the estimation.
const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
/// Number of series threshold for splitting batches.
const NUM_SERIES_THRESHOLD: u64 = 10240;
/// Minimum batch size after splitting. The batch size is less than 60 because a series may only have
/// 60 samples per hour.
const BATCH_SIZE_THRESHOLD: u64 = 50;

/// Returns true if splitting flat record batches may improve merge performance.
pub(crate) fn should_split_flat_batches_for_merge(
stream_ctx: &Arc<StreamContext>,
range_meta: &RangeMeta,
) -> bool {
// Number of files to split and scan.
let mut num_files_to_split = 0;
let mut num_mem_rows = 0;
let mut num_mem_series = 0;
// Checks each file range, returns early if any range is not splittable.
// For mem ranges, we collect the total number of rows and series because the number of rows in a
// mem range may be too small.
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let memtable = &stream_ctx.input.memtables[index.index];
// Is mem range
let stats = memtable.stats();
num_mem_rows += stats.num_rows();
num_mem_series += stats.series_count();
} else if stream_ctx.is_file_range_index(*index) {
// This is a file range.
let file_index = index.index - stream_ctx.input.num_memtables();
let file = &stream_ctx.input.files[file_index];
if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
// If the file doesn't have enough rows, or the number of series is unavailable, skips it.
continue;
}
debug_assert!(file.meta_ref().num_rows > 0);
if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
// We can't split batches in a file.
return false;
} else {
num_files_to_split += 1;
}
}
// Skips non-file and non-mem ranges.
}

if num_files_to_split > 0 {
// We mainly consider file ranges because they have enough data for sampling.
true
} else if num_mem_series > 0 && num_mem_rows > 0 {
// If we don't have files to scan, we check whether to split by the memtable.
can_split_series(num_mem_rows as u64, num_mem_series as u64)
} else {
false
}
}

fn can_split_series(num_rows: u64, num_series: u64) -> bool {
assert!(num_series > 0);
assert!(num_rows > 0);

// It doesn't have too many series or it will have enough rows for each batch.
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
}

/// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
Expand Down Expand Up @@ -876,3 +947,83 @@ pub(crate) async fn maybe_scan_flat_other_ranges(
}
.fail()
}

/// A stream wrapper that splits record batches from an inner stream.
pub(crate) struct SplitRecordBatchStream<S> {
/// The inner stream that yields record batches.
inner: S,
/// Buffer for split batches.
batches: VecDeque<RecordBatch>,
}

impl<S> SplitRecordBatchStream<S> {
/// Creates a new splitting stream wrapper.
pub(crate) fn new(inner: S) -> Self {
Self {
inner,
batches: VecDeque::new(),
}
}
}

impl<S> Stream for SplitRecordBatchStream<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// First, check if we have buffered split batches
if let Some(batch) = self.batches.pop_front() {
return Poll::Ready(Some(Ok(batch)));
}

// Poll the inner stream for the next batch
let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(batch)) => batch,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
};

// Split the batch and buffer the results
split_record_batch(record_batch, &mut self.batches);
// Continue the loop to return the first split batch
}
}
}

/// Splits the batch by timestamps.
///
/// # Panics
/// Panics if the timestamp array is invalid.
pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
let batch_rows = record_batch.num_rows();
if batch_rows == 0 {
return;
}
if batch_rows < 2 {
batches.push_back(record_batch);
return;
}

let time_index_pos = time_index_column_index(record_batch.num_columns());
let timestamps = record_batch.column(time_index_pos);
let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
let mut offsets = Vec::with_capacity(16);
offsets.push(0);
let values = ts_values.values();
for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
if value > values[i + 1] {
offsets.push(i + 1);
}
}
offsets.push(values.len());

// Splits the batch by offsets.
for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
let end = offsets[i + 1];
let rows_in_batch = end - start;
batches.push_back(record_batch.slice(start, rows_in_batch));
}
}
22 changes: 19 additions & 3 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ use crate::read::merge::MergeReaderBuilder;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
scan_flat_mem_ranges, scan_mem_ranges,
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
should_split_flat_batches_for_merge,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{
Expand Down Expand Up @@ -818,6 +819,7 @@ pub(crate) async fn build_flat_sources(
return Ok(());
}

let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
Expand Down Expand Up @@ -874,8 +876,22 @@ pub(crate) async fn build_flat_sources(
}

for stream in ordered_sources.into_iter().flatten() {
sources.push(stream);
if should_split {
sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
} else {
sources.push(stream);
}
}

if should_split {
common_telemetry::debug!(
"Splitting record batches, region: {}, sources: {}, part_range: {:?}",
stream_ctx.input.region_metadata().region_id,
sources.len(),
part_range,
);
}

Ok(())
}

Expand Down