Skip to content

Commit 7b45934

Browse files
authored
Add TableProvider::scan_with_args (#17336)
1 parent e711f14 commit 7b45934

File tree

3 files changed

+176
-10
lines changed

3 files changed

+176
-10
lines changed

datafusion/catalog/src/table.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,37 @@ pub trait TableProvider: Debug + Sync + Send {
171171
limit: Option<usize>,
172172
) -> Result<Arc<dyn ExecutionPlan>>;
173173

174+
/// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
175+
///
176+
/// This method uses [`ScanArgs`] to pass scan parameters in a structured way
177+
/// and returns a [`ScanResult`] containing the execution plan.
178+
///
179+
/// Table providers can override this method to take advantage of additional
180+
/// parameters like the upcoming `preferred_ordering` that may not be available through
181+
/// other scan methods.
182+
///
183+
/// # Arguments
184+
/// * `state` - The session state containing configuration and context
185+
/// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
186+
///
187+
/// # Returns
188+
/// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
189+
///
190+
/// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
191+
async fn scan_with_args<'a>(
192+
&self,
193+
state: &dyn Session,
194+
args: ScanArgs<'a>,
195+
) -> Result<ScanResult> {
196+
let filters = args.filters().unwrap_or(&[]);
197+
let projection = args.projection().map(|p| p.to_vec());
198+
let limit = args.limit();
199+
let plan = self
200+
.scan(state, projection.as_ref(), filters, limit)
201+
.await?;
202+
Ok(plan.into())
203+
}
204+
174205
/// Specify if DataFusion should provide filter expressions to the
175206
/// TableProvider to apply *during* the scan.
176207
///
@@ -299,6 +330,114 @@ pub trait TableProvider: Debug + Sync + Send {
299330
}
300331
}
301332

333+
/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
334+
#[derive(Debug, Clone, Default)]
335+
pub struct ScanArgs<'a> {
336+
filters: Option<&'a [Expr]>,
337+
projection: Option<&'a [usize]>,
338+
limit: Option<usize>,
339+
}
340+
341+
impl<'a> ScanArgs<'a> {
342+
/// Set the column projection for the scan.
343+
///
344+
/// The projection is a list of column indices from [`TableProvider::schema`]
345+
/// that should be included in the scan results. If `None`, all columns are included.
346+
///
347+
/// # Arguments
348+
/// * `projection` - Optional slice of column indices to project
349+
pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
350+
self.projection = projection;
351+
self
352+
}
353+
354+
/// Get the column projection for the scan.
355+
///
356+
/// Returns a reference to the projection column indices, or `None` if
357+
/// no projection was specified (meaning all columns should be included).
358+
pub fn projection(&self) -> Option<&'a [usize]> {
359+
self.projection
360+
}
361+
362+
/// Set the filter expressions for the scan.
363+
///
364+
/// Filters are boolean expressions that should be evaluated during the scan
365+
/// to reduce the number of rows returned. All expressions are combined with AND logic.
366+
/// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
367+
///
368+
/// # Arguments
369+
/// * `filters` - Optional slice of filter expressions
370+
pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
371+
self.filters = filters;
372+
self
373+
}
374+
375+
/// Get the filter expressions for the scan.
376+
///
377+
/// Returns a reference to the filter expressions, or `None` if no filters were specified.
378+
pub fn filters(&self) -> Option<&'a [Expr]> {
379+
self.filters
380+
}
381+
382+
/// Set the maximum number of rows to return from the scan.
383+
///
384+
/// If specified, the scan should return at most this many rows. This is typically
385+
/// used to optimize queries with `LIMIT` clauses.
386+
///
387+
/// # Arguments
388+
/// * `limit` - Optional maximum number of rows to return
389+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
390+
self.limit = limit;
391+
self
392+
}
393+
394+
/// Get the maximum number of rows to return from the scan.
395+
///
396+
/// Returns the row limit, or `None` if no limit was specified.
397+
pub fn limit(&self) -> Option<usize> {
398+
self.limit
399+
}
400+
}
401+
402+
/// Result of a table scan operation from [`TableProvider::scan_with_args`].
403+
#[derive(Debug, Clone)]
404+
pub struct ScanResult {
405+
/// The ExecutionPlan to run.
406+
plan: Arc<dyn ExecutionPlan>,
407+
}
408+
409+
impl ScanResult {
410+
/// Create a new `ScanResult` with the given execution plan.
411+
///
412+
/// # Arguments
413+
/// * `plan` - The execution plan that will perform the table scan
414+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
415+
Self { plan }
416+
}
417+
418+
/// Get a reference to the execution plan for this scan result.
419+
///
420+
/// Returns a reference to the [`ExecutionPlan`] that will perform
421+
/// the actual table scanning and data retrieval.
422+
pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
423+
&self.plan
424+
}
425+
426+
/// Consume this ScanResult and return the execution plan.
427+
///
428+
/// Returns the owned [`ExecutionPlan`] that will perform
429+
/// the actual table scanning and data retrieval.
430+
pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
431+
self.plan
432+
}
433+
}
434+
435+
impl From<Arc<dyn ExecutionPlan>> for ScanResult {
436+
fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
437+
Self::new(plan)
438+
}
439+
}
440+
302441
/// A factory which creates [`TableProvider`]s at runtime given a URL.
303442
///
304443
/// For example, this can be used to create a table "on the fly"

datafusion/core/src/datasource/listing/table.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
3030
use arrow_schema::Schema;
3131
use async_trait::async_trait;
32-
use datafusion_catalog::{Session, TableProvider};
32+
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
3333
use datafusion_common::{
3434
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
3535
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
@@ -1169,6 +1169,22 @@ impl TableProvider for ListingTable {
11691169
filters: &[Expr],
11701170
limit: Option<usize>,
11711171
) -> Result<Arc<dyn ExecutionPlan>> {
1172+
let options = ScanArgs::default()
1173+
.with_projection(projection.map(|p| p.as_slice()))
1174+
.with_filters(Some(filters))
1175+
.with_limit(limit);
1176+
Ok(self.scan_with_args(state, options).await?.into_inner())
1177+
}
1178+
1179+
async fn scan_with_args<'a>(
1180+
&self,
1181+
state: &dyn Session,
1182+
args: ScanArgs<'a>,
1183+
) -> Result<ScanResult> {
1184+
let projection = args.projection().map(|p| p.to_vec());
1185+
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
1186+
let limit = args.limit();
1187+
11721188
// extract types of partition columns
11731189
let table_partition_cols = self
11741190
.options
@@ -1181,6 +1197,7 @@ impl TableProvider for ListingTable {
11811197
.iter()
11821198
.map(|field| field.name().as_str())
11831199
.collect::<Vec<_>>();
1200+
11841201
// If the filters can be resolved using only partition cols, there is no need to
11851202
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
11861203
let (partition_filters, filters): (Vec<_>, Vec<_>) =
@@ -1198,8 +1215,8 @@ impl TableProvider for ListingTable {
11981215

11991216
// if no files need to be read, return an `EmptyExec`
12001217
if partitioned_file_lists.is_empty() {
1201-
let projected_schema = project_schema(&self.schema(), projection)?;
1202-
return Ok(Arc::new(EmptyExec::new(projected_schema)));
1218+
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
1219+
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
12031220
}
12041221

12051222
let output_ordering = self.try_create_output_ordering()?;
@@ -1233,13 +1250,16 @@ impl TableProvider for ListingTable {
12331250
let Some(object_store_url) =
12341251
self.table_paths.first().map(ListingTableUrl::object_store)
12351252
else {
1236-
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
1253+
return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
1254+
Schema::empty(),
1255+
)))));
12371256
};
12381257

12391258
let file_source = self.create_file_source_with_schema_adapter()?;
12401259

12411260
// create the execution plan
1242-
self.options
1261+
let plan = self
1262+
.options
12431263
.format
12441264
.create_physical_plan(
12451265
state,
@@ -1251,14 +1271,16 @@ impl TableProvider for ListingTable {
12511271
.with_file_groups(partitioned_file_lists)
12521272
.with_constraints(self.constraints.clone())
12531273
.with_statistics(statistics)
1254-
.with_projection(projection.cloned())
1274+
.with_projection(projection)
12551275
.with_limit(limit)
12561276
.with_output_ordering(output_ordering)
12571277
.with_table_partition_cols(table_partition_cols)
12581278
.with_expr_adapter(self.expr_adapter_factory.clone())
12591279
.build(),
12601280
)
1261-
.await
1281+
.await?;
1282+
1283+
Ok(ScanResult::new(plan))
12621284
}
12631285

12641286
fn supports_filters_pushdown(

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
6060
use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
63+
use datafusion_catalog::ScanArgs;
6364
use datafusion_common::display::ToStringifiedPlan;
6465
use datafusion_common::tree_node::{
6566
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
@@ -459,9 +460,13 @@ impl DefaultPhysicalPlanner {
459460
// doesn't know (nor should care) how the relation was
460461
// referred to in the query
461462
let filters = unnormalize_cols(filters.iter().cloned());
462-
source
463-
.scan(session_state, projection.as_ref(), &filters, *fetch)
464-
.await?
463+
let filters_vec = filters.into_iter().collect::<Vec<_>>();
464+
let opts = ScanArgs::default()
465+
.with_projection(projection.as_deref())
466+
.with_filters(Some(&filters_vec))
467+
.with_limit(*fetch);
468+
let res = source.scan_with_args(session_state, opts).await?;
469+
Arc::clone(res.plan())
465470
}
466471
LogicalPlan::Values(Values { values, schema }) => {
467472
let exec_schema = schema.as_ref().to_owned().into();

0 commit comments

Comments
 (0)