Skip to content

Commit 58cbbee

Browse files
committed
feat(queue): implement job scheduler in alert queue service
- Add job scheduling setup for alert-check jobs with specific intervals - Update relevant imports and refactor the QueueService to utilize RedisService - Enhance IQueueService interface with additional documentation and job-related methods
1 parent 1ac3cad commit 58cbbee

File tree

7 files changed

+102
-27
lines changed

7 files changed

+102
-27
lines changed

pnpm-lock.yaml

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/workflows-service/src/alert/alert-queue.service.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
22
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
33
import { AlertService } from './alert.service';
4-
import type { IQueueService } from '@/common/queue/queue.interface';
4+
import type { IQueueService, BullBoardInjectedInstance } from '@/common/queue/types';
55
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
6-
import type { BullBoardInjectedInstance } from '@/common/queue/types';
76
import { env } from '@/env';
87

98
export interface AlertCheckJobData extends Record<string, unknown> {
@@ -43,6 +42,13 @@ export class AlertQueueService implements OnModuleInit {
4342
},
4443
});
4544

45+
const queue = (this.queueService as any).getQueue({ name: this.QUEUE_NAME });
46+
await (this.queueService as any).setupJobScheduler(queue, this.SCHEDULER_ID, {
47+
every: 60 * 60 * 1000,
48+
jobName: 'alert-check',
49+
data: { timestamp: Date.now() },
50+
});
51+
4652
this.registerWorker();
4753

4854
this.logger.log('Alert queue system setup complete');

services/workflows-service/src/app.worker.module.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
import { MiddlewareConsumer, Module } from '@nestjs/common';
21
import { ConfigModule } from '@nestjs/config';
32
import { ClsModule } from 'nestjs-cls';
43

54
import { AnalyticsModule } from '@/common/analytics-logger/analytics.module';
65
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
76
import { QueueModule } from '@/common/queue/queue.module';
8-
import { configs, env } from '@/env';
7+
import { configs } from '@/env';
98
import { validate } from '@/env-validate';
109
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
1110
import { SentryModule } from '@/sentry/sentry.module';
1211
import { WebhooksModule } from '@/webhooks/webhooks.module';
1312
import { HealthModule } from './health/health.module';
1413
import { PrismaModule } from './prisma/prisma.module';
1514
import { AlertModule } from './alert/alert.module';
16-
import { MetricsAuthMiddleware } from './common/middlewares/metrics-auth.middleware';
15+
import { Module } from '@nestjs/common';
1716

1817
@Module({
1918
imports: [

services/workflows-service/src/common/queue/queue.module.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,19 @@ import { Module } from '@nestjs/common';
22
import { QueueService } from './queue.service';
33
import { QueueBullboardService } from './queue-bullboard.service';
44
import { QueueOtelService } from './otel.service';
5-
import { redisProvider } from './redis.provider';
65
import { MonitoringModule } from '@/common/monitoring/monitoring.module';
76
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from './types';
87
import { createBullBoard } from '@bull-board/api';
98
import { ExpressAdapter } from '@bull-board/express';
9+
import { RedisModule } from '../redis/redis.module';
1010

1111
@Module({
12-
imports: [MonitoringModule],
12+
imports: [MonitoringModule, RedisModule],
1313
providers: [
1414
QueueService,
1515
{ provide: 'IQueueService', useExisting: QueueService },
1616
QueueBullboardService,
1717
QueueOtelService,
18-
redisProvider,
1918
{
2019
provide: BULLBOARD_INSTANCE_INJECTION_TOKEN,
2120
useFactory: () => {
@@ -32,7 +31,6 @@ import { ExpressAdapter } from '@bull-board/express';
3231
'IQueueService',
3332
QueueBullboardService,
3433
QueueOtelService,
35-
redisProvider,
3634
BULLBOARD_INSTANCE_INJECTION_TOKEN,
3735
],
3836
})

services/workflows-service/src/common/queue/queue.service.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import { Queue, Worker } from 'bullmq';
33
import IORedis from 'ioredis';
44
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
55
import { env } from '@/env';
6-
import { QueueOtelService } from './otel.service';
7-
import { REDIS_CLIENT } from './redis.provider';
6+
import { RedisService } from '../redis/redis.service';
87
import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service';
98
import type { BullBoardInjectedInstance } from './types';
109
import { QueueBullboardService } from './queue-bullboard.service';
@@ -33,15 +32,14 @@ export class QueueService implements OnModuleDestroy {
3332

3433
constructor(
3534
private readonly logger: AppLoggerService,
36-
private readonly queueOtelService: QueueOtelService,
37-
@Inject(REDIS_CLIENT) redisClient: IORedis | null,
35+
private readonly redisService: RedisService,
3836
private readonly bullMQPrometheusService: BullMQPrometheusService,
3937
@Inject('BULLBOARD_INSTANCE') private readonly bullBoard?: BullBoardInjectedInstance,
4038
private readonly queueBullboardService?: QueueBullboardService,
4139
) {
4240
this.shouldProcessJobs = this.determineIfShouldProcessJobs();
4341
this.logger.log(`Queue worker mode: ${this.shouldProcessJobs ? 'ENABLED' : 'DISABLED'}`);
44-
this.redisClient = redisClient;
42+
this.redisClient = this.redisService.client;
4543
}
4644

4745
private determineIfShouldProcessJobs(): boolean {
@@ -147,7 +145,6 @@ export class QueueService implements OnModuleDestroy {
147145

148146
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void {
149147
if (this.queues.has(queueName)) {
150-
// Optionally update options if needed
151148
return;
152149
}
153150

@@ -182,12 +179,51 @@ export class QueueService implements OnModuleDestroy {
182179
);
183180

184181
await Promise.all([...workerClosePromises, ...queueClosePromises]);
182+
}
183+
184+
async setupJobScheduler<T = any>(
185+
queue: Queue,
186+
schedulerId: string,
187+
options: {
188+
every: number;
189+
data?: T;
190+
jobName?: string;
191+
jobOptions?: {
192+
attempts?: number;
193+
backoff?: {
194+
type: 'exponential' | 'fixed';
195+
delay: number;
196+
};
197+
};
198+
},
199+
) {
200+
try {
201+
const jobName = options.jobName || 'scheduled-job';
202+
const firstJob = await queue.upsertJobScheduler(
203+
schedulerId,
204+
{ every: options.every, jobId: schedulerId },
205+
{
206+
name: jobName,
207+
data: options.data || { timestamp: Date.now() },
208+
opts: {
209+
attempts: options.jobOptions?.attempts || 10,
210+
backoff: options.jobOptions?.backoff || {
211+
type: 'exponential',
212+
delay: 10000,
213+
},
214+
},
215+
},
216+
);
217+
this.logger.log(`Created job scheduler: ${schedulerId}`, {
218+
schedulerId,
219+
every: options.every,
220+
jobId: firstJob?.id,
221+
});
185222

186-
if (this.redisClient) {
187-
await this.redisClient
188-
.quit()
189-
.catch(err => this.logger.error(`Error closing Redis connection`, { err }));
190-
this.redisClient = null;
223+
return firstJob;
224+
} catch (error) {
225+
this.logger.error(`Failed to set up job scheduler: ${schedulerId}`, { error });
226+
throw error;
191227
}
192228
}
193229
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,32 @@
11
import { createBullBoard } from '@bull-board/api';
22
import { ExpressAdapter } from '@bull-board/express';
3+
import type { QueueOptions } from './queue.service';
34

45
export const BULLBOARD_INSTANCE_INJECTION_TOKEN = 'BULLBOARD_INSTANCE';
56

67
export interface BullBoardInjectedInstance {
78
boardInstance: ReturnType<typeof createBullBoard>;
89
serverAdapter: ExpressAdapter;
910
}
11+
export interface IQueueService {
12+
/**
13+
* Add a job to the queue.
14+
*/
15+
addJob<T = any>(queueName: string, data: T, opts?: any): Promise<any>;
16+
17+
/**
18+
* Register a worker for a queue. The processor should be a standalone function for testability and abstraction.
19+
*/
20+
registerWorker<T = any>(
21+
queueName: string,
22+
processor: (job: any) => Promise<any>,
23+
options?: { concurrency?: number },
24+
): void;
25+
26+
isWorkerEnabled(): boolean;
27+
28+
/**
29+
* Explicitly create/configure a queue with options.
30+
*/
31+
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void;
32+
}

services/workflows-service/src/webhooks/webhooks.service.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import { Job } from 'bullmq';
77

88
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
99
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
10-
import type { BullBoardInjectedInstance } from '@/common/queue/types';
10+
import type { BullBoardInjectedInstance, IQueueService } from '@/common/queue/types';
1111
import { env } from '@/env';
12-
import { type OutgoingWebhookJobData, type OutgoingWebhookPayloads } from './types/webhook';
13-
import type { IQueueService } from '@/common/queue/queue.interface';
12+
import {
13+
WebhookError,
14+
type OutgoingWebhookJobData,
15+
type OutgoingWebhookPayloads,
16+
} from './types/webhook';
1417

1518
const captureWebhookFailureWithSentry = (errorPayload: Record<string, unknown>) => {
1619
Sentry.captureException(
@@ -88,6 +91,16 @@ export class WebhooksService implements OnModuleInit {
8891
return res.data;
8992
} catch (error) {
9093
this.handleWebhookJobError(job, error);
94+
95+
if (isAxiosError(error)) {
96+
const webhookError = new WebhookError('Webhook request failed');
97+
webhookError.cause = error;
98+
webhookError.statusCode = error.response?.status;
99+
webhookError.responseData = error.response?.data;
100+
webhookError.headers = error.response?.headers;
101+
throw webhookError;
102+
}
103+
91104
throw error;
92105
}
93106
}

0 commit comments

Comments
 (0)