Skip to content

Commit 3af7dbd

Browse files
reduce max_inflight to 20 (#84)
1 parent c4cbaa1 commit 3af7dbd

File tree

2 files changed

+227
-1
lines changed

2 files changed

+227
-1
lines changed
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
#!/usr/bin/env bun
2+
3+
import Redis from "ioredis";
4+
5+
if (!process.env.REDIS_URL) {
6+
throw new Error("REDIS_URL is not set");
7+
}
8+
9+
// Configuration
10+
const CONFIG = {
11+
redisUrl: process.env.REDIS_URL,
12+
chainId: 8453,
13+
walletAddress: "0x80c08de1a05Df2bD633CF520754e40fdE3C794d3",
14+
fetchBatchSize: 1000, // batch size for fetching pending txs from sorted set
15+
deleteBatchSize: 100, // batch size for deletions
16+
dryRun: true, // Set to false to actually delete
17+
} as const;
18+
19+
class FailedPendingTxCleanup {
20+
private redis: Redis;
21+
private pendingTxsKey: string;
22+
private stats = {
23+
totalPendingTxs: 0,
24+
totalChecked: 0,
25+
failedTxsFound: 0,
26+
deleted: 0,
27+
errors: 0,
28+
};
29+
30+
constructor() {
31+
this.redis = new Redis(CONFIG.redisUrl);
32+
this.pendingTxsKey = `engine-cloud:eoa_executor:pending_txs:${CONFIG.chainId}:${CONFIG.walletAddress}`;
33+
}
34+
35+
async run(): Promise<void> {
36+
console.log(`🚀 Starting failed pending transaction cleanup (DRY_RUN: ${CONFIG.dryRun})`);
37+
console.log(`🎯 Target:`);
38+
console.log(` Chain ID: ${CONFIG.chainId}`);
39+
console.log(` Wallet: ${CONFIG.walletAddress}`);
40+
console.log(` Pending txs key: ${this.pendingTxsKey}`);
41+
console.log(` Fetch batch size: ${CONFIG.fetchBatchSize}`);
42+
console.log(` Delete batch size: ${CONFIG.deleteBatchSize}`);
43+
console.log("");
44+
45+
try {
46+
// Get total count of pending transactions
47+
const totalCount = await this.redis.zcard(this.pendingTxsKey);
48+
this.stats.totalPendingTxs = totalCount;
49+
50+
if (totalCount === 0) {
51+
console.log("✅ No pending transactions found");
52+
this.printFinalStats();
53+
return;
54+
}
55+
56+
console.log(`📊 Found ${totalCount.toLocaleString()} pending transactions`);
57+
console.log("");
58+
59+
// Process in batches
60+
await this.processPendingTxsInBatches();
61+
62+
this.printFinalStats();
63+
} catch (error) {
64+
console.error(`💥 Fatal error: ${error}`);
65+
throw error;
66+
} finally {
67+
await this.redis.quit();
68+
}
69+
}
70+
71+
private async processPendingTxsInBatches(): Promise<void> {
72+
let offset = 0;
73+
let batchNumber = 1;
74+
const failedTxIds: string[] = [];
75+
76+
while (true) {
77+
console.log(`🔍 Processing batch ${batchNumber} (offset: ${offset})`);
78+
79+
// Fetch batch of transaction IDs from sorted set (sorted by score ascending)
80+
const txIds = await this.redis.zrange(
81+
this.pendingTxsKey,
82+
offset,
83+
offset + CONFIG.fetchBatchSize - 1
84+
);
85+
86+
if (txIds.length === 0) {
87+
break;
88+
}
89+
90+
console.log(` Retrieved ${txIds.length} transaction IDs`);
91+
92+
// Check status for each transaction in parallel using pipeline
93+
const statuses = await this.checkTransactionStatuses(txIds);
94+
95+
// Collect failed transaction IDs
96+
for (let i = 0; i < txIds.length; i++) {
97+
const txId = txIds[i];
98+
const status = statuses[i];
99+
console.log(` txId: ${txId}, status: ${status}`);
100+
101+
this.stats.totalChecked++;
102+
103+
if (status === "failed" && txId) {
104+
failedTxIds.push(txId);
105+
this.stats.failedTxsFound++;
106+
}
107+
}
108+
109+
console.log(` Found ${statuses.filter(s => s === "failed").length} failed transactions in this batch`);
110+
111+
// Delete failed transactions if we've accumulated enough
112+
if (failedTxIds.length >= CONFIG.deleteBatchSize) {
113+
await this.deleteFailedTxsInBatches(failedTxIds);
114+
failedTxIds.length = 0; // Clear the array
115+
}
116+
117+
offset += CONFIG.fetchBatchSize;
118+
batchNumber++;
119+
console.log("");
120+
}
121+
122+
// Delete any remaining failed transactions
123+
if (failedTxIds.length > 0) {
124+
await this.deleteFailedTxsInBatches(failedTxIds);
125+
}
126+
}
127+
128+
private async checkTransactionStatuses(txIds: string[]): Promise<(string | null)[]> {
129+
try {
130+
const pipeline = this.redis.pipeline();
131+
132+
for (const txId of txIds) {
133+
const txDataKey = `engine-cloud:eoa_executor:tx_data:${txId}`;
134+
pipeline.hget(txDataKey, "status");
135+
}
136+
137+
const results = await pipeline.exec();
138+
139+
if (!results) {
140+
console.error(` ⚠️ Pipeline returned null results`);
141+
this.stats.errors += txIds.length;
142+
return new Array(txIds.length).fill(null);
143+
}
144+
145+
return results.map(([err, status], index) => {
146+
if (err) {
147+
console.error(` ⚠️ Error checking status for tx ${txIds[index]}: ${err}`);
148+
this.stats.errors++;
149+
return null;
150+
}
151+
return status as string | null;
152+
});
153+
} catch (error) {
154+
console.error(` 💥 Error in batch status check: ${error}`);
155+
this.stats.errors += txIds.length;
156+
return new Array(txIds.length).fill(null);
157+
}
158+
}
159+
160+
private async deleteFailedTxsInBatches(failedTxIds: string[]): Promise<void> {
161+
let offset = 0;
162+
163+
while (offset < failedTxIds.length) {
164+
const batch = failedTxIds.slice(offset, offset + CONFIG.deleteBatchSize);
165+
166+
if (CONFIG.dryRun) {
167+
console.log(` [DRY RUN] Would delete ${batch.length} failed transactions from pending set`);
168+
console.log(` [DRY RUN] Sample IDs: ${batch.slice(0, 3).join(", ")}${batch.length > 3 ? "..." : ""}`);
169+
this.stats.deleted += batch.length;
170+
} else {
171+
await this.deleteFailedTxs(batch);
172+
}
173+
174+
offset += CONFIG.deleteBatchSize;
175+
}
176+
}
177+
178+
private async deleteFailedTxs(txIds: string[]): Promise<void> {
179+
try {
180+
// Use ZREM to remove multiple members from the sorted set at once
181+
const deletedCount = await this.redis.zrem(this.pendingTxsKey, ...txIds);
182+
183+
console.log(` ✅ Deleted ${deletedCount} failed transactions from pending set`);
184+
185+
if (deletedCount < txIds.length) {
186+
const notFound = txIds.length - deletedCount;
187+
console.log(` ⚠️ ${notFound} transactions were not found in the set (may have been already removed)`);
188+
}
189+
190+
this.stats.deleted += deletedCount;
191+
} catch (error) {
192+
console.error(` 💥 Error deleting batch: ${error}`);
193+
this.stats.errors += txIds.length;
194+
}
195+
}
196+
197+
private printFinalStats(): void {
198+
console.log("📈 Final Statistics:");
199+
console.log(` Total Pending Transactions: ${this.stats.totalPendingTxs.toLocaleString()}`);
200+
console.log(` Total Checked: ${this.stats.totalChecked.toLocaleString()}`);
201+
console.log(` Failed Transactions Found: ${this.stats.failedTxsFound.toLocaleString()}`);
202+
console.log(` ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.deleted.toLocaleString()}`);
203+
if (this.stats.errors > 0) {
204+
console.log(` Errors: ${this.stats.errors}`);
205+
}
206+
console.log("");
207+
208+
if (CONFIG.dryRun) {
209+
console.log("💡 This was a DRY RUN - no data was actually deleted");
210+
console.log("💡 Set CONFIG.dryRun = false to actually delete the failed transactions");
211+
} else {
212+
console.log("✅ CLEANUP COMPLETED - Failed transactions have been permanently removed from pending set");
213+
}
214+
}
215+
}
216+
217+
// Main execution
218+
async function main() {
219+
const cleaner = new FailedPendingTxCleanup();
220+
await cleaner.run();
221+
}
222+
223+
if (import.meta.main) {
224+
main().catch(console.error);
225+
}
226+

server/src/queue/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl QueueManager {
257257
namespace: queue_config.execution_namespace.clone(),
258258
redis: redis_client.get_connection_manager().await?,
259259
authorization_cache,
260-
max_inflight: 100,
260+
max_inflight: 20,
261261
max_recycled_nonces: 50,
262262
eoa_metrics,
263263
kms_client_cache,

0 commit comments

Comments
 (0)