Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,37 @@ pub trait TableProvider: Debug + Sync + Send {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
///
/// This method uses [`ScanArgs`] to pass scan parameters in a structured way
/// and returns a [`ScanResult`] containing the execution plan.
///
/// Table providers can override this method to take advantage of additional
/// parameters like the upcoming `preferred_ordering` that may not be available through
/// other scan methods.
///
/// # Arguments
/// * `state` - The session state containing configuration and context
/// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
///
/// # Returns
/// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
///
/// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
async fn scan_with_args<'a>(
&self,
state: &dyn Session,
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let filters = args.filters().unwrap_or(&[]);
let projection = args.projection().map(|p| p.to_vec());
let limit = args.limit();
let plan = self
.scan(state, projection.as_ref(), filters, limit)
.await?;
Ok(plan.into())
}

/// Specify if DataFusion should provide filter expressions to the
/// TableProvider to apply *during* the scan.
///
Expand Down Expand Up @@ -299,6 +330,114 @@ pub trait TableProvider: Debug + Sync + Send {
}
}

/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
#[derive(Debug, Clone, Default)]
pub struct ScanArgs<'a> {
filters: Option<&'a [Expr]>,
projection: Option<&'a [usize]>,
limit: Option<usize>,
}

impl<'a> ScanArgs<'a> {
/// Set the column projection for the scan.
///
/// The projection is a list of column indices from [`TableProvider::schema`]
/// that should be included in the scan results. If `None`, all columns are included.
///
/// # Arguments
/// * `projection` - Optional slice of column indices to project
pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
self.projection = projection;
self
}

/// Get the column projection for the scan.
///
/// Returns a reference to the projection column indices, or `None` if
/// no projection was specified (meaning all columns should be included).
pub fn projection(&self) -> Option<&'a [usize]> {
self.projection
}

/// Set the filter expressions for the scan.
///
/// Filters are boolean expressions that should be evaluated during the scan
/// to reduce the number of rows returned. All expressions are combined with AND logic.
/// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
///
/// # Arguments
/// * `filters` - Optional slice of filter expressions
pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
self.filters = filters;
self
}

/// Get the filter expressions for the scan.
///
/// Returns a reference to the filter expressions, or `None` if no filters were specified.
pub fn filters(&self) -> Option<&'a [Expr]> {
self.filters
}

/// Set the maximum number of rows to return from the scan.
///
/// If specified, the scan should return at most this many rows. This is typically
/// used to optimize queries with `LIMIT` clauses.
///
/// # Arguments
/// * `limit` - Optional maximum number of rows to return
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Get the maximum number of rows to return from the scan.
///
/// Returns the row limit, or `None` if no limit was specified.
pub fn limit(&self) -> Option<usize> {
self.limit
}
}

/// Result of a table scan operation from [`TableProvider::scan_with_args`].
#[derive(Debug, Clone)]
pub struct ScanResult {
/// The ExecutionPlan to run.
plan: Arc<dyn ExecutionPlan>,
}

impl ScanResult {
/// Create a new `ScanResult` with the given execution plan.
///
/// # Arguments
/// * `plan` - The execution plan that will perform the table scan
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}

/// Get a reference to the execution plan for this scan result.
///
/// Returns a reference to the [`ExecutionPlan`] that will perform
/// the actual table scanning and data retrieval.
pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
&self.plan
}

/// Consume this ScanResult and return the execution plan.
///
/// Returns the owned [`ExecutionPlan`] that will perform
/// the actual table scanning and data retrieval.
pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
self.plan
}
}

impl From<Arc<dyn ExecutionPlan>> for ScanResult {
fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
Self::new(plan)
}
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
Expand Down
36 changes: 29 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::{
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
Expand Down Expand Up @@ -1169,6 +1169,22 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_filters(Some(filters))
.with_limit(limit);
Ok(self.scan_with_args(state, options).await?.into_inner())
}

async fn scan_with_args<'a>(
&self,
state: &dyn Session,
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let projection = args.projection().map(|p| p.to_vec());
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
let limit = args.limit();

// extract types of partition columns
let table_partition_cols = self
.options
Expand All @@ -1181,6 +1197,7 @@ impl TableProvider for ListingTable {
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();

// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let (partition_filters, filters): (Vec<_>, Vec<_>) =
Expand All @@ -1198,8 +1215,8 @@ impl TableProvider for ListingTable {

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

let output_ordering = self.try_create_output_ordering()?;
Expand Down Expand Up @@ -1233,13 +1250,16 @@ impl TableProvider for ListingTable {
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
Schema::empty(),
)))));
};

let file_source = self.create_file_source_with_schema_adapter()?;

// create the execution plan
self.options
let plan = self
.options
.format
.create_physical_plan(
state,
Expand All @@ -1251,14 +1271,16 @@ impl TableProvider for ListingTable {
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_projection(projection)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
)
.await
.await?;

Ok(ScanResult::new(plan))
}

fn supports_filters_pushdown(
Expand Down
11 changes: 8 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
Expand Down Expand Up @@ -459,9 +460,13 @@ impl DefaultPhysicalPlanner {
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
source
.scan(session_state, projection.as_ref(), &filters, *fetch)
.await?
let filters_vec = filters.into_iter().collect::<Vec<_>>();
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
}
LogicalPlan::Values(Values { values, schema }) => {
let exec_schema = schema.as_ref().to_owned().into();
Expand Down