Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::memory::DataSourceExec;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -491,23 +492,22 @@ impl TableProvider for IndexTableProvider {
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);

let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));
Comment on lines -494 to +507
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moving code around to initialize the FileScanConfig before the ParquetSource ✅


// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Comment on lines -494 to -510
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll see this pattern a lot in this PR.

The old flow goes something like:

  1. Define the file source
  2. Define the file scan config and move the file source inside
  3. Derive a data source plan from the file scan config

Inside this flow, there's a circular dependency (call file source from config, create file opener from file source but also pass in the config).

The new flow goes something like:

  1. Define the config
  2. Define the file source which now owns the config
  3. Derive a data source plan from the file source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest point of the refactor

Ok(DataSourceExec::from_data_source(file_source))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
28 changes: 11 additions & 17 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -58,20 +58,17 @@ async fn csv_opener() -> Result<()> {
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
.with_projection(Some(vec![12, 0]))
.with_batch_size(Some(8192))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(true, b',', b'"')
.with_comment(Some(b'#'))
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);
Comment on lines -70 to -72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that the schema, batch size and projection are not properties inherent to CSVs and thus should be part of FileScanConfig. In fact they are currently duplicated!

let source =
CsvSource::new(true, b',', b'"', scan_config.clone()).with_comment(Some(b'#'));

let opener = config.create_file_opener(object_store, &scan_config, 0);
let opener = source.create_file_opener(object_store, 0);

let mut result = vec![];
let mut stream =
Expand Down Expand Up @@ -121,15 +118,12 @@ async fn json_opener() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();
let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();

let mut stream = FileStream::new(
&scan_config,
Expand Down
10 changes: 6 additions & 4 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ impl FileFormat for TSVFileFormat {
async fn create_physical_plan(
&self,
state: &dyn Session,
conf: FileScanConfig,
source: Arc<dyn FileSource>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.csv_file_format.create_physical_plan(state, conf).await
self.csv_file_format
.create_physical_plan(state, source)
.await
}

async fn create_writer_physical_plan(
Expand All @@ -128,8 +130,8 @@ impl FileFormat for TSVFileFormat {
.await
}

fn file_source(&self) -> Arc<dyn FileSource> {
self.csv_file_format.file_source()
fn file_source(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
self.csv_file_format.file_source(config)
}
}

Expand Down
21 changes: 10 additions & 11 deletions datafusion-examples/examples/default_column_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::catalog::{Session, TableProvider};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{Result, ScalarValue};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -235,10 +236,6 @@ impl TableProvider for DefaultValueTableProvider {
&df_schema,
)?;

let parquet_source = ParquetSource::default()
.with_predicate(filter)
.with_pushdown_filters(true);

let object_store_url = ObjectStoreUrl::parse("memory://")?;
let store = state.runtime_env().object_store(object_store_url)?;

Expand All @@ -255,19 +252,21 @@ impl TableProvider for DefaultValueTableProvider {
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();

let file_scan_config = FileScanConfigBuilder::new(
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
self.schema.clone(),
Arc::new(parquet_source),
)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _))
.build();

Ok(DataSourceExec::from_data_source(
ParquetSource::new(TableParquetOptions::default(), config.clone())
.with_predicate(filter)
.with_pushdown_filters(true),
))
}
}

Expand Down
33 changes: 15 additions & 18 deletions datafusion-examples/examples/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion::common::{assert_contains, DFSchema, Result};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -243,10 +244,6 @@ impl TableProvider for ExampleTableProvider {
&df_schema,
)?;

let parquet_source = ParquetSource::default()
.with_predicate(filter)
.with_pushdown_filters(true);

let object_store_url = ObjectStoreUrl::parse("memory://")?;

let store = state.runtime_env().object_store(object_store_url)?;
Expand All @@ -264,20 +261,20 @@ impl TableProvider for ExampleTableProvider {
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();

let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
schema,
Arc::new(parquet_source),
)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
// if the rewriter needs a reference to the table schema you can bind self.schema() here
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::parse("memory://")?, schema)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
// if the rewriter needs a reference to the table schema you can bind self.schema() here
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _))
.build();

Ok(DataSourceExec::from_data_source(
ParquetSource::new(TableParquetOptions::default(), config.clone())
.with_predicate(filter)
.with_pushdown_filters(true),
))
}
}

Expand Down
11 changes: 8 additions & 3 deletions datafusion-examples/examples/parquet_embedded_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{exec_err, HashMap, HashSet, Result};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
Expand Down Expand Up @@ -426,8 +427,8 @@ impl TableProvider for DistinctIndexTable {

// Build ParquetSource to actually read the files
let url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_enable_page_index(true));
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);

let mut builder = FileScanConfigBuilder::new(url, self.schema.clone());
for file in files_to_scan {
let path = self.dir.join(file);
let len = std::fs::metadata(&path)?.len();
Expand All @@ -438,7 +439,11 @@ impl TableProvider for DistinctIndexTable {
PartitionedFile::new(path.to_str().unwrap().to_string(), len);
builder = builder.with_file(partitioned_file);
}
Ok(DataSourceExec::from_data_source(builder.build()))

Ok(DataSourceExec::from_data_source(
ParquetSource::new(TableParquetOptions::default(), builder.build())
.with_enable_page_index(true),
))
}

/// Tell DataFusion that we can handle filters on the "category" column
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::physical_plan::{FileGroup, ParquetSource};
use datafusion::datasource::physical_plan::{FileGroup, FileSource, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -98,9 +98,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some((file_config, _)) =
if let Some(parquet_source) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
let file_config = parquet_source.config();

self.file_groups = Some(file_config.file_groups.clone());
Comment on lines +104 to 106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a valid usage. Only question I have is if we should make FileScanConfig an implementation detail and have a file_groups() method on DataSource. We could even have a config() on FileSource so that impl<T: FileSource> DataSource for &T { fn file_groups(&self) -> ? { self.config().file_groups.clone() } or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting point. With 5d4e834, I'm starting to see a stratified API for T: FileSource, where certain methods are called directly on T and others via T::FileScanConfig.

The statistics getters were the only blatant cases where it was better served to have those functions live on T. I think having a file_groups() on T makes sense, but so does a lot of the functions that live in T::FileScanConfig.

Will think more about this


let metrics = match data_source_exec.metrics() {
Expand Down
12 changes: 9 additions & 3 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
Expand Down Expand Up @@ -243,9 +244,9 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_predicate(predicate));

let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
FileScanConfigBuilder::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);

Expand All @@ -258,8 +259,13 @@ impl TableProvider for IndexTableProvider {
PartitionedFile::new(canonical_path.display().to_string(), file_size),
);
}

Ok(DataSourceExec::from_data_source(
file_scan_config_builder.build(),
ParquetSource::new(
TableParquetOptions::default(),
file_scan_config_builder.build(),
)
.with_predicate(predicate),
))
}

Expand Down
20 changes: 3 additions & 17 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion_common::{
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::ObjectWriterBuilder;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand All @@ -58,7 +58,6 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -173,19 +172,6 @@ impl FileFormat for ArrowFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn create_physical_plan(
&self,
_state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();

Ok(DataSourceExec::from_data_source(config))
}

async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
Expand All @@ -202,8 +188,8 @@ impl FileFormat for ArrowFormat {
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}

fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(ArrowSource::default())
fn file_source(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
Arc::new(ArrowSource::new(config))
}
}

Expand Down
20 changes: 7 additions & 13 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,16 @@ pub(crate) mod test_util {
}]
.into()];

let exec = format
.create_physical_plan(
state,
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
)
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_file_source_projected_statistics(statistics)
.with_projection(projection)
.with_limit(limit)
.build(),
)
.await?;
Ok(exec)
.build();

let source = format.file_source(config);
format.create_physical_plan(state, source).await
}
}

Expand Down
Loading