From 8d03a44ae57a246b6e65122b9d9b1ba618385369 Mon Sep 17 00:00:00 2001 From: hknlof Date: Fri, 1 Aug 2025 09:31:30 +0200 Subject: [PATCH 1/3] fix(parquet): write single file if option is set https://github.com/apache/datafusion/issues/13323 --- .../common/src/file_options/file_type.rs | 2 + datafusion/core/src/dataframe/parquet.rs | 13 ++- .../core/src/execution/context/parquet.rs | 80 +++++++++++++++++++ datafusion/datasource/src/write/demux.rs | 15 +++- 4 files changed, 107 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 2648f7289798..3abcb1d72455 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -30,6 +30,8 @@ pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; pub const DEFAULT_JSON_EXTENSION: &str = ".json"; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; +// An interal file extension for extensionless single files +pub const SINGLE_FILE_EXTENSION: &str = ".single"; /// Define each `FileType`/`FileCompressionType`'s extension pub trait GetExt { diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 83bb60184fb9..59d134415755 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -26,7 +26,8 @@ use super::{ }; use datafusion_common::config::TableParquetOptions; -use datafusion_common::not_impl_err; +use datafusion_common::file_options::file_type::SINGLE_FILE_EXTENSION; +use datafusion_common::{not_impl_err, DEFAULT_PARQUET_EXTENSION}; use datafusion_expr::dml::InsertOp; impl DataFrame { @@ -82,6 +83,16 @@ impl DataFrame { .build()? }; + let path = if file_type.get_ext() != DEFAULT_PARQUET_EXTENSION + && options.single_file_output + { + let mut path = path.to_owned(); + path.push_str(SINGLE_FILE_EXTENSION); + path + } else { + path.to_owned() + }; + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 731f7e59ecfa..07e7dd1a4458 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -113,6 +113,7 @@ mod tests { }; use datafusion_execution::config::SessionConfig; + use datafusion_expr::Partitioning; use tempfile::{tempdir, TempDir}; #[tokio::test] @@ -216,6 +217,85 @@ mod tests { Ok(()) } + #[tokio::test] + async fn write_multiple_file_parquet_no_extensions() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("purchase_id", DataType::Int32, false), + Field::new("price", DataType::Float32, false), + Field::new("quantity", DataType::Int32, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])), + Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])), + ], + )?)?; + + // Repartition to have the desired. + let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?; + let tmp_dir = tempdir()?; + let path = tmp_dir + .path() + .join("no_ext_parquet") + .to_str() + .unwrap() + .to_string(); + + let options = DataFrameWriteOptions::new(); + + partitioned_df.write_parquet(&path, options, None).await?; + + let test_path = std::path::Path::new(&path); + assert_eq!( + test_path.is_dir(), + true, + "No extension and default DataFrameWriteOptons should have yielded a dir." + ); + + Ok(()) + } + + #[tokio::test] + async fn write_single_file_parquet_no_extensions() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("purchase_id", DataType::Int32, false), + Field::new("price", DataType::Float32, false), + Field::new("quantity", DataType::Int32, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])), + Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])), + ], + )?)?; + // Repartition to have + let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?; + let tmp_dir = tempdir()?; + let path = tmp_dir + .path() + .join("no_ext_parquet") + .to_str() + .unwrap() + .to_string(); + + let options = DataFrameWriteOptions::new().with_single_file_output(true); + + partitioned_df.write_parquet(&path, options, None).await?; + + let test_path = std::path::Path::new(&path); + assert_eq!( + test_path.is_file(), + true, + "No extension and DataFrameWriteOptons::with_single_file_output(true) should have yielded a single file." + ); + + Ok(()) + } + #[tokio::test] async fn read_from_different_file_extension() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 75fb557b63d2..440fbb7bc37b 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -101,8 +101,19 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + let single_file_output = (!base_output_path.is_collection() + && base_output_path.file_extension().is_some()) + || base_output_path.file_extension() == Some("single"); + + let file_extension = if single_file_output { + file_extension.replace(".single", "") + } else { + file_extension + }; + + let base_output_path = + ListingTableUrl::parse(base_output_path.as_str().replace(".single", "")) + .unwrap_or(base_output_path); SpawnedTask::spawn(async move { row_count_demuxer( tx, From 64d7a22740a01566333167bae915c9734f427f55 Mon Sep 17 00:00:00 2001 From: hknlof Date: Mon, 4 Aug 2025 16:30:12 +0200 Subject: [PATCH 2/3] chore: clippy improvements --- datafusion/core/src/dataframe/parquet.rs | 2 +- datafusion/core/src/execution/context/parquet.rs | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 59d134415755..0092afda85ab 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -95,7 +95,7 @@ impl DataFrame { let plan = LogicalPlanBuilder::copy_to( plan, - path.into(), + path, file_type, Default::default(), options.partition_by, diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 07e7dd1a4458..f175ea78b099 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -248,9 +248,8 @@ mod tests { partitioned_df.write_parquet(&path, options, None).await?; let test_path = std::path::Path::new(&path); - assert_eq!( + assert!( test_path.is_dir(), - true, "No extension and default DataFrameWriteOptons should have yielded a dir." ); @@ -287,9 +286,8 @@ mod tests { partitioned_df.write_parquet(&path, options, None).await?; let test_path = std::path::Path::new(&path); - assert_eq!( + assert!( test_path.is_file(), - true, "No extension and DataFrameWriteOptons::with_single_file_output(true) should have yielded a single file." ); From 651cfb80e9e33b36a104eb639d05e2507673bfcf Mon Sep 17 00:00:00 2001 From: hknlof Date: Wed, 20 Aug 2025 18:47:59 +0200 Subject: [PATCH 3/3] fix: remove magic strings --- datafusion/datasource/src/write/demux.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 440fbb7bc37b..5d8cdfa47485 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::url::ListingTableUrl; use crate::write::FileSinkConfig; use datafusion_common::error::Result; +use datafusion_common::file_options::file_type::SINGLE_FILE_EXTENSION; use datafusion_physical_plan::SendableRecordBatchStream; use arrow::array::{ @@ -101,19 +102,21 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { + let dot_free = SINGLE_FILE_EXTENSION.replace(".", ""); let single_file_output = (!base_output_path.is_collection() && base_output_path.file_extension().is_some()) - || base_output_path.file_extension() == Some("single"); + || base_output_path.file_extension() == Some(&dot_free); let file_extension = if single_file_output { - file_extension.replace(".single", "") + file_extension.replace(SINGLE_FILE_EXTENSION, "") } else { file_extension }; - let base_output_path = - ListingTableUrl::parse(base_output_path.as_str().replace(".single", "")) - .unwrap_or(base_output_path); + let base_output_path = ListingTableUrl::parse( + base_output_path.as_str().replace(SINGLE_FILE_EXTENSION, ""), + ) + .unwrap_or(base_output_path); SpawnedTask::spawn(async move { row_count_demuxer( tx,