Skip to content

Commit bd47768

Browse files
authored
chore: use rdkafka compression (#90)
* Revert "feat: enable producer side compression for kafka (#82)" This reverts commit 336e9ee. * zstd
1 parent eae04d2 commit bd47768

File tree

7 files changed

+8
-26
lines changed

7 files changed

+8
-26
lines changed

Cargo.lock

Lines changed: 2 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,11 @@ wiremock = "0.6.2"
5555
axum = "0.8.3"
5656

5757
# Kafka and S3 dependencies
58-
rdkafka = { version = "0.37.0", features = ["libz-static", "ssl-vendored"] }
58+
rdkafka = { version = "0.37.0", features = ["zstd", "ssl-vendored"] }
5959
aws-config = "1.1.7"
6060
aws-sdk-s3 = "1.106.0"
6161
aws-credential-types = "1.1.7"
6262
bytes = { version = "1.8.0", features = ["serde"] }
63-
lz4_flex = "0.12"
6463

6564
# tips-ingress
6665
backon = "1.5.2"

crates/audit/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ aws-config = { workspace = true }
3232
aws-sdk-s3 = { workspace = true }
3333
aws-credential-types = { workspace = true }
3434
bytes = { workspace = true }
35-
lz4_flex = { workspace = true }
3635

3736
[dev-dependencies]
3837
testcontainers = { workspace = true }

crates/audit/src/publisher.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ impl KafkaBundleEventPublisher {
2626
async fn send_event(&self, event: &BundleEvent) -> Result<()> {
2727
let bundle_id = event.bundle_id();
2828
let key = event.generate_event_key();
29-
let json_bytes = serde_json::to_vec(event)?;
30-
let payload = lz4_flex::compress_prepend_size(&json_bytes);
29+
let payload = serde_json::to_vec(event)?;
3130

3231
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
3332

crates/audit/src/reader.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ impl EventReader for KafkaAuditLogReader {
7777
.as_millis() as i64,
7878
};
7979

80-
let json_bytes = lz4_flex::decompress_size_prepended(payload)
81-
.map_err(|e| anyhow::anyhow!("Failed to decompress LZ4: {e}"))?;
82-
let event: BundleEvent = serde_json::from_slice(&json_bytes)?;
80+
let event: BundleEvent = serde_json::from_slice(payload)?;
8381

8482
debug!(
8583
bundle_id = %event.bundle_id(),

crates/core/src/user_ops_types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ mod tests {
8181
assert_eq!(user_operation.signature, Bytes::from_str("0x01").unwrap());
8282
}
8383
_ => {
84-
panic!("Expected EntryPointV06, got {user_operation:?}");
84+
panic!("Expected EntryPointV06, got {:?}", user_operation);
8585
}
8686
}
8787
}
@@ -132,7 +132,7 @@ mod tests {
132132
assert_eq!(user_operation.pre_verification_gas, Uint::from(0x186a0));
133133
}
134134
_ => {
135-
panic!("Expected EntryPointV07, got {user_operation:?}");
135+
panic!("Expected EntryPointV07, got {:?}", user_operation);
136136
}
137137
}
138138
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
# Kafka configuration properties for ingress audit events
22
bootstrap.servers=host.docker.internal:9094
33
message.timeout.ms=5000
4+
compression.type=zstd

0 commit comments

Comments
 (0)