Skip to content

Commit 36cecb6

Browse files
authored
add: fetch using concurrent connection & better sse management (#2368)
1 parent 864b344 commit 36cecb6

File tree

11 files changed

+582
-155
lines changed

11 files changed

+582
-155
lines changed

.env.master.dev

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ FRONTEND_HOST = http://localhost:8082 # ( REQUIRED )
2121
LEADMINER_API_PORT = 8081 # ( REQUIRED )
2222
LEADMINER_API_HOST = http://localhost:8081 # ( REQUIRED )
2323

24-
LEADMINER_FETCH_BATCH_SIZE = 100 # ( REQUIRED ) Sends notification every x processed items to the frontend
2524
LEADMINER_API_HASH_SECRET = change_me # ( REQUIRED ) Used for hashing secrets
2625
LEADMINER_MINING_ID_GENERATOR_LENGTH = 10 # ( REQUIRED ) Length of the task id
2726

27+
FETCHING_BATCH_SIZE_TO_SEND = 100 # ( REQUIRED ) Sends notification every x processed items to the frontend
28+
FETCHING_CHUNK_SIZE_PER_CONNECTION = 10000
29+
FETCHING_MAX_CONNECTIONS_PER_FOLDER = 5
30+
2831
## LOGGING ##
2932
# GRAFANA_LOKI_HOST = # Use Loki transport for logging (Default is console)
3033
LEADMINER_API_LOG_LEVEL = debug # Logging level (debug, info, notice, warning...)

.env.master.prod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ FRONTEND_HOST = # ( REQUIRED )
1919
LEADMINER_API_PORT = 8081 # ( REQUIRED )
2020
LEADMINER_API_HOST = # ( REQUIRED )
2121

22-
LEADMINER_FETCH_BATCH_SIZE = 100 # ( REQUIRED ) Sends notification every x processed items to the frontend
2322
LEADMINER_API_HASH_SECRET = change_me # ( REQUIRED ) Used for hashing secrets
2423
LEADMINER_MINING_ID_GENERATOR_LENGTH = 10 # ( REQUIRED ) Length of the task id
2524

25+
FETCHING_BATCH_SIZE_TO_SEND = 100 # ( REQUIRED ) Sends notification every x processed items to the frontend
26+
FETCHING_CHUNK_SIZE_PER_CONNECTION = 10000
27+
FETCHING_MAX_CONNECTIONS_PER_FOLDER = 5
28+
2629
## LOGGING ##
2730
GRAFANA_LOKI_HOST = # Use Loki transport for logging (Default is console)
2831
LEADMINER_API_LOG_LEVEL = debug # Logging level (debug, info, notice, warning...)

backend/package-lock.json

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"mailparser": "^3.7.4",
5757
"nanoid": "^3.3.11",
5858
"nodemon": "^3.1.10",
59+
"p-limit": "^7.1.0",
5960
"pg": "^8.16.1",
6061
"pg-format": "^1.0.4",
6162
"planer": "^1.2.0",

backend/src/config/schema.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ const schema = z.object({
1010
LEADMINER_API_HOST: z.string().url(),
1111
LEADMINER_API_HASH_SECRET: z.string().min(1),
1212
LEADMINER_MINING_ID_GENERATOR_LENGTH: number(),
13-
LEADMINER_FETCH_BATCH_SIZE: number(),
1413
FRONTEND_HOST: z.string().url(),
1514

15+
/* FETCHING */
16+
FETCHING_BATCH_SIZE_TO_SEND: number(),
17+
FETCHING_CHUNK_SIZE_PER_CONNECTION: number(),
18+
FETCHING_MAX_CONNECTIONS_PER_FOLDER: number(),
19+
1620
/* IMAP */
1721
IMAP_AUTH_TIMEOUT: number(),
1822
IMAP_CONNECTION_TIMEOUT: number(),

backend/src/controllers/mining.controller.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ export default function initializeMiningController(
135135

136136
const sanitizedHost = sanitizeImapInput(host);
137137
const sanitizedEmail = sanitizeImapInput(email);
138-
const sanitizedPassword = sanitizeImapInput(password);
138+
const sanitizedPassword = password;
139139

140140
try {
141141
// Validate & Get the valid IMAP login connection before creating the pool.
@@ -262,7 +262,7 @@ export default function initializeMiningController(
262262
boxes: sanitizedFolders,
263263
userId: user.id,
264264
email: miningSourceCredentials.email,
265-
batchSize: ENV.LEADMINER_FETCH_BATCH_SIZE,
265+
batchSize: ENV.FETCHING_BATCH_SIZE_TO_SEND,
266266
fetchEmailBody: ENV.IMAP_FETCH_BODY
267267
};
268268
miningTask = await tasksManager.createTask(imapEmailsFetcherOptions);

backend/src/controllers/stream.controller.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,20 @@ export default function initializeStreamController(
2525
logger.debug(`Attaching sse connection for taskId ${taskId}`);
2626
manager.attachSSE(taskId, { req, res });
2727
} catch (error) {
28+
res.setHeader('Content-Type', 'text/event-stream');
29+
res.setHeader('Cache-Control', 'no-cache');
30+
res.setHeader('Connection', 'keep-alive');
31+
2832
res.status(404);
29-
res.write('id: 0\n');
33+
res.write('id: 404-not-found\n');
3034
res.write('event: close\n');
3135
res.write(`data: ${JSON.stringify((error as Error).message)}\n\n`);
3236
res.flushHeaders();
3337
res.end();
3438
}
3539

3640
req.on('close', async () => {
37-
try {
38-
logger.debug(
39-
`Closing sse connection and deleting task for taskId ${taskId}`
40-
);
41-
await manager.deleteTask(taskId, null);
42-
} catch (error) {
43-
logger.error(
44-
`Error when disconnecting from the stream with miningId ${taskId}`,
45-
error
46-
);
47-
}
41+
logger.warn(`SSE Connection lost for mining task with id: ${taskId}`);
4842
});
4943
}
5044
};

0 commit comments

Comments
 (0)