diff --git a/backend/package.json b/backend/package.json index 154cc90..784699d 100644 --- a/backend/package.json +++ b/backend/package.json @@ -12,6 +12,10 @@ "start:dev": "nest start --watch", "start:debug": "nest start --debug --watch", "start:prod": "node dist/api/main", + "job:data-ingestion": "node dist/jobs/entrypoints/data-ingestion", + "job:cra-file-transfer": "node dist/jobs/entrypoints/cra-file-transfer", + "job:cra-response-poll": "node dist/jobs/entrypoints/cra-response-poll", + "job:retry-failed": "node dist/jobs/entrypoints/retry-failed", "lint": "eslint \"{src,apps,libs,test}/**/*.ts\"", "lint:fix": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix", "make-badges": "istanbul-badges-readme --logo=vitest --exitCode=1", diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index b81e938..ac36581 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -19,7 +19,7 @@ model Contact { akaLastName String @map("aka_last_name") akaFirstName String @map("aka_first_name") personIdIcm String @map("person_id_icm") - personIdIms String @map("person_id_ims") + personIdMis String @map("person_id_mis") caseNumber String @map("case_number") caseType String @map("case_type") caseStatus String @map("case_status") @@ -137,3 +137,26 @@ model TransferFile { @@schema("csa") @@map("transfer_files") } + +model JobRun { + id Int @id @default(autoincrement()) + jobType String @map("job_type") + status String + parentJobId Int? @map("parent_job_id") + jobTrigger String @map("job_trigger") + retryCount Int @default(0) @map("retry_count") + error String? + metadata Json @default("{}") @db.JsonB + createdAt DateTime @default(now()) @map("created_at") + startedAt DateTime @map("started_at") + completedAt DateTime? @map("completed_at") + + parentJob JobRun? @relation("ChildJobs", fields: [parentJobId], references: [id]) + childJobs JobRun[] @relation("ChildJobs") + + @@index([status]) + @@index([parentJobId]) + @@index([jobType, status]) + @@schema("csa") + @@map("job_runs") +} diff --git a/backend/prisma/seed.ts b/backend/prisma/seed.ts index a8643e0..10a578d 100644 --- a/backend/prisma/seed.ts +++ b/backend/prisma/seed.ts @@ -121,7 +121,7 @@ async function seedContacts() { akaFirstName: faker.person.firstName(), personIdIcm: faker.string.alphanumeric(10).toUpperCase(), - personIdIms: faker.string.alphanumeric(10).toUpperCase(), + personIdMis: faker.string.alphanumeric(10).toUpperCase(), gender: faker.helpers.arrayElement(GENDERS), dateOfBirth: birthDate, diff --git a/backend/src/api/contacts/contacts.service.ts b/backend/src/api/contacts/contacts.service.ts index 8301305..5cfed75 100644 --- a/backend/src/api/contacts/contacts.service.ts +++ b/backend/src/api/contacts/contacts.service.ts @@ -189,7 +189,7 @@ export class ContactsService { const data = await this.prisma.$queryRaw` SELECT id, last_name as "lastName", given_names as "givenNames", middle_name as "middleName", aka_last_name as "akaLastName", aka_first_name as "akaFirstName", - person_id_icm as "personIdIcm", person_id_ims as "personIdIms", + person_id_icm as "personIdIcm", person_id_mis as "personIdMis", gender, date_of_birth as "dateOfBirth", age, case_number as "caseNumber", legacy_file_number as "legacyFileNumber", case_type as "caseType", case_status as "caseStatus", case_load as "caseLoad", diff --git a/backend/src/api/contacts/dto/contact.dto.ts b/backend/src/api/contacts/dto/contact.dto.ts index e6b76c6..c35c589 100644 --- a/backend/src/api/contacts/dto/contact.dto.ts +++ b/backend/src/api/contacts/dto/contact.dto.ts @@ -24,7 +24,7 @@ export class ContactDto { personIdIcm: string @ApiProperty({ description: 'IMS person identifier' }) - personIdIms: string + personIdMis: string @ApiProperty({ description: 'Primary case number associated with the contact' }) caseNumber: string diff --git a/backend/src/cra/cra.module.spec.ts b/backend/src/cra/cra.module.spec.ts new file mode 100644 index 0000000..db61e40 --- /dev/null +++ b/backend/src/cra/cra.module.spec.ts @@ -0,0 +1,49 @@ +import type { TestingModule } from '@nestjs/testing' +import { Test } from '@nestjs/testing' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobRegistry } from 'src/jobs/job-registry.service' +import { JobsModule } from 'src/jobs/jobs.module' +import { CraModule } from './cra.module' +import { PollCraResponseHandler } from './handlers/poll-cra-response.handler' +import { SendCraFileHandler } from './handlers/send-cra-file.handler' + +describe('CraModule', () => { + let module: TestingModule + let registry: JobRegistry + + beforeEach(async () => { + module = await Test.createTestingModule({ + imports: [JobsModule, CraModule], + }).compile() + + registry = module.get(JobRegistry) + await module.init() // Trigger onModuleInit + }) + + afterEach(async () => { + await module.close() + }) + + it('should be defined', () => { + const craModule = module.get(CraModule) + expect(craModule).toBeDefined() + }) + + it('should register both CRA handlers', () => { + expect(registry.hasHandler(JobType.SEND_CRA_FILE)).toBe(true) + expect(registry.hasHandler(JobType.POLL_CRA_RESPONSE)).toBe(true) + }) + + it('should register handlers with correct types', () => { + const sendHandler = registry.getHandler(JobType.SEND_CRA_FILE) + const pollHandler = registry.getHandler(JobType.POLL_CRA_RESPONSE) + + expect(sendHandler).toBeInstanceOf(SendCraFileHandler) + expect(pollHandler).toBeInstanceOf(PollCraResponseHandler) + }) + + it('should export all handler providers', () => { + expect(module.get(SendCraFileHandler)).toBeDefined() + expect(module.get(PollCraResponseHandler)).toBeDefined() + }) +}) diff --git a/backend/src/cra/cra.module.ts b/backend/src/cra/cra.module.ts new file mode 100644 index 0000000..4736de1 --- /dev/null +++ b/backend/src/cra/cra.module.ts @@ -0,0 +1,28 @@ +import { Module, OnModuleInit } from '@nestjs/common' +import { JobRegistry } from 'src/jobs/job-registry.service' +import { JobsModule } from 'src/jobs/jobs.module' +import { PollCraResponseHandler } from './handlers/poll-cra-response.handler' +import { SendCraFileHandler } from './handlers/send-cra-file.handler' + +/* + * Generates and sends files to CRA + * Polls and process response files from CRA + */ +@Module({ + imports: [JobsModule], + providers: [SendCraFileHandler, PollCraResponseHandler], + exports: [SendCraFileHandler, PollCraResponseHandler], +}) +export class CraModule implements OnModuleInit { + constructor( + private readonly registry: JobRegistry, + private readonly sendCraFileHandler: SendCraFileHandler, + private readonly pollCraResponseHandler: PollCraResponseHandler, + ) {} + + onModuleInit() { + // Register CRA-related job handlers + this.registry.register(this.sendCraFileHandler.jobType, this.sendCraFileHandler) + this.registry.register(this.pollCraResponseHandler.jobType, this.pollCraResponseHandler) + } +} diff --git a/backend/src/cra/handlers/poll-cra-response.handler.ts b/backend/src/cra/handlers/poll-cra-response.handler.ts new file mode 100644 index 0000000..2fc14bf --- /dev/null +++ b/backend/src/cra/handlers/poll-cra-response.handler.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' + +/* + * Checks for response files from CRA and processes them + * Triggered by CronJob POLL_CRA_RESPONSE + */ +@Injectable() +export class PollCraResponseHandler extends BaseJob { + readonly jobType = JobType.POLL_CRA_RESPONSE + + async execute(_context: JobContext): Promise { + // TODO: Implement CRA response polling + // 1. Poll CRA endpoint for response files + // 2. Download new response files + // 3. Parse and validate response data + // 4. Update contact records with CRA responses + // 5. Return metadata: { files_processed, records_updated, errors } + + this.logger.log('POLL_CRA_RESPONSE stub - not yet implemented') + + return { + success: true, + message: 'CRA response polling stub', + metadata: { + files_processed: 0, + records_updated: 0, + errors: [], + }, + } + } +} diff --git a/backend/src/cra/handlers/send-cra-file.handler.ts b/backend/src/cra/handlers/send-cra-file.handler.ts new file mode 100644 index 0000000..d17e77b --- /dev/null +++ b/backend/src/cra/handlers/send-cra-file.handler.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' + +/* + * Triggered by CronJob SEND_CRA_FILE + * Creates a CRA-formatted file with eligible contacts and send it for tranfer + */ +@Injectable() +export class SendCraFileHandler extends BaseJob { + readonly jobType = JobType.SEND_CRA_FILE + + async execute(_context: JobContext): Promise { + // TODO: Implement CRA file generation and transfer + // 1. Query eligible contacts + // 2. Format data according to CRA specifications + // 3. Write to file storage + // 4. Transfer file to CRA destination + // 5. Return metadata: { file_path, record_count, transfer_status } + + this.logger.log('SEND_CRA_FILE stub - not yet implemented') + + return { + success: true, + message: 'CRA file send stub', + metadata: { + file_path: null, + record_count: 0, + transfer_status: 'pending', + }, + } + } +} diff --git a/backend/src/jobs/base-job.ts b/backend/src/jobs/base-job.ts new file mode 100644 index 0000000..b6dc09f --- /dev/null +++ b/backend/src/jobs/base-job.ts @@ -0,0 +1,31 @@ +import { Logger } from '@nestjs/common' +import { JobType } from './enums/job-type.enum' +import { JobResult } from './interfaces/job-result.interface' +import { Job, JobContext } from './interfaces/job.interface' + +export abstract class BaseJob implements Job { + protected readonly logger: Logger + + abstract readonly jobType: JobType + readonly inlineRetryAttempts: number = 2 + + constructor() { + this.logger = new Logger(this.constructor.name) + } + + abstract execute(context: JobContext): Promise + + async onStart(context: JobContext): Promise { + this.logger.log(`Starting job ${context.jobRunId} [${this.jobType}]`) + } + + async onSuccess(context: JobContext, result: JobResult): Promise { + this.logger.log(`Job ${context.jobRunId} completed successfully: ${result.message ?? 'OK'}`) + } + + async onFailure(context: JobContext, error: Error): Promise { + this.logger.warn( + `Job ${context.jobRunId} failed (attempt ${context.retryCount + 1}): ${error.message}`, + ) + } +} diff --git a/backend/src/jobs/entrypoints/cra-file-transfer.ts b/backend/src/jobs/entrypoints/cra-file-transfer.ts new file mode 100644 index 0000000..7f53765 --- /dev/null +++ b/backend/src/jobs/entrypoints/cra-file-transfer.ts @@ -0,0 +1,42 @@ +#!/usr/bin/env node +import { Logger } from '@nestjs/common' +import { NestFactory } from '@nestjs/core' +import { CraModule } from 'src/cra/cra.module' +import { JobTrigger } from '../enums/job-trigger.enum' +import { JobType } from '../enums/job-type.enum' +import { JobRunner } from '../job-runner.service' + +// Generates and sends CRA file +async function bootstrap() { + const logger = new Logger('CRAFileTransferJob') + + try { + logger.log('Bootstrapping CRA file transfer job...') + + // Create NestJS application context (no HTTP server) + const app = await NestFactory.createApplicationContext(CraModule, { + logger: ['log', 'error', 'warn'], + }) + + // Get JobRunner from DI container + const jobRunner = app.get(JobRunner) + + // Run SEND_CRA_FILE job + const result = await jobRunner.runJobType(JobType.SEND_CRA_FILE, JobTrigger.CRON) + + await app.close() + + if (result.success) { + logger.log('CRA file transfer completed successfully') + process.exit(0) + } else { + logger.error(`CRA file transfer failed: ${result.message}`) + process.exit(1) + } + } catch (error) { + logger.error(`Fatal error: ${error.message}`, error.stack) + process.exit(1) + } +} + +bootstrap() diff --git a/backend/src/jobs/entrypoints/cra-response-poll.ts b/backend/src/jobs/entrypoints/cra-response-poll.ts new file mode 100644 index 0000000..1d14567 --- /dev/null +++ b/backend/src/jobs/entrypoints/cra-response-poll.ts @@ -0,0 +1,42 @@ +#!/usr/bin/env node +import { Logger } from '@nestjs/common' +import { NestFactory } from '@nestjs/core' +import { CraModule } from 'src/cra/cra.module' +import { JobTrigger } from '../enums/job-trigger.enum' +import { JobType } from '../enums/job-type.enum' +import { JobRunner } from '../job-runner.service' + +// Polls for and processes CRA response files +async function bootstrap() { + const logger = new Logger('CRAResponsePollJob') + + try { + logger.log('Bootstrapping CRA response poll job...') + + // Create NestJS application context (no HTTP server) + const app = await NestFactory.createApplicationContext(CraModule, { + logger: ['log', 'error', 'warn'], + }) + + // Get JobRunner from DI container + const jobRunner = app.get(JobRunner) + + // Run POLL_CRA_RESPONSE job + const result = await jobRunner.runJobType(JobType.POLL_CRA_RESPONSE, JobTrigger.CRON) + + await app.close() + + if (result.success) { + logger.log('CRA response poll completed successfully') + process.exit(0) + } else { + logger.error(`CRA response poll failed: ${result.message}`) + process.exit(1) + } + } catch (error) { + logger.error(`Fatal error: ${error.message}`, error.stack) + process.exit(1) + } +} + +bootstrap() diff --git a/backend/src/jobs/entrypoints/data-ingestion.ts b/backend/src/jobs/entrypoints/data-ingestion.ts new file mode 100644 index 0000000..5b102b7 --- /dev/null +++ b/backend/src/jobs/entrypoints/data-ingestion.ts @@ -0,0 +1,42 @@ +#!/usr/bin/env node +import { Logger } from '@nestjs/common' +import { NestFactory } from '@nestjs/core' +import { SyncModule } from 'src/sync/sync.module' +import { JobTrigger } from '../enums/job-trigger.enum' +import { JobType } from '../enums/job-type.enum' +import { JobRunner } from '../job-runner.service' + +// Orchestrates: INGEST_ICM, INGEST_MIS, RUN_ELIGIBILITY, SYNC_ICM +async function bootstrap() { + const logger = new Logger('DataIngestionJob') + + try { + logger.log('Bootstrapping data ingestion job...') + + // Create NestJS application context (no HTTP server) + const app = await NestFactory.createApplicationContext(SyncModule, { + logger: ['log', 'error', 'warn'], + }) + + // Get JobRunner from DI container + const jobRunner = app.get(JobRunner) + + // Run INGEST_DATA job + const result = await jobRunner.runJobType(JobType.INGEST_DATA, JobTrigger.CRON) + + await app.close() + + if (result.success) { + logger.log('Data ingestion completed successfully') + process.exit(0) + } else { + logger.error(`Data ingestion failed: ${result.message}`) + process.exit(1) + } + } catch (error) { + logger.error(`Fatal error: ${error.message}`, error.stack) + process.exit(1) + } +} + +bootstrap() diff --git a/backend/src/jobs/entrypoints/retry-failed.ts b/backend/src/jobs/entrypoints/retry-failed.ts new file mode 100644 index 0000000..616eedb --- /dev/null +++ b/backend/src/jobs/entrypoints/retry-failed.ts @@ -0,0 +1,42 @@ +#!/usr/bin/env node +import { Logger } from '@nestjs/common' +import { NestFactory } from '@nestjs/core' +import { JobTrigger } from '../enums/job-trigger.enum' +import { JobType } from '../enums/job-type.enum' +import { JobRunner } from '../job-runner.service' +import { JobsModule } from '../jobs.module' + +// Marks stuck jobs as failed and retries all failed jobs +async function bootstrap() { + const logger = new Logger('RetryFailedJob') + + try { + logger.log('Bootstrapping retry failed jobs...') + + // Create NestJS application context (no HTTP server) + const app = await NestFactory.createApplicationContext(JobsModule, { + logger: ['log', 'error', 'warn'], + }) + + // Get JobRunner from DI container + const jobRunner = app.get(JobRunner) + + // Run RETRY_FAILED job + const result = await jobRunner.runJobType(JobType.RETRY_FAILED, JobTrigger.CRON) + + await app.close() + + if (result.success) { + logger.log('Retry failed jobs completed successfully') + process.exit(0) + } else { + logger.error(`Retry failed jobs failed: ${result.message}`) + process.exit(1) + } + } catch (error) { + logger.error(`Fatal error: ${error.message}`, error.stack) + process.exit(1) + } +} + +bootstrap() diff --git a/backend/src/jobs/enums/job-status.enum.ts b/backend/src/jobs/enums/job-status.enum.ts new file mode 100644 index 0000000..b02d5d3 --- /dev/null +++ b/backend/src/jobs/enums/job-status.enum.ts @@ -0,0 +1,5 @@ +export enum JobStatus { + RUNNING = 'RUNNING', + SUCCESS = 'SUCCESS', + FAILED = 'FAILED', +} diff --git a/backend/src/jobs/enums/job-trigger.enum.ts b/backend/src/jobs/enums/job-trigger.enum.ts new file mode 100644 index 0000000..5a2bdb2 --- /dev/null +++ b/backend/src/jobs/enums/job-trigger.enum.ts @@ -0,0 +1,5 @@ +export enum JobTrigger { + CRON = 'CRON', + SYSTEM = 'SYSTEM', + END_USER = 'END_USER', +} diff --git a/backend/src/jobs/enums/job-type.enum.ts b/backend/src/jobs/enums/job-type.enum.ts new file mode 100644 index 0000000..e1704a3 --- /dev/null +++ b/backend/src/jobs/enums/job-type.enum.ts @@ -0,0 +1,10 @@ +export enum JobType { + INGEST_DATA = 'INGEST_DATA', + INGEST_ICM = 'INGEST_ICM', + INGEST_MIS = 'INGEST_MIS', + RUN_ELIGIBILITY = 'RUN_ELIGIBILITY', + SYNC_ICM = 'SYNC_ICM', + SEND_CRA_FILE = 'SEND_CRA_FILE', + POLL_CRA_RESPONSE = 'POLL_CRA_RESPONSE', + RETRY_FAILED = 'RETRY_FAILED', +} diff --git a/backend/src/jobs/handlers/retry-failed.handler.ts b/backend/src/jobs/handlers/retry-failed.handler.ts new file mode 100644 index 0000000..f0df234 --- /dev/null +++ b/backend/src/jobs/handlers/retry-failed.handler.ts @@ -0,0 +1,42 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from '../base-job' +import { JobType } from '../enums/job-type.enum' +import { JobResult } from '../interfaces/job-result.interface' +import { JobContext } from '../interfaces/job.interface' +import { JobRunner } from '../job-runner.service' + +/* + * RETRY_FAILED - Retry failed jobs + * 1. Marks stuck RUNNING jobs as FAILED + * 2. Retries all FAILED jobs + */ +@Injectable() +export class RetryFailedHandler extends BaseJob { + readonly jobType = JobType.RETRY_FAILED + + constructor(private readonly jobRunner: JobRunner) { + super() + } + + async execute(_context: JobContext): Promise { + try { + // This handles both stuck job detection and retry logic + await this.jobRunner.processFailedJobs() + + return { + success: true, + message: 'Failed job processing completed', + } + } catch (error) { + this.logger.error(`Error processing failed jobs: ${error.message}`, error.stack) + return { + success: false, + message: error.message, + metadata: { + errorStack: error.stack, + errorName: error.name, + }, + } + } + } +} diff --git a/backend/src/jobs/interfaces/job-result.interface.ts b/backend/src/jobs/interfaces/job-result.interface.ts new file mode 100644 index 0000000..6b13968 --- /dev/null +++ b/backend/src/jobs/interfaces/job-result.interface.ts @@ -0,0 +1,5 @@ +export interface JobResult { + success: boolean + message?: string + metadata?: Record +} diff --git a/backend/src/jobs/interfaces/job.interface.ts b/backend/src/jobs/interfaces/job.interface.ts new file mode 100644 index 0000000..860cfc3 --- /dev/null +++ b/backend/src/jobs/interfaces/job.interface.ts @@ -0,0 +1,21 @@ +import { JobTrigger } from '../enums/job-trigger.enum' +import { JobType } from '../enums/job-type.enum' +import { JobResult } from './job-result.interface' + +export interface JobContext { + jobRunId: number + parentJobId?: number + jobType: JobType + jobTrigger: JobTrigger + retryCount: number + metadata?: Record +} + +export interface Job { + readonly jobType: JobType + readonly inlineRetryAttempts: number + execute(context: JobContext): Promise + onStart?(context: JobContext): Promise + onSuccess?(context: JobContext, result: JobResult): Promise + onFailure?(context: JobContext, error: Error): Promise +} diff --git a/backend/src/jobs/job-registry.service.ts b/backend/src/jobs/job-registry.service.ts new file mode 100644 index 0000000..e176458 --- /dev/null +++ b/backend/src/jobs/job-registry.service.ts @@ -0,0 +1,26 @@ +import { Injectable, Logger } from '@nestjs/common' +import { JobType } from './enums/job-type.enum' +import { Job } from './interfaces/job.interface' + +@Injectable() +export class JobRegistry { + private readonly logger = new Logger(JobRegistry.name) + private readonly handlers = new Map() + + register(jobType: JobType, handler: Job): void { + this.handlers.set(jobType, handler) + this.logger.log(`Registered handler for ${jobType}`) + } + + getHandler(jobType: JobType): Job | undefined { + return this.handlers.get(jobType) + } + + hasHandler(jobType: JobType): boolean { + return this.handlers.has(jobType) + } + + getRegisteredTypes(): JobType[] { + return Array.from(this.handlers.keys()) + } +} diff --git a/backend/src/jobs/job-runner.service.spec.ts b/backend/src/jobs/job-runner.service.spec.ts new file mode 100644 index 0000000..1f07062 --- /dev/null +++ b/backend/src/jobs/job-runner.service.spec.ts @@ -0,0 +1,192 @@ +import type { TestingModule } from '@nestjs/testing' +import { Test } from '@nestjs/testing' +import { JobStatus } from './enums/job-status.enum' +import { JobTrigger } from './enums/job-trigger.enum' +import { JobType } from './enums/job-type.enum' +import { JobResult } from './interfaces/job-result.interface' +import { Job } from './interfaces/job.interface' +import { JobRegistry } from './job-registry.service' +import { JobRunner } from './job-runner.service' +import { JobsService } from './jobs.service' + +describe('JobRunner', () => { + let runner: JobRunner + let jobsService: JobsService + let jobRegistry: JobRegistry + + const mockJobRun = { + id: 1, + jobType: JobType.INGEST_DATA, + status: JobStatus.RUNNING, + parentJobId: null, + jobTrigger: JobTrigger.CRON, + retryCount: 0, + error: null, + metadata: {}, + createdAt: new Date(), + startedAt: new Date(), + completedAt: null, + childJobs: [], + parentJob: null, + } + + const mockHandler: Job = { + jobType: JobType.INGEST_DATA, + inlineRetryAttempts: 2, + execute: vi.fn(), + onStart: vi.fn(), + onSuccess: vi.fn(), + onFailure: vi.fn(), + } + + beforeEach(async () => { + vi.clearAllMocks() + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + JobRunner, + { + provide: JobsService, + useValue: { + createJob: vi.fn().mockResolvedValue(mockJobRun), + getJob: vi.fn().mockResolvedValue(mockJobRun), + markSuccess: vi.fn().mockResolvedValue(mockJobRun), + markFailed: vi.fn().mockResolvedValue(mockJobRun), + getFailedJobs: vi.fn().mockResolvedValue([]), + markStuckJobsAsFailed: vi.fn().mockResolvedValue({ count: 0 }), + }, + }, + { + provide: JobRegistry, + useValue: { + getHandler: vi.fn().mockReturnValue(mockHandler), + hasHandler: vi.fn().mockReturnValue(true), + }, + }, + ], + }).compile() + + runner = module.get(JobRunner) + jobsService = module.get(JobsService) + jobRegistry = module.get(JobRegistry) + }) + + it('should be defined', () => { + expect(runner).toBeDefined() + }) + + describe('executeJob', () => { + it('should execute job successfully', async () => { + const successResult: JobResult = { success: true, message: 'Done' } + vi.mocked(mockHandler.execute).mockResolvedValue(successResult) + + const result = await runner.executeJob(1) + + expect(jobsService.getJob).toHaveBeenCalledWith(1) + expect(mockHandler.onStart).toHaveBeenCalled() + expect(mockHandler.execute).toHaveBeenCalled() + expect(jobsService.markSuccess).toHaveBeenCalledWith(1, undefined) + expect(mockHandler.onSuccess).toHaveBeenCalled() + expect(result.success).toBe(true) + }) + + it('should throw error if job not found', async () => { + vi.mocked(jobsService.getJob).mockResolvedValue(null) + + await expect(runner.executeJob(999)).rejects.toThrow('Job 999 not found') + }) + + it('should mark as FAILED if no handler registered', async () => { + vi.mocked(jobRegistry.getHandler).mockReturnValue(undefined) + + await expect(runner.executeJob(1)).rejects.toThrow('No handler registered') + expect(jobsService.markFailed).toHaveBeenCalled() + }) + + it('should retry inline on failure and then mark as failed', async () => { + vi.mocked(mockHandler.execute).mockRejectedValue(new Error('Connection failed')) + + const result = await runner.executeJob(1) + + // Should have tried inlineRetryAttempts + 1 times + expect(mockHandler.execute).toHaveBeenCalledTimes(3) // 1 initial + 2 retries + expect(jobsService.markFailed).toHaveBeenCalled() + expect(mockHandler.onFailure).toHaveBeenCalled() + expect(result.success).toBe(false) + }) + }) + + describe('runJobType', () => { + it('should create job and execute it', async () => { + const successResult: JobResult = { success: true, message: 'Done' } + vi.mocked(mockHandler.execute).mockResolvedValue(successResult) + + const result = await runner.runJobType(JobType.INGEST_DATA, JobTrigger.CRON) + + expect(jobsService.createJob).toHaveBeenCalledWith({ + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + parentJobId: undefined, + metadata: undefined, + }) + expect(result.success).toBe(true) + }) + + it('should throw if no handler for job type', async () => { + vi.mocked(jobRegistry.getHandler).mockReturnValue(undefined) + + await expect(runner.runJobType(JobType.SEND_CRA_FILE, JobTrigger.CRON)).rejects.toThrow( + 'No handler registered', + ) + }) + + it('should pass metadata to created job', async () => { + const successResult: JobResult = { success: true } + vi.mocked(mockHandler.execute).mockResolvedValue(successResult) + const metadata = { batchId: 123 } + + await runner.runJobType(JobType.INGEST_DATA, JobTrigger.CRON, { metadata }) + + expect(jobsService.createJob).toHaveBeenCalledWith(expect.objectContaining({ metadata })) + }) + }) + + describe('processFailedJobs', () => { + it('should mark stuck jobs as failed and retry failed jobs', async () => { + const failedJob = { + id: 2, + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + retryCount: 1, + metadata: {}, + parentJobId: null, + } + vi.mocked(jobsService.getFailedJobs).mockResolvedValue([failedJob]) + vi.mocked(jobsService.getJob).mockResolvedValue({ ...mockJobRun, id: 2 }) + vi.mocked(mockHandler.execute).mockResolvedValue({ success: true }) + + await runner.processFailedJobs() + + expect(jobsService.markStuckJobsAsFailed).toHaveBeenCalledWith(60) + expect(jobsService.getFailedJobs).toHaveBeenCalled() + // Should create a new job for retry + expect(jobsService.createJob).toHaveBeenCalled() + }) + + it('should handle errors during retry gracefully', async () => { + const failedJob = { + id: 2, + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + retryCount: 1, + metadata: {}, + parentJobId: null, + } + vi.mocked(jobsService.getFailedJobs).mockResolvedValue([failedJob]) + vi.mocked(jobsService.createJob).mockRejectedValue(new Error('DB error')) + + // Should not throw + await expect(runner.processFailedJobs()).resolves.toBeUndefined() + }) + }) +}) diff --git a/backend/src/jobs/job-runner.service.ts b/backend/src/jobs/job-runner.service.ts new file mode 100644 index 0000000..69bc5c0 --- /dev/null +++ b/backend/src/jobs/job-runner.service.ts @@ -0,0 +1,137 @@ +import { Injectable, Logger } from '@nestjs/common' +import { JobTrigger } from './enums/job-trigger.enum' +import { JobType } from './enums/job-type.enum' +import { JobResult } from './interfaces/job-result.interface' +import { JobContext } from './interfaces/job.interface' +import { JobRegistry } from './job-registry.service' +import { JobsService } from './jobs.service' + +@Injectable() +export class JobRunner { + private readonly logger = new Logger(JobRunner.name) + + constructor( + private readonly jobsService: JobsService, + private readonly jobRegistry: JobRegistry, + ) {} + + // Execute a job with inline retry + async executeJob(jobId: number): Promise { + const job = await this.jobsService.getJob(jobId) + if (!job) { + throw new Error(`Job ${jobId} not found`) + } + + const handler = this.jobRegistry.getHandler(job.jobType as JobType) + + if (!handler) { + const errorMsg = `No handler registered for job type: ${job.jobType}` + await this.jobsService.markFailed(jobId, errorMsg) + throw new Error(errorMsg) + } + + const context: JobContext = { + jobRunId: job.id, + parentJobId: job.parentJobId ?? undefined, + jobType: job.jobType as JobType, + jobTrigger: job.jobTrigger as JobTrigger, + retryCount: job.retryCount ?? 0, + metadata: job.metadata as Record | undefined, + } + + await handler.onStart?.(context) + + // Inline retry loop + let lastError: Error | null = null + for (let attempt = 0; attempt <= handler.inlineRetryAttempts; attempt++) { + try { + if (attempt > 0) { + this.logger.log(`Inline retry attempt ${attempt} for job ${jobId}`) + // Short delay between inline retries (exponential: 1s, 2s, 4s) + // TODO: to increase ? + await this.sleep(1000 * Math.pow(2, attempt - 1)) + } + + const result = await handler.execute(context) + + if (result.success) { + await this.jobsService.markSuccess(jobId, result.metadata) + await handler.onSuccess?.(context, result) + return result + } else { + lastError = new Error(result.message || 'Job execution returned unsuccessful result') + } + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + this.logger.warn(`Job ${jobId} inline attempt ${attempt + 1} failed: ${lastError.message}`) + } + } + + // All inline retries exhausted - mark as failed + await this.jobsService.markFailed(jobId, lastError!.message) + await handler.onFailure?.(context, lastError!) + + return { + success: false, + message: `Failed after ${handler.inlineRetryAttempts + 1} attempts: ${lastError!.message}`, + } + } + + async runJobType( + jobType: JobType, + jobTrigger: JobTrigger, + options?: { + parentJobId?: number + metadata?: Record + }, + ): Promise { + const handler = this.jobRegistry.getHandler(jobType) + if (!handler) { + throw new Error(`No handler registered for job type: ${jobType}`) + } + + // Create a new job run record (starts as RUNNING) + const jobRun = await this.jobsService.createJob({ + jobType, + jobTrigger, + parentJobId: options?.parentJobId, + metadata: options?.metadata, + }) + + this.logger.log(`Created job run ${jobRun.id} for ${jobType}`) + + return this.executeJob(jobRun.id) + } + + // Process failed jobs and stuck running jobs + // TODO: define when to consider jobs as failed + async processFailedJobs(): Promise { + // First, mark stuck RUNNING jobs as FAILED + const stuckResult = await this.jobsService.markStuckJobsAsFailed(60) // 1 hour threshold + if (stuckResult.count > 0) { + this.logger.log(`Marked ${stuckResult.count} stuck jobs as FAILED`) + } + + // Then, retry all FAILED jobs + const failedJobs = await this.jobsService.getFailedJobs() + this.logger.log(`Found ${failedJobs.length} failed jobs to retry`) + + for (const job of failedJobs) { + try { + this.logger.log(`Retrying job ${job.id} [${job.jobType}]`) + + // Re-run the job (creates a new job run) + await this.runJobType(job.jobType as JobType, job.jobTrigger as JobTrigger, { + parentJobId: job.parentJobId ?? undefined, + metadata: job.metadata as Record, + }) + } catch (error) { + this.logger.error(`Error retrying job ${job.id}: ${error}`) + } + } + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) + } +} diff --git a/backend/src/jobs/jobs.module.ts b/backend/src/jobs/jobs.module.ts new file mode 100644 index 0000000..cbfe3ed --- /dev/null +++ b/backend/src/jobs/jobs.module.ts @@ -0,0 +1,31 @@ +import { Module, OnModuleInit } from '@nestjs/common' +import { PrismaModule } from 'src/common/database/prisma.module' +import { RetryFailedHandler } from './handlers/retry-failed.handler' +import { JobRegistry } from './job-registry.service' +import { JobRunner } from './job-runner.service' +import { JobsService } from './jobs.service' + +/* + * JobsModule provides the job framework infrastructure + * + * Exports: + * - JobsService: Database operations for job_runs + * - JobRunner: Execute jobs with retry logic + * - JobRegistry: Register and retrieve job handlers + * - Register their handlers (cross-cutting jobs) with JobRegistry in onModuleInit() + */ +@Module({ + imports: [PrismaModule], + providers: [JobsService, JobRunner, JobRegistry, RetryFailedHandler], + exports: [JobsService, JobRunner, JobRegistry, RetryFailedHandler], +}) +export class JobsModule implements OnModuleInit { + constructor( + private readonly registry: JobRegistry, + private readonly retryFailedHandler: RetryFailedHandler, + ) {} + + onModuleInit() { + this.registry.register(this.retryFailedHandler.jobType, this.retryFailedHandler) + } +} diff --git a/backend/src/jobs/jobs.service.spec.ts b/backend/src/jobs/jobs.service.spec.ts new file mode 100644 index 0000000..5ff03cc --- /dev/null +++ b/backend/src/jobs/jobs.service.spec.ts @@ -0,0 +1,214 @@ +import type { TestingModule } from '@nestjs/testing' +import { Test } from '@nestjs/testing' +import { PrismaService } from 'src/common/database/prisma.service' +import { JobStatus } from './enums/job-status.enum' +import { JobTrigger } from './enums/job-trigger.enum' +import { JobType } from './enums/job-type.enum' +import { JobsService } from './jobs.service' + +describe('JobsService', () => { + let service: JobsService + let prisma: PrismaService + + const mockJobRun = { + id: 1, + jobType: JobType.INGEST_DATA, + status: JobStatus.RUNNING, + parentJobId: null, + jobTrigger: JobTrigger.CRON, + retryCount: 0, + error: null, + metadata: {}, + createdAt: new Date(), + startedAt: new Date(), + completedAt: null, + } + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + JobsService, + { + provide: PrismaService, + useValue: { + jobRun: { + create: vi.fn().mockResolvedValue(mockJobRun), + findUnique: vi.fn().mockResolvedValue(mockJobRun), + findFirst: vi.fn().mockResolvedValue(mockJobRun), + findMany: vi.fn().mockResolvedValue([mockJobRun]), + update: vi.fn().mockResolvedValue(mockJobRun), + updateMany: vi.fn().mockResolvedValue({ count: 1 }), + }, + }, + }, + ], + }).compile() + + service = module.get(JobsService) + prisma = module.get(PrismaService) + }) + + it('should be defined', () => { + expect(service).toBeDefined() + }) + + describe('createJob', () => { + it('should create a job with RUNNING status', async () => { + const result = await service.createJob({ + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + }) + + expect(prisma.jobRun.create).toHaveBeenCalledWith({ + data: expect.objectContaining({ + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + status: JobStatus.RUNNING, + retryCount: 0, + }), + }) + expect(result).toEqual(mockJobRun) + }) + + it('should create a child job with parent reference', async () => { + await service.createJob({ + jobType: JobType.INGEST_ICM, + jobTrigger: JobTrigger.CRON, + parentJobId: 1, + }) + + expect(prisma.jobRun.create).toHaveBeenCalledWith({ + data: expect.objectContaining({ + jobType: JobType.INGEST_ICM, + parentJobId: 1, + }), + }) + }) + }) + + describe('getJob', () => { + it('should return job with child jobs and parent', async () => { + await service.getJob(1) + + expect(prisma.jobRun.findUnique).toHaveBeenCalledWith({ + where: { id: 1 }, + include: { childJobs: true, parentJob: true }, + }) + }) + }) + + describe('markSuccess', () => { + it('should update status to SUCCESS with completedAt', async () => { + await service.markSuccess(1) + + expect(prisma.jobRun.update).toHaveBeenCalledWith({ + where: { id: 1 }, + data: expect.objectContaining({ + status: JobStatus.SUCCESS, + completedAt: expect.any(Date), + }), + }) + }) + + it('should update metadata if provided', async () => { + await service.markSuccess(1, { recordsProcessed: 100 }) + + expect(prisma.jobRun.update).toHaveBeenCalledWith({ + where: { id: 1 }, + data: expect.objectContaining({ + status: JobStatus.SUCCESS, + metadata: { recordsProcessed: 100 }, + }), + }) + }) + }) + + describe('markFailed', () => { + it('should update status to FAILED with error and increment retry count', async () => { + await service.markFailed(1, 'Connection timeout') + + expect(prisma.jobRun.update).toHaveBeenCalledWith({ + where: { id: 1 }, + data: expect.objectContaining({ + status: JobStatus.FAILED, + error: 'Connection timeout', + retryCount: { increment: 1 }, + completedAt: expect.any(Date), + }), + }) + }) + }) + + describe('getFailedJobs', () => { + it('should return all failed jobs', async () => { + await service.getFailedJobs() + + expect(prisma.jobRun.findMany).toHaveBeenCalledWith({ + where: { status: JobStatus.FAILED }, + select: { + id: true, + jobType: true, + jobTrigger: true, + retryCount: true, + metadata: true, + parentJobId: true, + }, + orderBy: { completedAt: 'asc' }, + }) + }) + }) + + describe('markStuckJobsAsFailed', () => { + it('should mark stuck RUNNING jobs as FAILED', async () => { + await service.markStuckJobsAsFailed(60) + + expect(prisma.jobRun.updateMany).toHaveBeenCalledWith({ + where: { + status: JobStatus.RUNNING, + startedAt: { lt: expect.any(Date) }, + }, + data: { + status: JobStatus.FAILED, + error: 'Job timed out (stuck)', + completedAt: expect.any(Date), + }, + }) + }) + }) + + describe('getLastSuccessfulJob', () => { + it('should return the most recent successful job of a type', async () => { + await service.getLastSuccessfulJob(JobType.INGEST_DATA) + + expect(prisma.jobRun.findFirst).toHaveBeenCalledWith({ + where: { + jobType: JobType.INGEST_DATA, + status: JobStatus.SUCCESS, + }, + orderBy: { completedAt: 'desc' }, + }) + }) + }) + + describe('getLastSuccessTimestamp', () => { + it('should return completed_at of last successful job of given type', async () => { + const completedAt = new Date() + vi.spyOn(prisma.jobRun, 'findFirst').mockResolvedValue({ + ...mockJobRun, + completedAt, + }) + + const timestamp = await service.getLastSuccessTimestamp(JobType.INGEST_DATA) + + expect(timestamp).toEqual(completedAt) + }) + + it('should return null if no successful job exists', async () => { + vi.spyOn(prisma.jobRun, 'findFirst').mockResolvedValue(null) + + const timestamp = await service.getLastSuccessTimestamp(JobType.INGEST_DATA) + + expect(timestamp).toBeNull() + }) + }) +}) diff --git a/backend/src/jobs/jobs.service.ts b/backend/src/jobs/jobs.service.ts new file mode 100644 index 0000000..ca58d1d --- /dev/null +++ b/backend/src/jobs/jobs.service.ts @@ -0,0 +1,138 @@ +import { Injectable } from '@nestjs/common' +import { PrismaService } from 'src/common/database/prisma.service' +import { JobStatus } from './enums/job-status.enum' +import { JobTrigger } from './enums/job-trigger.enum' +import { JobType } from './enums/job-type.enum' + +export interface CreateJobDto { + jobType: JobType + jobTrigger: JobTrigger + parentJobId?: number + metadata?: Record +} + +@Injectable() +export class JobsService { + constructor(private readonly prisma: PrismaService) {} + + async createJob(dto: CreateJobDto) { + const now = new Date() + return this.prisma.jobRun.create({ + data: { + jobType: dto.jobType, + jobTrigger: dto.jobTrigger, + parentJobId: dto.parentJobId, + status: JobStatus.RUNNING, + retryCount: 0, + metadata: (dto.metadata ?? {}) as any, + createdAt: now, + startedAt: now, + }, + }) + } + + async getJob(id: number) { + return this.prisma.jobRun.findUnique({ + where: { id }, + include: { childJobs: true, parentJob: true }, + }) + } + + async markSuccess(id: number, metadata?: Record) { + return this.prisma.jobRun.update({ + where: { id }, + data: { + status: JobStatus.SUCCESS, + completedAt: new Date(), + ...(metadata && { metadata: metadata as any }), + }, + }) + } + + async markFailed(id: number, error: string) { + return this.prisma.jobRun.update({ + where: { id }, + data: { + status: JobStatus.FAILED, + error, + retryCount: { increment: 1 }, + completedAt: new Date(), + }, + }) + } + + async getFailedJobs() { + return this.prisma.jobRun.findMany({ + where: { + status: JobStatus.FAILED, + }, + select: { + id: true, + jobType: true, + jobTrigger: true, + retryCount: true, + metadata: true, + parentJobId: true, + }, + orderBy: { + completedAt: 'asc', + }, + }) + } + + //TODO: define when a job is stuck + //stuckThresholdMinutes + async getStuckRunningJobs(stuckThresholdMinutes: number = 60) { + const threshold = new Date(Date.now() - stuckThresholdMinutes * 60 * 1000) + return this.prisma.jobRun.findMany({ + where: { + status: JobStatus.RUNNING, + startedAt: { + lt: threshold, + }, + }, + }) + } + + async markStuckJobsAsFailed(stuckThresholdMinutes: number = 60) { + const threshold = new Date(Date.now() - stuckThresholdMinutes * 60 * 1000) + return this.prisma.jobRun.updateMany({ + where: { + status: JobStatus.RUNNING, + startedAt: { + lt: threshold, + }, + }, + data: { + status: JobStatus.FAILED, + error: 'Job timed out (stuck)', + completedAt: new Date(), + }, + }) + } + + async getChildJobs(parentJobId: number) { + return this.prisma.jobRun.findMany({ + where: { parentJobId }, + orderBy: { createdAt: 'asc' }, + }) + } + + async getLastSuccessfulJob(jobType: JobType) { + return this.prisma.jobRun.findFirst({ + where: { + jobType, + status: JobStatus.SUCCESS, + }, + orderBy: { + completedAt: 'desc', + }, + }) + } + + // Get the completion timestamp of the last successful job of a given type + async getLastSuccessTimestamp(jobType: JobType): Promise { + const lastJob = await this.getLastSuccessfulJob(jobType) + return lastJob?.completedAt ?? null + } +} diff --git a/backend/src/sync/handlers/ingest-data.handler.spec.ts b/backend/src/sync/handlers/ingest-data.handler.spec.ts new file mode 100644 index 0000000..0a3b840 --- /dev/null +++ b/backend/src/sync/handlers/ingest-data.handler.spec.ts @@ -0,0 +1,156 @@ +import { Test, TestingModule } from '@nestjs/testing' +import { JobTrigger } from 'src/jobs/enums/job-trigger.enum' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobContext } from 'src/jobs/interfaces/job.interface' +import { JobRunner } from 'src/jobs/job-runner.service' +import { IngestDataHandler } from './ingest-data.handler' + +describe('IngestDataHandler', () => { + let handler: IngestDataHandler + let jobRunner: JobRunner + + const mockContext: JobContext = { + jobRunId: 1, + jobType: JobType.INGEST_DATA, + jobTrigger: JobTrigger.CRON, + retryCount: 0, + } + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + IngestDataHandler, + { + provide: JobRunner, + useValue: { + runJobType: vi.fn(), + }, + }, + ], + }).compile() + + handler = module.get(IngestDataHandler) + jobRunner = module.get(JobRunner) + }) + + it('should be defined', () => { + expect(handler).toBeDefined() + expect(handler.jobType).toBe(JobType.INGEST_DATA) + }) + + describe('successful execution', () => { + it('should orchestrate all jobs in correct order', async () => { + const runJobTypeSpy = vi.mocked(jobRunner.runJobType) + runJobTypeSpy.mockResolvedValue({ success: true }) + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(true) + expect(result.message).toBe('Data ingestion and sync completed successfully') + + // Verify execution order + expect(runJobTypeSpy).toHaveBeenNthCalledWith(1, JobType.INGEST_ICM, JobTrigger.CRON, { + parentJobId: 1, + }) + expect(runJobTypeSpy).toHaveBeenNthCalledWith(2, JobType.INGEST_MIS, JobTrigger.CRON, { + parentJobId: 1, + }) + expect(runJobTypeSpy).toHaveBeenNthCalledWith(3, JobType.RUN_ELIGIBILITY, JobTrigger.CRON, { + parentJobId: 1, + }) + expect(runJobTypeSpy).toHaveBeenNthCalledWith(4, JobType.SYNC_ICM, JobTrigger.CRON, { + parentJobId: 1, + }) + expect(runJobTypeSpy).toHaveBeenCalledTimes(4) + }) + }) + + describe('failure scenarios', () => { + it('should fail if ICM ingestion fails', async () => { + const runJobTypeSpy = vi.mocked(jobRunner.runJobType) + runJobTypeSpy + .mockResolvedValueOnce({ success: false, message: 'ICM connection failed' }) + .mockResolvedValueOnce({ success: true }) + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(false) + expect(result.message).toBe('Ingestion failed') + expect(result.metadata).toEqual({ + icmResult: { success: false, message: 'ICM connection failed' }, + misResult: { success: true }, + }) + + // Should not proceed to eligibility or sync + expect(runJobTypeSpy).toHaveBeenCalledTimes(2) // Only ICM + MIS + }) + + it('should fail if MIS ingestion fails', async () => { + const runJobTypeSpy = vi.mocked(jobRunner.runJobType) + runJobTypeSpy + .mockResolvedValueOnce({ success: true }) + .mockResolvedValueOnce({ success: false, message: 'MIS timeout' }) + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(false) + expect(result.message).toBe('Ingestion failed') + expect(result.metadata).toEqual({ + icmResult: { success: true }, + misResult: { success: false, message: 'MIS timeout' }, + }) + }) + + it('should fail if eligibility processing fails', async () => { + const runJobTypeSpy = vi.mocked(jobRunner.runJobType) + runJobTypeSpy + .mockResolvedValueOnce({ success: true }) // ICM + .mockResolvedValueOnce({ success: true }) // MIS + .mockResolvedValueOnce({ success: false, message: 'Eligibility rules error' }) // Eligibility + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(false) + expect(result.message).toBe('Eligibility processing failed') + expect(result.metadata).toEqual({ + eligibilityResult: { success: false, message: 'Eligibility rules error' }, + }) + + // Should not proceed to sync + expect(runJobTypeSpy).toHaveBeenCalledTimes(3) + }) + + it('should fail if ICM sync fails', async () => { + const runJobTypeSpy = vi.mocked(jobRunner.runJobType) + runJobTypeSpy + .mockResolvedValueOnce({ success: true }) // ICM + .mockResolvedValueOnce({ success: true }) // MIS + .mockResolvedValueOnce({ success: true }) // Eligibility + .mockResolvedValueOnce({ success: false, message: 'Sync API error' }) // Sync + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(false) + expect(result.message).toBe('ICM sync failed') + expect(result.metadata).toEqual({ + syncResult: { success: false, message: 'Sync API error' }, + }) + }) + + it('should handle unexpected errors with stack trace', async () => { + const error = new Error('Unexpected database error') + error.stack = 'Error: Unexpected database error\n at ...' + + vi.mocked(jobRunner.runJobType).mockRejectedValue(error) + + const result = await handler.execute(mockContext) + + expect(result.success).toBe(false) + expect(result.message).toBe('Unexpected database error') + expect(result.metadata).toEqual({ + errorStack: error.stack, + errorName: 'Error', + }) + }) + }) +}) diff --git a/backend/src/sync/handlers/ingest-data.handler.ts b/backend/src/sync/handlers/ingest-data.handler.ts new file mode 100644 index 0000000..f92b404 --- /dev/null +++ b/backend/src/sync/handlers/ingest-data.handler.ts @@ -0,0 +1,88 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobTrigger } from 'src/jobs/enums/job-trigger.enum' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' +import { JobRunner } from 'src/jobs/job-runner.service' + +/* + * Orchestrates the complete data sync flow: + * 1. INGEST_ICM + INGEST_MIS + * 2. RUN_ELIGIBILITY (after ingestion complete) + * 3. SYNC_ICM (after eligibility) + */ +@Injectable() +export class IngestDataHandler extends BaseJob { + readonly jobType = JobType.INGEST_DATA + + constructor(private readonly jobRunner: JobRunner) { + super() + } + + async execute(context: JobContext): Promise { + const parentJobId = context.jobRunId + + try { + // Step 1: Run ICM and MIS ingestion in parallel + this.logger.log('Starting parallel ingestion from ICM and MIS...') + const [icmResult, misResult] = await Promise.all([ + this.jobRunner.runJobType(JobType.INGEST_ICM, JobTrigger.CRON, { parentJobId }), + this.jobRunner.runJobType(JobType.INGEST_MIS, JobTrigger.CRON, { parentJobId }), + ]) + + if (!icmResult.success || !misResult.success) { + return { + success: false, + message: 'Ingestion failed', + metadata: { icmResult, misResult }, + } + } + + // Step 2: Run eligibility processing + this.logger.log('Running eligibility processing...') + const eligibilityResult = await this.jobRunner.runJobType( + JobType.RUN_ELIGIBILITY, + JobTrigger.CRON, + { parentJobId }, + ) + + if (!eligibilityResult.success) { + return { + success: false, + message: 'Eligibility processing failed', + metadata: { eligibilityResult }, + } + } + + // Step 3: Sync back to ICM + this.logger.log('Syncing back to ICM...') + const syncResult = await this.jobRunner.runJobType(JobType.SYNC_ICM, JobTrigger.CRON, { + parentJobId, + }) + + if (!syncResult.success) { + return { + success: false, + message: 'ICM sync failed', + metadata: { syncResult }, + } + } + + return { + success: true, + message: 'Data ingestion and sync completed successfully', + } + } catch (error) { + this.logger.error(`Unexpected error in INGEST_DATA: ${error.message}`, error.stack) + return { + success: false, + message: error.message, + metadata: { + errorStack: error.stack, + errorName: error.name, + }, + } + } + } +} diff --git a/backend/src/sync/handlers/ingest-icm.handler.ts b/backend/src/sync/handlers/ingest-icm.handler.ts new file mode 100644 index 0000000..e0ea5c1 --- /dev/null +++ b/backend/src/sync/handlers/ingest-icm.handler.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' +import { JobsService } from 'src/jobs/jobs.service' + +// Fetches ICM Data using incremental sync +@Injectable() +export class IngestIcmHandler extends BaseJob { + readonly jobType = JobType.INGEST_ICM + + constructor(private readonly jobsService: JobsService) { + super() + } + + async execute(_context: JobContext): Promise { + // TODO: Implement ICM ingestion logic + // 1. Get last_update_at: await jobsService.getLastSuccessTimestamp(JobType.INGEST_DATA) + // 2. Fetch records from ICM API where modified_at > last_update_at + // 3. Upsert into local database + // 4. Return metadata: { records_fetched, last_update_at_used } + + this.logger.log('INGEST_ICM stub - not yet implemented') + + return { + success: true, + message: 'ICM ingestion stub', + metadata: { + records_fetched: 0, + last_update_at_used: null, + }, + } + } +} diff --git a/backend/src/sync/handlers/ingest-mis.handler.ts b/backend/src/sync/handlers/ingest-mis.handler.ts new file mode 100644 index 0000000..9c28458 --- /dev/null +++ b/backend/src/sync/handlers/ingest-mis.handler.ts @@ -0,0 +1,31 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' +import { JobsService } from 'src/jobs/jobs.service' + +// Fetches MIS data using full reload +@Injectable() +export class IngestMisHandler extends BaseJob { + readonly jobType = JobType.INGEST_MIS + + constructor(private readonly jobsService: JobsService) { + super() + } + + async execute(_context: JobContext): Promise { + // TODO: Implement MIS ingestion logic + + this.logger.log('INGEST_MIS stub - not yet implemented') + + return { + success: true, + message: 'MIS ingestion stub', + metadata: { + records_fetched: 0, + last_updated_at_used: null, + }, + } + } +} diff --git a/backend/src/sync/handlers/run-eligibility.handler.ts b/backend/src/sync/handlers/run-eligibility.handler.ts new file mode 100644 index 0000000..0b6650b --- /dev/null +++ b/backend/src/sync/handlers/run-eligibility.handler.ts @@ -0,0 +1,26 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' + +// Runs after INGEST_ICM and INGEST_MIS complete +@Injectable() +export class RunEligibilityHandler extends BaseJob { + readonly jobType = JobType.RUN_ELIGIBILITY + + async execute(_context: JobContext): Promise { + // TODO: Implement eligibility logic + + this.logger.log('RUN_ELIGIBILITY stub - not yet implemented') + + return { + success: true, + message: 'Eligibility processing stub', + metadata: { + contacts_eligible: 0, + contacts_excluded: 0, + }, + } + } +} diff --git a/backend/src/sync/handlers/sync-icm.handler.ts b/backend/src/sync/handlers/sync-icm.handler.ts new file mode 100644 index 0000000..8a8a2a0 --- /dev/null +++ b/backend/src/sync/handlers/sync-icm.handler.ts @@ -0,0 +1,26 @@ +import { Injectable } from '@nestjs/common' +import { BaseJob } from 'src/jobs/base-job' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobResult } from 'src/jobs/interfaces/job-result.interface' +import { JobContext } from 'src/jobs/interfaces/job.interface' + +// Pushes eligibility status and other updates back to ICM +@Injectable() +export class SyncIcmHandler extends BaseJob { + readonly jobType = JobType.SYNC_ICM + + async execute(_context: JobContext): Promise { + // TODO: Implement ICM sync logic + + this.logger.log('SYNC_ICM stub - not yet implemented') + + return { + success: true, + message: 'ICM sync stub', + metadata: { + records_synced: 0, + failures: 0, + }, + } + } +} diff --git a/backend/src/sync/sync.module.spec.ts b/backend/src/sync/sync.module.spec.ts new file mode 100644 index 0000000..a05e281 --- /dev/null +++ b/backend/src/sync/sync.module.spec.ts @@ -0,0 +1,64 @@ +import type { TestingModule } from '@nestjs/testing' +import { Test } from '@nestjs/testing' +import { JobType } from 'src/jobs/enums/job-type.enum' +import { JobRegistry } from 'src/jobs/job-registry.service' +import { JobsModule } from 'src/jobs/jobs.module' +import { IngestDataHandler } from './handlers/ingest-data.handler' +import { IngestIcmHandler } from './handlers/ingest-icm.handler' +import { IngestMisHandler } from './handlers/ingest-mis.handler' +import { RunEligibilityHandler } from './handlers/run-eligibility.handler' +import { SyncIcmHandler } from './handlers/sync-icm.handler' +import { SyncModule } from './sync.module' + +describe('SyncModule', () => { + let module: TestingModule + let registry: JobRegistry + + beforeEach(async () => { + module = await Test.createTestingModule({ + imports: [JobsModule, SyncModule], + }).compile() + + registry = module.get(JobRegistry) + await module.init() // Trigger onModuleInit + }) + + afterEach(async () => { + await module.close() + }) + + it('should be defined', () => { + const syncModule = module.get(SyncModule) + expect(syncModule).toBeDefined() + }) + + it('should register all 5 sync handlers', () => { + expect(registry.hasHandler(JobType.INGEST_DATA)).toBe(true) + expect(registry.hasHandler(JobType.INGEST_ICM)).toBe(true) + expect(registry.hasHandler(JobType.INGEST_MIS)).toBe(true) + expect(registry.hasHandler(JobType.RUN_ELIGIBILITY)).toBe(true) + expect(registry.hasHandler(JobType.SYNC_ICM)).toBe(true) + }) + + it('should register handlers with correct types', () => { + const ingestDataHandler = registry.getHandler(JobType.INGEST_DATA) + const ingestIcmHandler = registry.getHandler(JobType.INGEST_ICM) + const ingestMisHandler = registry.getHandler(JobType.INGEST_MIS) + const eligibilityHandler = registry.getHandler(JobType.RUN_ELIGIBILITY) + const syncIcmHandler = registry.getHandler(JobType.SYNC_ICM) + + expect(ingestDataHandler).toBeInstanceOf(IngestDataHandler) + expect(ingestIcmHandler).toBeInstanceOf(IngestIcmHandler) + expect(ingestMisHandler).toBeInstanceOf(IngestMisHandler) + expect(eligibilityHandler).toBeInstanceOf(RunEligibilityHandler) + expect(syncIcmHandler).toBeInstanceOf(SyncIcmHandler) + }) + + it('should export all handler providers', () => { + expect(module.get(IngestDataHandler)).toBeDefined() + expect(module.get(IngestIcmHandler)).toBeDefined() + expect(module.get(IngestMisHandler)).toBeDefined() + expect(module.get(RunEligibilityHandler)).toBeDefined() + expect(module.get(SyncIcmHandler)).toBeDefined() + }) +}) diff --git a/backend/src/sync/sync.module.ts b/backend/src/sync/sync.module.ts new file mode 100644 index 0000000..f94d8a4 --- /dev/null +++ b/backend/src/sync/sync.module.ts @@ -0,0 +1,51 @@ +import { Module, OnModuleInit } from '@nestjs/common' +import { JobRegistry } from 'src/jobs/job-registry.service' +import { JobsModule } from 'src/jobs/jobs.module' +import { IngestDataHandler } from './handlers/ingest-data.handler' +import { IngestIcmHandler } from './handlers/ingest-icm.handler' +import { IngestMisHandler } from './handlers/ingest-mis.handler' +import { RunEligibilityHandler } from './handlers/run-eligibility.handler' +import { SyncIcmHandler } from './handlers/sync-icm.handler' + +/* + * Ingestion from ICM (CRM) and MIS (payment system) + * Eligibility processing + * Syncing results back to ICM + */ +// TODO: rename handlers +@Module({ + imports: [JobsModule], + providers: [ + IngestDataHandler, + IngestIcmHandler, + IngestMisHandler, + RunEligibilityHandler, + SyncIcmHandler, + ], + exports: [ + IngestDataHandler, + IngestIcmHandler, + IngestMisHandler, + RunEligibilityHandler, + SyncIcmHandler, + ], +}) +export class SyncModule implements OnModuleInit { + constructor( + private readonly registry: JobRegistry, + private readonly ingestDataHandler: IngestDataHandler, + private readonly ingestIcmHandler: IngestIcmHandler, + private readonly ingestMisHandler: IngestMisHandler, + private readonly runEligibilityHandler: RunEligibilityHandler, + private readonly syncIcmHandler: SyncIcmHandler, + ) {} + + onModuleInit() { + // Register all sync-related job handlers + this.registry.register(this.ingestDataHandler.jobType, this.ingestDataHandler) + this.registry.register(this.ingestIcmHandler.jobType, this.ingestIcmHandler) + this.registry.register(this.ingestMisHandler.jobType, this.ingestMisHandler) + this.registry.register(this.runEligibilityHandler.jobType, this.runEligibilityHandler) + this.registry.register(this.syncIcmHandler.jobType, this.syncIcmHandler) + } +} diff --git a/migrations/sql/V1__initial_schema.sql b/migrations/sql/V1__initial_schema.sql index 6aece82..731c261 100644 --- a/migrations/sql/V1__initial_schema.sql +++ b/migrations/sql/V1__initial_schema.sql @@ -8,7 +8,7 @@ CREATE TABLE IF NOT EXISTS csa.contacts ( aka_last_name TEXT NOT NULL, aka_first_name TEXT NOT NULL, person_id_icm TEXT NOT NULL, - person_id_ims TEXT NOT NULL, + person_id_mis TEXT NOT NULL, gender TEXT, date_of_birth DATE, age INTEGER, @@ -93,15 +93,23 @@ CREATE TABLE IF NOT EXISTS csa.contact_batch_details ( ); CREATE TABLE IF NOT EXISTS csa.job_runs ( - id SERIAL PRIMARY KEY, - type TEXT NOT NULL, - status TEXT NOT NULL, - retry_count INTEGER, - last_error TEXT, - created_at TIMESTAMP NOT NULL, - completed_at TIMESTAMP + id SERIAL PRIMARY KEY, + job_type TEXT NOT NULL, + status TEXT NOT NULL, + parent_job_id INTEGER REFERENCES csa.job_runs(id), + job_trigger TEXT NOT NULL, + retry_count INTEGER DEFAULT 0, + error TEXT, + metadata JSONB DEFAULT '{}'::jsonb, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + started_at TIMESTAMP NOT NULL, + completed_at TIMESTAMP ); +CREATE INDEX idx_job_runs_status ON csa.job_runs(status); +CREATE INDEX idx_job_runs_parent ON csa.job_runs(parent_job_id); +CREATE INDEX idx_job_runs_type_status ON csa.job_runs(job_type, status); + CREATE TABLE IF NOT EXISTS csa.transfer_files ( id SERIAL PRIMARY KEY, batch_id INTEGER,