Skip to content

Commit 91aa6fd

Browse files
alambandygrove
andauthored
Benchmarks for Arrow IPC writer (#7090)
* Add benchmarks for Arrow IPC writer * Add benchmarks for Arrow IPC writer * reuse target buffer * rename, etc * Add compression type * update --------- Co-authored-by: Andy Grove <[email protected]>
1 parent 0bbfc03 commit 91aa6fd

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

arrow-ipc/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,10 @@ default = []
4747
lz4 = ["lz4_flex"]
4848

4949
[dev-dependencies]
50+
criterion = "0.5.1"
5051
tempfile = "3.3"
52+
tokio = "1.43.0"
53+
54+
[[bench]]
55+
name = "ipc_writer"
56+
harness = false

arrow-ipc/benches/ipc_writer.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
19+
use arrow_array::{builder::StringBuilder, RecordBatch};
20+
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
21+
use arrow_ipc::CompressionType;
22+
use arrow_schema::{DataType, Field, Schema};
23+
use criterion::{criterion_group, criterion_main, Criterion};
24+
use std::sync::Arc;
25+
26+
fn criterion_benchmark(c: &mut Criterion) {
27+
let mut group = c.benchmark_group("arrow_ipc_stream_writer");
28+
29+
group.bench_function("StreamWriter/write_10", |b| {
30+
let batch = create_batch(8192, true);
31+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
32+
b.iter(move || {
33+
buffer.clear();
34+
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
35+
for _ in 0..10 {
36+
writer.write(&batch).unwrap();
37+
}
38+
writer.finish().unwrap();
39+
})
40+
});
41+
42+
group.bench_function("StreamWriter/write_10/zstd", |b| {
43+
let batch = create_batch(8192, true);
44+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
45+
b.iter(move || {
46+
buffer.clear();
47+
let options = IpcWriteOptions::default()
48+
.try_with_compression(Some(CompressionType::ZSTD))
49+
.unwrap();
50+
let mut writer =
51+
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
52+
.unwrap();
53+
for _ in 0..10 {
54+
writer.write(&batch).unwrap();
55+
}
56+
writer.finish().unwrap();
57+
})
58+
});
59+
60+
group.bench_function("FileWriter/write_10", |b| {
61+
let batch = create_batch(8192, true);
62+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
63+
b.iter(move || {
64+
buffer.clear();
65+
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
66+
for _ in 0..10 {
67+
writer.write(&batch).unwrap();
68+
}
69+
writer.finish().unwrap();
70+
})
71+
});
72+
}
73+
74+
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
75+
let schema = Arc::new(Schema::new(vec![
76+
Field::new("c0", DataType::Int32, true),
77+
Field::new("c1", DataType::Utf8, true),
78+
Field::new("c2", DataType::Date32, true),
79+
Field::new("c3", DataType::Decimal128(11, 2), true),
80+
]));
81+
let mut a = Int32Builder::new();
82+
let mut b = StringBuilder::new();
83+
let mut c = Date32Builder::new();
84+
let mut d = Decimal128Builder::new()
85+
.with_precision_and_scale(11, 2)
86+
.unwrap();
87+
for i in 0..num_rows {
88+
a.append_value(i as i32);
89+
c.append_value(i as i32);
90+
d.append_value((i * 1000000) as i128);
91+
if allow_nulls && i % 10 == 0 {
92+
b.append_null();
93+
} else {
94+
b.append_value(format!("this is string number {i}"));
95+
}
96+
}
97+
let a = a.finish();
98+
let b = b.finish();
99+
let c = c.finish();
100+
let d = d.finish();
101+
RecordBatch::try_new(
102+
schema.clone(),
103+
vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)],
104+
)
105+
.unwrap()
106+
}
107+
108+
fn config() -> Criterion {
109+
Criterion::default()
110+
}
111+
112+
criterion_group! {
113+
name = benches;
114+
config = config();
115+
targets = criterion_benchmark
116+
}
117+
criterion_main!(benches);

0 commit comments

Comments
 (0)