Skip to content

Commit 6dfeaa7

Browse files
committed
feat: Kafka message bus for sending webhook events
1 parent 6be3247 commit 6dfeaa7

File tree

14 files changed

+703
-0
lines changed

14 files changed

+703
-0
lines changed

CLAUDE.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Development Commands
6+
7+
### Build and Test
8+
- `cargo build --release` - Build production binary
9+
- `cargo test` - Run all tests including Redis integration tests
10+
- `cargo nextest run -p twmq --profile ci` - Run specific component tests
11+
- `RUST_LOG=debug cargo run` - Run server with debug logging
12+
13+
### Development Setup
14+
Redis is required for development:
15+
```bash
16+
docker run -d --name redis -p 6379:6379 redis:7-alpine
17+
```
18+
19+
Required environment variables:
20+
```bash
21+
export APP__THIRDWEB__SECRET="your_secret_key"
22+
export APP__THIRDWEB__CLIENT_ID="your_client_id"
23+
```
24+
25+
## Architecture Overview
26+
27+
This is a **Rust workspace** with 7 crates providing blockchain transaction infrastructure:
28+
29+
- **`server/`** - Main HTTP API server (Axum-based REST API with OpenAPI docs)
30+
- **`core/`** - Core blockchain functionality (chain management, transactions, UserOps)
31+
- **`aa-core/`** - Account Abstraction engine (ERC-4337 v0.6/v0.7 support)
32+
- **`aa-types/`** - Account Abstraction type definitions
33+
- **`executors/`** - Background job handlers (webhooks, transaction confirmation)
34+
- **`twmq/`** - Thirdweb Message Queue (Redis-backed job queue with lease-based concurrency)
35+
- **`thirdweb-core/`** - Thirdweb service integrations (Vault SDK, IAW)
36+
37+
### Key Technologies
38+
- **Axum** for HTTP server
39+
- **Alloy** for Ethereum interactions
40+
- **Redis** for job queue and state
41+
- **Tokio** for async runtime
42+
- **Vault SDK** for secure key management
43+
44+
## Configuration System
45+
46+
Hierarchical configuration priority:
47+
1. Environment variables (`APP__` prefix)
48+
2. Environment-specific YAML (`server_development.yaml`, `server_production.yaml`)
49+
3. Base YAML (`server_base.yaml`)
50+
51+
Configuration files located in `server/configuration/`
52+
53+
## Transaction Types Supported
54+
55+
- **EOA transactions** - Traditional wallet transactions
56+
- **Account Abstraction** - ERC-4337 smart accounts with gas sponsorship
57+
- **EIP-7702** - Delegated transaction execution
58+
59+
## Key Development Areas
60+
61+
### API Routes
62+
Located in `server/src/http/routes/` - follows RESTful patterns with OpenAPI documentation
63+
64+
### Background Jobs
65+
Implemented in `executors/src/` - uses TWMQ for reliable job processing with Redis persistence
66+
67+
### Blockchain Core
68+
`core/src/` contains chain management, transaction building, and UserOperation support
69+
70+
### Account Abstraction
71+
`aa-core/src/` implements complete ERC-4337 flow including bundler integration
72+
73+
## Error Handling
74+
75+
Uses comprehensive error types with context. All errors are structured and logged with tracing spans.
76+
77+
## Testing
78+
79+
Integration tests require Redis. Tests cover job queue operations, transaction building, and API endpoints.
80+
81+
## Production Features
82+
83+
- Horizontal scaling via shared Redis backend
84+
- Graceful shutdown with job completion guarantees
85+
- Vault-backed private key management
86+
- Comprehensive metrics and health checks
87+
- Docker support with multi-stage builds

Cargo.lock

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

executors/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ uuid = { version = "1.17.0", features = ["v4"] }
2222
chrono = "0.4.41"
2323
tokio = { version = "1.45.0", features = ["full"] }
2424
futures = "0.3.31"
25+
async-trait = "0.1.83"

executors/src/external_bundler/confirm.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ use twmq::{
1515
};
1616

1717
use crate::{
18+
kafka_integration::{SharedEventSender, TransactionConfirmedEvent},
1819
transaction_registry::TransactionRegistry,
1920
webhook::{
2021
WebhookJobHandler,
2122
envelope::{ExecutorStage, HasWebhookOptions, WebhookCapable},
2223
},
2324
};
2425

26+
use std::time::{SystemTime, UNIX_EPOCH};
27+
2528
use super::deployment::RedisDeploymentLock;
2629

2730
// --- Job Payload ---
@@ -103,6 +106,7 @@ where
103106
pub transaction_registry: Arc<TransactionRegistry>,
104107
pub max_confirmation_attempts: u32,
105108
pub confirmation_retry_delay: Duration,
109+
pub event_sender: SharedEventSender,
106110
}
107111

108112
impl<CS> UserOpConfirmationHandler<CS>
@@ -114,6 +118,7 @@ where
114118
deployment_lock: RedisDeploymentLock,
115119
webhook_queue: Arc<Queue<WebhookJobHandler>>,
116120
transaction_registry: Arc<TransactionRegistry>,
121+
event_sender: SharedEventSender,
117122
) -> Self {
118123
Self {
119124
chain_service,
@@ -122,6 +127,7 @@ where
122127
transaction_registry,
123128
max_confirmation_attempts: 20, // ~5 minutes with 15 second delays
124129
confirmation_retry_delay: Duration::from_secs(5),
130+
event_sender,
125131
}
126132
}
127133

@@ -246,6 +252,20 @@ where
246252
);
247253
}
248254

255+
// Send Kafka transaction confirmed event
256+
let confirmed_event = TransactionConfirmedEvent {
257+
transaction_id: job.job.data.transaction_id.clone(),
258+
user_op_hash: success_data.result.user_op_hash.clone(),
259+
receipt: success_data.result.receipt.clone(),
260+
deployment_lock_released: success_data.result.deployment_lock_released,
261+
timestamp: SystemTime::now()
262+
.duration_since(UNIX_EPOCH)
263+
.unwrap()
264+
.as_secs(),
265+
};
266+
267+
self.event_sender.send_transaction_confirmed(confirmed_event).await;
268+
249269
// Queue success webhook
250270
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
251271
tracing::error!(

executors/src/external_bundler/send.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ use twmq::{
2727
};
2828

2929
use crate::{
30+
kafka_integration::{SharedEventSender, TransactionSentEvent},
3031
transaction_registry::TransactionRegistry,
3132
webhook::{
3233
WebhookJobHandler,
3334
envelope::{ExecutorStage, HasTransactionMetadata, HasWebhookOptions, WebhookCapable},
3435
},
3536
};
3637

38+
use std::time::{SystemTime, UNIX_EPOCH};
39+
3740
use super::{
3841
confirm::{UserOpConfirmationHandler, UserOpConfirmationJobData},
3942
deployment::{RedisDeploymentCache, RedisDeploymentLock},
@@ -199,6 +202,7 @@ where
199202
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
200203
pub confirm_queue: Arc<Queue<UserOpConfirmationHandler<CS>>>,
201204
pub transaction_registry: Arc<TransactionRegistry>,
205+
pub event_sender: SharedEventSender,
202206
}
203207

204208
impl<CS> ExternalBundlerSendHandler<CS>
@@ -533,6 +537,22 @@ where
533537
);
534538
}
535539

540+
// Send Kafka transaction sent event
541+
let sent_event = TransactionSentEvent {
542+
transaction_id: job.job.data.transaction_id.clone(),
543+
chain_id: job.job.data.chain_id,
544+
account_address: success_data.result.account_address,
545+
user_op_hash: success_data.result.user_op_hash.clone(),
546+
nonce: success_data.result.nonce,
547+
deployment_lock_acquired: success_data.result.deployment_lock_acquired,
548+
timestamp: SystemTime::now()
549+
.duration_since(UNIX_EPOCH)
550+
.unwrap()
551+
.as_secs(),
552+
};
553+
554+
self.event_sender.send_transaction_sent(sent_event).await;
555+
536556
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
537557
tracing::error!(
538558
transaction_id = %job.job.data.transaction_id,

executors/src/kafka_integration.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use alloy::primitives::{Address, Bytes, U256};
2+
use engine_core::rpc_clients::UserOperationReceipt;
3+
use serde::Serialize;
4+
use std::sync::Arc;
5+
6+
/// Trait for sending transaction events to external messaging systems
7+
#[async_trait::async_trait]
8+
pub trait TransactionEventSender: Send + Sync {
9+
async fn send_transaction_sent(&self, message: TransactionSentEvent);
10+
async fn send_transaction_confirmed(&self, message: TransactionConfirmedEvent);
11+
}
12+
13+
#[derive(Debug, Clone, Serialize)]
14+
#[serde(rename_all = "camelCase")]
15+
pub struct TransactionSentEvent {
16+
pub transaction_id: String,
17+
pub chain_id: u64,
18+
pub account_address: Address,
19+
pub user_op_hash: Bytes,
20+
pub nonce: U256,
21+
pub deployment_lock_acquired: bool,
22+
pub timestamp: u64,
23+
}
24+
25+
#[derive(Debug, Clone, Serialize)]
26+
#[serde(rename_all = "camelCase")]
27+
pub struct TransactionConfirmedEvent {
28+
pub transaction_id: String,
29+
pub user_op_hash: Bytes,
30+
pub receipt: UserOperationReceipt,
31+
pub deployment_lock_released: bool,
32+
pub timestamp: u64,
33+
}
34+
35+
/// No-op implementation for when messaging is disabled
36+
pub struct NoOpEventSender;
37+
38+
#[async_trait::async_trait]
39+
impl TransactionEventSender for NoOpEventSender {
40+
async fn send_transaction_sent(&self, _message: TransactionSentEvent) {
41+
// Do nothing
42+
}
43+
44+
async fn send_transaction_confirmed(&self, _message: TransactionConfirmedEvent) {
45+
// Do nothing
46+
}
47+
}
48+
49+
pub type SharedEventSender = Arc<dyn TransactionEventSender>;

executors/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod eip7702_executor;
22
pub mod eoa;
33
pub mod external_bundler;
4+
pub mod kafka_integration;
45
pub mod transaction_registry;
56
pub mod webhook;

server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ vault-sdk = { workspace = true }
1313
vault-types = { workspace = true }
1414
engine-core = { path = "../core" }
1515
engine-aa-core = { path = "../aa-core" }
16+
engine-aa-types = { path = "../aa-types" }
1617
engine-executors = { path = "../executors" }
1718
twmq = { path = "../twmq" }
1819
thirdweb-core = { path = "../thirdweb-core" }
@@ -40,3 +41,5 @@ utoipa = { version = "5.4.0", features = [
4041
] }
4142
utoipa-axum = "0.2.0"
4243
utoipa-scalar = { version = "0.3.0", features = ["axum"] }
44+
rdkafka = { version = "0.36", features = ["ssl"] }
45+
async-trait = "0.1.83"

0 commit comments

Comments
 (0)