diff --git a/bin/tailf2kafka b/bin/tailf2kafka index e5fce4a..f154e9e 100755 --- a/bin/tailf2kafka +++ b/bin/tailf2kafka @@ -1,7 +1,8 @@ #!/usr/bin/env ruby require 'optparse' -require 'poseidon' +require 'kafka' +require 'diplomat' require 'yaml' require 'hash_symbolizer' require 'schash' @@ -42,6 +43,7 @@ unless @config end @logger = Logger.new(STDOUT) +@logger.level = @loglevel @settings = YAML.load_file(@config).symbolize_keys(true) @@ -52,7 +54,7 @@ validator = Schash::Validator.new do topic: string, prefix: string, suffix: optional(string), - time_pattern: string, + time_pattern: string }), position_file: string, flush_interval: optional(integer), @@ -60,12 +62,15 @@ validator = Schash::Validator.new do max_batches: optional(integer), from_begining: boolean, delete_old_tailed_files: optional(boolean), - post_delete_command: optional(string), + post_delete_command: optional(string) }, kafka: { - brokers: array_of(string), - producer_type: match(/^(sync|async)$/), - produce: optional(boolean), + brokers: optional(array_of(string)), + consul_service: optional(string), + consul_tag: optional(string), + connect_timeout: optional(integer), + socket_timeout: optional(integer), + produce: optional(boolean) } } end @@ -82,7 +87,6 @@ end @create_notifier = INotify::Notifier.new @delete_notifier = INotify::Notifier.new -@tailf_notifier = INotify::Notifier.new @dirs = {} @files = {} @@ -93,10 +97,29 @@ end @max_batches = @settings[:tailf].has_key?(:max_batches) ? @settings[:tailf][:max_batches] : 10 @from_begining = @settings[:tailf][:from_begining] @delete_old_tailed_files = @settings[:tailf].has_key?(:delete_old_tailed_files) ? @settings[:tailf][:delete_old_tailed_files] : false -@brokers = @settings[:kafka][:brokers] -@producer_type = @settings[:kafka][:producer_type].to_sym @produce = @settings[:kafka].has_key?(:produce) ? @settings[:kafka][:produce] : true +def kafka_connect + if @settings[:kafka].has_key?(:brokers) + kafka_nodes = @settings[:kafka][:brokers] + else + kafka_nodes = Diplomat::Service.get(@settings[:kafka][:consul_service], :all, {:tag => @settings[:kafka][:consul_tag]}).inject([]) {|hosts,h| hosts << "#{h.Node}:#{h.ServicePort}"; hosts} + end + @kafka = Kafka.new( + seed_brokers: kafka_nodes, + logger: @logger, + connect_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 10, + socket_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 60 + ) + @producer = @kafka.producer( + required_acks: 0, + max_retries: 2, + retry_backoff: 1, + compression_codec: :snappy, + max_buffer_size: @max_batch_lines + 1 + ) +end + def write_position_file @mutex.synchronize do File.open(@position_file, 'w') do |file| @@ -123,19 +146,31 @@ end load_position_file @topics = @settings[:tailf][:files].map{|tailf_file| tailf_file[:topic]} -@producer = Poseidon::Producer.new(@brokers, "#{Socket.gethostname}", :type => @producer_type, :compression_codec => :snappy, :compressed_topics => @topics) if @produce + +kafka_connect if @produce @producer_queue = SizedQueue.new(@max_batches * 10) +@partition_key = 0 + @producer_thread = Thread.new do loop do batch = @producer_queue.pop - begin - @producer.send_messages(batch[:messages]) if @produce - rescue Poseidon::Errors::UnableToFetchMetadata - @logger.warn("Got Poseidon::Errors::UnableToFetchMetadata while trying to produce kafka messages, retrying in 1 second ...") - sleep 1 - retry + if @produce + @partition_key += 1 + partition = @partition_key.to_s + puts batch[:messages].size + begin + batch[:messages].each do |msg| + @producer.produce(msg, topic: batch[:topic], partition_key: partition) + end + batch[:messages] = [] + @producer.deliver_messages + rescue Kafka::DeliveryFailed + @logger.warn("Producer failed to deliver messages to brokers, retrying in 1 second ...") + sleep 1 + retry + end end @files[batch[:path]][:offset] = batch[:offset] end @@ -151,13 +186,13 @@ def kafka_produce(path, buffer, offset) truncated = msg else msg = msg + buffer.shift - messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip) + messages << msg.strip end else - messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip) + messages << msg.strip end end - @producer_queue.push({ :path => path, :messages => messages, :offset => offset}) + @producer_queue.push({ :topic => @files[path][:topic], :path => path, :messages => messages, :offset => offset }) truncated end @@ -175,8 +210,10 @@ def tailf(path) truncated = kafka_produce(path, batch, file.pos) end + notifier = INotify::Notifier.new + mutex = Mutex.new - @tailf_notifier.watch(path, :modify) do |event| + notifier.watch(path, :modify) do |event| mutex.synchronize do unless file.closed? (1..@max_batches).each do |i| @@ -190,6 +227,14 @@ def tailf(path) end end end + + loop do + time_before = Time.now + notifier.process + time_after = Time.now + time_left = @settings[:tailf][:flush_interval] - (time_before - time_after) + sleep time_left if time_left > 0 + end end @time_regexp_hash = { @@ -298,7 +343,4 @@ Thread.new { loop { @timers.wait } } end -Thread.new { @create_notifier.run } -Thread.new { @delete_notifier.run } - -@tailf_notifier.run +[ Thread.new { @create_notifier.run }, Thread.new { @delete_notifier.run } ].each{|t| t.join} diff --git a/lib/tailf2kafka/version.rb b/lib/tailf2kafka/version.rb index df1642a..e84c404 100644 --- a/lib/tailf2kafka/version.rb +++ b/lib/tailf2kafka/version.rb @@ -1,3 +1,3 @@ module Tailf2Kafka - VERSION ||= '0.1.10' + VERSION ||= '0.2.0' end diff --git a/tailf2kafka.gemspec b/tailf2kafka.gemspec index c605e4e..7235d63 100644 --- a/tailf2kafka.gemspec +++ b/tailf2kafka.gemspec @@ -15,7 +15,7 @@ Gem::Specification.new do |s| s.license = 'MIT' s.has_rdoc = false - s.add_dependency('poseidon') + s.add_dependency('ruby-kafka') s.add_dependency('snappy') s.add_dependency('hash_symbolizer') s.add_dependency('schash') @@ -23,6 +23,7 @@ Gem::Specification.new do |s| s.add_dependency('timers') s.add_dependency('mixlib-shellout') s.add_dependency('activesupport', '~> 4.2.6') + s.add_dependency('diplomat') s.add_development_dependency('rake')