Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::Expr;
use datafusion_expr::{Expr, SortExpr};

use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;
use itertools::Itertools;

/// A table which can be queried and modified.
///
Expand Down Expand Up @@ -171,6 +172,34 @@ pub trait TableProvider: Debug + Sync + Send {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;

async fn scan_with_args(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great new API and makes a lot of sense as it allows us to iterate / expand the scan API over time with less API disruption

It is also consistent with ScalarUDFImpl::invoke_with_args

To merge this PR I think we should:

  1. document this function (perhaps move the docs from scan here and then direct people to use scan_with_args with new TableProviders
  2. Migrate all existing code in Datafusion to use scan_with_args
  3. Consider deprecating scan

(maybe we can do this as a follow on PR)

&self,
state: &dyn Session,
args: ScanArgs,
) -> Result<ScanResult> {
let ScanArgs {
preferred_ordering: _,
filters,
projection,
limit,
} = args;
let filters = filters.unwrap_or_default();
let unsupported_filters = self
.supports_filters_pushdown(&filters.iter().collect_vec())?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to also do something about supports_filters_pushdown. I think it could be folded into scan() (and somewhat already set it up that way since scan_with_args gets to return unsupported/inexact filters).

I would rework the filter pushdown optimizer so that it pushes all filters into the TableScan structure (without caring about supported / unsupported) and then then when we go from logical -> physical plan we apply a FilterExec for any non-exact filters and a ProjectionExec for the projections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend doing this as a follow on PR -- and the key consideration would be how disruptive it would be to existing users vs the benefit existing users and new users would get

.into_iter()
.zip(&filters)
.filter_map(|(support, expr)| match support {
TableProviderFilterPushDown::Inexact
| TableProviderFilterPushDown::Unsupported => Some(expr.clone()),
TableProviderFilterPushDown::Exact => None,
})
.collect_vec();
let plan = self
.scan(state, projection.as_ref(), &filters, limit)
.await?;
Ok(ScanResult::new(plan, unsupported_filters))
}

/// Specify if DataFusion should provide filter expressions to the
/// TableProvider to apply *during* the scan.
///
Expand Down Expand Up @@ -299,6 +328,75 @@ pub trait TableProvider: Debug + Sync + Send {
}
}

#[derive(Debug, Clone, Default)]
pub struct ScanArgs {
preferred_ordering: Option<Vec<SortExpr>>,
filters: Option<Vec<Expr>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
}

impl ScanArgs {
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

pub fn projection(&self) -> Option<Vec<usize>> {
self.projection.clone()
}

pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
self.filters = filters;
self
}

pub fn filters(&self) -> Option<&[Expr]> {
self.filters.as_deref()
}

pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

pub fn limit(&self) -> Option<usize> {
self.limit
}

pub fn with_preferred_ordering(mut self, ordering: Option<Vec<SortExpr>>) -> Self {
self.preferred_ordering = ordering;
self
}

pub fn preferred_ordering(&self) -> Option<&[SortExpr]> {
self.preferred_ordering.as_deref()
}
}

#[derive(Debug, Clone)]
pub struct ScanResult {
/// The ExecutionPlan to run.
plan: Arc<dyn ExecutionPlan>,
// Remaining filters that were not completely evaluated during `scan_with_args()`.
// These were previously referred to as "unsupported filters" or "inexact filters".
filters: Vec<Expr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is confusing as the "can the filters be supported" is currently a separate API. I think we should have one or the other but not both.

I realize that scan_with_args is basically the "create the appropriate physical plan" so logically it makes sense here.

}

impl ScanResult {
pub fn new(plan: Arc<dyn ExecutionPlan>, filters: Vec<Expr>) -> Self {
Self { plan, filters }
}

pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
Arc::clone(&self.plan)
}

pub fn filters(&self) -> &[Expr] {
&self.filters
}
}

/// A factory which creates [`TableProvider`]s at runtime given a URL.
///
/// For example, this can be used to create a table "on the fly"
Expand Down
Loading