Skip to content

Commit 273c172

Browse files
authored
Merge pull request #503 from dipendra-singh/add_target_topics
out_kafka2: Broker pool to take care of fetching metadata
2 parents 2dece8b + b0f3a10 commit 273c172

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

lib/fluent/plugin/kafka_producer_ext.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
9393
@max_buffer_bytesize = max_buffer_bytesize
9494
@compressor = compressor
9595
@partitioner = partitioner
96+
97+
# The set of topics that are produced to.
98+
@target_topics = Set.new
99+
96100
# A buffer organized by topic/partition.
97101
@buffer = MessageBuffer.new
98102

@@ -116,7 +120,8 @@ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_
116120
if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
117121
raise 'You must trigger begin_transaction before producing messages'
118122
end
119-
123+
124+
@target_topics.add(topic)
120125
@pending_message_queue.write(message)
121126

122127
nil
@@ -187,7 +192,7 @@ def transaction
187192
def deliver_messages_with_retries
188193
attempt = 0
189194

190-
#@cluster.add_target_topics(@target_topics)
195+
@cluster.add_target_topics(@target_topics)
191196

192197
operation = ProduceOperation.new(
193198
cluster: @cluster,

0 commit comments

Comments
 (0)