From 4bb9510628d847e44dc484826b7d3e4511df8bef Mon Sep 17 00:00:00 2001 From: didinele Date: Fri, 18 Jul 2025 16:53:44 +0300 Subject: [PATCH] feat(RedisBroker): poll for unacked events --- .../brokers/src/brokers/redis/BaseRedis.ts | 169 ++++++++++++++---- 1 file changed, 136 insertions(+), 33 deletions(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index 9ef109159a93..c3b2fae70291 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -8,10 +8,20 @@ import { ReplyError } from 'ioredis'; import type { BaseBrokerOptions, IBaseBroker, ToEventMap } from '../Broker.js'; import { DefaultBrokerOptions } from '../Broker.js'; -// For some reason ioredis doesn't have this typed, but it exists +type RedisReadGroupData = [Buffer, [Buffer, Buffer[]][]][]; + +// For some reason ioredis doesn't have those typed, but they exist declare module 'ioredis' { interface Redis { - xreadgroupBuffer(...args: (Buffer | string)[]): Promise<[Buffer, [Buffer, Buffer[]][]][] | null>; + xclaimBuffer( + key: Buffer | string, + group: Buffer | string, + consumer: Buffer | string, + minIdleTime: number, + id: Buffer | string, + ...args: (Buffer | string)[] + ): Promise; + xreadgroupBuffer(...args: (Buffer | string)[]): Promise; } } @@ -23,19 +33,29 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { * How long to block for messages when polling */ blockTimeout?: number; - /** * Consumer group name to use for this broker * * @see {@link https://redis.io/commands/xreadgroup/} */ group: string; - /** * Max number of messages to poll at once */ maxChunk?: number; - + /** + * How many times a message can be delivered to a consumer before it is considered dead. + * This is used to prevent messages from being stuck in the queue forever if a consumer is + * unable to process them. + */ + maxDeliveredTimes?: number; + /** + * How long a message should be idle for before allowing it to be claimed by another consumer. + * Note that too high of a value can lead to a high delay in processing messages during a service downscale, + * while too low of a value can lead to messages being too eagerly claimed by other consumers during an instance + * restart (which is most likely not actually that problematic) + */ + messageIdleTime?: number; /** * Unique consumer name. * @@ -51,6 +71,8 @@ export const DefaultRedisBrokerOptions = { ...DefaultBrokerOptions, name: randomBytes(20).toString('hex'), maxChunk: 10, + maxDeliveredTimes: 3, + messageIdleTime: 3_000, blockTimeout: 5_000, } as const satisfies Required>; @@ -141,7 +163,7 @@ export abstract class BaseRedisBroker< } /** - * Begins polling for events, firing them to {@link BaseRedisBroker.listen} + * Begins polling for events, firing them to {@link BaseRedisBroker.emitEvent} */ protected async listen(): Promise { if (this.listening) { @@ -150,40 +172,24 @@ export abstract class BaseRedisBroker< this.listening = true; + // Enter regular polling while (this.subscribedEvents.size > 0) { try { - const data = await this.streamReadClient.xreadgroupBuffer( - 'GROUP', - this.options.group, - this.options.name, - 'COUNT', - String(this.options.maxChunk), - 'BLOCK', - String(this.options.blockTimeout), - 'STREAMS', - ...this.subscribedEvents, - ...Array.from({ length: this.subscribedEvents.size }, () => '>'), - ); + await this.claimAndEmitDeadEvents(); + } catch (error) { + // @ts-expect-error: Intended + this.emit('error', error); + // We don't break here to keep the loop running even if dead event processing fails + } + try { + // As per docs, '>' means "give me a new message" + const data = await this.readGroup('>', this.options.blockTimeout); if (!data) { continue; } - for (const [event, info] of data) { - for (const [id, packet] of info) { - const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0); - if (idx < 0) { - continue; - } - - const data = packet[idx + 1]; - if (!data) { - continue; - } - - this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data)); - } - } + await this.processMessages(data); } catch (error) { // @ts-expect-error: Intended this.emit('error', error); @@ -194,6 +200,103 @@ export abstract class BaseRedisBroker< this.listening = false; } + private async readGroup(fromId: string, block: number): Promise { + const data = await this.streamReadClient.xreadgroupBuffer( + 'GROUP', + this.options.group, + this.options.name, + 'COUNT', + String(this.options.maxChunk), + 'BLOCK', + String(block), + 'STREAMS', + ...this.subscribedEvents, + ...Array.from({ length: this.subscribedEvents.size }, () => fromId), + ); + + return data ?? []; + } + + private async processMessages(data: RedisReadGroupData): Promise { + for (const [event, messages] of data) { + const eventName = event.toString('utf8'); + + for (const [id, packet] of messages) { + const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0); + if (idx < 0) continue; + + const payload = packet[idx + 1]; + if (!payload) continue; + + this.emitEvent(id, this.options.group, eventName, this.options.decode(payload)); + } + } + } + + private async claimAndEmitDeadEvents(): Promise { + for (const stream of this.subscribedEvents) { + // Get up to N oldest pending messages (note: a pending message is a message that has been read, but never ACKed) + const pending = (await this.streamReadClient.xpending( + stream, + this.options.group, + '-', + '+', + this.options.maxChunk, + // See: https://redis.io/docs/latest/commands/xpending/#extended-form-of-xpending + )) as [id: string, consumer: string, idleMs: number, deliveredTimes: number][]; + + for (const [id, consumer, idleMs, deliveredTimes] of pending) { + // Technically xclaim checks for us anyway, but why not avoid an extra call? + if (idleMs < this.options.messageIdleTime) { + continue; + } + + if (deliveredTimes > this.options.maxDeliveredTimes) { + // This message is dead. It has repeatedly failed being processed by a consumer. + await this.streamReadClient.xdel(stream, this.options.group, id); + continue; + } + + // Try to claim the message if we don't already own it (this may fail if another consumer has already claimed it) + if (consumer !== this.options.name) { + const claimed = await this.streamReadClient.xclaimBuffer( + stream, + this.options.group, + this.options.name, + Math.max(this.options.messageIdleTime, 1), + id, + 'JUSTID', + ); + + // Another consumer got the message before us + if (!claimed?.length) { + continue; + } + } + + // Fetch message body + const entries = await this.streamReadClient.xrangeBuffer(stream, id, id); + // No idea how this could happen, frankly! + if (!entries?.length) { + continue; + } + + const [msgId, fields] = entries[0]!; + const idx = fields.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0); + if (idx < 0) { + continue; + } + + const payload = fields[idx + 1]; + if (!payload) { + continue; + } + + this.emitEvent(msgId, this.options.group, stream, this.options.decode(payload)); + } + } + } + /** * Destroys the broker, closing all connections */