diff --git a/README.md b/README.md index 3efbc31..a576b7e 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,41 @@ const taskforceConnection = Connect("my connection", "my token", { }); ``` +### Using an existing IORedis instance + +You can also pass an existing IORedis instance instead of connection options. This is useful when you need more control over the Redis connection configuration: + +```ts +import { Connect } from "taskforce-connector"; +import Redis from "ioredis"; + +// Create your own Redis instance with custom configuration +const redisClient = new Redis({ + host: "my redis host", + port: 6379, + password: "my redis password", + // Any other IORedis options... + maxRetriesPerRequest: null, + enableReadyCheck: false, +}); + +const taskforceConnection = Connect("my connection", "my token", redisClient); +``` + +This also works with Redis Cluster: + +```ts +import { Connect } from "taskforce-connector"; +import Redis from "ioredis"; + +const cluster = new Redis.Cluster([ + { host: "node1", port: 6379 }, + { host: "node2", port: 6379 }, +]); + +const taskforceConnection = Connect("my connection", "my token", cluster); +``` + If you are using the On Premises version of Taskforce, you can also specify the backend domain: ```ts diff --git a/lib/index.ts b/lib/index.ts index c5cd222..33742e8 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -2,9 +2,9 @@ import { Integration } from "./interfaces/integration"; import { Socket, Connection } from "./socket"; export { Integration } from "./interfaces/integration"; -export { getRedisClient, FoundQueue } from "./queue-factory"; +export { getRedisClient, FoundQueue, RedisConnection } from "./queue-factory"; export { WebSocketClient } from "./ws-autoreconnect"; -export { Connection } from "./socket"; +export { Connection, ConnectionOptions } from "./socket"; export { respond } from "./responders/respond"; export { BullMQResponders } from "./responders/bullmq-responders"; diff --git a/lib/queue-factory.ts b/lib/queue-factory.ts index db64dff..620eb75 100644 --- a/lib/queue-factory.ts +++ b/lib/queue-factory.ts @@ -13,6 +13,8 @@ const queueNameRegExp = new RegExp("(.*):(.*):id"); const maxCount = 150000; const maxTime = 40000; +export type RedisConnection = Redis | Cluster; + // We keep a redis client that we can reuse for all the queues. let redisClients: Record = {} as any; @@ -96,9 +98,10 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => { }; export async function getConnectionQueues( - redisOpts: RedisOptions, + redisOpts: RedisOptions | undefined, clusterNodes?: string[], - queueNames?: string[] + queueNames?: string[], + redisClient?: RedisConnection ): Promise { const queues = await execRedisCommand( redisOpts, @@ -129,34 +132,74 @@ export async function getConnectionQueues( ); return queues; }, - clusterNodes + clusterNodes, + redisClient ); return queues; } -export async function ping(redisOpts: RedisOptions, clusterNodes?: string[]) { - return execRedisCommand(redisOpts, (client) => client.ping(), clusterNodes); +export async function ping( + redisOpts: RedisOptions | undefined, + clusterNodes?: string[], + redisClient?: RedisConnection +) { + return execRedisCommand( + redisOpts, + (client) => client.ping(), + clusterNodes, + redisClient + ); } export async function getRedisInfo( - redisOpts: RedisOptions, - clusterNodes?: string[] + redisOpts: RedisOptions | undefined, + clusterNodes?: string[], + redisClient?: RedisConnection ) { const info = await execRedisCommand( redisOpts, (client) => client.info(), - clusterNodes + clusterNodes, + redisClient ); return info; } +/** + * Gets or creates a Redis client for queue operations. + * + * @param redisOpts - Redis connection options. Required if existingClient is not provided. + * @param type - The type of queue ("bull" or "bullmq"), used for client caching key. + * @param clusterNodes - Optional list of cluster node URIs for Redis Cluster. + * @param existingClient - Optional pre-configured Redis/Cluster instance. + * @returns A Redis or Cluster client. + * + * @remarks + * When `existingClient` is provided, it is returned directly without caching. + * This allows the caller to manage the client lifecycle independently. + * + * When `redisOpts` are provided (without existingClient), the created client + * is cached internally using a checksum of the options. Subsequent calls with + * the same options will reuse the cached client. + */ export function getRedisClient( - redisOpts: RedisOptions, + redisOpts: RedisOptions | undefined, type: "bull" | "bullmq", - clusterNodes?: string[] + clusterNodes?: string[], + existingClient?: RedisConnection ) { - // Compute checksum for redisOpts + // If an existing client is provided, return it directly. + // We don't cache it since the caller owns its lifecycle. + if (existingClient) { + return existingClient; + } + + if (!redisOpts) { + throw new Error("Redis options are required when no client is provided"); + } + + // Compute checksum for redisOpts to use as cache key const checksumJson = JSON.stringify(redisOpts); const checksum = require("crypto") .createHash("md5") @@ -214,32 +257,34 @@ export function getRedisClient( } export async function execRedisCommand( - redisOpts: RedisOptions, + redisOpts: RedisOptions | undefined, cb: (client: Redis | Cluster) => any, - clusterNodes?: string[] + clusterNodes?: string[], + redisClient?: RedisConnection ) { - const redisClient = getRedisClient(redisOpts, "bull", clusterNodes); + const client = getRedisClient(redisOpts, "bull", clusterNodes, redisClient); - const result = await cb(redisClient); + const result = await cb(client); return result; } export function createQueue( foundQueue: FoundQueue, - redisOpts: RedisOptions, + redisOpts: RedisOptions | undefined, opts: { nodes?: string[]; integrations?: { [key: string]: Integration; }; + redisClient?: RedisConnection; } = {} ): { queue: Bull.Queue | Queue; responders: Responders } { - const { nodes, integrations } = opts; + const { nodes, integrations, redisClient } = opts; const createClient = function (type: "client" /*, redisOpts */) { switch (type) { case "client": - return getRedisClient(redisOpts, "bull", nodes); + return getRedisClient(redisOpts, "bull", nodes, redisClient); default: throw new Error(`Unexpected connection type: ${type}`); } @@ -255,7 +300,7 @@ export function createQueue( switch (foundQueue.type) { case "bullmq": - const connection = getRedisClient(redisOpts, "bullmq", nodes); + const connection = getRedisClient(redisOpts, "bullmq", nodes, redisClient); switch (foundQueue.majorVersion) { case 0: return { diff --git a/lib/queues-cache.ts b/lib/queues-cache.ts index 32bcb8a..b5aed77 100644 --- a/lib/queues-cache.ts +++ b/lib/queues-cache.ts @@ -2,7 +2,12 @@ import * as Bull from "bull"; import { Queue } from "bullmq"; import { RedisOptions } from "ioredis"; import { keyBy } from "lodash"; -import { FoundQueue, createQueue, getConnectionQueues } from "./queue-factory"; +import { + FoundQueue, + RedisConnection, + createQueue, + getConnectionQueues, +} from "./queue-factory"; import { Responders } from "./interfaces/responders"; import { Integration } from "./interfaces/integration"; @@ -21,17 +26,23 @@ export function queueKey( } export async function updateQueuesCache( - redisOpts: RedisOptions, + redisOpts: RedisOptions | undefined, opts: { nodes?: string[]; integrations?: { [key: string]: Integration; }; queueNames?: string[]; - } = {} + } = {}, + redisClient?: RedisConnection ) { const { nodes, integrations, queueNames } = opts; - const newQueues = await getConnectionQueues(redisOpts, nodes, queueNames); + const newQueues = await getConnectionQueues( + redisOpts, + nodes, + queueNames, + redisClient + ); queuesCache = queuesCache || {}; @@ -70,7 +81,11 @@ export async function updateQueuesCache( toAdd.forEach(function (foundQueue: FoundQueue) { const key = queueKey(foundQueue); - const queue = createQueue(foundQueue, redisOpts, { nodes, integrations }); + const queue = createQueue(foundQueue, redisOpts, { + nodes, + integrations, + redisClient, + }); if (queue) { queuesCache[key] = queue; } diff --git a/lib/socket.ts b/lib/socket.ts index f9edfef..3e2304a 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -1,9 +1,10 @@ -import { RedisOptions } from "ioredis"; +import { Redis, Cluster, RedisOptions } from "ioredis"; import { pick } from "lodash"; import { getCache, updateQueuesCache, queueKey } from "./queues-cache"; import { WebSocketClient } from "./ws-autoreconnect"; import { FoundQueue, + RedisConnection, execRedisCommand, getRedisInfo, ping, @@ -15,7 +16,7 @@ const { version } = require(`${__dirname}/../package.json`); const chalk = require("chalk"); -export interface Connection { +export interface ConnectionOptions { port?: number; host?: string; password?: string; @@ -24,6 +25,8 @@ export interface Connection { tls?: object; } +export type Connection = ConnectionOptions | RedisConnection; + export const Socket = ( name: string, server: string, @@ -40,7 +43,10 @@ export const Socket = ( ) => { const { team, nodes } = opts; const ws = new WebSocketClient(); - const redisOpts = redisOptsFromConnection(connection); + const redisOpts = isRedisInstance(connection) + ? undefined + : redisOptsFromConnection(connection); + const redisClient = isRedisInstance(connection) ? connection : undefined; ws.open(server, { headers: { @@ -92,7 +98,7 @@ export const Socket = ( // // Send this connection. // - const queues = await updateQueuesCache(redisOpts, opts); + const queues = await updateQueuesCache(redisOpts, opts, redisClient); console.log( `${chalk.yellow("WebSocket:")} ${chalk.green( "sending connection:" @@ -133,7 +139,7 @@ export const Socket = ( case "jobs": let cache = getCache(); if (!cache) { - await updateQueuesCache(redisOpts, opts); + await updateQueuesCache(redisOpts, opts, redisClient); cache = getCache(); if (!cache) { throw new Error("Unable to update queues"); @@ -177,12 +183,12 @@ export const Socket = ( switch (data.cmd) { case "ping": - const pong = await ping(redisOpts, nodes); + const pong = await ping(redisOpts, nodes, redisClient); respond(msg.id, startTime, pong); break; case "getConnection": { - const queues = await updateQueuesCache(redisOpts, opts); + const queues = await updateQueuesCache(redisOpts, opts, redisClient); console.log( `${chalk.yellow("WebSocket:")} ${chalk.green( @@ -204,7 +210,7 @@ export const Socket = ( break; case "getQueues": { - const queues = await updateQueuesCache(redisOpts, opts); + const queues = await updateQueuesCache(redisOpts, opts, redisClient); logSendingQueues(queues); @@ -212,7 +218,7 @@ export const Socket = ( } break; case "getInfo": - const info = await getRedisInfo(redisOpts, nodes); + const info = await getRedisInfo(redisOpts, nodes, redisClient); respond(msg.id, startTime, info); break; @@ -220,7 +226,8 @@ export const Socket = ( const queueType = await execRedisCommand( redisOpts, (client) => getQueueType(data.name, data.prefix, client), - nodes + nodes, + redisClient ); respond(msg.id, startTime, { queueType }); break; @@ -249,7 +256,11 @@ export const Socket = ( } }; -function redisOptsFromConnection(connection: Connection): RedisOptions { +function isRedisInstance(connection: Connection): connection is RedisConnection { + return connection instanceof Redis || connection instanceof Cluster; +} + +function redisOptsFromConnection(connection: ConnectionOptions): RedisOptions { let opts: RedisOptions = { ...pick(connection, [ "host",