Skip to content

madeindjs/nestjs-graphile-worker

Repository files navigation

Graphile Worker for Nest.js

npm version Test status

A Nest.js wrapper for Graphile Worker.

What is Graphile worker ?

Job queue for PostgreSQL running on Node.js - allows you to run jobs (e.g. sending emails, performing calculations, generating PDFs, etc) "in the background" so that your HTTP response/application code is not held up. Can be used with any PostgreSQL-backed application. Pairs beautifully with PostGraphile or PostgREST.

Why you should prefer Graphile Worker instead of Bull ?

  1. You already have a PostgreSQL in your stack (and you don't want to add a Redis server)

Features

  • provide a module GraphileWorkerModule to setup the runner using asRoot or asRootAsync
  • provide a WorkerService to add jobs or start runner
  • provide a @OnWorkerEvent decorator to add custom behavior on job:success for example
  • provide a @Task(name) decorator to define your injectable tasks
  • provide middleware support to control job execution for cross-cutting concerns (e.g. context enrichment)

Installation

npm install nestjs-graphile-worker
yarn add nestjs-graphile-worker
pnpm add nestjs-graphile-worker

Usage

1. Setup the module

In order, to setup the library, you need to import and initialize GraphileWorkerModule.

You can do it using forRoot method:

// src/app.module.ts
import { GraphileWorkerModule } from "nest-graphile-worker";
import { Module } from "@nestjs/common";
import { AppController } from "./app.controller";

@Module({
  imports: [
    GraphileWorkerModule.forRoot({
      connectionString: "postgres://example:password@postgres/example",
    }),
  ],
  controllers: [AppController],
  providers: [],
})
export class AppModule {}

Or using forRootAsync:

// src/app.module.ts
import { GraphileWorkerModule } from "nestjs-graphile-worker";
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { AppController } from "./app.controller";
import { helloTask } from "./hello.task";

@Module({
  imports: [
    ConfigModule.forRoot(),
    GraphileWorkerModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        connectionString: config.get("PG_CONNECTION"),
        taskList: {
          hello: helloTask,
        },
      }),
    }),
  ],
  controllers: [AppController],
  providers: [],
})
export class AppModule {}

The module configuration is GraphileWorkerConfiguration, which is a wrapper around Graphile's RunnerOptions

type GraphileWorkerConfiguration = Omit<RunnerOptions, "events" | "taskList">;

This means you can pass any configuration to the runner, like Recurring tasks (crontab):

// src/app.module.ts
@Module({
  imports: [
    GraphileWorkerModule.forRoot({
      connectionString: "postgres://example:password@postgres/example",
      crontab: [' * * * * * taskIdentifier ?priority=1 {"foo": "bar"}'].join(
        "\n",
      ),
    }),
  ],
  controllers: [AppController],
  providers: [],
})
export class AppModule {}

2. Create task

To create task you need to define an @Injectable class with @Task(name) decorator containing a decorated method with @TaskHandler:

// src/hello.task.ts
import { Injectable, Logger } from "@nestjs/common";
import type { JobHelpers } from "graphile-worker";
import { Task, TaskHandler } from "../../src/index";

@Injectable()
@Task("hello")
export class HelloTask {
  private logger = new Logger(HelloTask.name);

  @TaskHandler()
  handler(payload: any, _helpers: JobHelpers) {
    this.logger.log(`handle ${JSON.stringify(payload)}`);
  }
}

Then do not forget to register this class as provider in your module:

// src/app.module.ts
import { Module } from "@nestjs/common";
import { HelloTask } from "./hello.task";
// ...

@Module({
  imports: [
    /* ... */
  ],
  controllers: [
    /* ... */
  ],
  providers: [HelloTask],
})
export class AppModule {}

3. Create jobs

You can use WorkerService which is a wrapper of graphile-worker's Runner instance. WorkerService let you add job easily.

import { WorkerService } from "nestjs-graphile-worker";
import { Controller, HttpCode, Post } from "@nestjs/common";

@Controller()
export class AppController {
  constructor(private readonly graphileWorker: WorkerService) {}

  @Post()
  @HttpCode(201)
  async addJob() {
    await this.graphileWorker.addJob("hello", { hello: "world" });
  }

  @Post("bulk")
  @HttpCode(201)
  async addJobs() {
    const jobs = new Array(100)
      .fill(undefined)
      .map((_, i) => ({ identifier: "hello", payload: { hello: i } }));

    return this.graphileWorker.addJobs(jobs);
  }
}

4. Start runner

Add WorkerService.run in main.ts file:

import { WorkerService } from "nestjs-graphile-worker";
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.get(WorkerService).run();
  await app.listen(3000);
}
bootstrap();

5. Listen any Graphile's event

You can use @OnWorkerEvent decorator to listen any Graphile Worker event. You simply have to:

  1. @GraphileWorkerListener decorator on your class
  2. set @OnWorkerEvent(eventName) on your method
import { Injectable, Logger } from "@nestjs/common";
import { WorkerEventMap } from "graphile-worker";
import { GraphileWorkerListener, OnWorkerEvent } from "../../src/index";

@Injectable()
@GraphileWorkerListener()
export class AppService {
  private readonly logger = new Logger(AppService.name);

  @OnWorkerEvent("job:success")
  onJobSuccess({ job }: WorkerEventMap["job:success"]) {
    this.logger.debug(`job #${job.id} finished`);
  }

  @OnWorkerEvent("job:error")
  onJobError({ job, error }: WorkerEventMap["job:error"]) {
    this.logger.error(`job #${job.id} fail ${JSON.stringify(error)}`);
  }
}

Job middlewares

Middlewares can help implement cross-cutting concerns like:

  • Context enrichment - Add fresh runtime data to the job payload, or use the job data to enrich some higher-level context context (e.g. include the jobId in a logger context)
  • Error handling - Add error handling applying to all jobs, e.g. to handle the last attempt failed gracefully as recommended in the Graphile Worker documentation
  • Conditional execution - Skip or modify job execution based on runtime conditions
  • Rate limiting and throttling - Prevent job execution under certain conditions

The advantage of middlewares is that they execute as part of the job execution flow, giving you full control over the job's context and execution.
In contrast, WorkerEvent listeners execute as separate event handlers, making them more suitable for side effects (like notifications or monitoring) but not for controlling or modifying job execution itself.

Usage

To create a middleware, you should define a class with the decorator @Middleware({ global: true }) for global middlewares or @Middleware() for handler-specific middlewares. Global middlewares will automatically apply to all your task handlers.

For those middlewares that are not defined as global, you have to annotate the specific task handlers the middlewares should apply to, by using the decorator @UseMiddlewares([MyLocalMiddleware]) (with the list of the class references of the middlewares you want to apply).

You can also bypass specific global middlewares for individual handlers using the bypassGlobalMiddlewares option: @UseMiddlewares([MyLocalMiddleware], { bypassGlobalMiddlewares: [MyGlobalMiddleware] }). This provides maximum flexibility for controlling middleware execution on a per-handler basis if needed.

Here is an example:

import { Injectable } from "@nestjs/common";
import { Middleware, MiddlewareProvider } from "nestjs-graphile-worker";
import { JobHelpers } from "graphile-worker";

@Injectable()
@Middleware({ global: true })
export class ContextMiddleware implements MiddlewareProvider {
  async use(payload: any, helpers: JobHelpers, next: (payload: any) => Promise<void>) {
    // Add job execution context that handlers can use
    const enrichedPayload = {
      ...payload,
      _jobContext: {
        jobId: helpers.job.id,
      }
    };
    
    setLoggerContext({jobId: helpers.job.id}); // some logging util to set logging context

    await next(enrichedPayload);
  }
}

@Injectable()
@Middleware()
export class GracefulLastAttemptFailureMiddleware implements MiddlewareProvider {
  async use(payload: any, helpers: JobHelpers, next: (payload: any) => Promise<void>) {
    try {
      return await next(payload);
    } catch (error) {
      const { job, logger } = helpers;

      if (job.attempts < job.max_attempts) {
        throw error; // Re-throw if not the last attempt
      }
      // Handle the last attempt error gracefully, and avoid a permafailed job being stored in the jobs table
      logger.error(`Permanently failed to handle task ${job.task_identifier}`, { payload });
      instrumentation.onJobFailedWithGracefulExit(job.task_identifier); // some instrumentation util for monitoring
    }
  }
}

@Injectable()
@Task('myTask')
export class MyTask  {

  @UseMiddlewares([GracefulLastAttemptFailureMiddleware])
  @TaskHandler()
  async handler(payload: any, helpers: JobHelpers) {
    // do something
  }

  // Bypass specific global middlewares for this handler
  @UseMiddlewares(
    [GracefulLastAttemptFailureMiddleware], 
    { bypassGlobalMiddlewares: [ContextMiddleware] }
  )
  @TaskHandler()
  async handlerWithBypass(payload: any, helpers: JobHelpers) {
    // This handler will skip 'ContextMiddleware' but still use 'GracefulLastAttemptFailureMiddleware'
  }

  // Trick: Bypass a global middleware but use it in the array of handler-specific middlewares to control execution order
  @UseMiddlewares(
    [GracefulLastAttemptFailureMiddleware, ContextMiddleware], 
    { bypassGlobalMiddlewares: [ContextMiddleware] }
  )
  @TaskHandler()
  async handlerWithBypass(payload: any, helpers: JobHelpers) {
    // Bypass `myGlobalMiddleware` but include it locally to control execution order: while global middlewares execute
    // before local handlers, here `myGlobalMiddleware` gets executed as a local handler after `myLocalMiddleware`
  }
}

@Module({
  imports: [
    GraphileWorkerModule.forRoot({
      connectionString: "postgres://example:password@postgres/example",
    }),
  ],
  providers: [
    ContextMiddleware,
    GracefulLastAttemptFailureMiddleware,
    // ... other providers
  ],
  // ... other module config
})
export class AppModule {}

Middleware execution order

Global middlewares are always executed first, then handler-specific middlewares.

Important: Global middleware execution order is determined by the order in which they are declared in the providers array. Middlewares execute in the same order they appear in the array.

@Module({
  providers: [
    FirstGlobalMiddleware,    // Executes first
    SecondGlobalMiddleware,   // Executes second  
    // ... other providers
  ],
})
export class AppModule {}

Middleware troubleshooting: common issues

Take care of handling errors appropriately in your middleware.
Also keep your middleware lightweight and avoid heavy computations.

  1. Middleware not executing: Ensure the middleware class is decorated with @Middleware() and registered as a provider in your module.

  2. Tasks hanging: Make sure every middleware calls next() or throws an error.

  3. Performance issues: Check for heavy operations in middleware that might slow down task execution.

Sample

You can find a sample using this library. To run it, simply npm install and then:

docker-compose up

About

A Nest.js wrapper for Graphile Worker

Topics

Resources

License

Stars

Watchers

Forks

Contributors 13