diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 0dd53a34fa..3e2865f9c8 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -330,8 +330,7 @@ pub(crate) mod tests { let file_scan_tasks = vec![ FileScanTask { - start: 0, - length: 0, + file_range: None, record_count: None, data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()), data_file_format: DataFileFormat::Parquet, @@ -341,8 +340,7 @@ pub(crate) mod tests { deletes: vec![pos_del_1, pos_del_2.clone()], }, FileScanTask { - start: 0, - length: 0, + file_range: None, record_count: None, data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()), data_file_format: DataFileFormat::Parquet, diff --git a/crates/iceberg/src/arrow/group_pruner.rs b/crates/iceberg/src/arrow/group_pruner.rs new file mode 100644 index 0000000000..d2755430c1 --- /dev/null +++ b/crates/iceberg/src/arrow/group_pruner.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use futures::{TryStreamExt, try_join}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::metadata::ParquetMetaData; +use parquet::schema::types::SchemaDescriptor; + +use crate::Result; +use crate::arrow::{ArrowFileReader, CollectFieldIdVisitor, build_field_id_map}; +use crate::expr::BoundPredicate; +use crate::expr::visitors::bound_predicate_visitor::visit; +use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; +use crate::io::FileIO; +use crate::scan::{FileScanGroup, FileScanTask, FileScanTaskStream, ParquetFileScanGroup}; +use crate::spec::DataFileFormat; + +/// Prune the row groups of the parquet files based on the predicate +#[derive(Clone)] +pub struct ParquetGroupPruner { + file_io: FileIO, +} + +impl ParquetGroupPruner { + /// Create a new ParquetGroupPruner + pub fn new(file_io: FileIO) -> Self { + ParquetGroupPruner { file_io } + } + + fn build_field_id_set_and_map( + parquet_schema: &SchemaDescriptor, + predicate: &BoundPredicate, + ) -> Result<(HashSet, HashMap)> { + // Collects all Iceberg field IDs referenced in the filter predicate + let mut collector = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; + visit(&mut collector, predicate)?; + + let iceberg_field_ids = collector.field_ids(); + let field_id_map = build_field_id_map(parquet_schema)?; + + Ok((iceberg_field_ids, field_id_map)) + } + + fn get_selected_row_group_indices( + &self, + task: &FileScanTask, + parquet_schema: &SchemaDescriptor, + parquet_metadata: &Arc, + ) -> Result> { + if let Some(predicate) = &task.predicate { + let (_iceberg_field_ids, field_id_map) = + Self::build_field_id_set_and_map(parquet_schema, predicate)?; + + let row_groups_metadata = parquet_metadata.row_groups(); + let mut results = Vec::with_capacity(parquet_metadata.row_groups().len()); + + for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() { + if RowGroupMetricsEvaluator::eval( + predicate, + row_group_metadata, + &field_id_map, + &task.schema, + )? { + results.push(idx); + } + } + + Ok(results) + } else { + Ok((0..parquet_metadata.num_row_groups()).collect()) + } + } + + /// Prune the row groups of the parquet files based on the predicate + pub async fn prune(&self, mut task: FileScanTask) -> Result { + // Create stream reader for parquet file meta + let parquet_file = self.file_io.new_input(&task.data_file_path)?; + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) + .with_preload_column_index(true) + .with_preload_offset_index(true) + .with_preload_page_index(false); + let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( + parquet_file_reader, + ArrowReaderOptions::new(), + ) + .await?; + + let row_group_indices = self.get_selected_row_group_indices( + &task, + record_batch_stream_builder.parquet_schema(), + record_batch_stream_builder.metadata(), + )?; + + task.file_range = Some(FileScanGroup::Parquet(ParquetFileScanGroup { + row_group_indexes: row_group_indices, + })); + + Ok(task) + } +} + +/// Prune the row groups of the parquet files based on the predicate +pub struct GroupPruner { + parquet_pruner: ParquetGroupPruner, +} + +impl GroupPruner { + /// Create a new GroupPruner + pub fn new(file_io: FileIO) -> Self { + GroupPruner { + parquet_pruner: ParquetGroupPruner::new(file_io), + } + } + + /// Prune the groups of the files task based on the info of the file scan task.(E.g. predicate) + pub fn prune(&self, file_tasks_stream: FileScanTaskStream) -> FileScanTaskStream { + let parquet_pruner = self.parquet_pruner.clone(); + + let pruned_stream = file_tasks_stream + .map_ok(move |task| { + let parquet_pruner = parquet_pruner.clone(); + async move { + match task.data_file_format { + DataFileFormat::Parquet => { + // Use the parquet pruner to prune row groups + parquet_pruner.prune(task).await + } + DataFileFormat::Avro | DataFileFormat::Orc | DataFileFormat::Puffin => { + // For other formats, return the task unchanged as no pruning is supported + Ok(task) + } + } + } + }) + .try_buffer_unordered(10); // Process up to 10 tasks concurrently + + Box::pin(pruned_stream) + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 949f842412..92adef83a7 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -34,3 +34,6 @@ mod value; pub use reader::*; pub use value::*; + +mod group_pruner; +pub use group_pruner::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4327184058..a0de776e8e 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -52,7 +52,7 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; +use crate::scan::{ArrowRecordBatchStream, FileScanGroup, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -68,7 +68,7 @@ pub struct ArrowReaderBuilder { impl ArrowReaderBuilder { /// Create a new ArrowReaderBuilder - pub(crate) fn new(file_io: FileIO) -> Self { + pub fn new(file_io: FileIO) -> Self { let num_cpus = available_parallelism().get(); ArrowReaderBuilder { @@ -238,7 +238,10 @@ impl ArrowReader { // equality delete files OR (there is a scan predicate AND row_selection_enabled), // since the only implemented method of applying positional deletes is // by using a `RowSelection`. - let mut selected_row_group_indices = None; + let mut selected_row_group_indices = task.file_range.as_ref().map(|file_range| { + let FileScanGroup::Parquet(parquet_file_scan_range) = file_range; + parquet_file_scan_range.row_group_indexes.clone() + }); let mut row_selection = None; if let Some(predicate) = final_predicate { @@ -261,6 +264,7 @@ impl ArrowReader { record_batch_stream_builder.metadata(), &field_id_map, &task.schema, + &selected_row_group_indices, )?; selected_row_group_indices = Some(result); @@ -649,8 +653,25 @@ impl ArrowReader { parquet_metadata: &Arc, field_id_map: &HashMap, snapshot_schema: &Schema, + selected_row_groups: &Option>, ) -> Result> { - let row_groups_metadata = parquet_metadata.row_groups(); + let row_groups_metadata = if let Some(selected_row_groups) = selected_row_groups { + parquet_metadata + .row_groups() + .iter() + .enumerate() + .filter_map(|(idx, row_group_metadata)| { + if selected_row_groups.contains(&idx) { + Some(row_group_metadata) + } else { + None + } + }) + .collect::>() + } else { + parquet_metadata.row_groups().iter().collect::>() + }; + let mut results = Vec::with_capacity(row_groups_metadata.len()); for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() { @@ -730,7 +751,7 @@ impl ArrowReader { } /// Build the map of parquet field id to Parquet column index in the schema. -fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +pub fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { let mut column_map = HashMap::new(); for (idx, field) in parquet_schema.columns().iter().enumerate() { let field_type = field.self_type(); @@ -765,12 +786,12 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result, +pub(crate) struct CollectFieldIdVisitor { + pub field_ids: HashSet, } impl CollectFieldIdVisitor { - fn field_ids(self) -> HashSet { + pub fn field_ids(self) -> HashSet { self.field_ids } } @@ -1736,8 +1757,7 @@ message schema { ) -> Vec> { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, + file_range: None, record_count: None, data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2b..752645989f 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -114,8 +114,7 @@ impl ManifestEntryContext { .await; Ok(FileScanTask { - start: 0, - length: self.manifest_entry.file_size_in_bytes(), + file_range: None, record_count: Some(self.manifest_entry.record_count()), data_file_path: self.manifest_entry.file_path().to_string(), diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859f..5f4517014c 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1757,8 +1757,6 @@ pub mod tests { let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap(); assert_eq!(task.data_file_path, deserialized.data_file_path); - assert_eq!(task.start, deserialized.start); - assert_eq!(task.length, deserialized.length); assert_eq!(task.project_field_ids, deserialized.project_field_ids); assert_eq!(task.predicate, deserialized.predicate); assert_eq!(task.schema, deserialized.schema); @@ -1777,8 +1775,7 @@ pub mod tests { ); let task = FileScanTask { data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, + file_range: None, project_field_ids: vec![1, 2, 3], predicate: None, schema: schema.clone(), @@ -1791,8 +1788,7 @@ pub mod tests { // with predicate let task = FileScanTask { data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, + file_range: None, project_field_ids: vec![1, 2, 3], predicate: Some(BoundPredicate::AlwaysTrue), schema, diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 7b111e4f04..d0b66f3aeb 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -16,6 +16,7 @@ // under the License. use futures::stream::BoxStream; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use crate::Result; @@ -25,13 +26,87 @@ use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, Sch /// A stream of [`FileScanTask`]. pub type FileScanTaskStream = BoxStream<'static, Result>; +/// Row groups to scan in a Parquet file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ParquetFileScanGroup { + /// The indexes of row groups to scan. + pub row_group_indexes: Vec, +} + +impl ParquetFileScanGroup { + /// Split the file scan range into multiple file scan ranges. + pub fn split(&mut self, target_count: usize) -> FileScanGroup { + let mut new_group_indexes = Vec::new(); + let take_count = target_count.min(self.row_group_indexes.len()); + new_group_indexes.extend(self.row_group_indexes.drain(..take_count)); + FileScanGroup::Parquet(ParquetFileScanGroup { + row_group_indexes: new_group_indexes, + }) + } + + /// Merge multiple file scan ranges into one file scan range. + pub fn merge<'a, I>(ranges: I) -> Option + where I: IntoIterator> { + let mut merged_row_groups = Vec::new(); + + for range_opt in ranges { + // If any range is None, return None + let range = range_opt.as_ref()?; + + // Extract row group indexes from Parquet ranges + match range { + FileScanGroup::Parquet(parquet_range) => { + merged_row_groups.extend(&parquet_range.row_group_indexes); + } + } + } + + // Sort and deduplicate the row group indexes + merged_row_groups.sort_unstable(); + merged_row_groups.dedup(); + + Some(FileScanGroup::Parquet(ParquetFileScanGroup { + row_group_indexes: merged_row_groups, + })) + } +} + +/// A group of file to scan. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum FileScanGroup { + /// A group of row groups to scan in a Parquet file. + Parquet(ParquetFileScanGroup), +} + +impl FileScanGroup { + /// Returns the number of row groups in the file scan range. + pub fn len(&self) -> usize { + match self { + FileScanGroup::Parquet(range) => range.row_group_indexes.len(), + } + } + + /// Returns true if the file scan range is empty. + pub fn is_empty(&self) -> bool { + match self { + FileScanGroup::Parquet(range) => range.row_group_indexes.is_empty(), + } + } + + /// Split the file scan range into multiple file scan ranges. + pub fn split(&mut self, target_count: usize) -> Self { + match self { + FileScanGroup::Parquet(range) => range.split(target_count), + } + } +} + /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTask { - /// The start offset of the file to scan. - pub start: u64, - /// The length of the file to scan. - pub length: u64, + /// A FileScanTask contain multiple file scan groups. + /// A group is a unit work to scan and it can not be split further. + pub file_range: Option, /// The number of records in the file to scan. /// /// This is an optional field, and only available if we are @@ -81,6 +156,58 @@ impl FileScanTask { pub fn schema_ref(&self) -> SchemaRef { self.schema.clone() } + + /// Split out a task with group count equals to target_count + pub fn split(&mut self, target_count: usize) -> Result> { + let Some(range) = &mut self.file_range else { + // Can't split if we don't have range info + return Ok(None); + }; + + // Use reference to self to avoid partial move error, clone self for each range + let new_range = range.split(target_count); + + Ok(Some(Self { + file_range: Some(new_range), + ..self.clone() + })) + } + + /// Merge multiple file scan tasks into one file scan task. + pub fn merge(file_scan_tasks: I) -> Result> + where I: IntoIterator { + let tasks = file_scan_tasks + .into_iter() + .into_group_map_by(|task| task.data_file_path.clone()); + + let mut merged_tasks = Vec::with_capacity(tasks.len()); + for (_, tasks) in tasks { + if tasks.is_empty() { + continue; + } + + let file_type = tasks.first().unwrap().data_file_format; + let merged_range = match file_type { + DataFileFormat::Parquet => { + ParquetFileScanGroup::merge(tasks.iter().map(|task| &task.file_range)) + } + _ => todo!(), + }; + + if let Some(merged_range) = &merged_range { + if merged_range.is_empty() { + continue; + } + } + + merged_tasks.push(Self { + file_range: merged_range, + ..tasks.first().unwrap().clone() + }); + } + + Ok(merged_tasks) + } } #[derive(Debug)] diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index d4751a19c8..05f9780677 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -16,12 +16,12 @@ // under the License. use std::any::Any; -use std::pin::Pin; use std::sync::Arc; use std::vec; -use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::config::ConfigOptions; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; @@ -29,13 +29,27 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; -use futures::{Stream, TryStreamExt}; +use futures::TryStreamExt; +use iceberg::arrow::{ArrowReaderBuilder, GroupPruner}; use iceberg::expr::Predicate; +use iceberg::scan::{FileScanTask, FileScanTaskStream}; use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; use crate::to_datafusion_error; +#[derive(Debug, Clone)] +struct PartitionTask { + file_scan_tasks: Vec, +} + +impl PartitionTask { + pub fn to_file_scan_task_stream(&self) -> FileScanTaskStream { + let file_scan_tasks = self.file_scan_tasks.clone(); + Box::pin(futures::stream::iter(file_scan_tasks.into_iter().map(Ok))) + } +} + /// Manages the scanning process of an Iceberg [`Table`], encapsulating the /// necessary details and computed properties required for execution planning. #[derive(Debug)] @@ -51,32 +65,63 @@ pub struct IcebergTableScan { projection: Option>, /// Filters to apply to the table scan predicates: Option, + + /// The tasks to scan the table. + partition_tasks: Vec, } impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new( + pub(crate) async fn new( + session_state: &dyn Session, table: Table, snapshot_id: Option, schema: ArrowSchemaRef, projection: Option<&Vec>, filters: &[Expr], - ) -> Self { + ) -> DFResult { + // Create the table scan let output_schema = match projection { None => schema.clone(), Some(projection) => Arc::new(schema.project(projection).unwrap()), }; - let plan_properties = Self::compute_properties(output_schema.clone()); let projection = get_column_names(schema.clone(), projection); let predicates = convert_filters_to_predicate(filters); + let scan_builder = match snapshot_id { + Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), + None => table.scan(), + }; + let mut scan_builder = match &projection { + Some(column_names) => scan_builder.select(column_names.clone()), + None => scan_builder.select_all(), + }; + if let Some(pred) = &predicates { + scan_builder = scan_builder.with_filter(pred.clone()); + } + let table_scan = scan_builder.build().map_err(to_datafusion_error)?; + + // Create the file scan tasks and partition them + let file_tasks_stream = table_scan.plan_files().await.map_err(to_datafusion_error)?; + let pruner = GroupPruner::new(table.file_io().clone()); + let target_partitions = session_state.config().target_partitions(); + let pruned_file_tasks = pruner + .prune(file_tasks_stream) + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + let partition_tasks = partition_file_tasks_stream(pruned_file_tasks, target_partitions); + + let plan_properties = + Self::compute_properties(output_schema.clone(), partition_tasks.len()); - Self { + Ok(Self { table, snapshot_id, plan_properties, projection, predicates, - } + partition_tasks, + }) } pub fn table(&self) -> &Table { @@ -96,13 +141,13 @@ impl IcebergTableScan { } /// Computes [`PlanProperties`] used in query optimization. - fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + fn compute_properties(schema: ArrowSchemaRef, target_partitions: usize) -> PlanProperties { // TODO: // This is more or less a placeholder, to be replaced // once we support output-partitioning PlanProperties::new( EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(target_partitions), EmissionType::Incremental, Boundedness::Bounded, ) @@ -133,18 +178,44 @@ impl ExecutionPlan for IcebergTableScan { &self.plan_properties } + fn repartitioned( + &self, + target_partitions: usize, + _config: &ConfigOptions, + ) -> DFResult>> { + let original_tasks = self + .partition_tasks + .clone() + .into_iter() + .flat_map(|p_tasks| p_tasks.file_scan_tasks); + + let merged_tasks = FileScanTask::merge(original_tasks).unwrap(); + + let partition_tasks = partition_file_tasks_stream(merged_tasks, target_partitions); + + Ok(Some(Arc::new(IcebergTableScan { + table: self.table.clone(), + snapshot_id: self.snapshot_id, + plan_properties: self.plan_properties.clone(), + projection: self.projection.clone(), + predicates: self.predicates.clone(), + partition_tasks, + }))) + } + fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { - let fut = get_batch_stream( - self.table.clone(), - self.snapshot_id, - self.projection.clone(), - self.predicates.clone(), - ); - let stream = futures::stream::once(fut).try_flatten(); + let partition_task = &self.partition_tasks[partition]; + let file_scan_task_stream = partition_task.to_file_scan_task_stream(); + + let arrow_reader = ArrowReaderBuilder::new(self.table.file_io().clone()).build(); + + let stream = futures::stream::once(arrow_reader.read(file_scan_task_stream)) + .try_flatten() + .map_err(to_datafusion_error); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -172,39 +243,6 @@ impl DisplayAs for IcebergTableScan { } } -/// Asynchronously retrieves a stream of [`RecordBatch`] instances -/// from a given table. -/// -/// This function initializes a [`TableScan`], builds it, -/// and then converts it into a stream of Arrow [`RecordBatch`]es. -async fn get_batch_stream( - table: Table, - snapshot_id: Option, - column_names: Option>, - predicates: Option, -) -> DFResult> + Send>>> { - let scan_builder = match snapshot_id { - Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), - None => table.scan(), - }; - - let mut scan_builder = match column_names { - Some(column_names) => scan_builder.select(column_names), - None => scan_builder.select_all(), - }; - if let Some(pred) = predicates { - scan_builder = scan_builder.with_filter(pred); - } - let table_scan = scan_builder.build().map_err(to_datafusion_error)?; - - let stream = table_scan - .to_arrow() - .await - .map_err(to_datafusion_error)? - .map_err(to_datafusion_error); - Ok(Box::pin(stream)) -} - fn get_column_names( schema: ArrowSchemaRef, projection: Option<&Vec>, @@ -215,3 +253,128 @@ fn get_column_names( .collect::>() }) } + +/// Partition the file scan tasks stream into multiple partitions. +fn partition_file_tasks_stream( + file_tasks: Vec, + target_partitions: usize, +) -> Vec { + // If no file tasks, return empty vector + if file_tasks.is_empty() { + return Vec::new(); + } + + // Calculate total number of units (splittable ranges + non-splittable tasks) + let sum_unit = file_tasks + .iter() + .map(|task| { + task.file_range + .as_ref() + .map(|range| range.len()) + .unwrap_or(1) + }) + .sum::(); + + // Use the minimum of target_partitions and sum_unit to avoid empty partitions + let num_partitions = target_partitions.min(sum_unit).max(1); + + // Calculate target units per partition + let units_per_partition = sum_unit / num_partitions; + let extra_units = sum_unit % num_partitions; + + // Initialize partition tasks vector + let mut partition_tasks: Vec = (0..num_partitions) + .map(|_| PartitionTask { + file_scan_tasks: Vec::new(), + }) + .collect(); + + let mut current_partition = 0; + let mut current_partition_units = 0; + let mut target_units_for_current = units_per_partition + + if current_partition < extra_units { + 1 + } else { + 0 + }; + + for file_task in file_tasks { + if let Some(mut file_range) = file_task.file_range.clone() { + // Task has file_range, it can be split + let range_len = file_range.len(); + let mut remaining_units = range_len; + let mut current_task = file_task.clone(); + + while remaining_units > 0 { + let available_space = target_units_for_current - current_partition_units; + + if available_space == 0 { + // Move to next partition + current_partition += 1; + if current_partition >= num_partitions { + // If we've filled all partitions, put remaining in the last partition + current_partition = num_partitions - 1; + target_units_for_current = usize::MAX; // Allow unlimited units in last partition + current_partition_units = 0; + } else { + current_partition_units = 0; + target_units_for_current = units_per_partition + + if current_partition < extra_units { + 1 + } else { + 0 + }; + } + continue; + } + + let units_to_take = available_space.min(remaining_units); + + if units_to_take == remaining_units { + // Take all remaining units - add the current task + current_task.file_range = Some(file_range.clone()); + partition_tasks[current_partition] + .file_scan_tasks + .push(current_task.clone()); + current_partition_units += units_to_take; + remaining_units = 0; + } else { + // Split the task - take part of the range + let split_range = file_range.split(units_to_take); + let mut split_task = current_task.clone(); + split_task.file_range = Some(split_range); + partition_tasks[current_partition] + .file_scan_tasks + .push(split_task); + current_partition_units += units_to_take; + remaining_units -= units_to_take; + } + } + } else { + // Task has no file_range, it cannot be split - treat as 1 unit + if current_partition_units >= target_units_for_current { + // Move to next partition + current_partition += 1; + if current_partition >= num_partitions { + // If we've filled all partitions, put in the last partition + current_partition = num_partitions - 1; + } else { + current_partition_units = 0; + target_units_for_current = units_per_partition + + if current_partition < extra_units { + 1 + } else { + 0 + }; + } + } + + partition_tasks[current_partition] + .file_scan_tasks + .push(file_task); + current_partition_units += 1; + } + } + + partition_tasks +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 7f741a534a..dbb5ec0129 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -140,13 +140,17 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), - self.snapshot_id, - self.schema.clone(), - projection, - filters, - ))) + Ok(Arc::new( + IcebergTableScan::new( + _state, + self.table.clone(), + self.snapshot_id, + self.schema.clone(), + projection, + filters, + ) + .await?, + )) } fn supports_filters_pushdown(