Skip to content

Commit b470d8e

Browse files
authored
Refactored imapEmailFetcher to run as an independent process (#2458)
1 parent 747c354 commit b470d8e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+14612
-171
lines changed

backend/package-lock.json

Lines changed: 21 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
"google-auth-library": "^9.15.1",
4848
"html-entities": "^2.6.0",
4949
"imap": "^0.8.19",
50-
"imapflow": "^1.0.195",
50+
"imapflow": "^1.0.196",
5151
"ioredis": "^5.7.0",
5252
"is-url-http": "^2.3.10",
5353
"jsonwebtoken": "^9.0.2",
@@ -90,7 +90,7 @@
9090
"@types/ioredis-mock": "^8.2.5",
9191
"@types/jsonwebtoken": "^9.0.7",
9292
"@types/mailparser": "^3.4.6",
93-
"@types/node": "^22.10.5",
93+
"@types/node": "^22.18.8",
9494
"@types/pg": "^8.11.10",
9595
"@types/pg-format": "^1.0.5",
9696
"@types/quoted-printable": "^1.0.2",

backend/src/config/schema.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const schema = z.object({
1313
FRONTEND_HOST: z.string().url(),
1414

1515
/* FETCHING */
16+
EMAIL_FETCHING_SERVICE_URL: z.string().min(1),
17+
EMAIL_FETCHING_SERVICE_API_TOKEN: z.string().min(8),
18+
// TODO: CONSISTENT PREFIX FOR FETCHING RELATED VARS
1619
FETCHING_BATCH_SIZE_TO_SEND: number(),
1720
FETCHING_CHUNK_SIZE_PER_CONNECTION: number(),
1821
FETCHING_MAX_CONNECTIONS_PER_FOLDER: number(),

backend/src/controllers/imap.controller.ts

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import { User } from '@supabase/supabase-js';
22
import { NextFunction, Request, Response } from 'express';
3-
import { ImapFlow as Connection } from 'imapflow';
43
import {
5-
ImapMiningSourceCredentials,
64
MiningSources,
75
OAuthMiningSourceCredentials,
86
OAuthMiningSourceProvider
@@ -23,23 +21,6 @@ type NewToken = {
2321
expires_at: number;
2422
};
2523

26-
async function getImapConnectionProvider(
27-
data: OAuthMiningSourceCredentials | ImapMiningSourceCredentials
28-
) {
29-
if ('accessToken' in data) {
30-
const connection = await new ImapConnectionProvider(data.email).withOAuth(
31-
data
32-
);
33-
return connection;
34-
}
35-
return new ImapConnectionProvider(data.email).withPassword(
36-
data.host,
37-
data.password,
38-
data.tls,
39-
data.port
40-
);
41-
}
42-
4324
function getTokenAndProvider(data: OAuthMiningSourceCredentials) {
4425
const { provider, accessToken, refreshToken, expiresAt } = data;
4526
const client = provider === 'azure' ? azureOAuth2Client : googleOAuth2Client;
@@ -76,9 +57,6 @@ async function upsertMiningSource(
7657
export default function initializeImapController(miningSources: MiningSources) {
7758
return {
7859
async getImapBoxes(req: Request, res: Response, next: NextFunction) {
79-
let imapConnectionProvider: ImapConnectionProvider | null = null;
80-
let imapConnection: Connection | null = null;
81-
8260
const { email } = req.body;
8361

8462
const errors = [validateType('email', email, 'string')].filter(Boolean);
@@ -124,12 +102,25 @@ export default function initializeImapController(miningSources: MiningSources) {
124102
}
125103
}
126104

127-
imapConnectionProvider = await getImapConnectionProvider(data);
128-
imapConnection = await imapConnectionProvider.acquireConnection();
105+
const imapConnection = await ImapConnectionProvider.getSingleConnection(
106+
email,
107+
'accessToken' in data
108+
? {
109+
oauthToken: data.accessToken
110+
}
111+
: {
112+
host: data.host,
113+
password: data.password,
114+
tls: data.tls,
115+
port: data.port
116+
}
117+
);
129118

130-
const imapBoxesFetcher = new ImapBoxesFetcher(imapConnectionProvider);
119+
const imapBoxesFetcher = new ImapBoxesFetcher(imapConnection, logger);
131120
const tree: any = await imapBoxesFetcher.getTree(data.email);
132121

122+
await imapConnection.logout();
123+
133124
logger.info('Mining IMAP tree succeeded.', {
134125
metadata: {
135126
user: hashEmail(data.email, userId)
@@ -140,6 +131,10 @@ export default function initializeImapController(miningSources: MiningSources) {
140131
data: { message: 'IMAP folders fetched successfully!', folders: tree }
141132
});
142133
} catch (error: any) {
134+
logger.error(`Error during inbox fetch: ${(error as Error).message}`, {
135+
error
136+
});
137+
143138
if ([502, 503].includes(error?.output?.payload?.statusCode)) {
144139
return res
145140
.status(error?.output?.payload?.statusCode)
@@ -151,11 +146,6 @@ export default function initializeImapController(miningSources: MiningSources) {
151146
return res.status(generatedError.status).send(generatedError);
152147
}
153148
return next(generatedError);
154-
} finally {
155-
if (imapConnection) {
156-
await imapConnectionProvider?.releaseConnection(imapConnection);
157-
}
158-
await imapConnectionProvider?.cleanPool();
159149
}
160150
}
161151
};

backend/src/controllers/mining.controller.ts

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import {
77
OAuthMiningSourceProvider
88
} from '../db/interfaces/MiningSources';
99
import { ContactFormat } from '../services/extractors/engines/FileImport';
10-
import ImapConnectionProvider from '../services/imap/ImapConnectionProvider';
11-
import { ImapEmailsFetcherOptions } from '../services/imap/types';
1210
import TaskManagerFile from '../services/tasks-manager/TaskManagerFile';
1311
import TasksManager from '../services/tasks-manager/TasksManager';
1412
import { ImapAuthError } from '../utils/errors';
@@ -241,44 +239,15 @@ export default function initializeMiningController(
241239
});
242240
}
243241

244-
const imapConnectionProvider =
245-
'accessToken' in miningSourceCredentials
246-
? await new ImapConnectionProvider(
247-
miningSourceCredentials.email
248-
).withOAuth(miningSourceCredentials, {
249-
miningSources,
250-
userId: user.id
251-
})
252-
: new ImapConnectionProvider(
253-
miningSourceCredentials.email
254-
).withPassword(
255-
miningSourceCredentials.host,
256-
miningSourceCredentials.password,
257-
miningSourceCredentials.tls,
258-
miningSourceCredentials.port
259-
);
260-
261-
let imapConnection = null;
262-
let miningTask = null;
263-
264242
try {
265-
// Connect to validate connection before creating the pool.
266-
imapConnection = await imapConnectionProvider.acquireConnection();
267-
const imapEmailsFetcherOptions: ImapEmailsFetcherOptions = {
268-
imapConnectionProvider,
243+
const miningTask = await tasksManager.createTask({
269244
boxes: sanitizedFolders,
270245
userId: user.id,
271246
email: miningSourceCredentials.email,
272-
batchSize: ENV.FETCHING_BATCH_SIZE_TO_SEND,
273247
fetchEmailBody: extractSignatures && ENV.IMAP_FETCH_BODY
274-
};
275-
miningTask = await tasksManager.createTask(imapEmailsFetcherOptions);
248+
});
249+
return res.status(201).send({ error: null, data: miningTask });
276250
} catch (err) {
277-
if (imapConnection) {
278-
await imapConnectionProvider.releaseConnection(imapConnection);
279-
}
280-
await imapConnectionProvider.cleanPool();
281-
282251
if (
283252
err instanceof Error &&
284253
err.message.toLowerCase().startsWith('invalid credentials')
@@ -298,8 +267,6 @@ export default function initializeMiningController(
298267
res.status(500);
299268
return next(new Error(newError.message));
300269
}
301-
302-
return res.status(201).send({ error: null, data: miningTask });
303270
},
304271

305272
async startMiningFile(req: Request, res: Response, next: NextFunction) {

backend/src/server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import PgContacts from './db/pg/PgContacts';
77
import PgMiningSources from './db/pg/PgMiningSources';
88
import SupabaseUsers from './db/supabase/users';
99
import SupabaseAuthResolver from './services/auth/SupabaseAuthResolver';
10-
import EmailFetcherFactory from './services/factory/EmailFetcherFactory';
1110
import SSEBroadcasterFactory from './services/factory/SSEBroadcasterFactory';
1211
import TasksManager from './services/tasks-manager/TasksManager';
1312
import { flickrBase58IdGenerator } from './services/tasks-manager/utils';
@@ -16,6 +15,8 @@ import redis from './utils/redis';
1615
import supabaseClient from './utils/supabase';
1716
import SupabaseTasks from './db/supabase/tasks';
1817
import TasksManagerFile from './services/tasks-manager/TaskManagerFile';
18+
import EmailFetcherClient from './services/email-fetching';
19+
import EmailFetcherFactory from './services/factory/EmailFetcherFactory';
1920

2021
// eslint-disable-next-line no-console
2122
console.log(
@@ -47,7 +48,11 @@ console.log(
4748
tasksResolver,
4849
redis.getSubscriberClient(),
4950
redis.getClient(),
50-
new EmailFetcherFactory(),
51+
new EmailFetcherClient(
52+
logger,
53+
ENV.EMAIL_FETCHING_SERVICE_API_TOKEN,
54+
ENV.EMAIL_FETCHING_SERVICE_URL
55+
),
5156
new SSEBroadcasterFactory(),
5257
flickrBase58IdGenerator()
5358
);
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// src/clients/EmailFetcherClient.ts
2+
import { Logger } from 'winston';
3+
import axios, { AxiosInstance } from 'axios';
4+
5+
export interface FetchStartPayload {
6+
userId: string;
7+
miningId: string;
8+
email: string;
9+
boxes: string[];
10+
extractSignatures: boolean;
11+
contactStream: string;
12+
signatureStream: string;
13+
}
14+
15+
export interface FetchStopPayload {
16+
miningId: string;
17+
canceled: boolean;
18+
}
19+
20+
class EmailFetcherClient {
21+
private client: AxiosInstance;
22+
23+
constructor(
24+
private readonly logger: Logger,
25+
apiToken: string,
26+
baseUrl: string
27+
) {
28+
this.client = axios.create({
29+
baseURL: baseUrl,
30+
headers: { 'Content-Type': 'application/json', 'x-api-token': apiToken }
31+
});
32+
}
33+
34+
/**
35+
* Start IMAP fetch job
36+
*/
37+
async startFetch(payload: FetchStartPayload) {
38+
try {
39+
const { data } = await this.client.post('api/imap/fetch/start', payload);
40+
return data;
41+
} catch (error) {
42+
this.logger.error('Start fetching request failed', { error, payload });
43+
throw error;
44+
}
45+
}
46+
47+
/**
48+
* Stop IMAP fetch job
49+
*/
50+
async stopFetch(payload: FetchStopPayload) {
51+
try {
52+
const { data } = await this.client.delete('api/imap/fetch/stop', {
53+
data: payload
54+
});
55+
return data;
56+
} catch (error) {
57+
this.logger.error('Stop fetching request with failed', {
58+
error,
59+
payload
60+
});
61+
throw error;
62+
}
63+
}
64+
}
65+
66+
export default EmailFetcherClient;

0 commit comments

Comments
 (0)