Skip to content

Add VirtualObjectStore to support routing paths to multiple ObjectStores #17084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a71db72
feat: Add VirtualObjectStore for routing requests to underlying stores
kosiew Aug 7, 2025
6c7bc72
refactor: Simplify the signature of the resolve method in VirtualObje…
kosiew Aug 7, 2025
81e7234
Merge branch 'main' into virtual-object-store-16991
kosiew Aug 8, 2025
5076fbf
Reorganize imports and update license comments in `virtual_object_sto…
kosiew Aug 8, 2025
d695668
Improve error handling when retrieving object store in `FileScanConfig`
kosiew Aug 8, 2025
bb1daee
Enhance error messages in `VirtualObjectStore::resolve` for better de…
kosiew Aug 8, 2025
0da73b1
Optimize list method in VirtualObjectStore for single store key resol…
kosiew Aug 8, 2025
ee26110
Enhance object transfer in VirtualObjectStore with streaming support …
kosiew Aug 8, 2025
88c9583
Add unit tests for VirtualObjectStore methods including list and mult…
kosiew Aug 8, 2025
55dddbd
Update documentation for VirtualObjectStore with configuration and us…
kosiew Aug 8, 2025
874730b
Add Tokio as a dependency and update usage examples in VirtualObjectS…
kosiew Aug 8, 2025
2a028ed
Normalize prefix handling in VirtualObjectStore to skip empty segment…
kosiew Aug 8, 2025
f4522a0
Refactor path handling in VirtualObjectStore to use Path::child for j…
kosiew Aug 8, 2025
df7fa25
Clarify documentation and error handling in VirtualObjectStore to ind…
kosiew Aug 8, 2025
1778ec5
fix clippy errors
kosiew Aug 8, 2025
cc714fb
Fix taplo errors
kosiew Aug 8, 2025
cf70e2f
Merge branch 'main' into virtual-object-store-16991
kosiew Aug 12, 2025
1ecff8c
Merge branch 'main' into virtual-object-store-16991
kosiew Aug 12, 2025
e8926d4
refactor: Remove TODO comments for unsupported write operations in Vi…
kosiew Aug 12, 2025
5075467
chore(deps): move chrono to runtime deps and enable tokio dev features
kosiew Aug 12, 2025
b199997
refactor: Box errors in VirtualObjectStore::resolve to reduce error size
kosiew Aug 12, 2025
5b1a731
refactor: Change return type of VirtualObjectStore::resolve to use lo…
kosiew Aug 12, 2025
dccc9ed
Merge branch 'main' into virtual-object-store-16991
kosiew Aug 19, 2025
8edd353
Merge branch 'main' into virtual-object-store-16991
kosiew Aug 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use datafusion_physical_plan::{
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
DisplayAs, DisplayFormatType, ExecutionPlan,
};
use object_store::ObjectStore;

use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
Expand Down Expand Up @@ -186,6 +187,8 @@ pub struct FileScanConfig {
pub new_lines_in_values: bool,
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
pub file_source: Arc<dyn FileSource>,
/// Optional virtual object store for routing paths to underlying stores
pub virtual_store: Option<Arc<dyn ObjectStore>>,
/// Batch size while creating new batches
/// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
pub batch_size: Option<usize>,
Expand Down Expand Up @@ -258,6 +261,7 @@ pub struct FileScanConfigBuilder {
/// This probably would be better named `table_schema`
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
virtual_store: Option<Arc<dyn ObjectStore>>,

limit: Option<usize>,
projection: Option<Vec<usize>>,
Expand Down Expand Up @@ -288,6 +292,7 @@ impl FileScanConfigBuilder {
object_store_url,
file_schema,
file_source,
virtual_store: None,
file_groups: vec![],
statistics: None,
output_ordering: vec![],
Expand Down Expand Up @@ -318,6 +323,12 @@ impl FileScanConfigBuilder {
self
}

/// Set the virtual object store mapping
pub fn with_virtual_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
self.virtual_store = Some(store);
self
}

/// Set the columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
Expand Down Expand Up @@ -430,6 +441,7 @@ impl FileScanConfigBuilder {
object_store_url,
file_schema,
file_source,
virtual_store,
limit,
projection,
table_partition_cols,
Expand Down Expand Up @@ -458,6 +470,7 @@ impl FileScanConfigBuilder {
object_store_url,
file_schema,
file_source,
virtual_store,
limit,
projection,
table_partition_cols,
Expand All @@ -478,6 +491,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
object_store_url: config.object_store_url,
file_schema: config.file_schema,
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
virtual_store: config.virtual_store,
file_groups: config.file_groups,
statistics: config.file_source.statistics().ok(),
output_ordering: config.output_ordering,
Expand All @@ -499,7 +513,20 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let object_store: Arc<dyn ObjectStore> = if let Some(store) = &self.virtual_store
{
Arc::clone(store)
} else {
context
.runtime_env()
.object_store(&self.object_store_url)
.map_err(|e| {
e.context(format!(
"get object store for URL {}",
self.object_store_url
))
})?
};
let batch_size = self
.batch_size
.unwrap_or_else(|| context.session_config().batch_size());
Expand Down
4 changes: 3 additions & 1 deletion datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ parquet_encryption = [

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
Expand All @@ -57,5 +59,5 @@ tempfile = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
insta = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync"] }
1 change: 1 addition & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod parquet_encryption;
pub mod runtime_env;
mod stream;
mod task;
pub mod virtual_object_store;

pub mod registry {
pub use datafusion_expr::registry::{
Expand Down
Loading