Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ const taskforceConnection = Connect("my connection", "my token", {
});
```

### Using an existing IORedis instance

You can also pass an existing IORedis instance instead of connection options. This is useful when you need more control over the Redis connection configuration:

```ts
import { Connect } from "taskforce-connector";
import Redis from "ioredis";

// Create your own Redis instance with custom configuration
const redisClient = new Redis({
host: "my redis host",
port: 6379,
password: "my redis password",
// Any other IORedis options...
maxRetriesPerRequest: null,
enableReadyCheck: false,
});

const taskforceConnection = Connect("my connection", "my token", redisClient);
```

This also works with Redis Cluster:

```ts
import { Connect } from "taskforce-connector";
import Redis from "ioredis";

const cluster = new Redis.Cluster([
{ host: "node1", port: 6379 },
{ host: "node2", port: 6379 },
]);

const taskforceConnection = Connect("my connection", "my token", cluster);
```

If you are using the On Premises version of Taskforce, you can also specify the backend domain:

```ts
Expand Down
4 changes: 2 additions & 2 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { Integration } from "./interfaces/integration";
import { Socket, Connection } from "./socket";

export { Integration } from "./interfaces/integration";
export { getRedisClient, FoundQueue } from "./queue-factory";
export { getRedisClient, FoundQueue, RedisConnection } from "./queue-factory";
export { WebSocketClient } from "./ws-autoreconnect";
export { Connection } from "./socket";
export { Connection, ConnectionOptions } from "./socket";
export { respond } from "./responders/respond";
export { BullMQResponders } from "./responders/bullmq-responders";

Expand Down
83 changes: 64 additions & 19 deletions lib/queue-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const queueNameRegExp = new RegExp("(.*):(.*):id");
const maxCount = 150000;
const maxTime = 40000;

export type RedisConnection = Redis | Cluster;

// We keep a redis client that we can reuse for all the queues.
let redisClients: Record<string, Redis | Cluster> = {} as any;

Expand Down Expand Up @@ -96,9 +98,10 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
};

export async function getConnectionQueues(
redisOpts: RedisOptions,
redisOpts: RedisOptions | undefined,
clusterNodes?: string[],
queueNames?: string[]
queueNames?: string[],
redisClient?: RedisConnection
): Promise<FoundQueue[]> {
const queues = await execRedisCommand(
redisOpts,
Expand Down Expand Up @@ -129,34 +132,74 @@ export async function getConnectionQueues(
);
return queues;
},
clusterNodes
clusterNodes,
redisClient
);

return queues;
}

export async function ping(redisOpts: RedisOptions, clusterNodes?: string[]) {
return execRedisCommand(redisOpts, (client) => client.ping(), clusterNodes);
export async function ping(
redisOpts: RedisOptions | undefined,
clusterNodes?: string[],
redisClient?: RedisConnection
) {
return execRedisCommand(
redisOpts,
(client) => client.ping(),
clusterNodes,
redisClient
);
}

export async function getRedisInfo(
redisOpts: RedisOptions,
clusterNodes?: string[]
redisOpts: RedisOptions | undefined,
clusterNodes?: string[],
redisClient?: RedisConnection
) {
const info = await execRedisCommand(
redisOpts,
(client) => client.info(),
clusterNodes
clusterNodes,
redisClient
);
return info;
}

/**
* Gets or creates a Redis client for queue operations.
*
* @param redisOpts - Redis connection options. Required if existingClient is not provided.
* @param type - The type of queue ("bull" or "bullmq"), used for client caching key.
* @param clusterNodes - Optional list of cluster node URIs for Redis Cluster.
* @param existingClient - Optional pre-configured Redis/Cluster instance.
* @returns A Redis or Cluster client.
*
* @remarks
* When `existingClient` is provided, it is returned directly without caching.
* This allows the caller to manage the client lifecycle independently.
*
* When `redisOpts` are provided (without existingClient), the created client
* is cached internally using a checksum of the options. Subsequent calls with
* the same options will reuse the cached client.
*/
export function getRedisClient(
redisOpts: RedisOptions,
redisOpts: RedisOptions | undefined,
type: "bull" | "bullmq",
clusterNodes?: string[]
clusterNodes?: string[],
existingClient?: RedisConnection
) {
// Compute checksum for redisOpts
// If an existing client is provided, return it directly.
// We don't cache it since the caller owns its lifecycle.
if (existingClient) {
return existingClient;
}

if (!redisOpts) {
throw new Error("Redis options are required when no client is provided");
}

// Compute checksum for redisOpts to use as cache key
const checksumJson = JSON.stringify(redisOpts);
const checksum = require("crypto")
.createHash("md5")
Expand Down Expand Up @@ -214,32 +257,34 @@ export function getRedisClient(
}

export async function execRedisCommand(
redisOpts: RedisOptions,
redisOpts: RedisOptions | undefined,
cb: (client: Redis | Cluster) => any,
clusterNodes?: string[]
clusterNodes?: string[],
redisClient?: RedisConnection
) {
const redisClient = getRedisClient(redisOpts, "bull", clusterNodes);
const client = getRedisClient(redisOpts, "bull", clusterNodes, redisClient);

const result = await cb(redisClient);
const result = await cb(client);

return result;
}

export function createQueue(
foundQueue: FoundQueue,
redisOpts: RedisOptions,
redisOpts: RedisOptions | undefined,
opts: {
nodes?: string[];
integrations?: {
[key: string]: Integration;
};
redisClient?: RedisConnection;
} = {}
): { queue: Bull.Queue | Queue; responders: Responders } {
const { nodes, integrations } = opts;
const { nodes, integrations, redisClient } = opts;
const createClient = function (type: "client" /*, redisOpts */) {
switch (type) {
case "client":
return getRedisClient(redisOpts, "bull", nodes);
return getRedisClient(redisOpts, "bull", nodes, redisClient);
default:
throw new Error(`Unexpected connection type: ${type}`);
}
Expand All @@ -255,7 +300,7 @@ export function createQueue(

switch (foundQueue.type) {
case "bullmq":
const connection = getRedisClient(redisOpts, "bullmq", nodes);
const connection = getRedisClient(redisOpts, "bullmq", nodes, redisClient);
switch (foundQueue.majorVersion) {
case 0:
return {
Expand Down
25 changes: 20 additions & 5 deletions lib/queues-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as Bull from "bull";
import { Queue } from "bullmq";
import { RedisOptions } from "ioredis";
import { keyBy } from "lodash";
import { FoundQueue, createQueue, getConnectionQueues } from "./queue-factory";
import {
FoundQueue,
RedisConnection,
createQueue,
getConnectionQueues,
} from "./queue-factory";
import { Responders } from "./interfaces/responders";
import { Integration } from "./interfaces/integration";

Expand All @@ -21,17 +26,23 @@ export function queueKey(
}

export async function updateQueuesCache(
redisOpts: RedisOptions,
redisOpts: RedisOptions | undefined,
opts: {
nodes?: string[];
integrations?: {
[key: string]: Integration;
};
queueNames?: string[];
} = {}
} = {},
redisClient?: RedisConnection
) {
const { nodes, integrations, queueNames } = opts;
const newQueues = await getConnectionQueues(redisOpts, nodes, queueNames);
const newQueues = await getConnectionQueues(
redisOpts,
nodes,
queueNames,
redisClient
);

queuesCache = queuesCache || {};

Expand Down Expand Up @@ -70,7 +81,11 @@ export async function updateQueuesCache(

toAdd.forEach(function (foundQueue: FoundQueue) {
const key = queueKey(foundQueue);
const queue = createQueue(foundQueue, redisOpts, { nodes, integrations });
const queue = createQueue(foundQueue, redisOpts, {
nodes,
integrations,
redisClient,
});
if (queue) {
queuesCache[key] = queue;
}
Expand Down
33 changes: 22 additions & 11 deletions lib/socket.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { RedisOptions } from "ioredis";
import { Redis, Cluster, RedisOptions } from "ioredis";
import { pick } from "lodash";
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
import { WebSocketClient } from "./ws-autoreconnect";
import {
FoundQueue,
RedisConnection,
execRedisCommand,
getRedisInfo,
ping,
Expand All @@ -15,7 +16,7 @@ const { version } = require(`${__dirname}/../package.json`);

const chalk = require("chalk");

export interface Connection {
export interface ConnectionOptions {
port?: number;
host?: string;
password?: string;
Expand All @@ -24,6 +25,8 @@ export interface Connection {
tls?: object;
}

export type Connection = ConnectionOptions | RedisConnection;

export const Socket = (
name: string,
server: string,
Expand All @@ -40,7 +43,10 @@ export const Socket = (
) => {
const { team, nodes } = opts;
const ws = new WebSocketClient();
const redisOpts = redisOptsFromConnection(connection);
const redisOpts = isRedisInstance(connection)
? undefined
: redisOptsFromConnection(connection);
const redisClient = isRedisInstance(connection) ? connection : undefined;

ws.open(server, {
headers: {
Expand Down Expand Up @@ -92,7 +98,7 @@ export const Socket = (
//
// Send this connection.
//
const queues = await updateQueuesCache(redisOpts, opts);
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.green(
"sending connection:"
Expand Down Expand Up @@ -133,7 +139,7 @@ export const Socket = (
case "jobs":
let cache = getCache();
if (!cache) {
await updateQueuesCache(redisOpts, opts);
await updateQueuesCache(redisOpts, opts, redisClient);
cache = getCache();
if (!cache) {
throw new Error("Unable to update queues");
Expand Down Expand Up @@ -177,12 +183,12 @@ export const Socket = (

switch (data.cmd) {
case "ping":
const pong = await ping(redisOpts, nodes);
const pong = await ping(redisOpts, nodes, redisClient);
respond(msg.id, startTime, pong);
break;
case "getConnection":
{
const queues = await updateQueuesCache(redisOpts, opts);
const queues = await updateQueuesCache(redisOpts, opts, redisClient);

console.log(
`${chalk.yellow("WebSocket:")} ${chalk.green(
Expand All @@ -204,23 +210,24 @@ export const Socket = (
break;
case "getQueues":
{
const queues = await updateQueuesCache(redisOpts, opts);
const queues = await updateQueuesCache(redisOpts, opts, redisClient);

logSendingQueues(queues);

respond(msg.id, startTime, queues);
}
break;
case "getInfo":
const info = await getRedisInfo(redisOpts, nodes);
const info = await getRedisInfo(redisOpts, nodes, redisClient);
respond(msg.id, startTime, info);
break;

case "getQueueType":
const queueType = await execRedisCommand(
redisOpts,
(client) => getQueueType(data.name, data.prefix, client),
nodes
nodes,
redisClient
);
respond(msg.id, startTime, { queueType });
break;
Expand Down Expand Up @@ -249,7 +256,11 @@ export const Socket = (
}
};

function redisOptsFromConnection(connection: Connection): RedisOptions {
function isRedisInstance(connection: Connection): connection is RedisConnection {
return connection instanceof Redis || connection instanceof Cluster;
}

function redisOptsFromConnection(connection: ConnectionOptions): RedisOptions {
let opts: RedisOptions = {
...pick(connection, [
"host",
Expand Down
Loading