@@ -74,9 +74,9 @@ function buildSequenceRanges(total: number, chunkSize: number): string[] {
7474}
7575
7676export default class ImapEmailsFetcher {
77- private readonly CONNECTION_TIMEOUT_MS = 10000 ;
77+ private readonly CONNECTION_TIMEOUT_MS = 20000 ;
7878
79- private readonly EMAIL_TEXT_MAX_LENGTH = 4000 ;
79+ private readonly EMAIL_TEXT_MAX_LENGTH = 3000 ;
8080
8181 private readonly userIdentifier: string;
8282
@@ -90,6 +90,8 @@ export default class ImapEmailsFetcher {
9090
9191 private isCanceled = false;
9292
93+ private hasAuthFailureLogged = false;
94+
9395 private activeConnections = new Set<Connection>();
9496
9597 private readonly bodies = ['HEADER'];
@@ -124,7 +126,7 @@ export default class ImapEmailsFetcher {
124126 this.processSetKey = `caching:${miningId}`;
125127
126128 if (this.fetchEmailBody) {
127- this.bodies.push('');
129+ this.bodies.push('TEXT ');
128130 }
129131 }
130132
@@ -182,6 +184,30 @@ export default class ImapEmailsFetcher {
182184 }
183185 }
184186
187+ /**
188+ * Checks if an error represents a fatal authentication failure.
189+ * @param error - The error to check
190+ * @returns True if this is an authentication failure
191+ */
192+ private static isAuthFailure(error: unknown): boolean {
193+ if (typeof error !== 'object' || error === null) {
194+ return false;
195+ }
196+
197+ const err = error as Record<string, unknown>;
198+
199+ return (
200+ err.authenticationFailed === true ||
201+ err.serverResponseCode === 'AUTHENTICATIONFAILED' ||
202+ (err.responseStatus === 'NO' &&
203+ typeof err.responseText === 'string' &&
204+ err.responseText.includes('Invalid credentials')) ||
205+ (typeof err.message === 'string' &&
206+ (err.message.includes('AUTHENTICATIONFAILED') ||
207+ err.message.includes('Invalid credentials')))
208+ );
209+ }
210+
185211 /**
186212 * Fetches the total number of messages across the specified folders on an IMAP server.
187213 */
@@ -250,7 +276,7 @@ export default class ImapEmailsFetcher {
250276 for (const folder of this.folders) {
251277 if (this.isCanceled) {
252278 logger.info(
253- `[${this.miningId}] Cancellation detected after processing folder ${folder}. No further folders will be processed. `
279+ `[${this.miningId}] Cancellation requested; stopping before folder ${folder}`
254280 );
255281 break;
256282 }
@@ -265,10 +291,22 @@ export default class ImapEmailsFetcher {
265291 // eslint-disable-next-line no-await-in-loop
266292 await this.process;
267293 } catch (error) {
268- logger.error(
269- `[${this.miningId}] Error when fetching emails from folder ${folder}:`,
270- error
271- );
294+ if (ImapEmailsFetcher.isAuthFailure(error)) {
295+ if (!this.hasAuthFailureLogged) {
296+ logger.error(
297+ `[${this.miningId}] Authentication failed; aborting mining task`,
298+ error
299+ );
300+ this.hasAuthFailureLogged = true;
301+ }
302+ this.isCanceled = true;
303+ break;
304+ } else {
305+ logger.error(
306+ `[${this.miningId}] Error when fetching emails from folder ${folder}:`,
307+ error
308+ );
309+ }
272310 }
273311 }
274312
@@ -313,8 +351,15 @@ export default class ImapEmailsFetcher {
313351 source: false,
314352 envelope: true,
315353 headers: true,
316- bodyParts: ['HEADER', 'TEXT']
354+ bodyParts: this.bodies
317355 })) {
356+ if (this.isCanceled) {
357+ logger.info(
358+ `[${this.miningId}] Cancellation detected; stopping range ${range}`
359+ );
360+ break;
361+ }
362+
318363 processedCount += 1;
319364
320365 const { seq, headers, envelope } = msg;
@@ -385,7 +430,7 @@ export default class ImapEmailsFetcher {
385430 })
386431 );
387432
388- if (text && from && date) {
433+ if (text.length && from && date) {
389434 pipeline.xadd(
390435 this.signatureStream,
391436 '*',
@@ -418,15 +463,15 @@ export default class ImapEmailsFetcher {
418463
419464 header = null;
420465
421- if (pipeline.length >= parallelBatchSize) {
466+ if (publishedEmails >= parallelBatchSize) {
422467 await pipeline.exec();
423468 await publishFetchingProgress(this.miningId, publishedEmails);
424469 pipeline = redisClient.multi();
425470 publishedEmails = 0;
426471 }
427472 }
428473
429- if (pipeline.length ) {
474+ if (publishedEmails > 0 ) {
430475 await pipeline.exec();
431476 await publishFetchingProgress(this.miningId, publishedEmails);
432477 }
@@ -558,6 +603,13 @@ export default class ImapEmailsFetcher {
558603 folderPath: string,
559604 totalInFolder: number
560605 ): Promise<void> {
606+ if (this.isCanceled) {
607+ logger.info(
608+ `[${this.miningId}] Cancellation detected; skipping large folder processing for ${folderPath}`
609+ );
610+ return;
611+ }
612+
561613 logger.info(
562614 `[${this.miningId}] Parallel fetching ${totalInFolder} messages from ${folderPath}`
563615 );
@@ -600,6 +652,13 @@ export default class ImapEmailsFetcher {
600652
601653 try {
602654 for (let i = 0; i < ranges.length; i += connections.length) {
655+ if (this.isCanceled) {
656+ logger.info(
657+ `[${this.miningId}] Cancellation detected; stopping batch processing for ${folderPath}`
658+ );
659+ break;
660+ }
661+
603662 const batch = ranges.slice(i, i + connections.length);
604663 const batchTasks = batch.map((range, index) => {
605664 const connection = connections[index];
0 commit comments