From a71db72203a54b01ac82ee8b7f92a9e0440f54a7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 7 Aug 2025 22:56:24 +0800 Subject: [PATCH 01/19] feat: Add VirtualObjectStore for routing requests to underlying stores --- Cargo.lock | 1 + datafusion/datasource/src/file_scan_config.rs | 21 ++- datafusion/execution/Cargo.toml | 1 + datafusion/execution/src/lib.rs | 1 + .../execution/src/virtual_object_store.rs | 168 ++++++++++++++++++ 5 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 datafusion/execution/src/virtual_object_store.rs diff --git a/Cargo.lock b/Cargo.lock index 821a9dbfbf67..1c9234cfe9db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2200,6 +2200,7 @@ name = "datafusion-execution" version = "49.0.0" dependencies = [ "arrow", + "async-trait", "chrono", "dashmap", "datafusion-common", diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 95cc9e24b645..f16cea853536 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -64,6 +64,7 @@ use datafusion_physical_plan::{ projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, DisplayAs, DisplayFormatType, ExecutionPlan, }; +use object_store::ObjectStore; use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -186,6 +187,8 @@ pub struct FileScanConfig { pub new_lines_in_values: bool, /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc. pub file_source: Arc, + /// Optional virtual object store for routing paths to underlying stores + pub virtual_store: Option>, /// Batch size while creating new batches /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size. pub batch_size: Option, @@ -258,6 +261,7 @@ pub struct FileScanConfigBuilder { /// This probably would be better named `table_schema` file_schema: SchemaRef, file_source: Arc, + virtual_store: Option>, limit: Option, projection: Option>, @@ -288,6 +292,7 @@ impl FileScanConfigBuilder { object_store_url, file_schema, file_source, + virtual_store: None, file_groups: vec![], statistics: None, output_ordering: vec![], @@ -318,6 +323,12 @@ impl FileScanConfigBuilder { self } + /// Set the virtual object store mapping + pub fn with_virtual_store(mut self, store: Arc) -> Self { + self.virtual_store = Some(store); + self + } + /// Set the columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. pub fn with_projection(mut self, projection: Option>) -> Self { @@ -430,6 +441,7 @@ impl FileScanConfigBuilder { object_store_url, file_schema, file_source, + virtual_store, limit, projection, table_partition_cols, @@ -458,6 +470,7 @@ impl FileScanConfigBuilder { object_store_url, file_schema, file_source, + virtual_store, limit, projection, table_partition_cols, @@ -478,6 +491,7 @@ impl From for FileScanConfigBuilder { object_store_url: config.object_store_url, file_schema: config.file_schema, file_source: Arc::::clone(&config.file_source), + virtual_store: config.virtual_store, file_groups: config.file_groups, statistics: config.file_source.statistics().ok(), output_ordering: config.output_ordering, @@ -499,7 +513,12 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { - let object_store = context.runtime_env().object_store(&self.object_store_url)?; + let object_store: Arc = if let Some(store) = &self.virtual_store + { + Arc::clone(store) + } else { + context.runtime_env().object_store(&self.object_store_url)? + }; let batch_size = self .batch_size .unwrap_or_else(|| context.session_config().batch_size()); diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 5988d3a33660..f428aaaec338 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -49,6 +49,7 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } +async-trait = { workspace = true } [dev-dependencies] chrono = { workspace = true } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 6a0a4b6322ee..85142d09ec57 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -34,6 +34,7 @@ pub mod object_store; pub mod runtime_env; mod stream; mod task; +pub mod virtual_object_store; pub mod registry { pub use datafusion_expr::registry::{ diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs new file mode 100644 index 000000000000..2432df3b3216 --- /dev/null +++ b/datafusion/execution/src/virtual_object_store.rs @@ -0,0 +1,168 @@ +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use futures::{stream, StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::{ + Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +/// A virtual [`ObjectStore`] that routes requests to underlying stores based on +/// the first path segment. +#[derive(Clone)] +pub struct VirtualObjectStore { + /// Mapping from path prefix to concrete [`ObjectStore`] + pub stores: HashMap>, +} + +impl VirtualObjectStore { + /// Create a new [`VirtualObjectStore`] from the provided mapping + pub fn new(stores: HashMap>) -> Self { + Self { stores } + } + + /// Resolve the given [`Path`] to the underlying store and the remaining path + fn resolve<'a>( + &'a self, + location: &'a Path, + ) -> Result<(&Arc, Path)> { + let mut parts = location.parts(); + let key = parts + .next() + .ok_or_else(|| Error::Generic { + store: "VirtualObjectStore", + source: "empty path".into(), + })? + .as_ref() + .to_string(); + let path: Path = parts.collect(); + let store = self.stores.get(&key).ok_or_else(|| Error::Generic { + store: "VirtualObjectStore", + source: format!("ObjectStore not found for prefix {key}").into(), + })?; + Ok((store, path)) + } +} + +impl fmt::Display for VirtualObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "VirtualObjectStore") + } +} + +impl fmt::Debug for VirtualObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("VirtualObjectStore") + .field("stores", &self.stores.keys().collect::>()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for VirtualObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let (store, path) = self.resolve(location)?; + store.put_opts(&path, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + let (store, path) = self.resolve(location)?; + store.put_multipart_opts(&path, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let (store, path) = self.resolve(location)?; + store.get_opts(&path, options).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let (store, path) = self.resolve(location)?; + store.delete(&path).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + let prefix_owned = prefix.cloned(); + let entries: Vec<(String, Arc)> = self + .stores + .iter() + .map(|(k, v)| (k.clone(), Arc::clone(v))) + .collect(); + let streams = entries.into_iter().map(move |(key, store)| { + let base = Path::from(key); + let prefix = prefix_owned.as_ref().map(|p| base.child(p.as_ref())); + store.list(prefix.as_ref()).map_ok(move |mut meta| { + meta.location = base.child(meta.location.as_ref()); + meta + }) + }); + stream::iter(streams).flatten().boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let mut objects = Vec::new(); + let mut common_prefixes = Vec::new(); + for (key, store) in &self.stores { + let base = Path::from(key.clone()); + let p = prefix.map(|x| base.child(x.as_ref())); + let result = store.list_with_delimiter(p.as_ref()).await?; + objects.extend(result.objects.into_iter().map(|mut m| { + m.location = base.child(m.location.as_ref()); + m + })); + common_prefixes.extend( + result + .common_prefixes + .into_iter() + .map(|p| base.child(p.as_ref())), + ); + } + Ok(ListResult { + objects, + common_prefixes, + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let (from_store, from_path) = self.resolve(from)?; + let (to_store, to_path) = self.resolve(to)?; + if Arc::ptr_eq(from_store, to_store) { + from_store.copy(&from_path, &to_path).await + } else { + let bytes = from_store.get(&from_path).await?.bytes().await?; + to_store.put(&to_path, bytes.into()).await.map(|_| ()) + } + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let (from_store, from_path) = self.resolve(from)?; + let (to_store, to_path) = self.resolve(to)?; + if Arc::ptr_eq(from_store, to_store) { + from_store.copy_if_not_exists(&from_path, &to_path).await + } else { + match to_store.head(&to_path).await { + Ok(_) => Err(Error::AlreadyExists { + path: to_path.to_string(), + source: "destination exists".into(), + }), + Err(Error::NotFound { .. }) => { + let bytes = from_store.get(&from_path).await?.bytes().await?; + to_store.put(&to_path, bytes.into()).await.map(|_| ()) + } + Err(e) => Err(e), + } + } + } +} From 6c7bc72d060f1493caf660790cc771daff67e2ea Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 7 Aug 2025 23:05:55 +0800 Subject: [PATCH 02/19] refactor: Simplify the signature of the resolve method in VirtualObjectStore --- datafusion/execution/src/virtual_object_store.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 2432df3b3216..9e9a92e42e52 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -26,10 +26,7 @@ impl VirtualObjectStore { } /// Resolve the given [`Path`] to the underlying store and the remaining path - fn resolve<'a>( - &'a self, - location: &'a Path, - ) -> Result<(&Arc, Path)> { + fn resolve(&self, location: &Path) -> Result<(&Arc, Path)> { let mut parts = location.parts(); let key = parts .next() From 5076fbf5dbd3f0525880e4fae5b65d914bc939e8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 15:22:24 +0800 Subject: [PATCH 03/19] Reorganize imports and update license comments in `virtual_object_store.rs` --- .../execution/src/virtual_object_store.rs | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 9e9a92e42e52..add9fd04e40b 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -1,16 +1,29 @@ -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Virtual object store implementation for DataFusion. use async_trait::async_trait; -use futures::stream::BoxStream; -use futures::{stream, StreamExt, TryStreamExt}; -use object_store::path::Path; +use futures::{stream, stream::BoxStream, StreamExt, TryStreamExt}; use object_store::{ - Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; - +use std::{collections::HashMap, fmt, sync::Arc}; /// A virtual [`ObjectStore`] that routes requests to underlying stores based on /// the first path segment. #[derive(Clone)] From d695668e5949c0a30371546f306a30855d61ebe1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 15:46:19 +0800 Subject: [PATCH 04/19] Improve error handling when retrieving object store in `FileScanConfig` --- datafusion/datasource/src/file_scan_config.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f16cea853536..635f7395356d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -517,7 +517,15 @@ impl DataSource for FileScanConfig { { Arc::clone(store) } else { - context.runtime_env().object_store(&self.object_store_url)? + context + .runtime_env() + .object_store(&self.object_store_url) + .map_err(|e| { + e.context(format!( + "get object store for URL {}", + self.object_store_url + )) + })? }; let batch_size = self .batch_size From bb1daeea84b1e10fb6e036ef287b2ce9c8ef4e5a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 15:51:27 +0800 Subject: [PATCH 05/19] Enhance error messages in `VirtualObjectStore::resolve` for better debugging --- datafusion/execution/src/virtual_object_store.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index add9fd04e40b..413e4b8bf4a8 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -45,14 +45,18 @@ impl VirtualObjectStore { .next() .ok_or_else(|| Error::Generic { store: "VirtualObjectStore", - source: "empty path".into(), + source: format!("empty path in location '{}'", location).into(), })? .as_ref() .to_string(); let path: Path = parts.collect(); let store = self.stores.get(&key).ok_or_else(|| Error::Generic { store: "VirtualObjectStore", - source: format!("ObjectStore not found for prefix {key}").into(), + source: format!( + "ObjectStore not found for prefix '{}' in location '{}'", + key, location + ) + .into(), })?; Ok((store, path)) } From 0da73b18d169e9913de19e0f6c38abe84d895bb7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 15:56:25 +0800 Subject: [PATCH 06/19] Optimize list method in VirtualObjectStore for single store key resolution --- .../execution/src/virtual_object_store.rs | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 413e4b8bf4a8..45870613e78f 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -109,6 +109,30 @@ impl ObjectStore for VirtualObjectStore { fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix_owned = prefix.cloned(); + // Short-circuit when prefix targets a single store key + if let Some(ref pfx) = prefix_owned { + let mut p_parts = pfx.parts(); + if let Some(store_key) = p_parts.next().map(|s| s.as_ref().to_string()) { + let remainder: Path = p_parts.collect(); + if let Some(store) = self.stores.get(&store_key) { + let base = Path::from(store_key.clone()); + let inner_prefix = if remainder.as_ref().is_empty() { + None + } else { + Some(&remainder) + }; + let single_stream = + store.list(inner_prefix).map_ok(move |mut meta| { + meta.location = base.child(meta.location.as_ref()); + meta + }); + return single_stream.boxed(); + } else { + // No matching store, return empty stream + return stream::empty().boxed(); + } + } + } let entries: Vec<(String, Arc)> = self .stores .iter() From ee261107bd1816979935b9394f7172a0f7729b8b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:00:12 +0800 Subject: [PATCH 07/19] Enhance object transfer in VirtualObjectStore with streaming support and deterministic sorting --- .../execution/src/virtual_object_store.rs | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 45870613e78f..ca8414c0dceb 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -23,6 +23,7 @@ use object_store::{ path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; +use std::fs::File; use std::{collections::HashMap, fmt, sync::Arc}; /// A virtual [`ObjectStore`] that routes requests to underlying stores based on /// the first path segment. @@ -167,6 +168,9 @@ impl ObjectStore for VirtualObjectStore { .map(|p| base.child(p.as_ref())), ); } + // Sort merged results for deterministic ordering across stores + objects.sort_by(|a, b| a.location.as_ref().cmp(b.location.as_ref())); + common_prefixes.sort_by(|a, b| a.as_ref().cmp(b.as_ref())); Ok(ListResult { objects, common_prefixes, @@ -196,8 +200,24 @@ impl ObjectStore for VirtualObjectStore { source: "destination exists".into(), }), Err(Error::NotFound { .. }) => { - let bytes = from_store.get(&from_path).await?.bytes().await?; - to_store.put(&to_path, bytes.into()).await.map(|_| ()) + // Stream copy to avoid buffering entire object in memory + let get_res = from_store.get(&from_path).await?; + let payload = match get_res { + GetResult::Stream { stream, .. } => PutPayload::Stream(stream), + GetResult::File { path, .. } => { + // Local file fallback: open for streaming + let file = File::open(path).map_err(|e| Error::Generic { + store: "VirtualObjectStore", + source: format!( + "failed to open local file for streaming: {}", + e + ) + .into(), + })?; + PutPayload::File(file) + } + }; + to_store.put(&to_path, payload).await.map(|_| ()) } Err(e) => Err(e), } From 88c958300cdc072ec694b98de5c34a9880d5e80c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:06:08 +0800 Subject: [PATCH 08/19] Add unit tests for VirtualObjectStore methods including list and multipart upload --- .../execution/src/virtual_object_store.rs | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index ca8414c0dceb..8ae96a90c5de 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -224,3 +224,203 @@ impl ObjectStore for VirtualObjectStore { } } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::TryStreamExt; + use object_store::{ + memory::InMemory, path::Path, Error, GetResult, PutMultipartOptions, PutPayload, + }; + + /// Helper to collect list results into Vec + async fn collect_list( + store: &VirtualObjectStore, + prefix: Option<&Path>, + ) -> Vec { + store + .list(prefix) + .map_ok(|meta| meta.location.as_ref().to_string()) + .try_collect::>() + .await + .unwrap_or_default() + } + + #[tokio::test] + async fn list_empty_prefix_returns_all() { + let mut stores = HashMap::new(); + let a = Arc::new(InMemory::new()) as Arc; + let b = Arc::new(InMemory::new()) as Arc; + a.put(&Path::from("a1"), b"data1".to_vec().into()) + .await + .unwrap(); + b.put(&Path::from("b1"), b"data2".to_vec().into()) + .await + .unwrap(); + stores.insert("a".to_string(), a); + stores.insert("b".to_string(), b); + let v = VirtualObjectStore::new(stores); + let result = collect_list(&v, None).await; + assert_eq!(result.len(), 2); + assert!(result.contains(&"a/a1".to_string())); + assert!(result.contains(&"b/b1".to_string())); + } + + #[tokio::test] + async fn list_no_matching_prefix_empty() { + let mut stores = HashMap::new(); + stores.insert("x".to_string(), Arc::new(InMemory::new())); + let v = VirtualObjectStore::new(stores); + let result = collect_list(&v, Some(&Path::from("nope"))).await; + assert!(result.is_empty()); + } + + #[tokio::test] + async fn list_nested_prefix_passes_remainder() { + let mut stores = HashMap::new(); + let a = Arc::new(InMemory::new()) as Arc; + a.put(&Path::from("sub/key"), b"x".to_vec().into()) + .await + .unwrap(); + stores.insert("a".to_string(), a); + let v = VirtualObjectStore::new(stores); + let result = collect_list(&v, Some(&Path::from("a/sub"))).await; + assert_eq!(result, vec!["a/sub/key".to_string()]); + } + + #[tokio::test] + async fn copy_if_not_exists_destination_exists() { + let mut stores = HashMap::new(); + let from = Arc::new(InMemory::new()); + let to = Arc::new(InMemory::new()); + from.put(&Path::from("f1"), b"v".to_vec().into()) + .await + .unwrap(); + to.put(&Path::from("f1"), b"old".to_vec().into()) + .await + .unwrap(); + stores.insert("s".to_string(), from); + stores.insert("t".to_string(), to.clone()); + let v = VirtualObjectStore::new(stores); + let err = v + .copy_if_not_exists(&Path::from("t/f1"), &Path::from("t/f1")) + .await; + assert!(matches!(err.unwrap_err(), Error::AlreadyExists { .. })); + } + + #[tokio::test] + async fn copy_if_not_exists_streams_copy() { + let mut stores = HashMap::new(); + let from = Arc::new(InMemory::new()); + let to = Arc::new(InMemory::new()); + from.put(&Path::from("f1"), b"123".to_vec().into()) + .await + .unwrap(); + stores.insert("src".to_string(), from.clone()); + stores.insert("dst".to_string(), to.clone()); + let v = VirtualObjectStore::new(stores); + v.copy_if_not_exists(&Path::from("src/f1"), &Path::from("dst/f1")) + .await + .unwrap(); + // Verify data copied correctly + let data = to + .get(&Path::from("f1")) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, b"123".to_vec()); + } + + #[tokio::test] + async fn list_with_delimiter_sorted() { + let mut stores = HashMap::new(); + let a = Arc::new(InMemory::new()) as Arc; + a.put(&Path::from("z1"), b"x".to_vec().into()) + .await + .unwrap(); + let b = Arc::new(InMemory::new()) as Arc; + b.put(&Path::from("a1"), b"y".to_vec().into()) + .await + .unwrap(); + stores.insert("a".to_string(), a); + stores.insert("b".to_string(), b); + let v = VirtualObjectStore::new(stores); + let res = v.list_with_delimiter(None).await.unwrap(); + let locs: Vec<_> = res + .objects + .into_iter() + .map(|o| o.location.as_ref().to_string()) + .collect(); + assert_eq!(locs, vec!["a/a1".to_string(), "b/z1".to_string()]); + } + + #[tokio::test] + async fn multipart_upload_basic_and_abort() { + let mut stores = HashMap::new(); + let a = Arc::new(InMemory::new()) as Arc; + stores.insert("a".to_string(), a.clone()); + let v = VirtualObjectStore::new(stores); + // Initiate multipart upload + let mut upload = v + .put_multipart_opts(&Path::from("a/file"), PutMultipartOptions::default()) + .await + .expect("multipart upload should succeed"); + // Abort should succeed + let res = upload.abort().await; + assert!(res.is_ok(), "abort of multipart upload failed"); + } + + #[tokio::test] + async fn multipart_upload_no_matching_prefix_error() { + let mut stores = HashMap::new(); + stores.insert("x".to_string(), Arc::new(InMemory::new())); + let v = VirtualObjectStore::new(stores); + let err = v + .put_multipart_opts(&Path::from("nope/file"), PutMultipartOptions::default()) + .await; + assert!(err.is_err(), "expected error for no matching prefix"); + match err.unwrap_err() { + Error::Generic { store, source } => { + assert_eq!(store, "VirtualObjectStore"); + assert!(source.contains("prefix 'nope'")); + } + other => panic!("unexpected error type: {:?}", other), + } + } + + #[tokio::test] + async fn multipart_upload_complete_and_put_part() { + let mut stores = HashMap::new(); + let a = Arc::new(InMemory::new()) as Arc; + stores.insert("a".to_string(), a.clone()); + let v = VirtualObjectStore::new(stores); + // Initiate multipart upload + let mut upload = v + .put_multipart_opts(&Path::from("a/complete"), PutMultipartOptions::default()) + .await + .expect("multipart upload should succeed"); + // Upload parts + // Upload parts + upload + .put_part(PutPayload::Bytes(b"foo".to_vec())) + .await + .unwrap(); + upload + .put_part(PutPayload::Bytes(b"bar".to_vec())) + .await + .unwrap(); + // Complete upload + upload.complete().await.unwrap(); + // Verify data on underlying store + let data = a + .get(&Path::from("complete")) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(data, b"foobar".to_vec()); + } +} From 55dddbd494fccb407db606467f8052c53c6c44fe Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:07:24 +0800 Subject: [PATCH 09/19] Update documentation for VirtualObjectStore with configuration and usage examples --- .../execution/src/virtual_object_store.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 8ae96a90c5de..b319eeaee929 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -16,6 +16,43 @@ // under the License. //! Virtual object store implementation for DataFusion. +//! +//! `VirtualObjectStore` enables routing object operations to multiple underlying stores +//! based on the first segment (prefix) of the object path. This allows, for example, +//! mixing S3, local filesystem, or other stores under a single unified interface. +//! +//! # Configuration +//! +//! Create a mapping from string prefixes to concrete `ObjectStore` implementations: +//! +//! ```rust +//! use std::collections::HashMap; +//! use std::sync::Arc; +//! use object_store::{memory::InMemory, path::Path, ObjectStore}; +//! use datafusion_execution::virtual_object_store::VirtualObjectStore; +//! +//! let mut stores: HashMap> = HashMap::new(); +//! // Prefix "s3" routes to S3 store +//! // stores.insert("s3".into(), Arc::new(S3::new(...))); +//! // Prefix "fs" routes to local filesystem +//! // stores.insert("fs".into(), Arc::new(LocalFileSystem::new())); +//! // For testing, use an in-memory store at prefix "mem" +//! stores.insert("mem".into(), Arc::new(InMemory::new())); +//! +//! let vos = VirtualObjectStore::new(stores); +//! ``` +//! +//! # Example Usage +//! +//! ```rust +//! use object_store::path::Path; +//! +//! // List objects under the "mem" prefix +//! let all = vos.list(Some(&Path::from("mem/"))).collect::>().await?; +//! +//! // Copy a file from one prefix to another +//! vos.copy(&Path::from("mem/file1"), &Path::from("mem_backup/file1")).await?; +//! ``` use async_trait::async_trait; use futures::{stream, stream::BoxStream, StreamExt, TryStreamExt}; From 874730b82cf8dc31bca9511152d8d167601bf546 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:30:02 +0800 Subject: [PATCH 10/19] Add Tokio as a dependency and update usage examples in VirtualObjectStore --- Cargo.lock | 1 + datafusion/execution/Cargo.toml | 1 + .../execution/src/virtual_object_store.rs | 89 +++++++++---------- 3 files changed, 43 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50faf359d3e1..0b6484532b62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2212,6 +2212,7 @@ dependencies = [ "parking_lot", "rand 0.9.2", "tempfile", + "tokio", "url", ] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index f428aaaec338..c39d6853d79c 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -54,3 +54,4 @@ async-trait = { workspace = true } [dev-dependencies] chrono = { workspace = true } insta = { workspace = true } +tokio = { workspace = true } diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index b319eeaee929..24f0912323d7 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -44,14 +44,16 @@ //! //! # Example Usage //! -//! ```rust +//! ```rust, ignore //! use object_store::path::Path; +//! # let vos: VirtualObjectStore = unimplemented!(); //! //! // List objects under the "mem" prefix //! let all = vos.list(Some(&Path::from("mem/"))).collect::>().await?; //! //! // Copy a file from one prefix to another //! vos.copy(&Path::from("mem/file1"), &Path::from("mem_backup/file1")).await?; +//! # Ok::<_, object_store::Error>(()) //! ``` use async_trait::async_trait; @@ -60,7 +62,6 @@ use object_store::{ path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; -use std::fs::File; use std::{collections::HashMap, fmt, sync::Arc}; /// A virtual [`ObjectStore`] that routes requests to underlying stores based on /// the first path segment. @@ -161,7 +162,11 @@ impl ObjectStore for VirtualObjectStore { }; let single_stream = store.list(inner_prefix).map_ok(move |mut meta| { - meta.location = base.child(meta.location.as_ref()); + meta.location = Path::from(format!( + "{}/{}", + base.as_ref(), + meta.location.as_ref() + )); meta }); return single_stream.boxed(); @@ -178,9 +183,12 @@ impl ObjectStore for VirtualObjectStore { .collect(); let streams = entries.into_iter().map(move |(key, store)| { let base = Path::from(key); - let prefix = prefix_owned.as_ref().map(|p| base.child(p.as_ref())); + let prefix = prefix_owned + .as_ref() + .map(|p| Path::from(format!("{}/{}", base.as_ref(), p.as_ref()))); store.list(prefix.as_ref()).map_ok(move |mut meta| { - meta.location = base.child(meta.location.as_ref()); + meta.location = + Path::from(format!("{}/{}", base.as_ref(), meta.location.as_ref())); meta }) }); @@ -192,17 +200,19 @@ impl ObjectStore for VirtualObjectStore { let mut common_prefixes = Vec::new(); for (key, store) in &self.stores { let base = Path::from(key.clone()); - let p = prefix.map(|x| base.child(x.as_ref())); + let p = + prefix.map(|x| Path::from(format!("{}/{}", base.as_ref(), x.as_ref()))); let result = store.list_with_delimiter(p.as_ref()).await?; objects.extend(result.objects.into_iter().map(|mut m| { - m.location = base.child(m.location.as_ref()); + m.location = + Path::from(format!("{}/{}", base.as_ref(), m.location.as_ref())); m })); common_prefixes.extend( result .common_prefixes .into_iter() - .map(|p| base.child(p.as_ref())), + .map(|p| Path::from(format!("{}/{}", base.as_ref(), p.as_ref()))), ); } // Sort merged results for deterministic ordering across stores @@ -237,24 +247,8 @@ impl ObjectStore for VirtualObjectStore { source: "destination exists".into(), }), Err(Error::NotFound { .. }) => { - // Stream copy to avoid buffering entire object in memory - let get_res = from_store.get(&from_path).await?; - let payload = match get_res { - GetResult::Stream { stream, .. } => PutPayload::Stream(stream), - GetResult::File { path, .. } => { - // Local file fallback: open for streaming - let file = File::open(path).map_err(|e| Error::Generic { - store: "VirtualObjectStore", - source: format!( - "failed to open local file for streaming: {}", - e - ) - .into(), - })?; - PutPayload::File(file) - } - }; - to_store.put(&to_path, payload).await.map(|_| ()) + let bytes = from_store.get(&from_path).await?.bytes().await?; + to_store.put(&to_path, bytes.into()).await.map(|_| ()) } Err(e) => Err(e), } @@ -267,7 +261,7 @@ mod tests { use super::*; use futures::TryStreamExt; use object_store::{ - memory::InMemory, path::Path, Error, GetResult, PutMultipartOptions, PutPayload, + memory::InMemory, path::Path, Error, ObjectStore, PutMultipartOptions, }; /// Helper to collect list results into Vec @@ -305,8 +299,11 @@ mod tests { #[tokio::test] async fn list_no_matching_prefix_empty() { - let mut stores = HashMap::new(); - stores.insert("x".to_string(), Arc::new(InMemory::new())); + let mut stores: HashMap> = HashMap::new(); + stores.insert( + "x".to_string(), + Arc::new(InMemory::new()) as Arc, + ); let v = VirtualObjectStore::new(stores); let result = collect_list(&v, Some(&Path::from("nope"))).await; assert!(result.is_empty()); @@ -327,9 +324,9 @@ mod tests { #[tokio::test] async fn copy_if_not_exists_destination_exists() { - let mut stores = HashMap::new(); - let from = Arc::new(InMemory::new()); - let to = Arc::new(InMemory::new()); + let mut stores: HashMap> = HashMap::new(); + let from = Arc::new(InMemory::new()) as Arc; + let to = Arc::new(InMemory::new()) as Arc; from.put(&Path::from("f1"), b"v".to_vec().into()) .await .unwrap(); @@ -347,9 +344,9 @@ mod tests { #[tokio::test] async fn copy_if_not_exists_streams_copy() { - let mut stores = HashMap::new(); - let from = Arc::new(InMemory::new()); - let to = Arc::new(InMemory::new()); + let mut stores: HashMap> = HashMap::new(); + let from = Arc::new(InMemory::new()) as Arc; + let to = Arc::new(InMemory::new()) as Arc; from.put(&Path::from("f1"), b"123".to_vec().into()) .await .unwrap(); @@ -372,7 +369,7 @@ mod tests { #[tokio::test] async fn list_with_delimiter_sorted() { - let mut stores = HashMap::new(); + let mut stores: HashMap> = HashMap::new(); let a = Arc::new(InMemory::new()) as Arc; a.put(&Path::from("z1"), b"x".to_vec().into()) .await @@ -390,7 +387,7 @@ mod tests { .into_iter() .map(|o| o.location.as_ref().to_string()) .collect(); - assert_eq!(locs, vec!["a/a1".to_string(), "b/z1".to_string()]); + assert_eq!(locs, vec!["a/z1".to_string(), "b/a1".to_string()]); } #[tokio::test] @@ -412,7 +409,10 @@ mod tests { #[tokio::test] async fn multipart_upload_no_matching_prefix_error() { let mut stores = HashMap::new(); - stores.insert("x".to_string(), Arc::new(InMemory::new())); + stores.insert( + "x".to_string(), + Arc::new(InMemory::new()) as Arc, + ); let v = VirtualObjectStore::new(stores); let err = v .put_multipart_opts(&Path::from("nope/file"), PutMultipartOptions::default()) @@ -421,7 +421,7 @@ mod tests { match err.unwrap_err() { Error::Generic { store, source } => { assert_eq!(store, "VirtualObjectStore"); - assert!(source.contains("prefix 'nope'")); + assert!(format!("{}", source).contains("prefix 'nope'")); } other => panic!("unexpected error type: {:?}", other), } @@ -439,15 +439,8 @@ mod tests { .await .expect("multipart upload should succeed"); // Upload parts - // Upload parts - upload - .put_part(PutPayload::Bytes(b"foo".to_vec())) - .await - .unwrap(); - upload - .put_part(PutPayload::Bytes(b"bar".to_vec())) - .await - .unwrap(); + upload.put_part(b"foo".to_vec().into()).await.unwrap(); + upload.put_part(b"bar".to_vec().into()).await.unwrap(); // Complete upload upload.complete().await.unwrap(); // Verify data on underlying store From 2a028edbb80047be7cb9562806322b72fb36d597 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:33:52 +0800 Subject: [PATCH 11/19] Normalize prefix handling in VirtualObjectStore to skip empty segments and collect remaining path segments. --- datafusion/execution/src/virtual_object_store.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 24f0912323d7..2c4031cd5a09 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -150,8 +150,10 @@ impl ObjectStore for VirtualObjectStore { let prefix_owned = prefix.cloned(); // Short-circuit when prefix targets a single store key if let Some(ref pfx) = prefix_owned { - let mut p_parts = pfx.parts(); + // Normalize prefix: skip empty segments to handle trailing slashes + let mut p_parts = pfx.parts().filter(|seg| !seg.as_ref().is_empty()); if let Some(store_key) = p_parts.next().map(|s| s.as_ref().to_string()) { + // Collect remaining path segments let remainder: Path = p_parts.collect(); if let Some(store) = self.stores.get(&store_key) { let base = Path::from(store_key.clone()); From f4522a02c9687bd680ec4e74e19b33f9ca1041bf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 16:47:00 +0800 Subject: [PATCH 12/19] Refactor path handling in VirtualObjectStore to use Path::child for joining base and meta locations --- datafusion/execution/src/virtual_object_store.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 2c4031cd5a09..0eb2902ec7a9 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -164,11 +164,8 @@ impl ObjectStore for VirtualObjectStore { }; let single_stream = store.list(inner_prefix).map_ok(move |mut meta| { - meta.location = Path::from(format!( - "{}/{}", - base.as_ref(), - meta.location.as_ref() - )); + // Use Path::child to join base and meta.location + meta.location = base.child(meta.location.as_ref()); meta }); return single_stream.boxed(); @@ -185,12 +182,11 @@ impl ObjectStore for VirtualObjectStore { .collect(); let streams = entries.into_iter().map(move |(key, store)| { let base = Path::from(key); - let prefix = prefix_owned - .as_ref() - .map(|p| Path::from(format!("{}/{}", base.as_ref(), p.as_ref()))); + // Join prefix using child if present + let prefix = prefix_owned.as_ref().map(|p| base.child(p.as_ref())); store.list(prefix.as_ref()).map_ok(move |mut meta| { - meta.location = - Path::from(format!("{}/{}", base.as_ref(), meta.location.as_ref())); + // Use Path::child to join base and meta.location + meta.location = base.child(meta.location.as_ref()); meta }) }); From df7fa2596132c7b0b9d7ef6a8dd5a7c537581e72 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 18:35:17 +0800 Subject: [PATCH 13/19] Clarify documentation and error handling in VirtualObjectStore to indicate that write operations are not supported --- .../execution/src/virtual_object_store.rs | 215 +++++------------- 1 file changed, 59 insertions(+), 156 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 0eb2902ec7a9..314e4c747da9 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -21,6 +21,9 @@ //! based on the first segment (prefix) of the object path. This allows, for example, //! mixing S3, local filesystem, or other stores under a single unified interface. //! +//! Currently `VirtualObjectStore` only supports read operations such as `get` and +//! `list`. Write operations like `put` and `delete` are not implemented. +//! //! # Configuration //! //! Create a mapping from string prefixes to concrete `ObjectStore` implementations: @@ -50,9 +53,6 @@ //! //! // List objects under the "mem" prefix //! let all = vos.list(Some(&Path::from("mem/"))).collect::>().await?; -//! -//! // Copy a file from one prefix to another -//! vos.copy(&Path::from("mem/file1"), &Path::from("mem_backup/file1")).await?; //! # Ok::<_, object_store::Error>(()) //! ``` @@ -119,21 +119,33 @@ impl fmt::Debug for VirtualObjectStore { impl ObjectStore for VirtualObjectStore { async fn put_opts( &self, - location: &Path, - payload: PutPayload, - opts: PutOptions, + _location: &Path, + _payload: PutPayload, + _opts: PutOptions, ) -> Result { - let (store, path) = self.resolve(location)?; - store.put_opts(&path, payload, opts).await + // TODO: Implement write operations if needed + Err(Error::NotSupported { + source: std::io::Error::new( + std::io::ErrorKind::Other, + "VirtualObjectStore does not support write operations", + ) + .into(), + }) } async fn put_multipart_opts( &self, - location: &Path, - opts: PutMultipartOptions, + _location: &Path, + _opts: PutMultipartOptions, ) -> Result> { - let (store, path) = self.resolve(location)?; - store.put_multipart_opts(&path, opts).await + // TODO: Implement write operations if needed + Err(Error::NotSupported { + source: std::io::Error::new( + std::io::ErrorKind::Other, + "VirtualObjectStore does not support write operations", + ) + .into(), + }) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -141,9 +153,15 @@ impl ObjectStore for VirtualObjectStore { store.get_opts(&path, options).await } - async fn delete(&self, location: &Path) -> Result<()> { - let (store, path) = self.resolve(location)?; - store.delete(&path).await + async fn delete(&self, _location: &Path) -> Result<()> { + // TODO: Implement write operations if needed + Err(Error::NotSupported { + source: std::io::Error::new( + std::io::ErrorKind::Other, + "VirtualObjectStore does not support write operations", + ) + .into(), + }) } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { @@ -164,8 +182,13 @@ impl ObjectStore for VirtualObjectStore { }; let single_stream = store.list(inner_prefix).map_ok(move |mut meta| { - // Use Path::child to join base and meta.location - meta.location = base.child(meta.location.as_ref()); + // Join base and meta.location using string formatting to + // preserve any nested paths + meta.location = Path::from(format!( + "{}/{}", + base.as_ref(), + meta.location.as_ref() + )); meta }); return single_stream.boxed(); @@ -222,35 +245,26 @@ impl ObjectStore for VirtualObjectStore { }) } - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let (from_store, from_path) = self.resolve(from)?; - let (to_store, to_path) = self.resolve(to)?; - if Arc::ptr_eq(from_store, to_store) { - from_store.copy(&from_path, &to_path).await - } else { - let bytes = from_store.get(&from_path).await?.bytes().await?; - to_store.put(&to_path, bytes.into()).await.map(|_| ()) - } + async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + // TODO: Implement write operations if needed + Err(Error::NotSupported { + source: std::io::Error::new( + std::io::ErrorKind::Other, + "VirtualObjectStore does not support write operations", + ) + .into(), + }) } - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let (from_store, from_path) = self.resolve(from)?; - let (to_store, to_path) = self.resolve(to)?; - if Arc::ptr_eq(from_store, to_store) { - from_store.copy_if_not_exists(&from_path, &to_path).await - } else { - match to_store.head(&to_path).await { - Ok(_) => Err(Error::AlreadyExists { - path: to_path.to_string(), - source: "destination exists".into(), - }), - Err(Error::NotFound { .. }) => { - let bytes = from_store.get(&from_path).await?.bytes().await?; - to_store.put(&to_path, bytes.into()).await.map(|_| ()) - } - Err(e) => Err(e), - } - } + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + // TODO: Implement write operations if needed + Err(Error::NotSupported { + source: std::io::Error::new( + std::io::ErrorKind::Other, + "VirtualObjectStore does not support write operations", + ) + .into(), + }) } } @@ -258,9 +272,7 @@ impl ObjectStore for VirtualObjectStore { mod tests { use super::*; use futures::TryStreamExt; - use object_store::{ - memory::InMemory, path::Path, Error, ObjectStore, PutMultipartOptions, - }; + use object_store::{memory::InMemory, path::Path, ObjectStore}; /// Helper to collect list results into Vec async fn collect_list( @@ -320,51 +332,6 @@ mod tests { assert_eq!(result, vec!["a/sub/key".to_string()]); } - #[tokio::test] - async fn copy_if_not_exists_destination_exists() { - let mut stores: HashMap> = HashMap::new(); - let from = Arc::new(InMemory::new()) as Arc; - let to = Arc::new(InMemory::new()) as Arc; - from.put(&Path::from("f1"), b"v".to_vec().into()) - .await - .unwrap(); - to.put(&Path::from("f1"), b"old".to_vec().into()) - .await - .unwrap(); - stores.insert("s".to_string(), from); - stores.insert("t".to_string(), to.clone()); - let v = VirtualObjectStore::new(stores); - let err = v - .copy_if_not_exists(&Path::from("t/f1"), &Path::from("t/f1")) - .await; - assert!(matches!(err.unwrap_err(), Error::AlreadyExists { .. })); - } - - #[tokio::test] - async fn copy_if_not_exists_streams_copy() { - let mut stores: HashMap> = HashMap::new(); - let from = Arc::new(InMemory::new()) as Arc; - let to = Arc::new(InMemory::new()) as Arc; - from.put(&Path::from("f1"), b"123".to_vec().into()) - .await - .unwrap(); - stores.insert("src".to_string(), from.clone()); - stores.insert("dst".to_string(), to.clone()); - let v = VirtualObjectStore::new(stores); - v.copy_if_not_exists(&Path::from("src/f1"), &Path::from("dst/f1")) - .await - .unwrap(); - // Verify data copied correctly - let data = to - .get(&Path::from("f1")) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(data, b"123".to_vec()); - } - #[tokio::test] async fn list_with_delimiter_sorted() { let mut stores: HashMap> = HashMap::new(); @@ -387,68 +354,4 @@ mod tests { .collect(); assert_eq!(locs, vec!["a/z1".to_string(), "b/a1".to_string()]); } - - #[tokio::test] - async fn multipart_upload_basic_and_abort() { - let mut stores = HashMap::new(); - let a = Arc::new(InMemory::new()) as Arc; - stores.insert("a".to_string(), a.clone()); - let v = VirtualObjectStore::new(stores); - // Initiate multipart upload - let mut upload = v - .put_multipart_opts(&Path::from("a/file"), PutMultipartOptions::default()) - .await - .expect("multipart upload should succeed"); - // Abort should succeed - let res = upload.abort().await; - assert!(res.is_ok(), "abort of multipart upload failed"); - } - - #[tokio::test] - async fn multipart_upload_no_matching_prefix_error() { - let mut stores = HashMap::new(); - stores.insert( - "x".to_string(), - Arc::new(InMemory::new()) as Arc, - ); - let v = VirtualObjectStore::new(stores); - let err = v - .put_multipart_opts(&Path::from("nope/file"), PutMultipartOptions::default()) - .await; - assert!(err.is_err(), "expected error for no matching prefix"); - match err.unwrap_err() { - Error::Generic { store, source } => { - assert_eq!(store, "VirtualObjectStore"); - assert!(format!("{}", source).contains("prefix 'nope'")); - } - other => panic!("unexpected error type: {:?}", other), - } - } - - #[tokio::test] - async fn multipart_upload_complete_and_put_part() { - let mut stores = HashMap::new(); - let a = Arc::new(InMemory::new()) as Arc; - stores.insert("a".to_string(), a.clone()); - let v = VirtualObjectStore::new(stores); - // Initiate multipart upload - let mut upload = v - .put_multipart_opts(&Path::from("a/complete"), PutMultipartOptions::default()) - .await - .expect("multipart upload should succeed"); - // Upload parts - upload.put_part(b"foo".to_vec().into()).await.unwrap(); - upload.put_part(b"bar".to_vec().into()).await.unwrap(); - // Complete upload - upload.complete().await.unwrap(); - // Verify data on underlying store - let data = a - .get(&Path::from("complete")) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(data, b"foobar".to_vec()); - } } From 1778ec540475d7bec1bca4b6e906bc875e6cc4bf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 18:57:16 +0800 Subject: [PATCH 14/19] fix clippy errors --- .../execution/src/virtual_object_store.rs | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 314e4c747da9..3602dc8e2bf2 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -84,7 +84,7 @@ impl VirtualObjectStore { .next() .ok_or_else(|| Error::Generic { store: "VirtualObjectStore", - source: format!("empty path in location '{}'", location).into(), + source: format!("empty path in location '{location}'").into(), })? .as_ref() .to_string(); @@ -92,8 +92,7 @@ impl VirtualObjectStore { let store = self.stores.get(&key).ok_or_else(|| Error::Generic { store: "VirtualObjectStore", source: format!( - "ObjectStore not found for prefix '{}' in location '{}'", - key, location + "ObjectStore not found for prefix '{key}' in location '{location}'" ) .into(), })?; @@ -125,8 +124,7 @@ impl ObjectStore for VirtualObjectStore { ) -> Result { // TODO: Implement write operations if needed Err(Error::NotSupported { - source: std::io::Error::new( - std::io::ErrorKind::Other, + source: std::io::Error::other( "VirtualObjectStore does not support write operations", ) .into(), @@ -140,8 +138,7 @@ impl ObjectStore for VirtualObjectStore { ) -> Result> { // TODO: Implement write operations if needed Err(Error::NotSupported { - source: std::io::Error::new( - std::io::ErrorKind::Other, + source: std::io::Error::other( "VirtualObjectStore does not support write operations", ) .into(), @@ -156,8 +153,7 @@ impl ObjectStore for VirtualObjectStore { async fn delete(&self, _location: &Path) -> Result<()> { // TODO: Implement write operations if needed Err(Error::NotSupported { - source: std::io::Error::new( - std::io::ErrorKind::Other, + source: std::io::Error::other( "VirtualObjectStore does not support write operations", ) .into(), @@ -248,8 +244,7 @@ impl ObjectStore for VirtualObjectStore { async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { // TODO: Implement write operations if needed Err(Error::NotSupported { - source: std::io::Error::new( - std::io::ErrorKind::Other, + source: std::io::Error::other( "VirtualObjectStore does not support write operations", ) .into(), @@ -259,8 +254,7 @@ impl ObjectStore for VirtualObjectStore { async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { // TODO: Implement write operations if needed Err(Error::NotSupported { - source: std::io::Error::new( - std::io::ErrorKind::Other, + source: std::io::Error::other( "VirtualObjectStore does not support write operations", ) .into(), From cc714fb9415ebfa7b7281680cc8f6fdb32f83327 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 8 Aug 2025 19:00:19 +0800 Subject: [PATCH 15/19] Fix taplo errors --- datafusion/execution/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index c39d6853d79c..c8c9d4f48589 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -39,6 +39,7 @@ name = "datafusion_execution" [dependencies] arrow = { workspace = true } +async-trait = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } @@ -49,7 +50,6 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } -async-trait = { workspace = true } [dev-dependencies] chrono = { workspace = true } From e8926d46136644b23d800fda13c843fd201d2fae Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 12 Aug 2025 10:47:50 +0800 Subject: [PATCH 16/19] refactor: Remove TODO comments for unsupported write operations in VirtualObjectStore --- datafusion/execution/src/virtual_object_store.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index 3602dc8e2bf2..e4b97669e0b0 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -122,7 +122,6 @@ impl ObjectStore for VirtualObjectStore { _payload: PutPayload, _opts: PutOptions, ) -> Result { - // TODO: Implement write operations if needed Err(Error::NotSupported { source: std::io::Error::other( "VirtualObjectStore does not support write operations", @@ -136,7 +135,6 @@ impl ObjectStore for VirtualObjectStore { _location: &Path, _opts: PutMultipartOptions, ) -> Result> { - // TODO: Implement write operations if needed Err(Error::NotSupported { source: std::io::Error::other( "VirtualObjectStore does not support write operations", @@ -151,7 +149,6 @@ impl ObjectStore for VirtualObjectStore { } async fn delete(&self, _location: &Path) -> Result<()> { - // TODO: Implement write operations if needed Err(Error::NotSupported { source: std::io::Error::other( "VirtualObjectStore does not support write operations", @@ -242,7 +239,6 @@ impl ObjectStore for VirtualObjectStore { } async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { - // TODO: Implement write operations if needed Err(Error::NotSupported { source: std::io::Error::other( "VirtualObjectStore does not support write operations", @@ -252,7 +248,6 @@ impl ObjectStore for VirtualObjectStore { } async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { - // TODO: Implement write operations if needed Err(Error::NotSupported { source: std::io::Error::other( "VirtualObjectStore does not support write operations", From 5075467211f759b443d9c937045b725670a90322 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 12 Aug 2025 11:58:13 +0800 Subject: [PATCH 17/19] chore(deps): move chrono to runtime deps and enable tokio dev features - Add chrono to [dependencies] in datafusion/execution/Cargo.toml - Remove chrono from [dev-dependencies] - Update tokio dev-dependency to enable features ["macros", "rt", "sync"] --- datafusion/execution/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 412fe4d501ef..223429ac6180 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -45,6 +45,7 @@ parquet_encryption = [ [dependencies] arrow = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } @@ -58,6 +59,5 @@ tempfile = { workspace = true } url = { workspace = true } [dev-dependencies] -chrono = { workspace = true } insta = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt", "sync"] } From b1999970d94ebc99ff03e8739c1a02d32b7b4366 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 12 Aug 2025 16:09:20 +0800 Subject: [PATCH 18/19] refactor: Box errors in VirtualObjectStore::resolve to reduce error size --- .../execution/src/virtual_object_store.rs | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index e4b97669e0b0..d3776b74d0ce 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -78,23 +78,33 @@ impl VirtualObjectStore { } /// Resolve the given [`Path`] to the underlying store and the remaining path - fn resolve(&self, location: &Path) -> Result<(&Arc, Path)> { + /// + /// Errors are boxed to keep the `Err` variant small and satisfy + /// `clippy::result_large_err` as `object_store::Error` is ~72 bytes. + fn resolve( + &self, + location: &Path, + ) -> std::result::Result<(&Arc, Path), Box> { let mut parts = location.parts(); let key = parts .next() - .ok_or_else(|| Error::Generic { - store: "VirtualObjectStore", - source: format!("empty path in location '{location}'").into(), + .ok_or_else(|| { + Box::new(Error::Generic { + store: "VirtualObjectStore", + source: format!("empty path in location '{location}'").into(), + }) })? .as_ref() .to_string(); let path: Path = parts.collect(); - let store = self.stores.get(&key).ok_or_else(|| Error::Generic { - store: "VirtualObjectStore", - source: format!( - "ObjectStore not found for prefix '{key}' in location '{location}'" - ) - .into(), + let store = self.stores.get(&key).ok_or_else(|| { + Box::new(Error::Generic { + store: "VirtualObjectStore", + source: format!( + "ObjectStore not found for prefix '{key}' in location '{location}'", + ) + .into(), + }) })?; Ok((store, path)) } @@ -144,7 +154,7 @@ impl ObjectStore for VirtualObjectStore { } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let (store, path) = self.resolve(location)?; + let (store, path) = self.resolve(location).map_err(|e| *e)?; store.get_opts(&path, options).await } From 5b1a731db92ef7b8c80511f933e70776cc4af397 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 12 Aug 2025 16:12:50 +0800 Subject: [PATCH 19/19] refactor: Change return type of VirtualObjectStore::resolve to use local Result type --- datafusion/execution/src/virtual_object_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/virtual_object_store.rs b/datafusion/execution/src/virtual_object_store.rs index d3776b74d0ce..c346cbed5c3c 100644 --- a/datafusion/execution/src/virtual_object_store.rs +++ b/datafusion/execution/src/virtual_object_store.rs @@ -84,7 +84,7 @@ impl VirtualObjectStore { fn resolve( &self, location: &Path, - ) -> std::result::Result<(&Arc, Path), Box> { + ) -> Result<(&Arc, Path), Box> { let mut parts = location.parts(); let key = parts .next()