Skip to content

Commit bf3ea19

Browse files
authored
Merge branch 'main' into feat/delete-action
2 parents dcd2b90 + 3d47be5 commit bf3ea19

File tree

10 files changed

+24
-30
lines changed

10 files changed

+24
-30
lines changed

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
2727
use crate::{Error, ErrorKind, Result};
2828

2929
/// Builder for `DataFileWriter`.
30-
#[derive(Clone, Debug)]
30+
#[derive(Debug)]
3131
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
3232
inner: RollingFileWriterBuilder<B, L, F>,
3333
}
@@ -53,9 +53,9 @@ where
5353
{
5454
type R = DataFileWriter<B, L, F>;
5555

56-
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
56+
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
5757
Ok(DataFileWriter {
58-
inner: Some(self.inner.clone().build()),
58+
inner: Some(self.inner.build()),
5959
partition_key,
6060
})
6161
}

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder};
3434
use crate::{Error, ErrorKind, Result};
3535

3636
/// Builder for `EqualityDeleteWriter`.
37-
#[derive(Clone, Debug)]
37+
#[derive(Debug)]
3838
pub struct EqualityDeleteFileWriterBuilder<
3939
B: FileWriterBuilder,
4040
L: LocationGenerator,
@@ -60,7 +60,7 @@ where
6060
}
6161

6262
/// Config for `EqualityDeleteWriter`.
63-
#[derive(Clone, Debug)]
63+
#[derive(Debug)]
6464
pub struct EqualityDeleteWriterConfig {
6565
// Field ids used to determine row equality in equality delete files.
6666
equality_ids: Vec<i32>,
@@ -123,11 +123,11 @@ where
123123
{
124124
type R = EqualityDeleteFileWriter<B, L, F>;
125125

126-
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
126+
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
127127
Ok(EqualityDeleteFileWriter {
128-
inner: Some(self.inner.clone().build()),
129-
projector: self.config.projector,
130-
equality_ids: self.config.equality_ids,
128+
inner: Some(self.inner.build()),
129+
projector: self.config.projector.clone(),
130+
equality_ids: self.config.equality_ids.clone(),
131131
partition_key,
132132
})
133133
}

crates/iceberg/src/writer/file_writer/location_generator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::Result;
2424
use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
2525

2626
/// `LocationGenerator` used to generate the location of data file.
27-
pub trait LocationGenerator: Clone + Send + 'static {
27+
pub trait LocationGenerator: Clone + Send + Sync + 'static {
2828
/// Generate an absolute path for the given file name that includes the partition path.
2929
///
3030
/// # Arguments
@@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator {
9494
}
9595

9696
/// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file.
97-
pub trait FileNameGenerator: Clone + Send + 'static {
97+
pub trait FileNameGenerator: Clone + Send + Sync + 'static {
9898
/// Generate a file name.
9999
fn generate_file_name(&self) -> String;
100100
}

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ pub mod rolling_writer;
3636
type DefaultOutput = Vec<DataFileBuilder>;
3737

3838
/// File writer builder trait.
39-
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
39+
pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
4040
/// The associated file writer type.
4141
type R: FileWriter<O>;
4242
/// Build file writer.
43-
fn build(self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
43+
fn build(&self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
4444
}
4545

4646
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ impl ParquetWriterBuilder {
8181
impl FileWriterBuilder for ParquetWriterBuilder {
8282
type R = ParquetWriter;
8383

84-
async fn build(self, output_file: OutputFile) -> Result<Self::R> {
84+
async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
8585
Ok(ParquetWriter {
8686
schema: self.schema.clone(),
8787
inner_writer: None,
88-
writer_properties: self.props,
88+
writer_properties: self.props.clone(),
8989
current_row_num: 0,
9090
output_file,
9191
nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),

crates/iceberg/src/writer/file_writer/rolling_writer.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ where
103103
}
104104

105105
/// Build a new [`RollingFileWriter`].
106-
pub fn build(self) -> RollingFileWriter<B, L, F> {
106+
pub fn build(&self) -> RollingFileWriter<B, L, F> {
107107
RollingFileWriter {
108108
inner: None,
109-
inner_builder: self.inner_builder,
109+
inner_builder: self.inner_builder.clone(),
110110
target_file_size: self.target_file_size,
111111
data_file_builders: vec![],
112-
file_io: self.file_io,
113-
location_generator: self.location_generator,
114-
file_name_generator: self.file_name_generator,
112+
file_io: self.file_io.clone(),
113+
location_generator: self.location_generator.clone(),
114+
file_name_generator: self.file_name_generator.clone(),
115115
}
116116
}
117117
}
@@ -192,7 +192,6 @@ where
192192
// initialize inner writer
193193
self.inner = Some(
194194
self.inner_builder
195-
.clone()
196195
.build(self.new_output_file(partition_key)?)
197196
.await?,
198197
);
@@ -206,7 +205,6 @@ where
206205
// start a new writer
207206
self.inner = Some(
208207
self.inner_builder
209-
.clone()
210208
.build(self.new_output_file(partition_key)?)
211209
.await?,
212210
);

crates/iceberg/src/writer/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@
148148
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
149149
//! type R = LatencyRecordWriter<B::R>;
150150
//!
151-
//! async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
151+
//! async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
152152
//! Ok(LatencyRecordWriter {
153153
//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
154154
//! })
@@ -398,13 +398,11 @@ type DefaultOutput = Vec<DataFile>;
398398

399399
/// The builder for iceberg writer.
400400
#[async_trait::async_trait]
401-
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
402-
Send + Clone + 'static
403-
{
401+
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + Sync + 'static {
404402
/// The associated writer type.
405403
type R: IcebergWriter<I, O>;
406404
/// Build the iceberg writer with an optional partition key.
407-
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
405+
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
408406
}
409407

410408
/// The iceberg writer used to write data to iceberg table.

crates/iceberg/src/writer/partitioning/clustered_writer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ where
118118
// Create a new writer for the new partition
119119
self.current_writer = Some(
120120
self.inner_builder
121-
.clone()
122121
.build(Some(partition_key.clone()))
123122
.await?,
124123
);

crates/iceberg/src/writer/partitioning/fanout_writer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ where
7373
if !self.partition_writers.contains_key(partition_key.data()) {
7474
let writer = self
7575
.inner_builder
76-
.clone()
7776
.build(Some(partition_key.clone()))
7877
.await?;
7978
self.partition_writers

crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ where
7575
pub async fn write(&mut self, input: I) -> Result<()> {
7676
// Lazily create writer on first write
7777
if self.writer.is_none() {
78-
self.writer = Some(self.inner_builder.clone().build(None).await?);
78+
self.writer = Some(self.inner_builder.build(None).await?);
7979
}
8080

8181
// Write directly to inner writer

0 commit comments

Comments
 (0)