Skip to content

feat(RedisBroker): poll for unacked events #11004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 136 additions & 33 deletions packages/brokers/src/brokers/redis/BaseRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ import { ReplyError } from 'ioredis';
import type { BaseBrokerOptions, IBaseBroker, ToEventMap } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';

// For some reason ioredis doesn't have this typed, but it exists
type RedisReadGroupData = [Buffer, [Buffer, Buffer[]][]][];

// For some reason ioredis doesn't have those typed, but they exist
declare module 'ioredis' {
interface Redis {
xreadgroupBuffer(...args: (Buffer | string)[]): Promise<[Buffer, [Buffer, Buffer[]][]][] | null>;
xclaimBuffer(
key: Buffer | string,
group: Buffer | string,
consumer: Buffer | string,
minIdleTime: number,
id: Buffer | string,
...args: (Buffer | string)[]
): Promise<string[]>;
xreadgroupBuffer(...args: (Buffer | string)[]): Promise<RedisReadGroupData | null>;
}
}

Expand All @@ -23,19 +33,29 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
* How long to block for messages when polling
*/
blockTimeout?: number;

/**
* Consumer group name to use for this broker
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
group: string;

/**
* Max number of messages to poll at once
*/
maxChunk?: number;

/**
* How many times a message can be delivered to a consumer before it is considered dead.
* This is used to prevent messages from being stuck in the queue forever if a consumer is
* unable to process them.
*/
maxDeliveredTimes?: number;
/**
* How long a message should be idle for before allowing it to be claimed by another consumer.
* Note that too high of a value can lead to a high delay in processing messages during a service downscale,
* while too low of a value can lead to messages being too eagerly claimed by other consumers during an instance
* restart (which is most likely not actually that problematic)
*/
messageIdleTime?: number;
/**
* Unique consumer name.
*
Expand All @@ -51,6 +71,8 @@ export const DefaultRedisBrokerOptions = {
...DefaultBrokerOptions,
name: randomBytes(20).toString('hex'),
maxChunk: 10,
maxDeliveredTimes: 3,
messageIdleTime: 3_000,
blockTimeout: 5_000,
} as const satisfies Required<Omit<RedisBrokerOptions, 'group'>>;

Expand Down Expand Up @@ -141,7 +163,7 @@ export abstract class BaseRedisBroker<
}

/**
* Begins polling for events, firing them to {@link BaseRedisBroker.listen}
* Begins polling for events, firing them to {@link BaseRedisBroker.emitEvent}
*/
protected async listen(): Promise<void> {
if (this.listening) {
Expand All @@ -150,40 +172,24 @@ export abstract class BaseRedisBroker<

this.listening = true;

// Enter regular polling
while (this.subscribedEvents.size > 0) {
try {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
this.options.group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
'BLOCK',
String(this.options.blockTimeout),
'STREAMS',
...this.subscribedEvents,
...Array.from({ length: this.subscribedEvents.size }, () => '>'),
);
await this.claimAndEmitDeadEvents();
} catch (error) {
// @ts-expect-error: Intended
this.emit('error', error);
// We don't break here to keep the loop running even if dead event processing fails
}

try {
// As per docs, '>' means "give me a new message"
const data = await this.readGroup('>', this.options.blockTimeout);
if (!data) {
continue;
}

for (const [event, info] of data) {
for (const [id, packet] of info) {
const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) {
continue;
}

const data = packet[idx + 1];
if (!data) {
continue;
}

this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data));
}
}
await this.processMessages(data);
} catch (error) {
// @ts-expect-error: Intended
this.emit('error', error);
Expand All @@ -194,6 +200,103 @@ export abstract class BaseRedisBroker<
this.listening = false;
}

private async readGroup(fromId: string, block: number): Promise<RedisReadGroupData> {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
this.options.group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
'BLOCK',
String(block),
'STREAMS',
...this.subscribedEvents,
...Array.from({ length: this.subscribedEvents.size }, () => fromId),
);

return data ?? [];
}

private async processMessages(data: RedisReadGroupData): Promise<void> {
for (const [event, messages] of data) {
const eventName = event.toString('utf8');

for (const [id, packet] of messages) {
const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) continue;

const payload = packet[idx + 1];
if (!payload) continue;

this.emitEvent(id, this.options.group, eventName, this.options.decode(payload));
}
}
}

private async claimAndEmitDeadEvents(): Promise<void> {
for (const stream of this.subscribedEvents) {
// Get up to N oldest pending messages (note: a pending message is a message that has been read, but never ACKed)
const pending = (await this.streamReadClient.xpending(
stream,
this.options.group,
'-',
'+',
this.options.maxChunk,
// See: https://redis.io/docs/latest/commands/xpending/#extended-form-of-xpending
)) as [id: string, consumer: string, idleMs: number, deliveredTimes: number][];

for (const [id, consumer, idleMs, deliveredTimes] of pending) {
// Technically xclaim checks for us anyway, but why not avoid an extra call?
if (idleMs < this.options.messageIdleTime) {
continue;
}

if (deliveredTimes > this.options.maxDeliveredTimes) {
// This message is dead. It has repeatedly failed being processed by a consumer.
await this.streamReadClient.xdel(stream, this.options.group, id);
continue;
}

// Try to claim the message if we don't already own it (this may fail if another consumer has already claimed it)
if (consumer !== this.options.name) {
const claimed = await this.streamReadClient.xclaimBuffer(
stream,
this.options.group,
this.options.name,
Math.max(this.options.messageIdleTime, 1),
id,
'JUSTID',
);

// Another consumer got the message before us
if (!claimed?.length) {
continue;
}
}

// Fetch message body
const entries = await this.streamReadClient.xrangeBuffer(stream, id, id);
// No idea how this could happen, frankly!
if (!entries?.length) {
continue;
}

const [msgId, fields] = entries[0]!;
const idx = fields.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) {
continue;
}

const payload = fields[idx + 1];
if (!payload) {
continue;
}

this.emitEvent(msgId, this.options.group, stream, this.options.decode(payload));
}
}
}

/**
* Destroys the broker, closing all connections
*/
Expand Down