Skip to content

Commit d6ff4cc

Browse files
Move schema and batch_size into file scan config
1 parent 2b92b1d commit d6ff4cc

File tree

13 files changed

+130
-168
lines changed

13 files changed

+130
-168
lines changed

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,13 @@ async fn csv_opener() -> Result<()> {
6161
Arc::new(CsvSource::default()),
6262
)
6363
.with_projection(Some(vec![12, 0]))
64+
.with_batch_size(Some(8192))
6465
.with_limit(Some(5))
6566
.with_file(PartitionedFile::new(path.display().to_string(), 10))
6667
.build();
6768

6869
let config = CsvSource::new(true, b',', b'"')
6970
.with_comment(Some(b'#'))
70-
.with_schema(schema)
71-
.with_batch_size(8192)
7271
.with_projection(&scan_config);
7372

7473
let opener = config.create_file_opener(object_store, &scan_config, 0);

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use datafusion_datasource::as_file_source;
2424
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
2525

2626
use arrow::buffer::Buffer;
27-
use arrow::datatypes::SchemaRef;
2827
use arrow_ipc::reader::FileDecoder;
2928
use datafusion_datasource::file::FileSource;
3029
use datafusion_datasource::file_scan_config::FileScanConfig;
@@ -66,14 +65,6 @@ impl FileSource for ArrowSource {
6665
self
6766
}
6867

69-
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
70-
Arc::new(Self { ..self.clone() })
71-
}
72-
73-
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
74-
Arc::new(Self { ..self.clone() })
75-
}
76-
7768
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
7869
Arc::new(Self { ..self.clone() })
7970
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ mod tests {
185185
source = source.with_bloom_filter_on_read(false);
186186
}
187187

188-
source.with_schema(Arc::clone(&table_schema))
188+
// source.with_schema(Arc::clone(&table_schema))
189+
190+
Arc::new(source) as Arc<dyn FileSource>
189191
}
190192

191193
fn build_parquet_exec(

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use crate::physical_plan::metrics::MetricsSet;
3737
use crate::physical_plan::ExecutionPlan;
3838
use crate::prelude::{Expr, SessionConfig, SessionContext};
3939

40-
use datafusion_datasource::file::FileSource;
4140
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4241
use datafusion_datasource::source::DataSourceExec;
4342
use object_store::path::Path;
@@ -185,8 +184,8 @@ impl TestParquetFile {
185184
let source = Arc::new(
186185
ParquetSource::new(parquet_options)
187186
.with_predicate(Arc::clone(&physical_filter_expr)),
188-
)
189-
.with_schema(Arc::clone(&self.schema));
187+
);
188+
190189
let config = scan_config_builder.with_source(source).build();
191190
let parquet_exec = DataSourceExec::from_data_source(config);
192191

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use arrow::datatypes::SchemaRef;
1919
use arrow::error::ArrowError;
2020
use arrow::{array::RecordBatch, compute::concat_batches};
2121
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
22-
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
22+
use datafusion_common::{config::ConfigOptions, internal_err, Result};
2323
use datafusion_datasource::{
2424
file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
2525
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
@@ -107,10 +107,7 @@ impl FileOpener for TestOpener {
107107
pub struct TestSource {
108108
support: bool,
109109
predicate: Option<Arc<dyn PhysicalExpr>>,
110-
statistics: Option<Statistics>,
111-
batch_size: Option<usize>,
112110
batches: Vec<RecordBatch>,
113-
schema: Option<SchemaRef>,
114111
metrics: ExecutionPlanMetricsSet,
115112
projection: Option<Vec<usize>>,
116113
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
@@ -131,13 +128,13 @@ impl FileSource for TestSource {
131128
fn create_file_opener(
132129
&self,
133130
_object_store: Arc<dyn ObjectStore>,
134-
_base_config: &FileScanConfig,
131+
base_config: &FileScanConfig,
135132
_partition: usize,
136133
) -> Arc<dyn FileOpener> {
137134
Arc::new(TestOpener {
138135
batches: self.batches.clone(),
139-
batch_size: self.batch_size,
140-
schema: self.schema.clone(),
136+
batch_size: base_config.batch_size,
137+
schema: Some(Arc::clone(&base_config.file_schema)),
141138
projection: self.projection.clone(),
142139
})
143140
}
@@ -146,20 +143,6 @@ impl FileSource for TestSource {
146143
todo!("should not be called")
147144
}
148145

149-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
150-
Arc::new(TestSource {
151-
batch_size: Some(batch_size),
152-
..self.clone()
153-
})
154-
}
155-
156-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
157-
Arc::new(TestSource {
158-
schema: Some(schema),
159-
..self.clone()
160-
})
161-
}
162-
163146
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
164147
Arc::new(TestSource {
165148
projection: config.projection.clone(),
@@ -175,7 +158,12 @@ impl FileSource for TestSource {
175158
"test"
176159
}
177160

178-
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
161+
fn fmt_extra(
162+
&self,
163+
t: DisplayFormatType,
164+
f: &mut Formatter,
165+
_config: &FileScanConfig,
166+
) -> std::fmt::Result {
179167
match t {
180168
DisplayFormatType::Default | DisplayFormatType::Verbose => {
181169
let support = format!(", pushdown_supported={}", self.support);
@@ -202,6 +190,7 @@ impl FileSource for TestSource {
202190
&self,
203191
mut filters: Vec<Arc<dyn PhysicalExpr>>,
204192
config: &ConfigOptions,
193+
_fs_config: &FileScanConfig,
205194
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
206195
if self.support && config.execution.parquet.pushdown_filters {
207196
if let Some(internal) = self.predicate.as_ref() {

datafusion/datasource-avro/src/source.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ use object_store::ObjectStore;
3636
/// AvroSource holds the extra configuration that is necessary for opening avro files
3737
#[derive(Clone, Default)]
3838
pub struct AvroSource {
39-
schema: Option<SchemaRef>,
40-
batch_size: Option<usize>,
4139
projection: Option<Vec<String>>,
4240
metrics: ExecutionPlanMetricsSet,
4341
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
@@ -49,11 +47,16 @@ impl AvroSource {
4947
Self::default()
5048
}
5149

52-
fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
50+
fn open<R: std::io::Read>(
51+
&self,
52+
reader: R,
53+
file_schema: SchemaRef,
54+
batch_size: Option<usize>,
55+
) -> Result<AvroReader<'static, R>> {
5356
AvroReader::try_new(
5457
reader,
55-
Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
56-
self.batch_size.expect("Batch size must set before open"),
58+
file_schema,
59+
batch_size.expect("Batch size must set before open"),
5760
self.projection.clone(),
5861
)
5962
}
@@ -63,31 +66,21 @@ impl FileSource for AvroSource {
6366
fn create_file_opener(
6467
&self,
6568
object_store: Arc<dyn ObjectStore>,
66-
_base_config: &FileScanConfig,
69+
base_config: &FileScanConfig,
6770
_partition: usize,
6871
) -> Arc<dyn FileOpener> {
6972
Arc::new(private::AvroOpener {
70-
config: Arc::new(self.clone()),
73+
source: Arc::new(self.clone()),
7174
object_store,
75+
file_schema: base_config.file_schema.clone(),
76+
batch_size: base_config.batch_size,
7277
})
7378
}
7479

7580
fn as_any(&self) -> &dyn Any {
7681
self
7782
}
7883

79-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
80-
let mut conf = self.clone();
81-
conf.batch_size = Some(batch_size);
82-
Arc::new(conf)
83-
}
84-
85-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
86-
let mut conf = self.clone();
87-
conf.schema = Some(schema);
88-
Arc::new(conf)
89-
}
90-
9184
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
9285
let mut conf = self.clone();
9386
conf.projection = config.projected_file_column_names();
@@ -138,8 +131,11 @@ mod private {
138131
use object_store::{GetResultPayload, ObjectStore};
139132

140133
pub struct AvroOpener {
141-
pub config: Arc<AvroSource>,
134+
pub source: Arc<AvroSource>,
142135
pub object_store: Arc<dyn ObjectStore>,
136+
137+
pub file_schema: SchemaRef,
138+
pub batch_size: Option<usize>,
143139
}
144140

145141
impl FileOpener for AvroOpener {
@@ -148,18 +144,21 @@ mod private {
148144
file_meta: FileMeta,
149145
_file: PartitionedFile,
150146
) -> Result<FileOpenFuture> {
151-
let config = Arc::clone(&self.config);
147+
let source = Arc::clone(&self.source);
152148
let object_store = Arc::clone(&self.object_store);
149+
let file_schema = Arc::clone(&self.file_schema);
150+
let batch_size = self.batch_size;
153151
Ok(Box::pin(async move {
154152
let r = object_store.get(file_meta.location()).await?;
155153
match r.payload {
156154
GetResultPayload::File(file, _) => {
157-
let reader = config.open(file)?;
155+
let reader = source.open(file, file_schema, batch_size)?;
158156
Ok(futures::stream::iter(reader).boxed())
159157
}
160158
GetResultPayload::Stream(_) => {
161159
let bytes = r.bytes().await?;
162-
let reader = config.open(bytes.reader())?;
160+
let reader =
161+
source.open(bytes.reader(), file_schema, batch_size)?;
163162
Ok(futures::stream::iter(reader).boxed())
164163
}
165164
}

0 commit comments

Comments
 (0)