Skip to content

Commit 96ea7e8

Browse files
committed
Refactor TableProvider::scan into TableProvider::scan_with_args
1 parent e6c4f0d commit 96ea7e8

File tree

3 files changed

+185
-11
lines changed

3 files changed

+185
-11
lines changed

datafusion/catalog/src/table.rs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion_common::Result;
2727
use datafusion_common::{not_impl_err, Constraints, Statistics};
28-
use datafusion_expr::Expr;
28+
use datafusion_expr::{Expr, SortExpr};
2929

3030
use datafusion_expr::dml::InsertOp;
3131
use datafusion_expr::{
@@ -171,6 +171,41 @@ pub trait TableProvider: Debug + Sync + Send {
171171
limit: Option<usize>,
172172
) -> Result<Arc<dyn ExecutionPlan>>;
173173

174+
/// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
175+
///
176+
/// This method uses [`ScanArgs`] to pass scan parameters in a structured way
177+
/// and returns a [`ScanResult`] containing the execution plan. This approach
178+
/// allows for extensible parameter passing and result handling.
179+
///
180+
/// Table providers can override this method to take advantage of additional
181+
/// parameters like `preferred_ordering` that may not be available through
182+
/// other scan methods.
183+
///
184+
/// # Arguments
185+
/// * `state` - The session state containing configuration and context
186+
/// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
187+
///
188+
/// # Returns
189+
/// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
190+
///
191+
/// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
192+
async fn scan_with_args(
193+
&self,
194+
state: &dyn Session,
195+
args: ScanArgs,
196+
) -> Result<ScanResult> {
197+
let ScanArgs {
198+
filters,
199+
projection,
200+
limit,
201+
} = args;
202+
let filters = filters.unwrap_or_default();
203+
let plan = self
204+
.scan(state, projection.as_ref(), &filters, limit)
205+
.await?;
206+
Ok(ScanResult::new(plan))
207+
}
208+
174209
/// Specify if DataFusion should provide filter expressions to the
175210
/// TableProvider to apply *during* the scan.
176211
///
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
299334
}
300335
}
301336

337+
/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
338+
///
339+
/// `ScanArgs` provides a structured way to pass scan parameters to table providers,
340+
/// replacing the multiple individual parameters used by [`TableProvider::scan`].
341+
/// This struct uses the builder pattern for convenient construction.
342+
///
343+
/// # Examples
344+
///
345+
/// ```
346+
/// # use datafusion_catalog::ScanArgs;
347+
/// # use datafusion_expr::Expr;
348+
/// let args = ScanArgs::default()
349+
/// .with_projection(Some(vec![0, 2, 4]))
350+
/// .with_limit(Some(1000));
351+
/// ```
352+
#[derive(Debug, Clone, Default)]
353+
pub struct ScanArgs {
354+
filters: Option<Vec<Expr>>,
355+
projection: Option<Vec<usize>>,
356+
limit: Option<usize>,
357+
}
358+
359+
impl ScanArgs {
360+
/// Set the column projection for the scan.
361+
///
362+
/// The projection is a list of column indices from [`TableProvider::schema`]
363+
/// that should be included in the scan results. If `None`, all columns are included.
364+
///
365+
/// # Arguments
366+
/// * `projection` - Optional list of column indices to project
367+
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
368+
self.projection = projection;
369+
self
370+
}
371+
372+
/// Get the column projection for the scan.
373+
///
374+
/// Returns a cloned copy of the projection column indices, or `None` if
375+
/// no projection was specified (meaning all columns should be included).
376+
pub fn projection(&self) -> Option<Vec<usize>> {
377+
self.projection.clone()
378+
}
379+
380+
/// Set the filter expressions for the scan.
381+
///
382+
/// Filters are boolean expressions that should be evaluated during the scan
383+
/// to reduce the number of rows returned. All expressions are combined with AND logic.
384+
/// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
385+
///
386+
/// # Arguments
387+
/// * `filters` - Optional list of filter expressions
388+
pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
389+
self.filters = filters;
390+
self
391+
}
392+
393+
/// Get the filter expressions for the scan.
394+
///
395+
/// Returns a reference to the filter expressions, or `None` if no filters were specified.
396+
pub fn filters(&self) -> Option<&[Expr]> {
397+
self.filters.as_deref()
398+
}
399+
400+
/// Set the maximum number of rows to return from the scan.
401+
///
402+
/// If specified, the scan should return at most this many rows. This is typically
403+
/// used to optimize queries with `LIMIT` clauses.
404+
///
405+
/// # Arguments
406+
/// * `limit` - Optional maximum number of rows to return
407+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
408+
self.limit = limit;
409+
self
410+
}
411+
412+
/// Get the maximum number of rows to return from the scan.
413+
///
414+
/// Returns the row limit, or `None` if no limit was specified.
415+
pub fn limit(&self) -> Option<usize> {
416+
self.limit
417+
}
418+
}
419+
420+
/// Result of a table scan operation from [`TableProvider::scan_with_args`].
421+
///
422+
/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan,
423+
/// providing a typed return value instead of returning the plan directly.
424+
/// This allows for future extensibility of scan results without breaking
425+
/// the API.
426+
#[derive(Debug, Clone)]
427+
pub struct ScanResult {
428+
/// The ExecutionPlan to run.
429+
plan: Arc<dyn ExecutionPlan>,
430+
}
431+
432+
impl ScanResult {
433+
/// Create a new `ScanResult` with the given execution plan.
434+
///
435+
/// # Arguments
436+
/// * `plan` - The execution plan that will perform the table scan
437+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
438+
Self { plan }
439+
}
440+
441+
/// Get the execution plan for this scan result.
442+
///
443+
/// Returns a cloned reference to the [`ExecutionPlan`] that will perform
444+
/// the actual table scanning and data retrieval.
445+
pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
446+
Arc::clone(&self.plan)
447+
}
448+
}
449+
302450
/// A factory which creates [`TableProvider`]s at runtime given a URL.
303451
///
304452
/// For example, this can be used to create a table "on the fly"

datafusion/core/src/datasource/listing/table.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
3030
use arrow_schema::Schema;
3131
use async_trait::async_trait;
32-
use datafusion_catalog::{Session, TableProvider};
32+
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
3333
use datafusion_common::{
3434
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
3535
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
@@ -1166,6 +1166,22 @@ impl TableProvider for ListingTable {
11661166
filters: &[Expr],
11671167
limit: Option<usize>,
11681168
) -> Result<Arc<dyn ExecutionPlan>> {
1169+
let options = ScanArgs::default()
1170+
.with_projection(projection.cloned())
1171+
.with_filters(Some(filters.to_vec()))
1172+
.with_limit(limit);
1173+
Ok(self.scan_with_args(state, options).await?.plan())
1174+
}
1175+
1176+
async fn scan_with_args(
1177+
&self,
1178+
state: &dyn Session,
1179+
args: ScanArgs,
1180+
) -> Result<ScanResult> {
1181+
let projection = args.projection();
1182+
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
1183+
let limit = args.limit();
1184+
11691185
// extract types of partition columns
11701186
let table_partition_cols = self
11711187
.options
@@ -1178,6 +1194,7 @@ impl TableProvider for ListingTable {
11781194
.iter()
11791195
.map(|field| field.name().as_str())
11801196
.collect::<Vec<_>>();
1197+
11811198
// If the filters can be resolved using only partition cols, there is no need to
11821199
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
11831200
let (partition_filters, filters): (Vec<_>, Vec<_>) =
@@ -1195,8 +1212,8 @@ impl TableProvider for ListingTable {
11951212

11961213
// if no files need to be read, return an `EmptyExec`
11971214
if partitioned_file_lists.is_empty() {
1198-
let projected_schema = project_schema(&self.schema(), projection)?;
1199-
return Ok(Arc::new(EmptyExec::new(projected_schema)));
1215+
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
1216+
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
12001217
}
12011218

12021219
let output_ordering = self.try_create_output_ordering()?;
@@ -1230,13 +1247,16 @@ impl TableProvider for ListingTable {
12301247
let Some(object_store_url) =
12311248
self.table_paths.first().map(ListingTableUrl::object_store)
12321249
else {
1233-
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
1250+
return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
1251+
Schema::empty(),
1252+
)))));
12341253
};
12351254

12361255
let file_source = self.create_file_source_with_schema_adapter()?;
12371256

12381257
// create the execution plan
1239-
self.options
1258+
let plan = self
1259+
.options
12401260
.format
12411261
.create_physical_plan(
12421262
state,
@@ -1248,14 +1268,16 @@ impl TableProvider for ListingTable {
12481268
.with_file_groups(partitioned_file_lists)
12491269
.with_constraints(self.constraints.clone())
12501270
.with_statistics(statistics)
1251-
.with_projection(projection.cloned())
1271+
.with_projection(projection)
12521272
.with_limit(limit)
12531273
.with_output_ordering(output_ordering)
12541274
.with_table_partition_cols(table_partition_cols)
12551275
.with_expr_adapter(self.expr_adapter_factory.clone())
12561276
.build(),
12571277
)
1258-
.await
1278+
.await?;
1279+
1280+
Ok(ScanResult::new(plan))
12591281
}
12601282

12611283
fn supports_filters_pushdown(

datafusion/core/src/physical_planner.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
6060
use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
63+
use datafusion_catalog::ScanArgs;
6364
use datafusion_common::display::ToStringifiedPlan;
6465
use datafusion_common::tree_node::{
6566
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
@@ -459,9 +460,12 @@ impl DefaultPhysicalPlanner {
459460
// doesn't know (nor should care) how the relation was
460461
// referred to in the query
461462
let filters = unnormalize_cols(filters.iter().cloned());
462-
source
463-
.scan(session_state, projection.as_ref(), &filters, *fetch)
464-
.await?
463+
let opts = ScanArgs::default()
464+
.with_projection(projection.clone())
465+
.with_filters(Some(filters))
466+
.with_limit(*fetch);
467+
let res = source.scan_with_args(session_state, opts).await?;
468+
res.plan()
465469
}
466470
LogicalPlan::Values(Values { values, schema }) => {
467471
let exec_schema = schema.as_ref().to_owned().into();

0 commit comments

Comments
 (0)