Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 181 additions & 40 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,52 +243,15 @@ impl ListingTableUrl {
let exec_options = &ctx.config_options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;

async fn list_with_cache<'b>(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to its own function and simplified it

ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
prefix: &'b Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
.list(Some(prefix))
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
if let Some(res) = cache.get(prefix) {
debug!("Hit list all files cache");
Ok(futures::stream::iter(
res.as_ref().clone().into_iter().map(Ok),
)
.map(|res| {
res.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
})
.boxed())
} else {
let vec = store
.list(Some(prefix))
.map(|res| {
res.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
})
.try_collect::<Vec<ObjectMeta>>()
.await?;
cache.put(prefix, Arc::new(vec.clone()));
Ok(futures::stream::iter(vec.into_iter().map(Ok))
.map(|res| {
res.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
})
.boxed())
}
}
}
}

let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
list_with_cache(ctx, store, &self.prefix).await?
} else {
match store.head(&self.prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
.boxed(),
// If the head command fails, it is likely that object doesn't exist.
// Retry as though it were a prefix (aka a collection)
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
}
};
Expand Down Expand Up @@ -337,6 +300,33 @@ impl ListingTableUrl {
}
}

async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
prefix: &'b Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
.list(Some(prefix))
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
let vec = if let Some(res) = cache.get(prefix) {
debug!("Hit list all files cache");
res.as_ref().clone()
} else {
let vec = store
.list(Some(prefix))
.try_collect::<Vec<ObjectMeta>>()
.await?;
cache.put(prefix, Arc::new(vec.clone()));
vec
};
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
}
}
}

/// Creates a file URL from a potentially relative filesystem path
#[cfg(not(target_arch = "wasm32"))]
fn url_from_filesystem_path(s: &str) -> Option<Url> {
Expand Down Expand Up @@ -415,6 +405,18 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
#[cfg(test)]
mod tests {
use super::*;
use datafusion_common::config::TableOptions;
use datafusion_common::DFSchema;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use object_store::PutPayload;
use std::any::Any;
use std::collections::HashMap;
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -628,4 +630,143 @@ mod tests {
"file path ends with .ext - extension is ext",
);
}

#[tokio::test]
async fn test_list_files() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wrote several tests for the new code.

let store = object_store::memory::InMemory::new();
// Create some files:
create_file(&store, "a.parquet").await;
create_file(&store, "/t/b.parquet").await;
create_file(&store, "/t/c.csv").await;
create_file(&store, "/t/d.csv").await;

let url = ListingTableUrl::parse("/t").unwrap();
assert_eq!(
list_all_files("/", &store, "parquet").await,
vec!["a.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t/", &store, "parquet").await,
vec!["t/b.parquet"],
);
assert_eq!(
list_all_files("/t", &store, "parquet").await,
vec!["t/b.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t", &store, "csv").await,
vec!["t/c.csv", "t/d.csv"],
);
assert_eq!(
list_all_files("/t/", &store, "csv").await,
vec!["t/c.csv", "t/d.csv"],
);
}

/// Creates a file with "hello world" content at the specified path
async fn create_file(object_store: &dyn ObjectStore, path: &str) {
object_store
.put(&Path::from(path), PutPayload::from_static(b"hello world"))
.await
.expect("failed to create test file");
}

/// Runs "list_all_files" and returns their paths
async fn list_all_files(
url: &str,
store: &dyn ObjectStore,
file_extension: &str,
) -> Vec<String> {
let session = MockSession::new();
let url = ListingTableUrl::parse(url).unwrap();
url.list_all_files(&session, store, file_extension)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|meta| meta.location.as_ref().to_string())
.collect()
}

struct MockSession {
config: SessionConfig,
runtime_env: Arc<RuntimeEnv>,
}

impl MockSession {
fn new() -> Self {
Self {
config: SessionConfig::new(),
runtime_env: Arc::new(RuntimeEnv::default()),
}
}
}

#[async_trait::async_trait]
impl Session for MockSession {
fn session_id(&self) -> &str {
unimplemented!()
}

fn config(&self) -> &SessionConfig {
&self.config
}

async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
unimplemented!()
}

fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
unimplemented!()
}

fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
unimplemented!()
}

fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
unimplemented!()
}

fn runtime_env(&self) -> &Arc<RuntimeEnv> {
&self.runtime_env
}

fn execution_props(&self) -> &ExecutionProps {
unimplemented!()
}

fn as_any(&self) -> &dyn Any {
unimplemented!()
}

fn table_options(&self) -> &TableOptions {
unimplemented!()
}

fn table_options_mut(&mut self) -> &mut TableOptions {
unimplemented!()
}

fn task_ctx(&self) -> Arc<TaskContext> {
unimplemented!()
}
}
}
Loading