Skip to content

Commit 9d128ef

Browse files
committed
refactor(queue): simplify queue configuration and job setup
- Remove unnecessary injection of BullBoard instance in AlertQueueService - Update job options to reduce default attempts for better performance - Enhance queue service methods for clearer job scheduling and worker registration
1 parent 58cbbee commit 9d128ef

File tree

7 files changed

+141
-104
lines changed

7 files changed

+141
-104
lines changed

services/workflows-service/src/alert/alert-queue.service.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
22
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
33
import { AlertService } from './alert.service';
4-
import type { IQueueService, BullBoardInjectedInstance } from '@/common/queue/types';
5-
import { BULLBOARD_INSTANCE_INJECTION_TOKEN } from '@/common/queue/types';
4+
import type { IQueueService } from '@/common/queue/types';
65
import { env } from '@/env';
76

87
export interface AlertCheckJobData extends Record<string, unknown> {
@@ -15,11 +14,9 @@ export class AlertQueueService implements OnModuleInit {
1514
private readonly SCHEDULER_ID = 'transaction-monitoring-alert-check';
1615

1716
constructor(
18-
@Inject(AppLoggerService) private readonly logger: AppLoggerService,
19-
@Inject(AlertService) private readonly alertService: AlertService,
17+
private readonly logger: AppLoggerService,
18+
private readonly alertService: AlertService,
2019
@Inject('IQueueService') private readonly queueService: IQueueService,
21-
@Inject(BULLBOARD_INSTANCE_INJECTION_TOKEN)
22-
private bullBoard: BullBoardInjectedInstance,
2320
) {}
2421

2522
async onModuleInit() {
@@ -35,19 +32,21 @@ export class AlertQueueService implements OnModuleInit {
3532
this.queueService.createQueue<AlertCheckJobData>(this.QUEUE_NAME, {
3633
name: this.QUEUE_NAME,
3734
jobOptions: {
38-
attempts: 10,
35+
attempts: 3,
3936
backoff: { type: 'exponential', delay: 10000 },
4037
removeOnComplete: { count: 100, age: 3600 * 24 },
4138
removeOnFail: false,
4239
},
4340
});
44-
45-
const queue = (this.queueService as any).getQueue({ name: this.QUEUE_NAME });
46-
await (this.queueService as any).setupJobScheduler(queue, this.SCHEDULER_ID, {
47-
every: 60 * 60 * 1000,
48-
jobName: 'alert-check',
49-
data: { timestamp: Date.now() },
50-
});
41+
await this.queueService.setupJobScheduler(
42+
this.QUEUE_NAME,
43+
this.SCHEDULER_ID,
44+
{ every: 60 * 60 * 1000 },
45+
{
46+
name: 'alert-check',
47+
data: { timestamp: Date.now() },
48+
},
49+
);
5150

5251
this.registerWorker();
5352

services/workflows-service/src/common/queue/queue.module.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Module } from '@nestjs/common';
2-
import { QueueService } from './queue.service';
2+
import { BullMQQueueService } from './queue.service';
33
import { QueueBullboardService } from './queue-bullboard.service';
44
import { QueueOtelService } from './otel.service';
55
import { MonitoringModule } from '@/common/monitoring/monitoring.module';
@@ -11,8 +11,8 @@ import { RedisModule } from '../redis/redis.module';
1111
@Module({
1212
imports: [MonitoringModule, RedisModule],
1313
providers: [
14-
QueueService,
15-
{ provide: 'IQueueService', useExisting: QueueService },
14+
BullMQQueueService,
15+
{ provide: 'IQueueService', useExisting: BullMQQueueService },
1616
QueueBullboardService,
1717
QueueOtelService,
1818
{
@@ -27,7 +27,7 @@ import { RedisModule } from '../redis/redis.module';
2727
},
2828
],
2929
exports: [
30-
QueueService,
30+
BullMQQueueService,
3131
'IQueueService',
3232
QueueBullboardService,
3333
QueueOtelService,

services/workflows-service/src/common/queue/queue.service.ts

Lines changed: 36 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,24 @@ import { AppLoggerService } from '@/common/app-logger/app-logger.service';
55
import { env } from '@/env';
66
import { RedisService } from '../redis/redis.service';
77
import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.service';
8-
import type { BullBoardInjectedInstance } from './types';
8+
import type { BullBoardInjectedInstance, IQueueService, QueueOptions } from './types';
99
import { QueueBullboardService } from './queue-bullboard.service';
1010

11-
export interface QueueOptions<T = any> {
12-
name: string;
13-
concurrency?: number;
14-
jobOptions?: {
15-
attempts?: number;
16-
backoff?: {
17-
type: 'exponential' | 'fixed';
18-
delay: number;
19-
};
20-
removeOnComplete?: boolean | number | { count: number; age: number };
21-
removeOnFail?: boolean | number | { count: number; age: number };
22-
priority?: number;
23-
};
24-
}
25-
2611
@Injectable()
27-
export class QueueService implements OnModuleDestroy {
12+
export class BullMQQueueService implements OnModuleDestroy, IQueueService {
2813
private redisClient: IORedis | null;
2914
private queues: Map<string, Queue> = new Map();
3015
private workers: Map<string, Worker> = new Map();
3116
private readonly shouldProcessJobs: boolean;
3217

18+
async addJob<T = any>(queueName: string, job_name: string, data: T, opts?: any): Promise<any> {
19+
const queue = this.getQueue(queueName);
20+
21+
return queue.add(job_name, data, {
22+
priority: opts?.priority,
23+
});
24+
}
25+
3326
constructor(
3427
private readonly logger: AppLoggerService,
3528
private readonly redisService: RedisService,
@@ -58,40 +51,16 @@ export class QueueService implements OnModuleDestroy {
5851
return this.shouldProcessJobs;
5952
}
6053

61-
private getQueue<T = any>(options: QueueOptions<T>): Queue<T> {
54+
public getQueue(queueName: string): Queue {
6255
if (!this.redisClient) {
6356
throw new Error('Redis client not initialized');
6457
}
6558

66-
if (this.queues.has(options.name)) {
67-
return this.queues.get(options.name) as unknown as Queue<T>;
68-
}
69-
70-
const queue = new Queue<T>(options.name, {
71-
connection: this.redisClient as any,
72-
defaultJobOptions: {
73-
attempts: options.jobOptions?.attempts ?? 3,
74-
backoff: options.jobOptions?.backoff ?? {
75-
type: options.jobOptions?.backoff?.type ?? 'exponential',
76-
delay: options.jobOptions?.backoff?.delay ?? 5000,
77-
},
78-
removeOnComplete: options.jobOptions?.removeOnComplete ?? { count: 100, age: 3600 * 24 },
79-
removeOnFail: options.jobOptions?.removeOnFail ?? false,
80-
},
81-
});
82-
83-
this.queues.set(options.name, queue as Queue);
84-
this.logger.log(`Queue created: ${options.name}`);
85-
86-
if (this.bullMQPrometheusService) {
87-
this.bullMQPrometheusService.registerQueue(queue);
88-
}
89-
90-
if (this.shouldProcessJobs && this.bullBoard && this.queueBullboardService) {
91-
this.queueBullboardService.registerQueue(this.bullBoard, queue);
59+
if (this.queues.has(queueName)) {
60+
return this.queues.get(queueName) as Queue;
9261
}
9362

94-
return queue;
63+
throw new Error(`Queue with name '${queueName}' does not exist. Please create it first.`);
9564
}
9665

9766
registerWorker<T = any>(
@@ -115,8 +84,8 @@ export class QueueService implements OnModuleDestroy {
11584
return;
11685
}
11786

118-
const worker = new Worker<T, any, string>(queueName, processor, {
119-
connection: this.redisClient as any,
87+
const worker = new Worker(queueName, processor, {
88+
connection: this.redisClient,
12089
concurrency: options.concurrency ?? 1,
12190
autorun: true,
12291
});
@@ -139,25 +108,25 @@ export class QueueService implements OnModuleDestroy {
139108
});
140109
});
141110

142-
this.workers.set(queueName, worker as unknown as Worker<T, any, string>);
111+
this.workers.set(queueName, worker);
143112
this.logger.log(`Worker registered for queue: ${queueName}`);
144113
}
145114

146-
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void {
115+
createQueue(queueName: string, options?: QueueOptions): void {
147116
if (this.queues.has(queueName)) {
148117
return;
149118
}
150119

151-
const queue = new Queue<T, any, string>(queueName, {
152-
connection: this.redisClient as any,
120+
const queue = new Queue(queueName, {
121+
connection: this.redisClient as IORedis,
153122
defaultJobOptions: options?.jobOptions ?? {
154123
attempts: 3,
155124
backoff: { type: 'exponential', delay: 5000 },
156125
removeOnComplete: { count: 100, age: 3600 * 24 },
157126
removeOnFail: false,
158127
},
159128
});
160-
this.queues.set(queueName, queue as Queue);
129+
this.queues.set(queueName, queue);
161130
this.logger.log(`Queue created: ${queueName}`);
162131

163132
if (this.bullMQPrometheusService) {
@@ -182,41 +151,36 @@ export class QueueService implements OnModuleDestroy {
182151
}
183152

184153
async setupJobScheduler<T = any>(
185-
queue: Queue,
154+
queueName: string,
186155
schedulerId: string,
187-
options: {
188-
every: number;
189-
data?: T;
190-
jobName?: string;
191-
jobOptions?: {
192-
attempts?: number;
193-
backoff?: {
194-
type: 'exponential' | 'fixed';
195-
delay: number;
196-
};
197-
};
156+
scheduleOpts: { every: number },
157+
jobOpts: {
158+
name: string;
159+
data: T;
160+
opts?: any;
198161
},
199-
) {
162+
): Promise<any> {
200163
try {
201-
const jobName = options.jobName || 'scheduled-job';
164+
const queue = this.getQueue(queueName);
165+
const jobName = jobOpts.name;
202166
const firstJob = await queue.upsertJobScheduler(
203167
schedulerId,
204-
{ every: options.every, jobId: schedulerId },
168+
{ every: scheduleOpts.every, jobId: schedulerId },
205169
{
206170
name: jobName,
207-
data: options.data || { timestamp: Date.now() },
171+
data: jobOpts.data || { timestamp: Date.now() },
208172
opts: {
209-
attempts: options.jobOptions?.attempts || 10,
210-
backoff: options.jobOptions?.backoff || {
173+
attempts: jobOpts.opts?.attempts || 3,
174+
backoff: jobOpts.opts?.backoff || {
211175
type: 'exponential',
212-
delay: 10000,
176+
delay: 3000,
213177
},
214178
},
215179
},
216180
);
217181
this.logger.log(`Created job scheduler: ${schedulerId}`, {
218182
schedulerId,
219-
every: options.every,
183+
every: scheduleOpts.every,
220184
jobId: firstJob?.id,
221185
});
222186

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,47 @@
11
import { createBullBoard } from '@bull-board/api';
22
import { ExpressAdapter } from '@bull-board/express';
3-
import type { QueueOptions } from './queue.service';
43

54
export const BULLBOARD_INSTANCE_INJECTION_TOKEN = 'BULLBOARD_INSTANCE';
65

76
export interface BullBoardInjectedInstance {
87
boardInstance: ReturnType<typeof createBullBoard>;
98
serverAdapter: ExpressAdapter;
109
}
10+
1111
export interface IQueueService {
12-
/**
13-
* Add a job to the queue.
14-
*/
15-
addJob<T = any>(queueName: string, data: T, opts?: any): Promise<any>;
12+
addJob<T = any>(queueName: string, job_name: string, data: T, opts?: any): Promise<any>;
1613

17-
/**
18-
* Register a worker for a queue. The processor should be a standalone function for testability and abstraction.
19-
*/
2014
registerWorker<T = any>(
2115
queueName: string,
2216
processor: (job: any) => Promise<any>,
2317
options?: { concurrency?: number },
2418
): void;
2519

2620
isWorkerEnabled(): boolean;
27-
28-
/**
29-
* Explicitly create/configure a queue with options.
30-
*/
3121
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void;
22+
setupJobScheduler<T = any>(
23+
queueName: string,
24+
schedulerId: string,
25+
scheduleOpts: { every: number },
26+
jobOpts: {
27+
name: string;
28+
data: T;
29+
opts?: any;
30+
},
31+
): Promise<any>;
32+
}
33+
34+
export interface QueueOptions<T = any> {
35+
name: string;
36+
concurrency?: number;
37+
jobOptions?: {
38+
attempts?: number;
39+
backoff?: {
40+
type: 'exponential' | 'fixed';
41+
delay: number;
42+
};
43+
removeOnComplete?: boolean | number | { count: number; age: number };
44+
removeOnFail?: boolean | number | { count: number; age: number };
45+
priority?: number;
46+
};
3247
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Module } from '@nestjs/common';
2+
import { RedisService, redisProvider } from './redis.service';
3+
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
4+
5+
@Module({
6+
imports: [AppLoggerModule],
7+
providers: [RedisService, redisProvider],
8+
exports: [RedisService, redisProvider],
9+
})
10+
export class RedisModule {}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Injectable, OnModuleDestroy } from '@nestjs/common';
2+
import IORedis from 'ioredis';
3+
import { env } from '@/env';
4+
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
5+
6+
export const REDIS_CLIENT = Symbol('REDIS_CLIENT');
7+
8+
@Injectable()
9+
export class RedisService implements OnModuleDestroy {
10+
public readonly client: IORedis;
11+
12+
constructor(private readonly logger: AppLoggerService) {
13+
if (!env.QUEUE_SYSTEM_ENABLED) {
14+
this.client = null as any;
15+
16+
return;
17+
}
18+
19+
const redisConfig = {
20+
host: env.REDIS_HOST,
21+
port: env.REDIS_PORT,
22+
password: env.REDIS_PASSWORD,
23+
maxRetriesPerRequest: null,
24+
...(env.ENVIRONMENT_NAME !== 'local' ? { tls: {} } : {}),
25+
};
26+
this.client = new IORedis(redisConfig);
27+
28+
this.client.on('error', error => {
29+
this.logger.error('Redis connection error', { error });
30+
});
31+
32+
this.client.on('connect', () => {
33+
this.logger.log('Redis connected successfully');
34+
});
35+
36+
this.logger.log('Redis client initialized via RedisService.');
37+
}
38+
39+
async onModuleDestroy() {
40+
if (this.client) {
41+
await this.client.quit();
42+
}
43+
}
44+
}
45+
46+
export const redisProvider = {
47+
provide: REDIS_CLIENT,
48+
useExisting: RedisService,
49+
};

0 commit comments

Comments
 (0)