Skip to content

Commit 851bcc9

Browse files
committed
fix(parquet): write single file if option is set
#13323
1 parent d376a32 commit 851bcc9

File tree

4 files changed

+107
-3
lines changed

4 files changed

+107
-3
lines changed

datafusion/common/src/file_options/file_type.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
3030
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
3131
/// The default file extension of parquet files
3232
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
33+
// An interal file extension for extensionless single files
34+
pub const SINGLE_FILE_EXTENSION: &str = ".single";
3335

3436
/// Define each `FileType`/`FileCompressionType`'s extension
3537
pub trait GetExt {

datafusion/core/src/dataframe/parquet.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use super::{
2626
};
2727

2828
use datafusion_common::config::TableParquetOptions;
29-
use datafusion_common::not_impl_err;
29+
use datafusion_common::file_options::file_type::SINGLE_FILE_EXTENSION;
30+
use datafusion_common::{not_impl_err, DEFAULT_PARQUET_EXTENSION};
3031
use datafusion_expr::dml::InsertOp;
3132

3233
impl DataFrame {
@@ -82,6 +83,16 @@ impl DataFrame {
8283
.build()?
8384
};
8485

86+
let path = if file_type.get_ext() != DEFAULT_PARQUET_EXTENSION
87+
&& options.single_file_output
88+
{
89+
let mut path = path.to_owned();
90+
path.push_str(SINGLE_FILE_EXTENSION);
91+
path
92+
} else {
93+
path.to_owned()
94+
};
95+
8596
let plan = LogicalPlanBuilder::copy_to(
8697
plan,
8798
path.into(),

datafusion/core/src/execution/context/parquet.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ mod tests {
113113
};
114114
use datafusion_execution::config::SessionConfig;
115115

116+
use datafusion_expr::Partitioning;
116117
use tempfile::{tempdir, TempDir};
117118

118119
#[tokio::test]
@@ -216,6 +217,85 @@ mod tests {
216217
Ok(())
217218
}
218219

220+
#[tokio::test]
221+
async fn write_multiple_file_parquet_no_extensions() -> Result<()> {
222+
let ctx = SessionContext::new();
223+
let df = ctx.read_batch(RecordBatch::try_new(
224+
Arc::new(Schema::new(vec![
225+
Field::new("purchase_id", DataType::Int32, false),
226+
Field::new("price", DataType::Float32, false),
227+
Field::new("quantity", DataType::Int32, false),
228+
])),
229+
vec![
230+
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
231+
Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
232+
Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
233+
],
234+
)?)?;
235+
236+
// Repartition to have the desired.
237+
let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?;
238+
let tmp_dir = tempdir()?;
239+
let path = tmp_dir
240+
.path()
241+
.join("no_ext_parquet")
242+
.to_str()
243+
.unwrap()
244+
.to_string();
245+
246+
let options = DataFrameWriteOptions::new();
247+
248+
partitioned_df.write_parquet(&path, options, None).await?;
249+
250+
let test_path = std::path::Path::new(&path);
251+
assert_eq!(
252+
test_path.is_dir(),
253+
true,
254+
"No extension and default DataFrameWriteOptons should have yielded a dir."
255+
);
256+
257+
Ok(())
258+
}
259+
260+
#[tokio::test]
261+
async fn write_single_file_parquet_no_extensions() -> Result<()> {
262+
let ctx = SessionContext::new();
263+
let df = ctx.read_batch(RecordBatch::try_new(
264+
Arc::new(Schema::new(vec![
265+
Field::new("purchase_id", DataType::Int32, false),
266+
Field::new("price", DataType::Float32, false),
267+
Field::new("quantity", DataType::Int32, false),
268+
])),
269+
vec![
270+
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
271+
Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
272+
Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
273+
],
274+
)?)?;
275+
// Repartition to have
276+
let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?;
277+
let tmp_dir = tempdir()?;
278+
let path = tmp_dir
279+
.path()
280+
.join("no_ext_parquet")
281+
.to_str()
282+
.unwrap()
283+
.to_string();
284+
285+
let options = DataFrameWriteOptions::new().with_single_file_output(true);
286+
287+
partitioned_df.write_parquet(&path, options, None).await?;
288+
289+
let test_path = std::path::Path::new(&path);
290+
assert_eq!(
291+
test_path.is_file(),
292+
true,
293+
"No extension and DataFrameWriteOptons::with_single_file_output(true) should have yielded a single file."
294+
);
295+
296+
Ok(())
297+
}
298+
219299
#[tokio::test]
220300
async fn read_from_different_file_extension() -> Result<()> {
221301
let ctx = SessionContext::new();

datafusion/datasource/src/write/demux.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,19 @@ pub(crate) fn start_demuxer_task(
101101
let file_extension = config.file_extension.clone();
102102
let base_output_path = config.table_paths[0].clone();
103103
let task = if config.table_partition_cols.is_empty() {
104-
let single_file_output = !base_output_path.is_collection()
105-
&& base_output_path.file_extension().is_some();
104+
let single_file_output = (!base_output_path.is_collection()
105+
&& base_output_path.file_extension().is_some())
106+
|| base_output_path.file_extension() == Some("single");
107+
108+
let file_extension = if single_file_output {
109+
file_extension.replace(".single", "")
110+
} else {
111+
file_extension
112+
};
113+
114+
let base_output_path =
115+
ListingTableUrl::parse(base_output_path.as_str().replace(".single", ""))
116+
.unwrap_or(base_output_path);
106117
SpawnedTask::spawn(async move {
107118
row_count_demuxer(
108119
tx,

0 commit comments

Comments
 (0)