Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion::{
test_util::aggr_test_schema,
};

use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use datafusion::datasource::{
physical_plan::FileScanConfigBuilder, table_schema::TableSchema,
};
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand Down Expand Up @@ -67,7 +69,7 @@ async fn csv_opener() -> Result<()> {

let config = CsvSource::new(true, b',', b'"')
.with_comment(Some(b'#'))
.with_schema(schema)
.with_schema(TableSchema::new_from_file_schema(schema))
.with_batch_size(8192)
.with_projection(&scan_config);

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub use datafusion_catalog::view;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::sink;
pub use datafusion_datasource::source;
pub use datafusion_datasource::table_schema;
pub use datafusion_execution::object_store;
pub use datafusion_physical_expr::create_ordering;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
use datafusion_datasource::source::DataSourceExec;

use datafusion_datasource::file::FileSource;
use datafusion_datasource::{FileRange, PartitionedFile};
use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_datasource_parquet::{
DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
Expand Down Expand Up @@ -186,7 +186,7 @@ mod tests {
source = source.with_bloom_filter_on_read(false);
}

source.with_schema(Arc::clone(&table_schema))
source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![]))
}

fn build_parquet_exec(
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::prelude::{Expr, SessionConfig, SessionContext};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::TableSchema;
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -186,7 +187,7 @@ impl TestParquetFile {
ParquetSource::new(parquet_options)
.with_predicate(Arc::clone(&physical_filter_expr)),
)
.with_schema(Arc::clone(&self.schema));
.with_schema(TableSchema::new_from_file_schema(Arc::clone(&self.schema)));
let config = scan_config_builder.with_source(source).build();
let parquet_exec = DataSourceExec::from_data_source(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion_datasource::{
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
TableSchema,
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -156,9 +157,13 @@ impl FileSource for TestSource {
})
}

fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
assert!(
schema.table_partition_cols().is_empty(),
"TestSource does not support partition columns"
);
Arc::new(TestSource {
schema: Some(schema),
schema: Some(schema.file_schema().clone()),
..self.clone()
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource-arrow/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::sync::Arc;

use datafusion_datasource::as_file_source;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::TableSchema;

use arrow::buffer::Buffer;
use arrow::datatypes::SchemaRef;
use arrow_ipc::reader::FileDecoder;
use datafusion_common::error::Result;
use datafusion_common::{exec_datafusion_err, Statistics};
Expand Down Expand Up @@ -73,7 +73,7 @@ impl FileSource for ArrowSource {
Arc::new(Self { ..self.clone() })
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/datasource-avro/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::TableSchema;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

Expand Down Expand Up @@ -84,9 +85,10 @@ impl FileSource for AvroSource {
Arc::new(conf)
}

fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.schema = Some(schema);
// TableSchema may have partition columns, but AvroSource does not use partition columns or values atm
conf.schema = Some(Arc::clone(&schema.file_schema()));
Arc::new(conf)
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
RangeCalculation,
RangeCalculation, TableSchema,
};

use arrow::csv;
Expand Down Expand Up @@ -258,9 +258,9 @@ impl FileSource for CsvSource {
Arc::new(conf)
}

fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.file_schema = Some(schema);
conf.file_schema = Some(Arc::clone(&schema.file_schema()));
Arc::new(conf)
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
TableSchema,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

Expand Down Expand Up @@ -122,7 +123,7 @@ impl FileSource for JsonSource {
Arc::new(conf)
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
Expand Down
25 changes: 16 additions & 9 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};

use arrow::datatypes::{SchemaRef, TimeUnit};
use arrow::datatypes::TimeUnit;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::TableSchema;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
Expand Down Expand Up @@ -275,7 +276,7 @@ pub struct ParquetSource {
/// The schema of the file.
/// In particular, this is the schema of the table without partition columns,
/// *not* the physical schema of the file.
pub(crate) file_schema: Option<SchemaRef>,
pub(crate) table_schema: Option<TableSchema>,
/// Optional predicate for row filtering during parquet scan
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional user defined parquet file reader factory
Expand Down Expand Up @@ -601,9 +602,9 @@ impl FileSource for ParquetSource {
Arc::new(conf)
}

fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(Self {
file_schema: Some(schema),
table_schema: Some(schema),
..self.clone()
})
}
Expand Down Expand Up @@ -661,9 +662,10 @@ impl FileSource for ParquetSource {
// the actual predicates are built in reference to the physical schema of
// each file, which we do not have at this point and hence cannot use.
// Instead we use the logical schema of the file (the table schema without partition columns).
if let (Some(file_schema), Some(predicate)) =
(&self.file_schema, &self.predicate)
{
if let (Some(file_schema), Some(predicate)) = (
&self.table_schema.as_ref().map(|ts| ts.file_schema()),
&self.predicate,
) {
let predicate_creation_errors = Count::new();
if let (Some(pruning_predicate), _) = build_pruning_predicates(
Some(predicate),
Expand Down Expand Up @@ -700,7 +702,12 @@ impl FileSource for ParquetSource {
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let Some(file_schema) = self.file_schema.clone() else {
let Some(table_schema) = self
.table_schema
.as_ref()
.map(|ts| ts.table_schema())
.cloned()
else {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
));
Expand All @@ -720,7 +727,7 @@ impl FileSource for ParquetSource {
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
if can_expr_be_pushed_down_with_schemas(&filter, &table_schema) {
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::schema_adapter::SchemaAdapterFactory;
use arrow::datatypes::SchemaRef;
use crate::TableSchema;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
Expand Down Expand Up @@ -64,7 +64,7 @@ pub trait FileSource: Send + Sync {
/// Initialize new type with batch size configuration
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
/// Initialize new instance with a new schema
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource>;
/// Initialize new instance with projection information
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
/// Initialize new instance with projected statistics
Expand Down
10 changes: 7 additions & 3 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use log::{debug, warn};
/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
/// # use datafusion_datasource::file_stream::FileOpener;
/// # use datafusion_datasource::source::DataSourceExec;
/// # use datafusion_datasource::table_schema::TableSchema;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # use datafusion_physical_plan::ExecutionPlan;
/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand All @@ -107,7 +108,7 @@ use log::{debug, warn};
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
/// # fn as_any(&self) -> &dyn Any { self }
/// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
/// # fn with_schema(&self, _: TableSchema) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
/// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
/// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
Expand Down Expand Up @@ -446,14 +447,17 @@ impl FileScanConfigBuilder {
let statistics =
statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));

// Create TableSchema from file_schema and table_partition_cols
let table_schema =
TableSchema::new(Arc::clone(&file_schema), table_partition_cols.clone());

let file_source = file_source
.with_statistics(statistics.clone())
.with_schema(Arc::clone(&file_schema));
.with_schema(table_schema.clone());
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let new_lines_in_values = new_lines_in_values.unwrap_or(false);

// Create TableSchema from file_schema and table_partition_cols
let table_schema = TableSchema::new(file_schema, table_partition_cols);

FileScanConfig {
Expand Down
26 changes: 26 additions & 0 deletions datafusion/datasource/src/table_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ impl TableSchema {
/// The table schema is automatically computed by appending the partition columns
/// to the file schema.
///
/// You should prefer calling this method over
/// chaining [`TableSchema::new_from_file_schema`] and [`TableSchema::with_partition_cols`]
/// if you have both the file schema and partition columns available at construction time
/// since it avoids re-computing the table schema.
///
/// # Arguments
///
/// * `file_schema` - Schema of the data files (without partition columns)
Expand Down Expand Up @@ -121,6 +126,27 @@ impl TableSchema {
}
}

/// Create a new TableSchema with no partition columns.
///
/// You should prefer calling [`TableSchema::new`] if you have partition columns at
/// construction time since it avoids re-computing the table schema.
pub fn new_from_file_schema(file_schema: SchemaRef) -> Self {
Self::new(file_schema, vec![])
}

/// Add partition columns to an existing TableSchema, returning a new instance.
///
/// You should prefer calling [`TableSchema::new`] instead of chaining [`TableSchema::new_from_file_schema`]
/// into [`TableSchema::with_partition_cols`] if you have partition columns at construction time
/// since it avoids re-computing the table schema.
pub fn with_partition_cols(mut self, partition_cols: Vec<FieldRef>) -> Self {
self.table_partition_cols = partition_cols;
let mut builder = SchemaBuilder::from(self.file_schema.as_ref());
builder.extend(self.table_partition_cols.iter().cloned());
self.table_schema = Arc::new(builder.finish());
self
}

/// Get the file schema (without partition columns).
///
/// This is the schema of the actual data files on disk.
Expand Down
5 changes: 3 additions & 2 deletions datafusion/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{

use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};
use crate::TableSchema;
use arrow::datatypes::Schema;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl FileSource for MockSource {
Arc::new(Self { ..self.clone() })
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
}

Expand Down
Loading
Loading