Skip to content

Commit 512eb26

Browse files
authored
feat: add queue for controlling concurrency & multi worker support for signature streams (#2611)
1 parent da52ccb commit 512eb26

File tree

17 files changed

+523
-187
lines changed

17 files changed

+523
-187
lines changed

backend/src/controllers/imap.controller.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ export default function initializeImapController(miningSources: MiningSources) {
9494
await upsertMiningSource(
9595
miningSources,
9696
userId,
97-
newToken,
97+
{
98+
...newToken,
99+
refresh_token: newToken.refresh_token ?? refreshToken
100+
},
98101
provider,
99102
data.email
100103
);

backend/src/emailSignatureWorker.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ if (ENV.SIGNATURE_OPENROUTER_API_KEY) {
5050
});
5151
}
5252

53-
const { processStreamData } = initializeEmailSignatureProcessor(
53+
const { processStreamData, handler } = initializeEmailSignatureProcessor(
5454
supabaseClient,
5555
new Signature(logger, signatureEngines),
5656
emailSignatureCache,
@@ -78,7 +78,8 @@ const emailsStreamConsumer = new EmailSignatureConsumer(
7878
ENV.REDIS_EMAIL_SIGNATURE_CONSUMER_BATCH_SIZE,
7979
processStreamData,
8080
redisClient,
81-
logger
81+
logger,
82+
handler
8283
);
8384

8485
(async () => {

backend/src/services/cache/EmailSignatureCache.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ export interface EmailSignatureWithMetadata {
1212
lastSeenDate: string;
1313
}
1414

15+
export interface SignatureProgress {
16+
received: number;
17+
}
18+
1519
export default interface EmailSignatureCache {
1620
/**
1721
* Set or update a signature for an email address and associate it with a mining operation
@@ -47,4 +51,24 @@ export default interface EmailSignatureCache {
4751
* @param miningId - The ID of the mining operation to clear
4852
*/
4953
clearCachedSignature(miningId: string): Promise<void>;
54+
55+
/**
56+
* Increments the received message count for a mining operation
57+
* @param miningId - The ID of the mining operation
58+
* @returns The new received count
59+
*/
60+
incrementReceived(miningId: string): Promise<number>;
61+
62+
/**
63+
* Gets current progress state without modifying it
64+
* @param miningId - The ID of the mining operation
65+
* @returns Current progress state
66+
*/
67+
getProgress(miningId: string): Promise<SignatureProgress>;
68+
69+
/**
70+
* Clears all progress tracking keys for a mining operation
71+
* @param miningId - The ID of the mining operation
72+
*/
73+
clearProgress(miningId: string): Promise<void>;
5074
}

backend/src/services/cache/redis/RedisEmailSignatureCache.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import Redis from 'ioredis';
22
import EmailSignatureCache, {
3-
EmailSignatureWithMetadata
3+
EmailSignatureWithMetadata,
4+
SignatureProgress
45
} from '../EmailSignatureCache';
56

67
export default class RedisEmailSignatureCache implements EmailSignatureCache {
@@ -102,4 +103,35 @@ export default class RedisEmailSignatureCache implements EmailSignatureCache {
102103
await this.client.del(miningKey);
103104
}
104105
}
106+
107+
/**
108+
* Increments the received message count for a mining operation
109+
* Uses Redis INCR for atomicity
110+
*/
111+
public async incrementReceived(miningId: string): Promise<number> {
112+
const key = `mining:${miningId}:received`;
113+
const count = await this.client.incr(key);
114+
return count;
115+
}
116+
117+
/**
118+
* Gets current progress state without modifying it
119+
*/
120+
public async getProgress(miningId: string): Promise<SignatureProgress> {
121+
const [received] = await this.client.mget(`mining:${miningId}:received`);
122+
123+
return {
124+
received: parseInt(received || '0', 10)
125+
};
126+
}
127+
128+
/**
129+
* Clears all progress tracking keys for a mining operation
130+
*/
131+
public async clearProgress(miningId: string): Promise<void> {
132+
await this.client.del(
133+
`mining:${miningId}:received`,
134+
`mining:${miningId}:total`
135+
);
136+
}
105137
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
const TIMEOUT = 10_000; // Default API request timeout
2-
export default TIMEOUT;
1+
export const TIMEOUT = 10_000; // Default API request timeout
2+
export const ZEROBOUNCE_TIMEOUT = 30_000; // Default API request timeout

backend/src/services/email-status/mailercheck/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import axios, { AxiosInstance } from 'axios';
22
import { Logger } from 'winston';
33
import { logError } from '../../../utils/axios';
44
import { IRateLimiter } from '../../rate-limiter';
5-
import TIMEOUT from '../constants';
5+
import { TIMEOUT } from '../constants';
66

77
export default class MailerCheckClient {
88
private static readonly baseURL = 'https://app.mailercheck.com/api/';

backend/src/services/email-status/reacher/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import axios, { AxiosError, AxiosInstance } from 'axios';
22
import { Logger } from 'winston';
33
import { logError } from '../../../utils/axios';
44
import { IRateLimiter } from '../../rate-limiter';
5-
import TIMEOUT from '../constants';
5+
import { TIMEOUT } from '../constants';
66

77
interface BulkSubmitResponse {
88
job_id: string;

backend/src/services/email-status/zerobounce/client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import axios, { AxiosInstance } from 'axios';
33
import assert from 'node:assert';
44
import { logError } from '../../../utils/axios';
55
import { IRateLimiter } from '../../rate-limiter';
6-
import TIMEOUT from '../constants';
6+
import { ZEROBOUNCE_TIMEOUT } from '../constants';
77

88
/**
99
* Configuration options for ZerobounceClient.
@@ -124,7 +124,7 @@ export default class ZerobounceClient {
124124
) {
125125
this.api = axios.create({
126126
baseURL: config.url ?? ZerobounceClient.baseURL,
127-
timeout: TIMEOUT
127+
timeout: ZEROBOUNCE_TIMEOUT
128128
});
129129
}
130130

backend/src/services/rate-limiter/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ export class TokenBucketRateLimiter implements IRateLimiter {
5454
executeEvenly,
5555
distribution
5656
});
57+
58+
logger.debug('Token bucket rate limiter initialized', {
59+
intervalSeconds,
60+
requests,
61+
executeEvenly,
62+
distribution,
63+
uniqueKey
64+
});
65+
5766
this.limiter = new RateLimiterQueue(baseLimiter);
5867
}
5968

backend/src/services/signature/llm/index.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ type OpenRouterResponse = {
172172
export class SignatureLLM implements ExtractSignature {
173173
LLM_ENDPOINT = 'https://openrouter.ai/api/v1/chat/completions';
174174

175+
MAX_OUTPUT_TOKENS = 1000;
176+
175177
private active = true;
176178

177179
constructor(
@@ -210,13 +212,17 @@ export class SignatureLLM implements ExtractSignature {
210212
content: SignaturePrompt.buildUserPrompt(email, signature)
211213
}
212214
],
213-
response_format: SignaturePrompt.response_format
215+
response_format: SignaturePrompt.response_format,
216+
max_tokens: this.MAX_OUTPUT_TOKENS
214217
});
215218
}
216219

217220
private handleResponseError(error: OpenRouterError['error']) {
218221
if ([402, 502, 503].includes(error.code)) {
219222
this.active = false;
223+
this.logger.warn(
224+
'LLM engine down — check credits balance or expired/invalid API token'
225+
);
220226
}
221227
throw new Error(error.message);
222228
}
@@ -279,9 +285,15 @@ export class SignatureLLM implements ExtractSignature {
279285
try {
280286
const content = await this.sendPrompt(email, signature);
281287

282-
this.logger.debug(`extract signature content: ${content}`);
288+
if (!content || content.toLowerCase() === 'null') {
289+
this.logger.debug('Received empty or null response. skipping parse');
290+
return null;
291+
}
283292

284-
if (!content || content.toLowerCase() === 'null') return null;
293+
if (content.length > this.MAX_OUTPUT_TOKENS) {
294+
this.logger.debug('Model response was too large, skipping parse');
295+
return null;
296+
}
285297

286298
const parsed = JSON.parse(content);
287299
const person = Array.isArray(parsed) ? parsed[0] : parsed;

0 commit comments

Comments
 (0)