Simple, reliable and efficient concurrent work queue for Prisma + PostgreSQL
- Leverages PostgreSQL SKIP LOCKED feature to reliably dequeue jobs
- Supports crontab syntax for complex scheduled jobs
- Written in TypeScript for static type checking with exported types along the library.
- Built by tsup to provide both CommonJS and ESM packages.
npm install @mgcrea/prisma-queue --save
# or
pnpm add @mgcrea/prisma-queue
- If you use an old version of Prisma ranging from 2.29.0 to 4.6.1 (included), you must first add
"interactiveTransactions"
to yourschema.prisma
client configuration:
generator client {
provider = "prisma-client-js"
previewFeatures = ["interactiveTransactions"]
}
-
Append the
QueueJob
model to yourschema.prisma
file -
Create your queue
type JobPayload = { email: string };
type JobResult = { status: number };
export const emailQueue = createQueue<JobPayload, JobResult>({ name: "email" }, async (job, client) => {
const { id, payload } = job;
console.log(`Processing job#${id} with payload=${JSON.stringify(payload)})`);
// await someAsyncMethod();
await job.progress(50);
const status = 200;
if (Math.random() > 0.5) {
throw new Error(`Failed for some unknown reason`);
}
console.log(`Finished job#${id} with status=${status}`);
return { status };
});
- Queue a job
import { emailQueue } from "./emailQueue";
const main = async () => {
const job = await emailQueue.enqueue({ email: "[email protected]" });
};
main();
- Schedule a recurring job
import { emailQueue } from "./emailQueue";
const main = async () => {
const nextJob = await queue.schedule(
{ key: "email-schedule", cron: "5 5 * * *" },
{ email: "[email protected]" },
);
};
main();
- Start queue processing (usually in another process)
import { emailQueue } from "./emailQueue";
const main = async () => {
await queue.start();
};
main();
You can easily spin of your workers in separate threads using worker_threads (Node.js >= 12.17.0).
It enables you to fully leverage your CPU cores and isolate your main application queue from potential memory leaks or crashes.
import { JobPayload, JobResult, PrismaJob } from "@mgcrea/prisma-queue";
import { Worker } from "node:worker_threads";
import { ROOT_DIR } from "src/config/env";
import { log } from "src/config/log";
const WORKER_SCRIPT = `${ROOT_DIR}/dist/worker.js`;
export const processInWorker = async <P extends JobPayload, R extends JobResult>(
job: PrismaJob<P, R>,
): Promise<R> =>
new Promise((resolve, reject) => {
const workerData = getJobWorkerData(job);
log.debug(`Starting worker thread for job id=${job.id} in queue=${job.record.queue}`);
try {
const worker = new Worker(WORKER_SCRIPT, {
workerData,
});
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) {
reject(
new Error(
`Worker for job id=${job.id} in queue=${job.record.queue} stopped with exit code ${code}`,
),
);
}
});
} catch (error) {
reject(error as Error);
}
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type JobWorkerData<P extends JobPayload = any> = {
id: bigint;
payload: P;
queue: string;
};
const getJobWorkerData = <P extends JobPayload, R extends JobResult>(job: PrismaJob<P, R>): JobWorkerData => {
// Prepare the job data for structured cloning in worker thread
return {
id: job.id,
payload: job.payload,
queue: job.record.queue,
};
};
worker.ts
import { parentPort, workerData } from "node:worker_threads";
import { log } from "src/config/log";
import { workers } from "src/queue";
import { type JobWorkerData } from "src/utils/queue";
import { logMemoryUsage } from "./utils/system";
log.info(`Worker thread started with data=${JSON.stringify(workerData)}`);
const typedWorkerData = workerData as JobWorkerData;
const { queue } = typedWorkerData;
const workerName = queue.replace(/Queue$/, "Worker") as keyof typeof workers;
log.debug(`Importing worker ${workerName} for queue=${queue}`);
const jobWorker = workers[workerName];
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!jobWorker) {
log.error(`No worker found for queue=${queue}`);
process.exit(1);
}
log.info(`Running worker for queue=${queue}`);
const result = await jobWorker(typedWorkerData);
log.info(`Worker for queue=${queue} completed with result=${JSON.stringify(result)}`);
parentPort?.postMessage(result);
process.exit(0);
Inspired by
- pg-queue by
The MIT License
Copyright (c) 2022 Olivier Louvignes <[email protected]>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.