Skip to content

Commit 1ac3cad

Browse files
committed
refactor(queue): improve alert queue service and worker registration
- Simplify dependency injection for services in AlertQueueService - Replace queue setup logic with createQueue method and refactor worker processing - Eliminate unused code and streamline error handling in webhook job processing
1 parent 1b9e40a commit 1ac3cad

File tree

6 files changed

+81
-202
lines changed

6 files changed

+81
-202
lines changed
Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
2-
import { Job } from 'bullmq';
32
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
43
import { AlertService } from './alert.service';
5-
import { QueueService } from '@/common/queue/queue.service';
6-
import { QueueBullboardService } from '@/common/queue/queue-bullboard.service';
7-
import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service';
4+
import type { IQueueService } from '@/common/queue/queue.interface';
85
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
96
import type { BullBoardInjectedInstance } from '@/common/queue/types';
107
import { env } from '@/env';
@@ -19,11 +16,9 @@ export class AlertQueueService implements OnModuleInit {
1916
private readonly SCHEDULER_ID = 'transaction-monitoring-alert-check';
2017

2118
constructor(
22-
private readonly logger: AppLoggerService,
23-
private readonly alertService: AlertService,
24-
private readonly queueService: QueueService,
25-
private readonly queueBullboardService: QueueBullboardService,
26-
private readonly bullMQPrometheusService: BullMQPrometheusService,
19+
@Inject(AppLoggerService) private readonly logger: AppLoggerService,
20+
@Inject(AlertService) private readonly alertService: AlertService,
21+
@Inject('IQueueService') private readonly queueService: IQueueService,
2722
@Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN)
2823
private bullBoard: BullBoardInjectedInstance,
2924
) {}
@@ -38,32 +33,16 @@ export class AlertQueueService implements OnModuleInit {
3833

3934
private async setupAlertQueue() {
4035
try {
41-
const queue = this.queueService.getQueue<AlertCheckJobData>({
36+
this.queueService.createQueue<AlertCheckJobData>(this.QUEUE_NAME, {
4237
name: this.QUEUE_NAME,
4338
jobOptions: {
4439
attempts: 10,
45-
backoff: {
46-
type: 'exponential',
47-
delay: 10_000,
48-
},
40+
backoff: { type: 'exponential', delay: 10000 },
4941
removeOnComplete: { count: 100, age: 3600 * 24 },
5042
removeOnFail: false,
5143
},
5244
});
5345

54-
await this.queueService.setupJobScheduler(queue, this.SCHEDULER_ID, {
55-
every: 60 * 60 * 1000,
56-
jobName: 'check-transaction-monitoring-alerts',
57-
data: { timestamp: Date.now() },
58-
jobOptions: {
59-
attempts: 10,
60-
backoff: {
61-
type: 'exponential',
62-
delay: 10_000,
63-
},
64-
},
65-
});
66-
6746
this.registerWorker();
6847

6948
this.logger.log('Alert queue system setup complete');
@@ -73,22 +52,21 @@ export class AlertQueueService implements OnModuleInit {
7352
}
7453

7554
private registerWorker() {
76-
this.queueService.registerWorker<AlertCheckJobData>(
77-
this.QUEUE_NAME,
78-
async (job: Job<AlertCheckJobData>) => {
79-
this.logger.log('Processing transaction monitoring alerts check job', {
80-
jobId: job.id,
81-
});
82-
83-
await this.alertService.checkAllAlerts();
55+
this.queueService.registerWorker(this.QUEUE_NAME, this.processAlertCheckJob.bind(this), {
56+
concurrency: 1,
57+
});
58+
}
8459

85-
this.logger.log('Completed transaction monitoring alerts check', {
86-
jobId: job.id,
87-
});
60+
private async processAlertCheckJob(job: any) {
61+
this.logger.log('Processing transaction monitoring alerts check job', { jobId: job.id });
62+
try {
63+
await this.alertService.checkAllAlerts();
64+
this.logger.log('Completed transaction monitoring alerts check', { jobId: job.id });
8865

89-
return { success: true, timestamp: Date.now() };
90-
},
91-
{ concurrency: 1 },
92-
);
66+
return { success: true, timestamp: Date.now() };
67+
} catch (error) {
68+
this.logger.error('Alert check job failed', { jobId: job.id, error });
69+
throw error;
70+
}
9371
}
9472
}

services/workflows-service/src/common/queue/queue-bullboard.service.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,4 @@ export class QueueBullboardService {
2222
this.logger.error(`Error registering queue ${queue.name} with BullBoard`, { error });
2323
}
2424
}
25-
26-
registerQueues(bullBoardInstance: any, queues: Queue[]) {
27-
try {
28-
const adapters = queues.map(queue => new BullMQAdapter(queue));
29-
bullBoardInstance.boardInstance.setQueues(adapters);
30-
this.logger.log(`Registered ${queues.length} queues with BullBoard`);
31-
} catch (error) {
32-
this.logger.error('Error registering queues with BullBoard', { error });
33-
}
34-
}
3525
}

services/workflows-service/src/common/queue/queue.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { ExpressAdapter } from '@bull-board/express';
1212
imports: [MonitoringModule],
1313
providers: [
1414
QueueService,
15+
{ provide: 'IQueueService', useExisting: QueueService },
1516
QueueBullboardService,
1617
QueueOtelService,
1718
redisProvider,
@@ -28,6 +29,7 @@ import { ExpressAdapter } from '@bull-board/express';
2829
],
2930
exports: [
3031
QueueService,
32+
'IQueueService',
3133
QueueBullboardService,
3234
QueueOtelService,
3335
redisProvider,

services/workflows-service/src/common/queue/queue.service.ts

Lines changed: 39 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Injectable, OnModuleDestroy, Inject } from '@nestjs/common';
2-
import { Queue, Worker, Job } from 'bullmq';
2+
import { Queue, Worker } from 'bullmq';
33
import IORedis from 'ioredis';
44
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
55
import { env } from '@/env';
@@ -9,8 +9,6 @@ import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.s
99
import type { BullBoardInjectedInstance } from './types';
1010
import { QueueBullboardService } from './queue-bullboard.service';
1111

12-
export type JobProcessor<T = any> = (job: Job<T>) => Promise<any>;
13-
1412
export interface QueueOptions<T = any> {
1513
name: string;
1614
concurrency?: number;
@@ -62,16 +60,16 @@ export class QueueService implements OnModuleDestroy {
6260
return this.shouldProcessJobs;
6361
}
6462

65-
getQueue<T = any, R = any, N extends string = string>(options: QueueOptions<T>): Queue<T, R, N> {
63+
private getQueue<T = any>(options: QueueOptions<T>): Queue<T> {
6664
if (!this.redisClient) {
6765
throw new Error('Redis client not initialized');
6866
}
6967

7068
if (this.queues.has(options.name)) {
71-
return this.queues.get(options.name) as unknown as Queue<T, R, N>;
69+
return this.queues.get(options.name) as unknown as Queue<T>;
7270
}
7371

74-
const queue = new Queue<T, R, N>(options.name, {
72+
const queue = new Queue<T>(options.name, {
7573
connection: this.redisClient as any,
7674
defaultJobOptions: {
7775
attempts: options.jobOptions?.attempts ?? 3,
@@ -98,50 +96,32 @@ export class QueueService implements OnModuleDestroy {
9896
return queue;
9997
}
10098

101-
registerWorker<T = any, R = any, N extends string = string>(
99+
registerWorker<T = any>(
102100
queueName: string,
103-
processor: JobProcessor<T>,
104-
options: {
105-
concurrency?: number;
106-
forceLocalProcessing?: boolean;
107-
} = {},
108-
): Worker<T, R, N> | null {
101+
processor: (job: any) => Promise<any>,
102+
options: { concurrency?: number } = {},
103+
): void {
109104
if (!this.redisClient) {
110105
throw new Error('Redis client not initialized');
111106
}
112107

113-
if (!this.shouldProcessJobs && !options.forceLocalProcessing) {
108+
if (!this.shouldProcessJobs) {
114109
this.logger.debug(
115110
`Skipping worker registration for queue ${queueName} (not a worker instance)`,
116111
);
117112

118-
return null;
113+
return;
119114
}
120115

121116
if (this.workers.has(queueName)) {
122-
return this.workers.get(queueName) as unknown as Worker<T, R, N>;
117+
return;
123118
}
124119

125-
const worker = new Worker<T, R, N>(
126-
queueName,
127-
async job => {
128-
try {
129-
return await processor(job);
130-
} catch (error) {
131-
this.logger.error(`Error processing job ${job.id} in queue ${queueName}`, {
132-
error,
133-
jobId: job.id,
134-
queueName,
135-
});
136-
throw error;
137-
}
138-
},
139-
{
140-
connection: this.redisClient as any,
141-
concurrency: options.concurrency ?? 1,
142-
autorun: true,
143-
},
144-
);
120+
const worker = new Worker<T, any, string>(queueName, processor, {
121+
connection: this.redisClient as any,
122+
concurrency: options.concurrency ?? 1,
123+
autorun: true,
124+
});
145125

146126
worker.on('failed', (job, error) => {
147127
this.logger.error(`Job ${job?.id} failed in queue ${queueName}`, {
@@ -161,69 +141,34 @@ export class QueueService implements OnModuleDestroy {
161141
});
162142
});
163143

164-
this.workers.set(queueName, worker as unknown as Worker);
144+
this.workers.set(queueName, worker as unknown as Worker<T, any, string>);
165145
this.logger.log(`Worker registered for queue: ${queueName}`);
166-
167-
return worker;
168146
}
169147

170-
async setupJobScheduler<T = any>(
171-
queue: Queue,
172-
schedulerId: string,
173-
options: {
174-
every: number;
175-
data?: T;
176-
jobName?: string;
177-
jobOptions?: {
178-
attempts?: number;
179-
backoff?: {
180-
type: 'exponential' | 'fixed';
181-
delay: number;
182-
};
183-
};
184-
},
185-
) {
186-
try {
187-
const schedulers = await queue.getJobSchedulers();
188-
const existingScheduler = schedulers.find(s => s.id === schedulerId);
189-
190-
if (existingScheduler) {
191-
this.logger.log(`Job scheduler already exists: ${schedulerId}`, {
192-
schedulerId,
193-
pattern: existingScheduler.pattern,
194-
every: existingScheduler.every,
195-
});
196-
197-
return existingScheduler;
198-
}
199-
200-
const jobName = options.jobName || 'scheduled-job';
201-
const firstJob = await queue.upsertJobScheduler(
202-
schedulerId,
203-
{ every: options.every, jobId: schedulerId },
204-
{
205-
name: jobName,
206-
data: options.data || { timestamp: Date.now() },
207-
opts: {
208-
attempts: options.jobOptions?.attempts || 10,
209-
backoff: options.jobOptions?.backoff || {
210-
type: 'exponential',
211-
delay: 10000,
212-
},
213-
},
214-
},
215-
);
148+
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void {
149+
if (this.queues.has(queueName)) {
150+
// Optionally update options if needed
151+
return;
152+
}
216153

217-
this.logger.log(`Created job scheduler: ${schedulerId}`, {
218-
schedulerId,
219-
every: options.every,
220-
firstJobId: firstJob?.id,
221-
});
154+
const queue = new Queue<T, any, string>(queueName, {
155+
connection: this.redisClient as any,
156+
defaultJobOptions: options?.jobOptions ?? {
157+
attempts: 3,
158+
backoff: { type: 'exponential', delay: 5000 },
159+
removeOnComplete: { count: 100, age: 3600 * 24 },
160+
removeOnFail: false,
161+
},
162+
});
163+
this.queues.set(queueName, queue as Queue);
164+
this.logger.log(`Queue created: ${queueName}`);
165+
166+
if (this.bullMQPrometheusService) {
167+
this.bullMQPrometheusService.registerQueue(queue);
168+
}
222169

223-
return firstJob;
224-
} catch (error) {
225-
this.logger.error(`Failed to set up job scheduler: ${schedulerId}`, { error });
226-
throw error;
170+
if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) {
171+
this.queueBullboardService.registerQueue(this.bullBoard, queue);
227172
}
228173
}
229174

services/workflows-service/src/common/queue/types.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,3 @@ export interface BullBoardInjectedInstance {
77
boardInstance: ReturnType<typeof createBullBoard>;
88
serverAdapter: ExpressAdapter;
99
}
10-
11-
export const bullBoardProvider = {
12-
provide: BULLBOARD_INSTANCE_INJECTION_TOKEN,
13-
useFactory: (): BullBoardInjectedInstance => {
14-
const serverAdapter = new ExpressAdapter();
15-
serverAdapter.setBasePath('/api/queues');
16-
const boardInstance = createBullBoard({ queues: [], serverAdapter });
17-
18-
return { boardInstance, serverAdapter };
19-
},
20-
};

0 commit comments

Comments
 (0)