Skip to content

Commit 605f327

Browse files
feat: implement compressed CSV/JSON export functionality (#7162)
* feat: implement compressed CSV/JSON export functionality - Add CompressedWriter for real-time compression during CSV/JSON export - Support GZIP, BZIP2, XZ, ZSTD compression formats - Remove LazyBufferedWriter dependency for simplified architecture - Implement Encoder -> Compressor -> FileWriter data flow - Add tests for compressed CSV/JSON export Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON export functionality - refactor and extend compressed_writer tests - add coverage for Bzip2 and Xz compression Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON export functionality - Switch to threshold-based chunked flushing - Avoid unnecessary writes on empty buffers - Replace direct write_all() calls with the new helper for consistency Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON import (COPY FROM) functionality - Add support for reading compressed CSV and JSON in COPY FROM - Support GZIP, BZIP2, XZ, ZSTD compression formats - Add tests for compressed CSV/JSON import Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON export/import functionality - Fix review comments Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON export/import functionality - Move temp_dir out of the loop Signed-off-by: McKnight22 <[email protected]> * feat: implement compressed CSV/JSON export/import functionality - Fix unreasonable locking logic Co-authored-by: jeremyhi <[email protected]> Signed-off-by: McKnight22 <[email protected]> --------- Signed-off-by: McKnight22 <[email protected]> Co-authored-by: jeremyhi <[email protected]>
1 parent 4e9f419 commit 605f327

19 files changed

+1632
-242
lines changed

src/common/datasource/src/buffered_writer.rs

Lines changed: 1 addition & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::future::Future;
16-
1715
use arrow::record_batch::RecordBatch;
1816
use async_trait::async_trait;
1917
use datafusion::parquet::format::FileMetaData;
20-
use snafu::{OptionExt, ResultExt};
21-
use tokio::io::{AsyncWrite, AsyncWriteExt};
22-
23-
use crate::error::{self, Result};
24-
use crate::share_buffer::SharedBuffer;
2518

26-
pub struct LazyBufferedWriter<T, U, F> {
27-
path: String,
28-
writer_factory: F,
29-
writer: Option<T>,
30-
/// None stands for [`LazyBufferedWriter`] closed.
31-
encoder: Option<U>,
32-
buffer: SharedBuffer,
33-
rows_written: usize,
34-
bytes_written: u64,
35-
threshold: usize,
36-
}
19+
use crate::error::Result;
3720

3821
pub trait DfRecordBatchEncoder {
3922
fn write(&mut self, batch: &RecordBatch) -> Result<()>;
@@ -43,126 +26,3 @@ pub trait DfRecordBatchEncoder {
4326
pub trait ArrowWriterCloser {
4427
async fn close(mut self) -> Result<FileMetaData>;
4528
}
46-
47-
impl<
48-
T: AsyncWrite + Send + Unpin,
49-
U: DfRecordBatchEncoder + ArrowWriterCloser,
50-
F: Fn(String) -> Fut,
51-
Fut: Future<Output = Result<T>>,
52-
> LazyBufferedWriter<T, U, F>
53-
{
54-
/// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage
55-
/// if any row's been written.
56-
pub async fn close_with_arrow_writer(mut self) -> Result<(FileMetaData, u64)> {
57-
let encoder = self
58-
.encoder
59-
.take()
60-
.context(error::BufferedWriterClosedSnafu)?;
61-
let metadata = encoder.close().await?;
62-
63-
// It's important to shut down! flushes all pending writes
64-
self.close_inner_writer().await?;
65-
Ok((metadata, self.bytes_written))
66-
}
67-
}
68-
69-
impl<
70-
T: AsyncWrite + Send + Unpin,
71-
U: DfRecordBatchEncoder,
72-
F: Fn(String) -> Fut,
73-
Fut: Future<Output = Result<T>>,
74-
> LazyBufferedWriter<T, U, F>
75-
{
76-
/// Closes the writer and flushes the buffer data.
77-
pub async fn close_inner_writer(&mut self) -> Result<()> {
78-
// Use `rows_written` to keep a track of if any rows have been written.
79-
// If no row's been written, then we can simply close the underlying
80-
// writer without flush so that no file will be actually created.
81-
if self.rows_written != 0 {
82-
self.bytes_written += self.try_flush(true).await?;
83-
}
84-
85-
if let Some(writer) = &mut self.writer {
86-
writer.shutdown().await.context(error::AsyncWriteSnafu)?;
87-
}
88-
Ok(())
89-
}
90-
91-
pub fn new(
92-
threshold: usize,
93-
buffer: SharedBuffer,
94-
encoder: U,
95-
path: impl AsRef<str>,
96-
writer_factory: F,
97-
) -> Self {
98-
Self {
99-
path: path.as_ref().to_string(),
100-
threshold,
101-
encoder: Some(encoder),
102-
buffer,
103-
rows_written: 0,
104-
bytes_written: 0,
105-
writer_factory,
106-
writer: None,
107-
}
108-
}
109-
110-
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
111-
let encoder = self
112-
.encoder
113-
.as_mut()
114-
.context(error::BufferedWriterClosedSnafu)?;
115-
encoder.write(batch)?;
116-
self.rows_written += batch.num_rows();
117-
self.bytes_written += self.try_flush(false).await?;
118-
Ok(())
119-
}
120-
121-
async fn try_flush(&mut self, all: bool) -> Result<u64> {
122-
let mut bytes_written: u64 = 0;
123-
124-
// Once buffered data size reaches threshold, split the data in chunks (typically 4MB)
125-
// and write to underlying storage.
126-
while self.buffer.buffer.lock().unwrap().len() >= self.threshold {
127-
let chunk = {
128-
let mut buffer = self.buffer.buffer.lock().unwrap();
129-
buffer.split_to(self.threshold)
130-
};
131-
let size = chunk.len();
132-
133-
self.maybe_init_writer()
134-
.await?
135-
.write_all(&chunk)
136-
.await
137-
.context(error::AsyncWriteSnafu)?;
138-
139-
bytes_written += size as u64;
140-
}
141-
142-
if all {
143-
bytes_written += self.try_flush_all().await?;
144-
}
145-
Ok(bytes_written)
146-
}
147-
148-
/// Only initiates underlying file writer when rows have been written.
149-
async fn maybe_init_writer(&mut self) -> Result<&mut T> {
150-
if let Some(ref mut writer) = self.writer {
151-
Ok(writer)
152-
} else {
153-
let writer = (self.writer_factory)(self.path.clone()).await?;
154-
Ok(self.writer.insert(writer))
155-
}
156-
}
157-
158-
async fn try_flush_all(&mut self) -> Result<u64> {
159-
let remain = self.buffer.buffer.lock().unwrap().split();
160-
let size = remain.len();
161-
self.maybe_init_writer()
162-
.await?
163-
.write_all(&remain)
164-
.await
165-
.context(error::AsyncWriteSnafu)?;
166-
Ok(size as u64)
167-
}
168-
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::pin::Pin;
17+
use std::task::{Context, Poll};
18+
19+
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
20+
use snafu::ResultExt;
21+
use tokio::io::{AsyncWrite, AsyncWriteExt};
22+
23+
use crate::compression::CompressionType;
24+
use crate::error::{self, Result};
25+
26+
/// A compressed writer that wraps an underlying async writer with compression.
27+
///
28+
/// This writer supports multiple compression formats including GZIP, BZIP2, XZ, and ZSTD.
29+
/// It provides transparent compression for any async writer implementation.
30+
pub struct CompressedWriter {
31+
inner: Box<dyn AsyncWrite + Unpin + Send>,
32+
compression_type: CompressionType,
33+
}
34+
35+
impl CompressedWriter {
36+
/// Creates a new compressed writer with the specified compression type.
37+
///
38+
/// # Arguments
39+
///
40+
/// * `writer` - The underlying writer to wrap with compression
41+
/// * `compression_type` - The type of compression to apply
42+
pub fn new(
43+
writer: impl AsyncWrite + Unpin + Send + 'static,
44+
compression_type: CompressionType,
45+
) -> Self {
46+
let inner: Box<dyn AsyncWrite + Unpin + Send> = match compression_type {
47+
CompressionType::Gzip => Box::new(GzipEncoder::new(writer)),
48+
CompressionType::Bzip2 => Box::new(BzEncoder::new(writer)),
49+
CompressionType::Xz => Box::new(XzEncoder::new(writer)),
50+
CompressionType::Zstd => Box::new(ZstdEncoder::new(writer)),
51+
CompressionType::Uncompressed => Box::new(writer),
52+
};
53+
54+
Self {
55+
inner,
56+
compression_type,
57+
}
58+
}
59+
60+
/// Returns the compression type used by this writer.
61+
pub fn compression_type(&self) -> CompressionType {
62+
self.compression_type
63+
}
64+
65+
/// Flush the writer and shutdown compression
66+
pub async fn shutdown(mut self) -> Result<()> {
67+
self.inner
68+
.shutdown()
69+
.await
70+
.context(error::AsyncWriteSnafu)?;
71+
Ok(())
72+
}
73+
}
74+
75+
impl AsyncWrite for CompressedWriter {
76+
fn poll_write(
77+
mut self: Pin<&mut Self>,
78+
cx: &mut Context<'_>,
79+
buf: &[u8],
80+
) -> Poll<io::Result<usize>> {
81+
Pin::new(&mut self.inner).poll_write(cx, buf)
82+
}
83+
84+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
85+
Pin::new(&mut self.inner).poll_flush(cx)
86+
}
87+
88+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
89+
Pin::new(&mut self.inner).poll_shutdown(cx)
90+
}
91+
}
92+
93+
/// A trait for converting async writers into compressed writers.
94+
///
95+
/// This trait is automatically implemented for all types that implement [`AsyncWrite`].
96+
pub trait IntoCompressedWriter {
97+
/// Converts this writer into a [`CompressedWriter`] with the specified compression type.
98+
///
99+
/// # Arguments
100+
///
101+
/// * `self` - The underlying writer to wrap with compression
102+
/// * `compression_type` - The type of compression to apply
103+
fn into_compressed_writer(self, compression_type: CompressionType) -> CompressedWriter
104+
where
105+
Self: AsyncWrite + Unpin + Send + 'static + Sized,
106+
{
107+
CompressedWriter::new(self, compression_type)
108+
}
109+
}
110+
111+
impl<W: AsyncWrite + Unpin + Send + 'static> IntoCompressedWriter for W {}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
116+
117+
use super::*;
118+
119+
#[tokio::test]
120+
async fn test_compressed_writer_gzip() {
121+
let (duplex_writer, mut duplex_reader) = duplex(1024);
122+
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Gzip);
123+
let original = b"test data for gzip compression";
124+
125+
writer.write_all(original).await.unwrap();
126+
writer.shutdown().await.unwrap();
127+
128+
let mut buffer = Vec::new();
129+
duplex_reader.read_to_end(&mut buffer).await.unwrap();
130+
131+
// The compressed data should be different from the original
132+
assert_ne!(buffer, original);
133+
assert!(!buffer.is_empty());
134+
}
135+
136+
#[tokio::test]
137+
async fn test_compressed_writer_bzip2() {
138+
let (duplex_writer, mut duplex_reader) = duplex(1024);
139+
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Bzip2);
140+
let original = b"test data for bzip2 compression";
141+
142+
writer.write_all(original).await.unwrap();
143+
writer.shutdown().await.unwrap();
144+
145+
let mut buffer = Vec::new();
146+
duplex_reader.read_to_end(&mut buffer).await.unwrap();
147+
148+
// The compressed data should be different from the original
149+
assert_ne!(buffer, original);
150+
assert!(!buffer.is_empty());
151+
}
152+
153+
#[tokio::test]
154+
async fn test_compressed_writer_xz() {
155+
let (duplex_writer, mut duplex_reader) = duplex(1024);
156+
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Xz);
157+
let original = b"test data for xz compression";
158+
159+
writer.write_all(original).await.unwrap();
160+
writer.shutdown().await.unwrap();
161+
162+
let mut buffer = Vec::new();
163+
duplex_reader.read_to_end(&mut buffer).await.unwrap();
164+
165+
// The compressed data should be different from the original
166+
assert_ne!(buffer, original);
167+
assert!(!buffer.is_empty());
168+
}
169+
170+
#[tokio::test]
171+
async fn test_compressed_writer_zstd() {
172+
let (duplex_writer, mut duplex_reader) = duplex(1024);
173+
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Zstd);
174+
let original = b"test data for zstd compression";
175+
176+
writer.write_all(original).await.unwrap();
177+
writer.shutdown().await.unwrap();
178+
179+
let mut buffer = Vec::new();
180+
duplex_reader.read_to_end(&mut buffer).await.unwrap();
181+
182+
// The compressed data should be different from the original
183+
assert_ne!(buffer, original);
184+
assert!(!buffer.is_empty());
185+
}
186+
187+
#[tokio::test]
188+
async fn test_compressed_writer_uncompressed() {
189+
let (duplex_writer, mut duplex_reader) = duplex(1024);
190+
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Uncompressed);
191+
let original = b"test data for uncompressed";
192+
193+
writer.write_all(original).await.unwrap();
194+
writer.shutdown().await.unwrap();
195+
196+
let mut buffer = Vec::new();
197+
duplex_reader.read_to_end(&mut buffer).await.unwrap();
198+
199+
// Uncompressed data should be the same as the original
200+
assert_eq!(buffer, original);
201+
}
202+
}

src/common/datasource/src/error.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,6 @@ pub enum Error {
194194
location: Location,
195195
},
196196

197-
#[snafu(display("Buffered writer closed"))]
198-
BufferedWriterClosed {
199-
#[snafu(implicit)]
200-
location: Location,
201-
},
202-
203197
#[snafu(display("Failed to write parquet file, path: {}", path))]
204198
WriteParquet {
205199
path: String,
@@ -208,6 +202,14 @@ pub enum Error {
208202
#[snafu(source)]
209203
error: parquet::errors::ParquetError,
210204
},
205+
206+
#[snafu(display("Failed to build file stream"))]
207+
BuildFileStream {
208+
#[snafu(implicit)]
209+
location: Location,
210+
#[snafu(source)]
211+
error: datafusion::error::DataFusionError,
212+
},
211213
}
212214

213215
pub type Result<T> = std::result::Result<T, Error>;
@@ -239,7 +241,7 @@ impl ErrorExt for Error {
239241
| ReadRecordBatch { .. }
240242
| WriteRecordBatch { .. }
241243
| EncodeRecordBatch { .. }
242-
| BufferedWriterClosed { .. }
244+
| BuildFileStream { .. }
243245
| OrcReader { .. } => StatusCode::Unexpected,
244246
}
245247
}

0 commit comments

Comments
 (0)