From 3bac7cd834a542ad1644c844a310eba34f25f544 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=AC=A3?= Date: Fri, 2 Nov 2018 12:17:12 +0800 Subject: [PATCH 1/3] fix kafka client_id. remove thread idxnum --- lib/logstash/inputs/kafka.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 745460a..0aec29d 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -221,7 +221,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer(client_id) } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run From 7adc397ef5c91862398db4d2d319e66691b2befa Mon Sep 17 00:00:00 2001 From: raistlinzx Date: Fri, 2 Nov 2018 12:17:12 +0800 Subject: [PATCH 2/3] fix kafka client_id. remove thread idxnum --- lib/logstash/inputs/kafka.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 745460a..0aec29d 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -221,7 +221,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer(client_id) } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run From 312dcf40332df29ac1da42af019de1fd7a62e45f Mon Sep 17 00:00:00 2001 From: raistlinzx Date: Fri, 2 Nov 2018 18:50:31 +0800 Subject: [PATCH 3/3] fix spec --- spec/integration/inputs/kafka_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 1fa3015..ffec2c4 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -82,7 +82,7 @@ def thread_it(kafka_input, queue) wait(timeout_seconds).for {queue.length}.to eq(num_events) expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec") end ensure t.kill