Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/clients/consumer/messages-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import {
type ConsumeOptions,
type CorruptedMessageHandler,
type GroupAssignment,
type Offsets
type Offsets,
type MessagesStreamBreakModeValue,
MessagesStreamBreakModes
} from './types.ts'

// Don't move this function as being in the same file will enable V8 to remove.
Expand All @@ -46,6 +48,7 @@ export function defaultCorruptedMessageHandler (): boolean {
export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable {
#consumer: Consumer<Key, Value, HeaderKey, HeaderValue>
#mode: string
#breakMode: MessagesStreamBreakModeValue
#fallbackMode: string
#options: ConsumeOptions<Key, Value, HeaderKey, HeaderValue>
#topics: string[]
Expand Down Expand Up @@ -82,6 +85,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
super({ objectMode: true, highWaterMark: options.highWaterMark ?? defaultConsumerOptions.highWaterMark })
this.#consumer = consumer
this.#mode = mode ?? MessagesStreamModes.LATEST
this.#breakMode = options.breakMode ?? MessagesStreamBreakModes.MANUAL
this.#fallbackMode = fallbackMode ?? MessagesStreamFallbackModes.LATEST
this.#offsetsToCommit = new Map()
this.#topics = structuredClone(options.topics)
Expand Down Expand Up @@ -283,6 +287,13 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
return super[Symbol.asyncIterator]()
}

collect (): Promise<Message<Key, Value, HeaderKey, HeaderValue>[]> {
if (this.#breakMode === MessagesStreamBreakModes.MANUAL) {
throw new UserError('Cannot collect messages when the stream is in MANUAL break mode.')
}
return Array.fromAsync(this)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I would add this. As you noted, a user could just do Array.fromAsync(), which would do the trick. It would be good to document it in the docs.

The place where we should add this would be in the consumer as collectNBatch(), which would be easier.


_construct (callback: (error?: Error) => void) {
this.#refreshOffsets(callback as Callback<void>)
}
Expand Down Expand Up @@ -402,6 +413,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
}

this.#pushRecords(metadata, topicIds, response, requestedOffsets)

if (this.#breakMode === MessagesStreamBreakModes.AFTER_FIRST_BATCH && this.#inflightNodes.size === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be generalized to collecting the first n batches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

this.push(null)
}
})
}
})
Expand Down Expand Up @@ -520,7 +535,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
this.#autocommit()
}

if (canPush && !(this.#shouldClose || this.closed || this.destroyed)) {
if (canPush && this.#breakMode !== MessagesStreamBreakModes.AFTER_FIRST_BATCH && !(this.#shouldClose || this.closed || this.destroyed)) {
process.nextTick(() => {
this.#fetch()
})
Expand Down
9 changes: 9 additions & 0 deletions src/clients/consumer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ export const MessagesStreamFallbackModes = {
export type MessagesStreamFallbackMode = keyof typeof MessagesStreamFallbackModes
export type MessagesStreamFallbackModeValue =
(typeof MessagesStreamFallbackModes)[keyof typeof MessagesStreamFallbackModes]

export const MessagesStreamBreakModes = {
MANUAL: 'manual',
AFTER_FIRST_BATCH: 'after-first-batch',
} as const
export type MessagesStreamBreakMode = keyof typeof MessagesStreamBreakModes
export type MessagesStreamBreakModeValue = (typeof MessagesStreamBreakModes)[keyof typeof MessagesStreamBreakModes]

export interface GroupOptions {
sessionTimeout?: number
rebalanceTimeout?: number
Expand All @@ -83,6 +91,7 @@ export interface ConsumeBaseOptions<Key, Value, HeaderKey, HeaderValue> {
export interface StreamOptions {
topics: string[]
mode?: MessagesStreamModeValue
breakMode?: MessagesStreamBreakModeValue
fallbackMode?: MessagesStreamFallbackModeValue
offsets?: TopicWithPartitionAndOffset[]
onCorruptedMessage?: CorruptedMessageHandler
Expand Down
79 changes: 78 additions & 1 deletion test/clients/consumer/messages-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { deepStrictEqual, ok, rejects, strictEqual, throws } from 'node:assert'
import { deepStrictEqual, equal, ok, rejects, strictEqual, throws } from 'node:assert'
import { randomUUID } from 'node:crypto'
import { once } from 'node:events'
import { Readable } from 'node:stream'
Expand Down Expand Up @@ -659,6 +659,83 @@ test('should support asyncIterator interface', async t => {
}
})

test('should automatically break in non-manual break mode', async t => {
const groupId = createTestGroupId()
const topic = await createTopic(t, true)

// Produce test messages
await produceTestMessages(t, topic)

const consumer = createConsumer(t, groupId, { deserializers: stringDeserializers })

await consumer.topics.trackAll(topic)
await consumer.joinGroup({})

const stream = await consumer.consume({
topics: [topic],
mode: MessagesStreamModes.EARLIEST,
maxWaitTime: 1000,
breakMode: 'after-first-batch',
})

const messages = []
for await (const message of stream) {
messages.push(message)
}
})

test('should collect messages', async t => {
const groupId = createTestGroupId()
const topic = await createTopic(t, true)

// Produce test messages
await produceTestMessages(t, topic)

const consumer = createConsumer(t, groupId, { deserializers: stringDeserializers })

await consumer.topics.trackAll(topic)
await consumer.joinGroup({})

const stream = await consumer.consume({
topics: [topic],
mode: MessagesStreamModes.EARLIEST,
maxWaitTime: 1000,
breakMode: 'after-first-batch',
})

const messages = await stream.collect()
equal(messages.length > 0, true, 'Should collect messages from the stream')
})

test('collecting messages should throw an error if breakMode is manual', async t => {
const groupId = createTestGroupId()
const topic = await createTopic(t, true)

// Produce test messages
await produceTestMessages(t, topic)

const consumer = createConsumer(t, groupId, { deserializers: stringDeserializers })

await consumer.topics.trackAll(topic)
await consumer.joinGroup({})

const stream = await consumer.consume({
topics: [topic],
mode: MessagesStreamModes.EARLIEST,
maxWaitTime: 1000,
breakMode: 'manual',
})

await throws(
() => stream.collect(),
(error: any) => {
ok(error instanceof UserError)
strictEqual(error.message, 'Cannot collect messages when the stream is in MANUAL break mode.')
return true
}
)
})

test('should handle deserialization errors', async t => {
const groupId = createTestGroupId()
const topic = await createTopic(t, true)
Expand Down