Skip to content

Commit 0a77b6a

Browse files
makubijsvd
authored andcommitted
Commit offset only if records were consumed
Currently offsets are committed after each poll, even if no records were consumed.
1 parent 0bc0210 commit 0a77b6a

File tree

1 file changed

+1
-0
lines changed

1 file changed

+1
-0
lines changed

lib/logstash/inputs/kafka.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def thread_runner(logstash_queue, consumer)
250250
codec_instance = @codec.clone
251251
while !stop?
252252
records = consumer.poll(poll_timeout_ms)
253+
next unless records.count > 0
253254
for record in records do
254255
codec_instance.decode(record.value.to_s) do |event|
255256
decorate(event)

0 commit comments

Comments
 (0)