From 14baad74f93a44a32328d062e45ce969ff3ca175 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Aug 2025 13:36:53 -0400 Subject: [PATCH] Encapsulate early pruning in its own stream --- datafusion/datasource-parquet/src/opener.rs | 153 +++++++++++++------- 1 file changed, 97 insertions(+), 56 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 694b14f6a5c2..42b5776abe15 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -17,17 +17,19 @@ //! [`ParquetOpener`] for opening Parquet files -use std::sync::Arc; - use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; +use arrow::array::RecordBatch; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; use arrow::error::ArrowError; @@ -47,7 +49,7 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{StreamExt, TryStreamExt}; +use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; @@ -409,64 +411,103 @@ impl FileOpener for ParquetOpener { .with_row_groups(row_group_indexes) .build()?; - // Create a stateful stream that can check pruning after each batch - let adapted = { - use futures::stream; - let schema_mapping = Some(schema_mapping); - let file_pruner = file_pruner; - let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e))); - let files_ranges_pruned_statistics = - file_metrics.files_ranges_pruned_statistics.clone(); - - stream::try_unfold( - ( - stream, - schema_mapping, - file_pruner, - files_ranges_pruned_statistics, - ), - move |( - mut stream, - schema_mapping_opt, - mut file_pruner, - files_ranges_pruned_statistics, - )| async move { - match stream.try_next().await? { - Some(batch) => { - let schema_mapping = schema_mapping_opt.as_ref().unwrap(); - let mapped_batch = schema_mapping.map_batch(batch)?; - - // Check if we can prune the file now - if let Some(ref mut pruner) = file_pruner { - if pruner.should_prune()? { - // File can now be pruned based on updated dynamic filters - files_ranges_pruned_statistics.add(1); - // Terminate the stream early - return Ok(None); - } - } - - Ok(Some(( - mapped_batch, - ( - stream, - schema_mapping_opt, - file_pruner, - files_ranges_pruned_statistics, - ), - ))) - } - None => Ok(None), - } - }, - ) - }; + let stream = stream + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + .map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?))); - Ok(adapted.boxed()) + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + file_metrics.files_ranges_pruned_statistics.clone(), + ) + .boxed()) + } else { + Ok(stream.boxed()) + } })) } } +/// Wraps an inner RecordBatchStream and a [`FilePruner`] +/// +/// This can terminate the scan early when some dynamic filters is updated after +/// the scan starts, so we discover after the scan starts that the file can be +/// pruned (can't have matching rows). +struct EarlyStoppingStream { + /// Has the stream finished processing? All subsequent polls will return + /// None + done: bool, + file_pruner: FilePruner, + files_ranges_pruned_statistics: Count, + /// The inner stream + inner: S, +} + +impl EarlyStoppingStream { + pub fn new( + stream: S, + file_pruner: FilePruner, + files_ranges_pruned_statistics: Count, + ) -> Self { + Self { + done: false, + inner: stream, + file_pruner, + files_ranges_pruned_statistics, + } + } +} +impl EarlyStoppingStream +where + S: Stream> + Unpin, +{ + fn check_prune( + &mut self, + input: Result, + ) -> Result, ArrowError> { + let batch = input?; + + // Since dynamic filters may have been updated, see if we can stop + // reading this stream entirely. + if self.file_pruner.should_prune()? { + self.files_ranges_pruned_statistics.add(1); + self.done = true; + Ok(None) + } else { + // Return the adapted batch + Ok(Some(batch)) + } + } +} + +impl Stream for EarlyStoppingStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.done { + return Poll::Ready(None); + } + match ready!(self.inner.poll_next_unpin(cx)) { + None => { + // input done + self.done = true; + Poll::Ready(None) + } + Some(input_batch) => { + let output = self.check_prune(input_batch); + Poll::Ready(output.transpose()) + } + } + } +} + #[cfg(feature = "parquet_encryption")] impl ParquetOpener { fn get_file_decryption_properties(