-
Notifications
You must be signed in to change notification settings - Fork 261
Bal 4425 bullmq refactor #3452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Bal 4425 bullmq refactor #3452
Conversation
… port config - Remove MetricsAuthMiddleware configuration from WorkerAppModule - Change retrieved port from 'PORT' to 'WORKER_PORT' in main.worker.ts - Update BullBoardAuthMiddleware usage in WebhooksModule configuration
- Remove Redis connection initialization from constructor - Inject Redis client through dependency injection - Improve code structure by eliminating unnecessary methods
…ports - Remove redundant queue registration lines in AlertQueueService - Integrate BullBoard instance creation within QueueModule - Simplify dependency handling in WebhooksService and related modules
- Simplify dependency injection for services in AlertQueueService - Replace queue setup logic with createQueue method and refactor worker processing - Eliminate unused code and streamline error handling in webhook job processing
- Add job scheduling setup for alert-check jobs with specific intervals - Update relevant imports and refactor the QueueService to utilize RedisService - Enhance IQueueService interface with additional documentation and job-related methods
|
WalkthroughThis update refactors the queue management system by introducing a new Redis module and service, formalizing queue operations through an Changes
Sequence Diagram(s)sequenceDiagram
participant App
participant IQueueService
participant RedisService
participant Worker
App->>IQueueService: createQueue(queueName, options)
IQueueService->>RedisService: (uses Redis client)
App->>IQueueService: addJob(queueName, jobName, data)
IQueueService->>RedisService: (enqueue job)
App->>IQueueService: registerWorker(queueName, processor)
IQueueService->>Worker: (register processor)
Worker->>IQueueService: process job
IQueueService->>App: (callback with result/error)
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (5)
🚧 Files skipped from review as they are similar to previous changes (4)
🧰 Additional context used📓 Path-based instructions (1)`services/workflows-service/**/*.service.ts`: Service implementation files must be placed in feature modules and named with the .service.ts suffix.
📄 Source: CodeRabbit Inference Engine (.cursor/rules/workflows-service.mdc) List of files the instruction was applied to:
⏰ Context from checks skipped due to timeout of 90000ms (6)
🔇 Additional comments (10)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
services/workflows-service/src/common/queue/types.ts (1)
11-32: Consider using more specific types instead ofany.The interface uses
anyin several places. Consider:
- Making
optsparameters more specific by defining job option interfaces- Using generics more consistently across all methods
- Adding JSDoc documentation to clarify the purpose and usage of each method
Here's a more type-safe version:
+export interface JobOptions { + delay?: number; + priority?: number; + repeat?: { every: number }; + // Add other common job options +} + export interface IQueueService { - addJob<T = any>(queueName: string, job_name: string, data: T, opts?: any): Promise<any>; + addJob<T = any, R = any>(queueName: string, job_name: string, data: T, opts?: JobOptions): Promise<R>; registerWorker<T = any>( queueName: string, - processor: (job: any) => Promise<any>, + processor: (job: { id: string; data: T; [key: string]: any }) => Promise<any>, options?: { concurrency?: number }, ): void; isWorkerEnabled(): boolean; createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void; setupJobScheduler<T = any>( queueName: string, schedulerId: string, scheduleOpts: { every: number }, jobOpts: { name: string; data: T; - opts?: any; + opts?: JobOptions; }, - ): Promise<any>; + ): Promise<void>; }services/workflows-service/src/common/queue/queue.service.ts (1)
18-24: Fix parameter naming convention.The parameter
job_nameshould follow camelCase convention.Apply this diff:
- async addJob<T = any>(queueName: string, job_name: string, data: T, opts?: any): Promise<any> { + async addJob<T = any>(queueName: string, jobName: string, data: T, opts?: any): Promise<any> { const queue = this.getQueue(queueName); - return queue.add(job_name, data, { + return queue.add(jobName, data, { priority: opts?.priority, }); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
-
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (11)
-
services/workflows-service/src/alert/alert-queue.service.ts(3 hunks) -
services/workflows-service/src/app.worker.module.ts(2 hunks) -
services/workflows-service/src/common/queue/queue-bullboard.service.ts(0 hunks) -
services/workflows-service/src/common/queue/queue.module.ts(1 hunks) -
services/workflows-service/src/common/queue/queue.service.ts(3 hunks) -
services/workflows-service/src/common/queue/types.ts(1 hunks) -
services/workflows-service/src/common/redis/redis.module.ts(1 hunks) -
services/workflows-service/src/common/redis/redis.service.ts(1 hunks) -
services/workflows-service/src/main.worker.ts(1 hunks) -
services/workflows-service/src/webhooks/webhooks.module.ts(2 hunks) -
services/workflows-service/src/webhooks/webhooks.service.ts(6 hunks)
💤 Files with no reviewable changes (1)
- services/workflows-service/src/common/queue/queue-bullboard.service.ts
🧰 Additional context used
📓 Path-based instructions (1)
`services/workflows-service/**/*.service.ts`: Service implementation files must be placed in feature modules and named with the .service.ts suffix.
services/workflows-service/**/*.service.ts: Service implementation files must be placed in feature modules and named with the .service.ts suffix.
📄 Source: CodeRabbit Inference Engine (.cursor/rules/workflows-service.mdc)
List of files the instruction was applied to:
services/workflows-service/src/alert/alert-queue.service.tsservices/workflows-service/src/common/redis/redis.service.tsservices/workflows-service/src/webhooks/webhooks.service.tsservices/workflows-service/src/common/queue/queue.service.ts
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (14)
services/workflows-service/src/app.worker.module.ts (2)
7-7: Good consolidation of configuration imports.Using only
configsinstead of bothenvandconfigsreduces redundancy and centralizes environment variable access.
15-39: Verify metrics and authentication still function after middleware removal.The removal of
MetricsAuthMiddlewareconfiguration simplifies the module, but ensure that:
- Metrics collection is still working through other mechanisms
- Authentication/authorization is handled elsewhere if still needed
Run the following script to check if metrics middleware is configured elsewhere:
#!/bin/bash # Description: Check if MetricsAuthMiddleware is used in other modules # Search for MetricsAuthMiddleware usage ast-grep --pattern 'MetricsAuthMiddleware' # Check for alternative metrics middleware configuration rg -A 5 "configure.*MiddlewareConsumer" --type tsservices/workflows-service/src/common/queue/types.ts (1)
34-47: Well-structured queue configuration options.The
QueueOptionsinterface provides comprehensive configuration including backoff strategies and retention policies. Good use of union types for backoff configuration.services/workflows-service/src/main.worker.ts (1)
36-36: Good separation of worker port configuration.Using
WORKER_PORTinstead ofPORTallows the worker to run on a different port than the main application, improving deployment flexibility and debugging capabilities.services/workflows-service/src/common/redis/redis.module.ts (1)
1-10: ```shell
#!/bin/bashRefined search for the RedisService class definition
Try matching export with decorator using ast-grep
ast-grep --pattern
$'@Injectable() export class RedisService { $ $$ }'Fallback: ripgrep in the redis.service.ts file
rg -n "class RedisService" -A 5 services/workflows-service/src/common/redis/redis.service.ts
</details> <details> <summary>services/workflows-service/src/alert/alert-queue.service.ts (3)</summary> `16-20`: **Good use of dependency injection with interface token.** The injection of `IQueueService` using the token pattern properly decouples this service from the concrete implementation. --- `32-40`: **Verify the reduction in retry attempts is intentional.** The number of retry attempts has been reduced from 10 to 3. This could impact reliability if alert checks frequently fail due to transient issues. Was this reduction intentional? Consider if 3 retries are sufficient for your use case, especially for critical alert monitoring operations. --- `65-76`: **Excellent error handling implementation.** The dedicated `processAlertCheckJob` method with explicit try-catch error handling and logging improves observability and debugging. The error is properly re-thrown to allow the queue system to handle retries. </details> <details> <summary>services/workflows-service/src/webhooks/webhooks.module.ts (1)</summary> `11-25`: **LGTM! Module correctly simplified.** The removal of Bull Board provider and authentication middleware aligns with the queue system refactoring. The module now correctly delegates Bull Board management to the QueueModule while still consuming the instance for route configuration. </details> <details> <summary>services/workflows-service/src/common/redis/redis.service.ts (1)</summary> `39-43`: **Add null check in onModuleDestroy.** The method should check if the client exists before attempting to quit, especially since it can be null when the queue system is disabled. Apply this diff: ```diff async onModuleDestroy() { - if (this.client) { + if (this.client && env.QUEUE_SYSTEM_ENABLED) { await this.client.quit(); } }Likely an incorrect or invalid review comment.
services/workflows-service/src/common/queue/queue.module.ts (1)
11-37: Well-structured module refactoring.The module properly implements the dependency injection pattern with the IQueueService interface and centralizes Bull Board instance creation. The exports align with the new service structure.
services/workflows-service/src/webhooks/webhooks.service.ts (2)
59-67: Verify the reduction in retry attempts.The retry attempts have been reduced to 3 (from what appears to be a higher default based on other parts of the codebase). Ensure this aligns with your webhook reliability requirements.
79-106: Good extraction of webhook processing logic.Extracting the webhook processing into a separate method improves code organization and testability. The error handling with WebhookError provides better context for failures.
services/workflows-service/src/common/queue/queue.service.ts (1)
141-151: Robust cleanup implementation.The concurrent cleanup of workers and queues with proper error handling ensures graceful shutdown.
- Remove unnecessary injection of BullBoard instance in AlertQueueService - Update job options to reduce default attempts for better performance - Enhance queue service methods for clearer job scheduling and worker registration
fed9014 to
9d128ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
services/workflows-service/src/common/queue/queue.service.ts (1)
115-139: Queue name validation is still missing.The createQueue method still lacks validation for the queueName parameter as previously noted.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
services/workflows-service/src/alert/alert-queue.service.ts(3 hunks)services/workflows-service/src/common/queue/queue.module.ts(1 hunks)services/workflows-service/src/common/queue/queue.service.ts(3 hunks)services/workflows-service/src/common/queue/types.ts(1 hunks)services/workflows-service/src/common/redis/redis.module.ts(1 hunks)services/workflows-service/src/common/redis/redis.service.ts(1 hunks)services/workflows-service/src/webhooks/webhooks.service.ts(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- services/workflows-service/src/common/redis/redis.module.ts
- services/workflows-service/src/common/queue/types.ts
- services/workflows-service/src/alert/alert-queue.service.ts
- services/workflows-service/src/common/redis/redis.service.ts
- services/workflows-service/src/common/queue/queue.module.ts
- services/workflows-service/src/webhooks/webhooks.service.ts
🧰 Additional context used
📓 Path-based instructions (1)
`services/workflows-service/**/*.service.ts`: Service implementation files must be placed in feature modules and named with the .service.ts suffix.
services/workflows-service/**/*.service.ts: Service implementation files must be placed in feature modules and named with the .service.ts suffix.
📄 Source: CodeRabbit Inference Engine (.cursor/rules/workflows-service.mdc)
List of files the instruction was applied to:
services/workflows-service/src/common/queue/queue.service.ts
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test_linux
- GitHub Check: Analyze (javascript)
- GitHub Check: lint
- GitHub Check: build (ubuntu-latest)
- GitHub Check: format
- GitHub Check: spell_check
🔇 Additional comments (7)
services/workflows-service/src/common/queue/queue.service.ts (7)
1-9: LGTM: Clean import organization.The import changes properly reflect the new dependencies and abstractions introduced in the refactoring.
12-13: LGTM: Proper interface implementation and type refinement.The class correctly implements the
IQueueServiceinterface and the Redis client type is appropriately refined.
28-31: LGTM: Proper dependency injection pattern.The constructor properly injects the Redis service and initializes the client appropriately, following good DI practices.
Also applies to: 35-35
54-64: LGTM: Explicit queue management approach.The change from implicit queue creation to explicit error throwing improves predictability and prevents accidental queue creation.
66-113: LGTM: Simplified worker registration.The worker registration is properly streamlined while maintaining necessary error handling and logging.
141-151: LGTM: Improved concurrent cleanup.The refactored cleanup logic properly handles both workers and queues concurrently with appropriate error handling.
153-192: LGTM: Simplified scheduler setup with appropriate defaults.The method signature changes and simplified options structure improve usability while maintaining necessary functionality.
- Update forceDirect assignment to explicitly check for enabled state - Ensure consistent behavior for document and workflow state changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
80-80: Consider clarifying the feature flag logic.The conditional logic is functionally correct but may be confusing. The expression
customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== trueforces direct webhook calls when:
- The feature is undefined/null
- The feature exists but
enabledis not strictlytrueThis means the queue system is only used when explicitly enabled (
enabled === true), which appears intentional for the refactoring. However, the negative condition could be clearer.Consider refactoring for better readability:
- forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true, + forceDirect: !customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled,Or add a comment explaining the default behavior:
+ // Default to direct calls unless queue system is explicitly enabled forceDirect: customer.features?.WEBHOOK_QUEUE_SYSTEM_ENABLED?.enabled !== true,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
services/workflows-service/src/events/document-changed-webhook-caller.ts(2 hunks)services/workflows-service/src/events/workflow-completed-webhook-caller.ts(1 hunks)services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: lint
- GitHub Check: Analyze (javascript)
- GitHub Check: spell_check
- GitHub Check: test_linux
- GitHub Check: build (ubuntu-latest)
- GitHub Check: format
🔇 Additional comments (5)
services/workflows-service/src/events/document-changed-webhook-caller.ts (2)
87-87: LGTM! Feature flag access properly enabled.Adding the
featuresproperty to the select options enables access to customer feature flags, which is necessary for the queue system control logic implemented later in the file.
138-138: LGTM! Defensive queue system control implemented correctly.The logic properly implements feature flag-based control over the webhook queue system:
- Uses optional chaining to safely handle cases where
featuresmight not exist- Employs strict equality check (
!== true) to ensure only explicitly enabled feature flags activate the queue system- Defaults to direct webhook calls (
forceDirect: true) for safety, only using the queue when the feature is explicitly enabledThis defensive approach aligns well with the BullMQ refactor objectives and ensures reliable fallback behavior.
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
44-44: LGTM: Feature flag data retrieval added.The addition of
features: trueto the select clause is appropriate for accessing customer feature flags that control queue behavior.services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts (2)
36-36: LGTM! Feature flags properly included in customer query.Adding the
featuresfield to the select query enables access to customer feature flags, which is necessary for the updated webhook queue logic.
55-55: LGTM! Strict boolean checking ensures correct queue behavior.The logic correctly implements strict boolean comparison (
!== true) to ensure that only when the feature flag is explicitly set to booleantruewill the queue system be used. All other cases (undefined, null, false, truthy non-boolean values) will default to direct sending, which provides a safe fallback behavior.
- Remove deprecated job option settings for consistency - Introduce default job options in queue creation function - Add validation to ensure valid queue names are provided
89ab61c to
89b2632
Compare
Summary by CodeRabbit
New Features
Refactor
Chores