Skip to content

Commit fa26515

Browse files
authored
Encapsulate early pruning in its own stream (#17293)
1 parent 8c65a41 commit fa26515

File tree

1 file changed

+97
-56
lines changed

1 file changed

+97
-56
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20-
use std::sync::Arc;
21-
2220
use crate::page_filter::PagePruningAccessPlanFilter;
2321
use crate::row_group_filter::RowGroupAccessPlanFilter;
2422
use crate::{
2523
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
2624
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2725
};
26+
use arrow::array::RecordBatch;
2827
use datafusion_datasource::file_meta::FileMeta;
2928
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3029
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
30+
use std::pin::Pin;
31+
use std::sync::Arc;
32+
use std::task::{Context, Poll};
3133

3234
use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
3335
use arrow::error::ArrowError;
@@ -47,7 +49,7 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
4749
use datafusion_common::config::EncryptionFactoryOptions;
4850
#[cfg(feature = "parquet_encryption")]
4951
use datafusion_execution::parquet_encryption::EncryptionFactory;
50-
use futures::{StreamExt, TryStreamExt};
52+
use futures::{ready, Stream, StreamExt, TryStreamExt};
5153
use itertools::Itertools;
5254
use log::debug;
5355
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
@@ -409,64 +411,103 @@ impl FileOpener for ParquetOpener {
409411
.with_row_groups(row_group_indexes)
410412
.build()?;
411413

412-
// Create a stateful stream that can check pruning after each batch
413-
let adapted = {
414-
use futures::stream;
415-
let schema_mapping = Some(schema_mapping);
416-
let file_pruner = file_pruner;
417-
let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
418-
let files_ranges_pruned_statistics =
419-
file_metrics.files_ranges_pruned_statistics.clone();
420-
421-
stream::try_unfold(
422-
(
423-
stream,
424-
schema_mapping,
425-
file_pruner,
426-
files_ranges_pruned_statistics,
427-
),
428-
move |(
429-
mut stream,
430-
schema_mapping_opt,
431-
mut file_pruner,
432-
files_ranges_pruned_statistics,
433-
)| async move {
434-
match stream.try_next().await? {
435-
Some(batch) => {
436-
let schema_mapping = schema_mapping_opt.as_ref().unwrap();
437-
let mapped_batch = schema_mapping.map_batch(batch)?;
438-
439-
// Check if we can prune the file now
440-
if let Some(ref mut pruner) = file_pruner {
441-
if pruner.should_prune()? {
442-
// File can now be pruned based on updated dynamic filters
443-
files_ranges_pruned_statistics.add(1);
444-
// Terminate the stream early
445-
return Ok(None);
446-
}
447-
}
448-
449-
Ok(Some((
450-
mapped_batch,
451-
(
452-
stream,
453-
schema_mapping_opt,
454-
file_pruner,
455-
files_ranges_pruned_statistics,
456-
),
457-
)))
458-
}
459-
None => Ok(None),
460-
}
461-
},
462-
)
463-
};
414+
let stream = stream
415+
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
416+
.map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?)));
464417

465-
Ok(adapted.boxed())
418+
if let Some(file_pruner) = file_pruner {
419+
Ok(EarlyStoppingStream::new(
420+
stream,
421+
file_pruner,
422+
file_metrics.files_ranges_pruned_statistics.clone(),
423+
)
424+
.boxed())
425+
} else {
426+
Ok(stream.boxed())
427+
}
466428
}))
467429
}
468430
}
469431

432+
/// Wraps an inner RecordBatchStream and a [`FilePruner`]
433+
///
434+
/// This can terminate the scan early when some dynamic filters is updated after
435+
/// the scan starts, so we discover after the scan starts that the file can be
436+
/// pruned (can't have matching rows).
437+
struct EarlyStoppingStream<S> {
438+
/// Has the stream finished processing? All subsequent polls will return
439+
/// None
440+
done: bool,
441+
file_pruner: FilePruner,
442+
files_ranges_pruned_statistics: Count,
443+
/// The inner stream
444+
inner: S,
445+
}
446+
447+
impl<S> EarlyStoppingStream<S> {
448+
pub fn new(
449+
stream: S,
450+
file_pruner: FilePruner,
451+
files_ranges_pruned_statistics: Count,
452+
) -> Self {
453+
Self {
454+
done: false,
455+
inner: stream,
456+
file_pruner,
457+
files_ranges_pruned_statistics,
458+
}
459+
}
460+
}
461+
impl<S> EarlyStoppingStream<S>
462+
where
463+
S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
464+
{
465+
fn check_prune(
466+
&mut self,
467+
input: Result<RecordBatch, ArrowError>,
468+
) -> Result<Option<RecordBatch>, ArrowError> {
469+
let batch = input?;
470+
471+
// Since dynamic filters may have been updated, see if we can stop
472+
// reading this stream entirely.
473+
if self.file_pruner.should_prune()? {
474+
self.files_ranges_pruned_statistics.add(1);
475+
self.done = true;
476+
Ok(None)
477+
} else {
478+
// Return the adapted batch
479+
Ok(Some(batch))
480+
}
481+
}
482+
}
483+
484+
impl<S> Stream for EarlyStoppingStream<S>
485+
where
486+
S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
487+
{
488+
type Item = Result<RecordBatch, ArrowError>;
489+
490+
fn poll_next(
491+
mut self: Pin<&mut Self>,
492+
cx: &mut Context<'_>,
493+
) -> Poll<Option<Self::Item>> {
494+
if self.done {
495+
return Poll::Ready(None);
496+
}
497+
match ready!(self.inner.poll_next_unpin(cx)) {
498+
None => {
499+
// input done
500+
self.done = true;
501+
Poll::Ready(None)
502+
}
503+
Some(input_batch) => {
504+
let output = self.check_prune(input_batch);
505+
Poll::Ready(output.transpose())
506+
}
507+
}
508+
}
509+
}
510+
470511
#[cfg(feature = "parquet_encryption")]
471512
impl ParquetOpener {
472513
fn get_file_decryption_properties(

0 commit comments

Comments
 (0)