Skip to content

Conversation

friendlymatthew
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

This PR simplifies the relationship between FileSources <--> FileScanConfig <--> DataSource.

Currently, FileScanConfig is a struct used to group common parameters shared across different file sources. However, the existing design also makes FileScanConfig impl DataSource. This means to construct a data source execution plan, you must derive it from a configuration struct.

This PR removes that indirection. Instead, each FileSource struct holds a FileScanConfig field, and all types impl FileSource also impl DataSource.

This redesign proves to remove a lot of redundant code. For instance, AvroSource previously duplicated fields from FileScanConfig, which required additional boilerplate to manually get/set values:

Screenshot 2025-08-19 at 3 00 26 PM

--

We still maintain an abstraction bounday between FileSource and DataSources. The DataSource impl remains generic over any T: FileSource

Are there any user-facing changes?

Yes -- and they are substantial.

Note: the current diff does not yet include deprecation strategies for existing methods to keep the review process clearer

@github-actions github-actions bot added core Core DataFusion crate substrait Changes to the substrait crate proto Related to proto crate datasource Changes to the datasource crate labels Aug 19, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 6 times, most recently from 93b9263 to 152548d Compare August 20, 2025 14:51
@github-actions github-actions bot removed the substrait Changes to the substrait crate label Aug 20, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 4 times, most recently from b138003 to d6ff4cc Compare August 20, 2025 19:35
@github-actions github-actions bot added documentation Improvements or additions to documentation substrait Changes to the substrait crate labels Aug 21, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from ae16eba to 3177ca6 Compare August 21, 2025 18:42
@friendlymatthew
Copy link
Contributor Author

friendlymatthew commented Aug 22, 2025

I was able to identify the root cause behind several test failures related to roundtripping physical plans in datafusion/proto

impacted test cases

cases::roundtrip_physical_plan::roundtrip_empty_projection
cases::roundtrip_physical_plan::roundtrip_parquet_select_projection
cases::roundtrip_physical_plan::roundtrip_parquet_select_projection_predicate
cases::roundtrip_physical_plan::roundtrip_parquet_select_star
cases::roundtrip_physical_plan::roundtrip_parquet_select_star_predicate
cases::roundtrip_physical_plan::test_round_trip_date_part_display
cases::roundtrip_physical_plan::test_round_trip_groups_display
cases::roundtrip_physical_plan::test_round_trip_human_display
cases::roundtrip_physical_plan::test_round_trip_tpch_queries

tl;dr, we expect FileScanConfig to be lossless when converting to/from protobuf. However, FileSource is not lossless. Now that FileSource implements DataSource, roundtripping physical plans loses some information, which is expected-- but the current assert_eq fails because of it.

On main, FileScanConfig implements DataSource directly, and when we roundtrip physical plans, we compare the Display output of FileScanConfig. Although FileScanConfig holds a file source struct, we display minimal info (e.g, source type: "parquet"). However in this PR, the ownership model changes: now all FileSource structs own a FileScanConfig, and they implement DataSource directly. Because we #derive(Debug) for FileSource types, the debug output includes fields that are not preserved through serialization.

For example, in ParquetSource, the parquet_file_reader_factory field is not retained after deserialization. Only the TableParquetOptions are serialized and restored; everything else is set to None.

Thoughts on fixes

I see 2 possible directions for resolving the test failures:

  1. Loosen the assertion when verifying DataSourceExec
    Instead of comparing the entire debug output, we could fall back to comparing the Display output of the underlying FileScanConfig, as is done on main. This would ignore non-serialized fields

  2. Encode extra metadata into TableParquetOptions
    We could explicitly encode some of the additional fields (e.g. presence of a custom reader factory) into the serialized metadata. But this feels futile-- since these fields aren't actually restored on deserialization, tracking their prior existence is extra overhead

I'm leaning towards 1 since it reflects the current lossless boundary and avoids overcomplicating serialization. But I'm curious if people have any opinions

cc @adriangb @berkaysynnada @mbrubeck @xudong963 @comphead @blaginin @alamb

@adriangb
Copy link
Contributor

I agree that there was always losses in serialization / deserialization e.g. loss of custom SchemaAdapterFactory. As long as there aren't more losses now than before we should adapt the assertion to roughly match this expectation.

@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 4 times, most recently from ccf7308 to a152b7d Compare August 22, 2025 20:10
@friendlymatthew friendlymatthew changed the title Have T: impl FileSource implement DataSource and own FileScanConfig Change ownership model between FileScanConfig and FileSources Aug 22, 2025
@friendlymatthew friendlymatthew changed the title Change ownership model between FileScanConfig and FileSources Redesign ownership model between FileScanConfig and FileSources Aug 22, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 513b7aa to 66eac9f Compare August 22, 2025 20:38
@xudong963 xudong963 self-requested a review August 23, 2025 14:30
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 2 times, most recently from fa40cc8 to 5cfd81d Compare August 25, 2025 14:41
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 5cfd81d to c7ea40a Compare August 25, 2025 14:43
@friendlymatthew friendlymatthew marked this pull request as ready for review August 25, 2025 15:10
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 2 times, most recently from b417b26 to 1bbd3ed Compare August 25, 2025 15:17
Copy link
Contributor Author

@friendlymatthew friendlymatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this PR is ready for review-- It's quite sizable and touches 55+ files.

I've split up the PR into 2 commits:

  • Move all fields shared across FileSource into FileScanConfig
  • Have FileSource own FileScanConfig and directly impl DataSource

I would recommend starting from datafusion/datasource/src/file_scan_config.rs to see which fields were added to FileScanConfig. Then, all concrete types that impl FileSource like ParquetSource.

datafusion/datasource/src/file.rs is the main change, as it implements DataSource for all T: FileSource

Comment on lines -494 to -510
let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));

// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll see this pattern a lot in this PR.

The old flow goes something like:

  1. Define the file source
  2. Define the file scan config and move the file source inside
  3. Derive a data source plan from the file scan config

Inside this flow, there's a circular dependency (call file source from config, create file opener from file source but also pass in the config).

The new flow goes something like:

  1. Define the config
  2. Define the file source which now owns the config
  3. Derive a data source plan from the file source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest point of the refactor

@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 1bbd3ed to 0cc2125 Compare August 25, 2025 20:06
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just small comments for now. will review more tomorrow

Comment on lines -494 to +507
let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moving code around to initialize the FileScanConfig before the ParquetSource ✅

Comment on lines -70 to -72
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that the schema, batch size and projection are not properties inherent to CSVs and thus should be part of FileScanConfig. In fact they are currently duplicated!

@xudong963
Copy link
Member

I'll start review in recent two days.

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks really nice!

Big picture the idea of having the shared behavior via impl <T: FileSource> DataSource for &T is really smart: it gives us code sharing / common behavior while allowing each FileSource to override the implementations to specialize (e.g. Parquet for projection pushdown).

I have left some comments questions and would like to see some other reviews as well before approving.

But again overall commend you on the attention to detail taken in this PR. It is unfortunately massive and hard to review but I think the organization into stacked commits you did is good and I don't see any way to split this up better.

Comment on lines 39 to 56
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct ArrowSource {
metrics: ExecutionPlanMetricsSet,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
config: FileScanConfig,
}

impl ArrowSource {
/// Returns a [`ArrowSource`]
pub fn new(config: FileScanConfig) -> Self {
Self {
metrics: Default::default(),
schema_adapter_factory: None,
config,
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense - all of the FileSource now need a FileScanConfig -> they can't impl Default anymmore

Comment on lines +65 to +74
fn config(&self) -> &FileScanConfig {
&self.config
}

fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
let mut this = self.clone();
this.config = config;

Arc::new(this)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the APIs I'm not so sure about

_partition: usize,
) -> Arc<dyn FileOpener> {
Arc::new(ArrowOpener {
object_store,
projection: base_config.file_column_projection_indices(),
projection: self.config().file_column_projection_indices(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use self.config instead, seems like not point in calling the public method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both approaches work in this case because:

  • The trait method config() just returns &self.config
  • So self.config() and self.config access the same data

Use self.config() for consistency with the trait interface, especially since:

  1. It maintains abstraction - using the trait method rather than direct field access
  2. It would work correctly even if the trait implementation changed, though it seems not to have happened

Comment on lines -68 to +95
fn metrics(&self) -> &ExecutionPlanMetricsSet {
fn metrics_inner(&self) -> &ExecutionPlanMetricsSet {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is metrics_inner now a method on FileSource?

Comment on lines +277 to +283
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let config = FileScanConfigBuilder::from(self.config().clone())
.with_limit(limit)
.build();

Some(self.with_config(config).as_data_source())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should invert this:

    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
        Some(Arc::new(FileSource::with_fetch(self, limit)) as Arc<dyn DataSource>)
    }

Where FileSource::with_fetch is something along the lines of:

self.config = self.config.with_fetch(limit)

Basically can we make FileScanConfig an implementation detail one level lower.

Comment on lines +183 to +185
fn as_file_source(&self) -> Option<Arc<dyn FileSource>> {
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering / questioning this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemorySourceConfig impls DataSource but not FileSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but I wonder if this is the right way to go about that or if whatever is needing this sort of down casting should be a part of the trait

Comment on lines +298 to +300
if let Some(file_source) = self.data_source.as_file_source() {
if let Some(file_group) = file_source.config().file_groups.get(partition)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the point is that not all data sources are file sources, and for the ones that aren't partitioning doesn't necessarily apply.

Can't we make this a method on the DataSource trait so that we can implement it as Ok(Statistics::new_unknown()) for MemoryDataSource and copy this code into impl <T: FileScan> DataSource for &T?

Comment on lines +472 to 474
pub fn downcast_to_file_source<T: FileSource + 'static>(&self) -> Option<&T> {
self.data_source().as_any().downcast_ref::<T>()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about this method, wonder where it is used. It's a nice convenience but to me this sort of downcasting is always a smell of missing design on the trait / leaky abstraction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessarily a bad code smell. It's used whenever we need to downcast to a FileSource. It just reduces the # LOC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that sort of down casting being necessary is a code smell

Comment on lines 185 to 187
fn parquet_exec_multiple_sorted(
output_ordering: Vec<LexOrdering>,
) -> Arc<DataSourceExec> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem for today but again the fact that optimizer rules have to special case execution node implementations via downcasting is IMO a smell

@mbutrovich mbutrovich self-requested a review August 26, 2025 16:25
fn data_source_statistics(&self) -> Result<Statistics> {
Ok(self.config().projected_stats(self.file_source_statistics()))
fn data_source_statistics(&self) -> Statistics {
let file_source_statistics = self.file_source_statistics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love data_source_statistics and file_source_statistics, curious if people had thoughts on better names here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both statistics()s are good to me

@xudong963
Copy link
Member

xudong963 commented Aug 28, 2025

It would be better to have a high-level diagram to describe the current relationship. Before the PR, it looks like
image

After PR, -> ?

If we have this, it'll definitely be helpful to review, especially for our users to understand the changes in the PR.


Also, if we wanna have more eyes on this redesign, we can send an email to https://lists.apache.org/[email protected].

@friendlymatthew
Copy link
Contributor Author

It would be better to have a high-level diagram to describe the current relationship.

Fwiw, this change involves removing the FileScanConfig node and moving it inside the arrow source...parquet source nodes

Here's a diagram that captures the redesign:

Screenshot 2025-08-28 at 10 11 46 AM

You can check it out here

@xudong963
Copy link
Member

It would be better to have a high-level diagram to describe the current relationship.

Fwiw, this change involves removing the FileScanConfig node and moving it inside the arrow source...parquet source nodes

Here's a diagram that captures the redesign:

Screenshot 2025-08-28 at 10 11 46 AM You can check it out [here](https://www.figma.com/board/lGQyasgDL0fwlPWqhAq0Mm/FileSource-to-DataSource-redesign?node-id=0-1&t=7MNOhWGSqdFSWH7R-1)

I'll review tomorrow following the diagram

Comment on lines +69 to +74
fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
let mut this = self.clone();
this.config = config;

Arc::new(this)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find a reason why we don't give a default implementation for it in FileSource

Comment on lines +65 to +66
fn config(&self) -> &FileScanConfig {
&self.config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +76 to +78
fn as_data_source(&self) -> Arc<dyn DataSource> {
Arc::new(self.clone())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

_partition: usize,
) -> Arc<dyn FileOpener> {
Arc::new(ArrowOpener {
object_store,
projection: base_config.file_column_projection_indices(),
projection: self.config().file_column_projection_indices(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both approaches work in this case because:

  • The trait method config() just returns &self.config
  • So self.config() and self.config access the same data

Use self.config() for consistency with the trait interface, especially since:

  1. It maintains abstraction - using the trait method rather than direct field access
  2. It would work correctly even if the trait implementation changed, though it seems not to have happened

/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
fn metrics_inner(&self) -> &ExecutionPlanMetricsSet;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the method here? It seems that we can add metrics to FileScanConfig

let mut source = source
.as_any()
.downcast_ref::<ParquetSource>()
.unwrap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be better to be replaced with proper error handling

Comment on lines -503 to -508
// Apply schema adapter factory before building the new config
let file_source = source.apply_schema_adapter(&conf)?;

let conf = FileScanConfigBuilder::from(conf)
.with_source(file_source)
.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for later reviewers, these step have been pulled up by create_file_source_with_schema_adapter

fn file_source_statistics(&self, config: &FileScanConfig) -> Statistics {
let statistics = config.file_source_projected_statistics.clone();
fn file_source_statistics(&self) -> Statistics {
let statistics = self.config.file_source_projected_statistics.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems just a name issue.

/// Batch size while creating new batches
/// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
pub batch_size: Option<usize>,
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,

pub file_source_projected_statistics: Statistics,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

projected_statistics is good to me

fn data_source_statistics(&self) -> Result<Statistics> {
Ok(self.config().projected_stats(self.file_source_statistics()))
fn data_source_statistics(&self) -> Statistics {
let file_source_statistics = self.file_source_statistics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both statistics()s are good to me

@comphead
Copy link
Contributor

Here's a diagram that captures the redesign:

Thanks @friendlymatthew @xudong963 for diagrams thats incredibly helpful for high level review, couple of questions:

  • FileOpener should be a separate block? For the reading user it is not clear what is the FileOpener(trait, class, API, etc)
  • Wondering should we have a sourceconfig to take common parts for MemorySourceConfig and FileSourceConfig? like batch size, etc. Currently MemosySourceConfig implements DataSource which is slightly confusing?
  • Would be beneficial to show whatTableProvider provides to FileSource, like table folder/paths, etc
  • Probably need to reorganize recordbatch block, because as per diagram it doesn't participate in the flow, but the recordbatch it is the exact we waiting from the DataSourceExec

@waynexia
Copy link
Member

waynexia commented Sep 2, 2025

Thank you for this innovative discussion! Learned a lot from it 💯

After digging a bit into related code, I got a few different opinions. First, I was wondering how DataSourceExec yields RecordBatchStream after all. As mentioned by @comphead, details of this part from previous diagram:

Probably need to reorganize recordbatch block, because as per diagram it doesn't participate in the flow, but the recordbatch it is the exact we waiting from the DataSourceExec

image

Here is a brief walk through of current implementation:

DataSourceExec --holds--> DataSource
  --use--> FileScanConfig::open
  --holds--> FileSource --call--> FileSource::create_file_opener(.., FileScanConfig)
  --get--> FileOpener
  --used by--> FileStream
  --get--> Future<RecordBatchStream>

Some unreasonable points to me are:

  • Too many abstractions, we have File{Format|Source|Stream|Opener|ScanConfig}. Some among them are very shallow, and even with duplications. Like batch, project, limit etc betweenFileScanConfig and FileSource. These duplications are removed in this PR (by the way, partition parameter is still duplicated). Those types are
    • FileScanConfig: general, base config, one of DataSource
    • FileSource: per-format, format-specific configs. Can be an empty struct for simple formats.
    • FileOpener: per-format, use static configs (FileScanConfig and FileSource) and other runtime info (like FileMeta) to get a future of RecordBatchStream. One-to-one associated with FileSource and FileFormat
    • FileStream: general, iterates over file segments (stored in FileScanConfig) got from FileOpener
  • FileScanConfig holds FileSource, but it's also a parameter of FileSource::create
  • FileOpener returns a future of future (pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>
  • FileScanConfig is acting as the top level entrance to end users, but it's actually the middle layer, a common part of config, between specific file format and actual data stream. And that might be the cause of the second point

And the final one, DataSourceExec doesn't look like a good abstraction to end user to me because it

  • can't be used with where file and memory are both available (e.g., cached record batch).
  • is not the best choice when I just want to read some files. I made an example to prove this in refactor: simplify json_shredding example by using ListingTable #17369
  • from functionality aspect should just be a stream merger (an exec plan that merges multiple RecordBatchStreams, either memory of file, like CombinedRecordBatchStream) and an adapter (proxy adjusting/property APIs on ExecutionPlan to record batch provider).

I'd like to propose a few changes based on current PR (as a discussion in this refactor topic, not as a review comment and might be too large for one PR).

  • Remove FileSource trait

    A fundamental thing we want to achieve from those types is Base Config + Format Config + Runtime Info -> File Data. FileSource as an optional format config doesn't need to be a general trait. Moving it to FileFormat looks more natural and closer to where it's being used.

  • Use FileFormat as the top-level entrance

    FileFormat takes FilsScanConfig and format config stored in it to produce the associated FileOpener.

  • Change the return type of FileOpener::open to just a stream to simplify the logic of FileStream

    This can likely simplify the logic of FileStream by a lot

    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
    loop {
    match &mut self.state {
    FileStreamState::Idle => {
    self.file_stream_metrics.time_opening.start();
    match self.start_next_file().transpose() {
    Ok(Some((future, partition_values))) => {
    self.state = FileStreamState::Open {
    future,
    partition_values,
    }
    }
    Ok(None) => return Poll::Ready(None),
    Err(e) => {
    self.state = FileStreamState::Error;
    return Poll::Ready(Some(Err(e)));
    }
    }
    }
    FileStreamState::Open {
    future,
    partition_values,
    } => match ready!(future.poll_unpin(cx)) {
    Ok(reader) => {
    let partition_values = mem::take(partition_values);
    // include time needed to start opening in `start_next_file`
    self.file_stream_metrics.time_opening.stop();
    let next = self.start_next_file().transpose();
    self.file_stream_metrics.time_scanning_until_data.start();
    self.file_stream_metrics.time_scanning_total.start();
    match next {
    Ok(Some((next_future, next_partition_values))) => {
    self.state = FileStreamState::Scan {
    partition_values,
    reader,
    next: Some((
    NextOpen::Pending(next_future),
    next_partition_values,
    )),
    };
    }
    Ok(None) => {
    self.state = FileStreamState::Scan {
    reader,
    partition_values,
    next: None,
    };
    }
    Err(e) => {
    self.state = FileStreamState::Error;
    return Poll::Ready(Some(Err(e)));
    }
    }
    }
    Err(e) => {
    self.file_stream_metrics.file_open_errors.add(1);
    match self.on_error {
    OnError::Skip => {
    self.file_stream_metrics.time_opening.stop();
    self.state = FileStreamState::Idle
    }
    OnError::Fail => {
    self.state = FileStreamState::Error;
    return Poll::Ready(Some(Err(e)));
    }
    }
    }
    },
    FileStreamState::Scan {
    reader,
    partition_values,
    next,
    } => {
    // We need to poll the next `FileOpenFuture` here to drive it forward
    if let Some((next_open_future, _)) = next {
    if let NextOpen::Pending(f) = next_open_future {
    if let Poll::Ready(reader) = f.as_mut().poll(cx) {
    *next_open_future = NextOpen::Ready(reader);
    }
    }
    }
    match ready!(reader.poll_next_unpin(cx)) {
    Some(Ok(batch)) => {
    self.file_stream_metrics.time_scanning_until_data.stop();
    self.file_stream_metrics.time_scanning_total.stop();
    let result = self
    .pc_projector
    .project(batch, partition_values)
    .map_err(|e| ArrowError::ExternalError(e.into()))
    .map(|batch| match &mut self.remain {
    Some(remain) => {
    if *remain > batch.num_rows() {
    *remain -= batch.num_rows();
    batch
    } else {
    let batch = batch.slice(0, *remain);
    self.state = FileStreamState::Limit;
    *remain = 0;
    batch
    }
    }
    None => batch,
    });
    if result.is_err() {
    // If the partition value projection fails, this is not governed by
    // the `OnError` behavior
    self.state = FileStreamState::Error
    }
    self.file_stream_metrics.time_scanning_total.start();
    return Poll::Ready(Some(result.map_err(Into::into)));
    }
    Some(Err(err)) => {
    self.file_stream_metrics.file_scan_errors.add(1);
    self.file_stream_metrics.time_scanning_until_data.stop();
    self.file_stream_metrics.time_scanning_total.stop();
    match self.on_error {
    // If `OnError::Skip` we skip the file as soon as we hit the first error
    OnError::Skip => match mem::take(next) {
    Some((future, partition_values)) => {
    self.file_stream_metrics.time_opening.start();
    match future {
    NextOpen::Pending(future) => {
    self.state = FileStreamState::Open {
    future,
    partition_values,
    }
    }
    NextOpen::Ready(reader) => {
    self.state = FileStreamState::Open {
    future: Box::pin(std::future::ready(
    reader,
    )),
    partition_values,
    }
    }
    }
    }
    None => return Poll::Ready(None),
    },
    OnError::Fail => {
    self.state = FileStreamState::Error;
    return Poll::Ready(Some(Err(err.into())));
    }
    }
    }
    None => {
    self.file_stream_metrics.time_scanning_until_data.stop();
    self.file_stream_metrics.time_scanning_total.stop();
    match mem::take(next) {
    Some((future, partition_values)) => {
    self.file_stream_metrics.time_opening.start();
    match future {
    NextOpen::Pending(future) => {
    self.state = FileStreamState::Open {
    future,
    partition_values,
    }
    }
    NextOpen::Ready(reader) => {
    self.state = FileStreamState::Open {
    future: Box::pin(std::future::ready(
    reader,
    )),
    partition_values,
    }
    }
    }
    }
    None => return Poll::Ready(None),
    }
    }
    }
    }
    FileStreamState::Error | FileStreamState::Limit => {
    return Poll::Ready(None)
    }
    }
    }
    }
    }

  • FileFormat::create_physical_plan can be simplified. Its implementations are identical


A diagram covers above proposed changes (drawio xml embedded):

Untitled Diagram (3)

@adriangb
Copy link
Contributor

adriangb commented Sep 2, 2025

@waynexia thanks so much for the input! I agree with most of your points, let's see what @friendlymatthew thinks

FileFormat takes FilsScanConfig and format config stored in it to produce the associated FileOpener

Don't we need a FileOpener per partition? So we need something that is produced by FileFormat::make_thing(FileScanConfig) that can then create a FileOpener per partition, right?

This can likely simplify the logic of FileStream by a lot

FWIW I also think we should get rid of the partition value handling in FileStream, instead we should just replace references to partition columns with literal values like we now do with filters. Those can then be expanded into an array if the projection demands it.

@waynexia
Copy link
Member

waynexia commented Sep 2, 2025

FileFormat takes FilsScanConfig and format config stored in it to produce the associated FileOpener

Don't we need a FileOpener per partition? So we need something that is produced by FileFormat::make_thing(FileScanConfig) that can then create a FileOpener per partition, right?

Partition is closer to Runtime Config (passed from execution plan's exec) so it should be ok to have only one FileFormat for all partitions. The partition number is used to get a set of file segments from FilsScanConfig and to mark metrics.

@comphead
Copy link
Contributor

comphead commented Sep 2, 2025

Thanks @waynexia for the diagram and explanation.
Definitely agree for the simplification, abstractions indeed are overly flexible, more than needed and getting this simplified would be awesome. For instance all the details related to specific datasource can be incapsulated in DataSource provider implementation.

For example if user would like to onboard the Orc file,

For the diagram it might be still confusing having memory datasource under file source configs.
File source are format dependent and thus having specific readers, configs per format, however memory have no dependency on the format, opener, etc. It should still depend on some memory scan config though.

Making some changes in the proposal

                   ┌────────────────────┐
                   │   TableProvider    │
                   │ (File / Memory)    │
                   └─────────┬──────────┘
                             │
          ┌──────────────────┴──────────────────┐
          │                                     │
 ┌────────▼─────────┐                   ┌───────▼────────┐
 │ FileTableProvider│                   │ MemoryProvider │
 └────────┬─────────┘                   └────────┬───────┘
          │                                      │
          │ uses                                 │ uses
          ▼                                      ▼
   ┌───────────────────┐              ┌───────────────────┐
   │   FileScanConfig  │              │  MemoryScanConfig │
   └──────────┬────────┘              └──────────┬────────┘
              │                                   │
              └─────────────┬─────────────────────┘
                            │
                    ┌───────▼─────────┐
                    │   ScanConfig    │ (trait)
                    └───────┬─────────┘
                            │
        ┌───────────────────┼───────────────────┐
        │                   │                   │
   ┌────▼────┐        ┌─────▼────┐        ┌─────▼─────┐
   │FileFormat│        │FileOpener│        │FileStream │
   │(Parquet, │        └──────────┘        └───────────┘
   │ Avro,    │
   │ JSON, ..)│
   └──────────┘

However as you correctly mentioned FileFormat, FileOpener, FileStream probably can be incapsulated into some facade object taking a config and providing RecordBatchStream hiding all the specifics inside

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation proto Related to proto crate substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unify how various FileSources are applying projections? Clean up APIs around FileScanConfigBuilder, FileScanConfig and FileSource
5 participants