Skip to content

Commit a101bd5

Browse files
authored
signature, imap fetch refactoring & unittest (#2510)
1 parent a6124ed commit a101bd5

File tree

14 files changed

+968
-534
lines changed

14 files changed

+968
-534
lines changed

backend/src/services/cache/EmailSignatureCache.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export interface EmailSignatureWithMetadata {
77
userId: string;
88
signature: string;
99
email: string;
10+
messageId: string;
1011
firstSeenDate: string;
1112
lastSeenDate: string;
1213
}
@@ -24,6 +25,7 @@ export default interface EmailSignatureCache {
2425
userId: string,
2526
email: string,
2627
signature: string,
28+
messageId: string,
2729
messageDate: string,
2830
miningId: string
2931
): Promise<void>;

backend/src/services/cache/redis/RedisEmailSignatureCache.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export default class RedisEmailSignatureCache implements EmailSignatureCache {
1414
userId: string,
1515
email: string,
1616
signature: string,
17+
messageId: string,
1718
messageDate: string,
1819
miningId: string
1920
): Promise<void> {
@@ -33,6 +34,7 @@ export default class RedisEmailSignatureCache implements EmailSignatureCache {
3334
signature,
3435
firstSeenDate,
3536
lastSeenDate: messageDateISO,
37+
messageId,
3638
userId,
3739
email
3840
});
@@ -77,6 +79,7 @@ export default class RedisEmailSignatureCache implements EmailSignatureCache {
7779
signature: data.signature,
7880
firstSeenDate: data.firstSeenDate,
7981
lastSeenDate: data.lastSeenDate,
82+
messageId: data.messageId,
8083
userId: data.userId,
8184
email: data.email
8285
};

backend/src/services/signature/llm/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ export class SignatureLLM implements ExtractSignature {
244244

245245
return (data as OpenRouterResponse).choices?.[0]?.message?.content;
246246
} catch (err) {
247-
this.logger.error('SignatureExtractionLLM error:', { error: err });
247+
this.logger.error(
248+
`SignatureExtractionLLM error: ${(err as Error).message}`,
249+
{ error: err }
250+
);
248251
return null;
249252
}
250253
}

backend/src/workers/email-signature/handler.ts

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import planer from 'planer';
77
import EmailSignatureCache from '../../services/cache/EmailSignatureCache';
88
import { Contact } from '../../db/types';
99
import logger from '../../utils/logger';
10-
import { isUsefulSignatureContent, pushNotificationDB } from './utils';
10+
import {
11+
isUsefulSignatureContent,
12+
pushNotificationDB,
13+
upsertSignaturesDB
14+
} from './utils';
1115
import { ExtractSignature } from '../../services/signature/types';
1216
import { DomainStatusVerificationFunction } from '../../services/extractors/engines/EmailMessage';
1317
import { CleanQuotedForwardedReplies } from '../../utils/helpers/emailParsers';
@@ -79,11 +83,13 @@ export class EmailSignatureProcessor {
7983
contacts: Partial<Contact>[] | null;
8084
}> {
8185
const { userId, miningId, data: payload } = data;
82-
const { from, messageDate } = payload.header ?? {};
86+
const { from, messageDate, rawHeader } = payload.header ?? {};
8387

8488
const shouldProcess = await this.isWorthProcessing(data);
8589

8690
if (shouldProcess) {
91+
const [messageId] = rawHeader['message-id'];
92+
8793
this.logging.debug('Processing new signature', {
8894
userId,
8995
miningId,
@@ -96,6 +102,7 @@ export class EmailSignatureProcessor {
96102
miningId,
97103
from?.address,
98104
payload.body,
105+
messageId,
99106
messageDate
100107
);
101108
}
@@ -110,24 +117,36 @@ export class EmailSignatureProcessor {
110117

111118
if (extracted.length) {
112119
try {
120+
const signatures = extracted.map(
121+
([personEmail, messageId, rawSignature, extractedSignature]) => ({
122+
userId,
123+
personEmail,
124+
messageId,
125+
rawSignature,
126+
extractedSignature,
127+
details: { miningId }
128+
})
129+
);
130+
131+
await upsertSignaturesDB(this.supabase, signatures);
132+
113133
await pushNotificationDB(this.supabase, {
114134
userId,
115135
type: 'signature',
116136
details: {
117-
extracted,
118137
signatures: extracted.length
119138
}
120139
});
121140
} catch (err) {
122141
this.logging.error(
123-
`Error when pushing notifications: ${(err as Error).message}`,
142+
`Error when inserting signatures/notifications: ${(err as Error).message}`,
124143
err
125144
);
126145
}
127146
}
128147
return {
129148
finished: true,
130-
contacts: extracted
149+
contacts: extracted.map(([, , , contact]) => contact)
131150
};
132151
}
133152

@@ -136,6 +155,7 @@ export class EmailSignatureProcessor {
136155
miningId: string,
137156
email: string,
138157
body: string,
158+
messageId: string,
139159
messageDate: string
140160
): Promise<void> {
141161
const signature = this.extractSignature(body);
@@ -158,7 +178,14 @@ export class EmailSignatureProcessor {
158178
return;
159179
}
160180

161-
await this.cache.set(userId, email, signature, messageDate, miningId);
181+
await this.cache.set(
182+
userId,
183+
email,
184+
signature,
185+
messageId,
186+
messageDate,
187+
miningId
188+
);
162189
this.logging.info('Cached new signature', {
163190
email,
164191
miningId,
@@ -170,7 +197,7 @@ export class EmailSignatureProcessor {
170197
private async handleBatchUpdate(
171198
userId: string,
172199
miningId: string
173-
): Promise<Partial<Contact>[]> {
200+
): Promise<[string, string, string, Partial<Contact>][]> {
174201
this.logging.debug('handleBatchUpdate()', { userId, miningId });
175202

176203
const all = await this.cache.getAllFromMining(miningId);
@@ -180,25 +207,31 @@ export class EmailSignatureProcessor {
180207
return [];
181208
}
182209

183-
const contacts: (Partial<Contact> | undefined)[] = await Promise.all(
184-
all.map(async ({ email, signature }) => {
185-
try {
186-
const contact = await this.extractContact(userId, email, signature);
187-
if (contact) {
188-
await this.upsertContact(contact);
189-
return contact;
210+
const contacts: ([string, string, string, Partial<Contact>] | undefined)[] =
211+
await Promise.all(
212+
all.map(async ({ email, signature, messageId }) => {
213+
try {
214+
const contact = await this.extractContact(userId, email, signature);
215+
if (contact) {
216+
await this.upsertContact(contact);
217+
return [email, messageId, signature, contact];
218+
}
219+
return undefined;
220+
} catch (err) {
221+
this.logging.error('Error on extract/insert contact', err);
222+
return undefined;
190223
}
191-
return undefined;
192-
} catch (err) {
193-
this.logging.error('Error on extract/insert contact', err);
194-
return undefined;
195-
}
196-
})
197-
);
224+
})
225+
);
198226

199227
await this.cache.clearCachedSignature(miningId);
200228

201-
const successfulContacts = contacts.filter(Boolean) as Contact[];
229+
const successfulContacts = contacts.filter((c) => c && contacts.length) as [
230+
string,
231+
string,
232+
string,
233+
Partial<Contact>
234+
][];
202235

203236
this.logging.info('Batch complete - cache cleared', {
204237
miningId,

backend/src/workers/email-signature/utils.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { SupabaseClient } from '@supabase/supabase-js';
2-
import { Contact } from '../../db/types';
32

43
export type NotificationType = 'enrich' | 'clean' | 'extract' | 'signature';
54

@@ -8,10 +7,44 @@ export interface NotificationPayload {
87
type: NotificationType;
98
details: {
109
signatures: number;
11-
extracted: Partial<Contact>[];
1210
};
1311
}
1412

13+
export interface SignaturePayload {
14+
userId: string;
15+
personEmail: string;
16+
messageId: string;
17+
rawSignature?: string | null;
18+
extractedSignature?: Record<string, string> | null;
19+
details?: Record<string, unknown> | null;
20+
}
21+
22+
/**
23+
* Bulk upsert signatures into `private.signatures`.
24+
* If (user_id, person_email) already exists, updates all fields.
25+
*/
26+
export async function upsertSignaturesDB(
27+
supabase: SupabaseClient,
28+
signatures: SignaturePayload[]
29+
) {
30+
const rows = signatures.map((s) => ({
31+
user_id: s.userId,
32+
person_email: s.personEmail,
33+
message_id: s.messageId,
34+
raw_signature: s.rawSignature,
35+
extracted_signature: s.extractedSignature,
36+
details: s.details
37+
}));
38+
39+
const { error } = await supabase
40+
.schema('private')
41+
.from('signatures')
42+
.upsert(rows, {
43+
onConflict: 'user_id,person_email'
44+
});
45+
if (error) throw error;
46+
}
47+
1548
/**
1649
* Push a new notification to the Supabase `notifications` table.
1750
*/

0 commit comments

Comments
 (0)