Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,40 @@ fn test_backtrace_output(#[case] query: &str) {
stderr
);
}

#[tokio::test]
async fn test_s3_url_fallback() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}

let container = setup_minio_container().await;

let mut settings = make_settings();
settings.set_snapshot_suffix("s3_url_fallback");
let _bound = settings.bind_to_scope();

let port = container.get_host_port_ipv4(9000).await.unwrap();

// Create a table using a prefix path (without trailing slash)
// This should trigger the fallback logic where head() fails on the prefix
// and list() is used to discover the actual files
let input = r#"CREATE EXTERNAL TABLE partitioned_data
STORED AS CSV
LOCATION 's3://data/partitioned_csv'
OPTIONS (
'format.has_header' 'false'
);

SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
"#;

assert_cmd_snapshot!(cli()
.env_clear()
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
.env("AWS_ALLOW_HTTP", "true")
.pass_stdin(input));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args: []
env:
AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
AWS_ALLOW_HTTP: "true"
AWS_ENDPOINT: "http://localhost:32771"
AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
stdin: "CREATE EXTERNAL TABLE partitioned_data\nSTORED AS CSV\nLOCATION 's3://data/partitioned_csv'\nOPTIONS (\n 'format.has_header' 'false'\n);\n\nSELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;\n"
---
success: true
exit_code: 0
----- stdout -----
[CLI_VERSION]
0 row(s) fetched.
[ELAPSED]

+----------+----------+----------+
| column_1 | column_2 | column_3 |
+----------+----------+----------+
| 0 | 0 | true |
| 0 | 1 | false |
| 0 | 2 | true |
| 0 | 3 | false |
| 0 | 4 | true |
+----------+----------+----------+
5 row(s) fetched.
[ELAPSED]

\q

----- stderr -----
232 changes: 213 additions & 19 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,33 +242,27 @@ impl ListingTableUrl {
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.config_options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
// If the prefix is a file, use a head request, otherwise list
let list = match self.is_collection() {
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => store.list(Some(&self.prefix)),
Some(cache) => {
if let Some(res) = cache.get(&self.prefix) {
debug!("Hit list all files cache");
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
.boxed()
} else {
let list_res = store.list(Some(&self.prefix));
let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
cache.put(&self.prefix, Arc::new(vec.clone()));
futures::stream::iter(vec.into_iter().map(Ok)).boxed()
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),

let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
list_with_cache(ctx, store, &self.prefix).await?
} else {
match store.head(&self.prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
.boxed(),
// If the head command fails, it is likely that object doesn't exist.
// Retry as though it were a prefix (aka a collection)
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
}
};

Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
.boxed())
}

Expand Down Expand Up @@ -306,6 +300,33 @@ impl ListingTableUrl {
}
}

async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
prefix: &'b Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
.list(Some(prefix))
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
let vec = if let Some(res) = cache.get(prefix) {
debug!("Hit list all files cache");
res.as_ref().clone()
} else {
let vec = store
.list(Some(prefix))
.try_collect::<Vec<ObjectMeta>>()
.await?;
cache.put(prefix, Arc::new(vec.clone()));
vec
};
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
}
}
}

/// Creates a file URL from a potentially relative filesystem path
#[cfg(not(target_arch = "wasm32"))]
fn url_from_filesystem_path(s: &str) -> Option<Url> {
Expand Down Expand Up @@ -384,6 +405,18 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
#[cfg(test)]
mod tests {
use super::*;
use datafusion_common::config::TableOptions;
use datafusion_common::DFSchema;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use object_store::PutPayload;
use std::any::Any;
use std::collections::HashMap;
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -597,4 +630,165 @@ mod tests {
"file path ends with .ext - extension is ext",
);
}

#[tokio::test]
async fn test_list_files() {
let store = object_store::memory::InMemory::new();
// Create some files:
create_file(&store, "a.parquet").await;
create_file(&store, "/t/b.parquet").await;
create_file(&store, "/t/c.csv").await;
create_file(&store, "/t/d.csv").await;

assert_eq!(
list_all_files("/", &store, "parquet").await,
vec!["a.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t/", &store, "parquet").await,
vec!["t/b.parquet"],
);
assert_eq!(
list_all_files("/t", &store, "parquet").await,
vec!["t/b.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t", &store, "csv").await,
vec!["t/c.csv", "t/d.csv"],
);
assert_eq!(
list_all_files("/t/", &store, "csv").await,
vec!["t/c.csv", "t/d.csv"],
);

// Test a non existing prefix
assert_eq!(
list_all_files("/NonExisting", &store, "csv").await,
vec![] as Vec<String>
);
assert_eq!(
list_all_files("/NonExisting/", &store, "csv").await,
vec![] as Vec<String>
);
}

/// Creates a file with "hello world" content at the specified path
async fn create_file(object_store: &dyn ObjectStore, path: &str) {
object_store
.put(&Path::from(path), PutPayload::from_static(b"hello world"))
.await
.expect("failed to create test file");
}

/// Runs "list_all_files" and returns their paths
///
/// Panic's on error
async fn list_all_files(
url: &str,
store: &dyn ObjectStore,
file_extension: &str,
) -> Vec<String> {
try_list_all_files(url, store, file_extension)
.await
.unwrap()
}

/// Runs "list_all_files" and returns their paths
async fn try_list_all_files(
url: &str,
store: &dyn ObjectStore,
file_extension: &str,
) -> Result<Vec<String>> {
let session = MockSession::new();
let url = ListingTableUrl::parse(url)?;
let files = url
.list_all_files(&session, store, file_extension)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|meta| meta.location.as_ref().to_string())
.collect();
Ok(files)
}

struct MockSession {
config: SessionConfig,
runtime_env: Arc<RuntimeEnv>,
}

impl MockSession {
fn new() -> Self {
Self {
config: SessionConfig::new(),
runtime_env: Arc::new(RuntimeEnv::default()),
}
}
}

#[async_trait::async_trait]
impl Session for MockSession {
fn session_id(&self) -> &str {
unimplemented!()
}

fn config(&self) -> &SessionConfig {
&self.config
}

async fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn create_physical_expr(
&self,
_expr: Expr,
_df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
unimplemented!()
}

fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
unimplemented!()
}

fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
unimplemented!()
}

fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
unimplemented!()
}

fn runtime_env(&self) -> &Arc<RuntimeEnv> {
&self.runtime_env
}

fn execution_props(&self) -> &ExecutionProps {
unimplemented!()
}

fn as_any(&self) -> &dyn Any {
unimplemented!()
}

fn table_options(&self) -> &TableOptions {
unimplemented!()
}

fn table_options_mut(&mut self) -> &mut TableOptions {
unimplemented!()
}

fn task_ctx(&self) -> Arc<TaskContext> {
unimplemented!()
}
}
}