Skip to content

Commit 0846204

Browse files
authored
perf: improve audit (#123)
* perf: improve audit * fix metric * wip * add error logs * worker pool * make params configurable * simplify try_join * try_join_all * chore: add more audit logs * perf: deduplicate bundles with the same contents (#137) * chore: use uuid v5 for determinism * feat: add ttl * diffs * fix: in-flight archive task * Revert "fix: in-flight archive task" This reverts commit a759f2b. * tmp: clear backlog by nooping * chore: log out meterbundleres * make ttl configurable
1 parent 3e73a99 commit 0846204

File tree

19 files changed

+398
-99
lines changed

19 files changed

+398
-99
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ Thumbs.db
2626
/ui/.claude
2727

2828
# e2e / load tests
29-
wallets.json
29+
wallets.json

Cargo.lock

Lines changed: 94 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,5 @@ testcontainers = { version = "0.23.1", default-features = false }
9797
tracing-subscriber = { version = "0.3.20", default-features = false }
9898
testcontainers-modules = { version = "0.11.2", default-features = false }
9999
metrics-exporter-prometheus = { version = "0.17.0", default-features = false }
100+
futures = { version = "0.3.31", default-features = false }
101+
moka = { version = "0.12.12", default-features = false }

bin/tips-audit/src/main.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ struct Args {
5353

5454
#[arg(long, env = "TIPS_AUDIT_METRICS_ADDR", default_value = "0.0.0.0:9002")]
5555
metrics_addr: SocketAddr,
56+
57+
#[arg(long, env = "TIPS_AUDIT_WORKER_POOL_SIZE", default_value = "80")]
58+
worker_pool_size: usize,
59+
60+
#[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")]
61+
channel_buffer_size: usize,
62+
63+
#[arg(long, env = "TIPS_AUDIT_NOOP_ARCHIVE", default_value = "false")]
64+
noop_archive: bool,
5665
}
5766

5867
#[tokio::main]
@@ -82,7 +91,13 @@ async fn main() -> Result<()> {
8291
let s3_bucket = args.s3_bucket.clone();
8392
let writer = S3EventReaderWriter::new(s3_client, s3_bucket);
8493

85-
let mut archiver = KafkaAuditArchiver::new(reader, writer);
94+
let mut archiver = KafkaAuditArchiver::new(
95+
reader,
96+
writer,
97+
args.worker_pool_size,
98+
args.channel_buffer_size,
99+
args.noop_archive,
100+
);
86101

87102
info!("Audit archiver initialized, starting main loop");
88103

crates/audit/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ metrics-derive.workspace = true
1818
tips-core = { workspace = true, features = ["test-utils"] }
1919
serde = { workspace = true, features = ["std", "derive"] }
2020
tokio = { workspace = true, features = ["full"] }
21-
uuid = { workspace = true, features = ["v4", "serde"] }
21+
uuid = { workspace = true, features = ["v5", "serde"] }
2222
tracing = { workspace = true, features = ["std"] }
2323
anyhow = { workspace = true, features = ["std"] }
2424
serde_json = { workspace = true, features = ["std"] }
2525
rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] }
2626
alloy-consensus = { workspace = true, features = ["std"] }
2727
alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] }
2828
aws-sdk-s3 = { workspace = true, features = ["rustls", "default-https-client", "rt-tokio"] }
29+
futures = { workspace = true }
2930

3031
[dev-dependencies]
3132
testcontainers = { workspace = true, features = ["blocking"] }

0 commit comments

Comments
 (0)