Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 66 additions & 24 deletions bin/tailf2kafka
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env ruby

require 'optparse'
require 'poseidon'
require 'kafka'
require 'diplomat'
require 'yaml'
require 'hash_symbolizer'
require 'schash'
Expand Down Expand Up @@ -42,6 +43,7 @@ unless @config
end

@logger = Logger.new(STDOUT)
@logger.level = @loglevel

@settings = YAML.load_file(@config).symbolize_keys(true)

Expand All @@ -52,20 +54,23 @@ validator = Schash::Validator.new do
topic: string,
prefix: string,
suffix: optional(string),
time_pattern: string,
time_pattern: string
}),
position_file: string,
flush_interval: optional(integer),
max_batch_lines: optional(integer),
max_batches: optional(integer),
from_begining: boolean,
delete_old_tailed_files: optional(boolean),
post_delete_command: optional(string),
post_delete_command: optional(string)
},
kafka: {
brokers: array_of(string),
producer_type: match(/^(sync|async)$/),
produce: optional(boolean),
brokers: optional(array_of(string)),
consul_service: optional(string),
consul_tag: optional(string),
connect_timeout: optional(integer),
socket_timeout: optional(integer),
produce: optional(boolean)
}
}
end
Expand All @@ -82,7 +87,6 @@ end

@create_notifier = INotify::Notifier.new
@delete_notifier = INotify::Notifier.new
@tailf_notifier = INotify::Notifier.new

@dirs = {}
@files = {}
Expand All @@ -93,10 +97,29 @@ end
@max_batches = @settings[:tailf].has_key?(:max_batches) ? @settings[:tailf][:max_batches] : 10
@from_begining = @settings[:tailf][:from_begining]
@delete_old_tailed_files = @settings[:tailf].has_key?(:delete_old_tailed_files) ? @settings[:tailf][:delete_old_tailed_files] : false
@brokers = @settings[:kafka][:brokers]
@producer_type = @settings[:kafka][:producer_type].to_sym
@produce = @settings[:kafka].has_key?(:produce) ? @settings[:kafka][:produce] : true

def kafka_connect
if @settings[:kafka].has_key?(:brokers)
kafka_nodes = @settings[:kafka][:brokers]
else
kafka_nodes = Diplomat::Service.get(@settings[:kafka][:consul_service], :all, {:tag => @settings[:kafka][:consul_tag]}).inject([]) {|hosts,h| hosts << "#{h.Node}:#{h.ServicePort}"; hosts}
end
@kafka = Kafka.new(
seed_brokers: kafka_nodes,
logger: @logger,
connect_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 10,
socket_timeout: @settings[:kafka].has_key?(:socket_timeout) ? @settings[:kafka][:socket_timeout] : 60
)
@producer = @kafka.producer(
required_acks: 0,
max_retries: 2,
retry_backoff: 1,
compression_codec: :snappy,
max_buffer_size: @max_batch_lines + 1
)
end

def write_position_file
@mutex.synchronize do
File.open(@position_file, 'w') do |file|
Expand All @@ -123,19 +146,31 @@ end
load_position_file

@topics = @settings[:tailf][:files].map{|tailf_file| tailf_file[:topic]}
@producer = Poseidon::Producer.new(@brokers, "#{Socket.gethostname}", :type => @producer_type, :compression_codec => :snappy, :compressed_topics => @topics) if @produce

kafka_connect if @produce

@producer_queue = SizedQueue.new(@max_batches * 10)

@partition_key = 0

@producer_thread = Thread.new do
loop do
batch = @producer_queue.pop
begin
@producer.send_messages(batch[:messages]) if @produce
rescue Poseidon::Errors::UnableToFetchMetadata
@logger.warn("Got Poseidon::Errors::UnableToFetchMetadata while trying to produce kafka messages, retrying in 1 second ...")
sleep 1
retry
if @produce
@partition_key += 1
partition = @partition_key.to_s
puts batch[:messages].size
begin
batch[:messages].each do |msg|
@producer.produce(msg, topic: batch[:topic], partition_key: partition)
end
batch[:messages] = []
@producer.deliver_messages
rescue Kafka::DeliveryFailed
@logger.warn("Producer failed to deliver messages to brokers, retrying in 1 second ...")
sleep 1
retry
end
end
@files[batch[:path]][:offset] = batch[:offset]
end
Expand All @@ -151,13 +186,13 @@ def kafka_produce(path, buffer, offset)
truncated = msg
else
msg = msg + buffer.shift
messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip)
messages << msg.strip
end
else
messages << Poseidon::MessageToSend.new(@files[path][:topic], msg.strip)
messages << msg.strip
end
end
@producer_queue.push({ :path => path, :messages => messages, :offset => offset})
@producer_queue.push({ :topic => @files[path][:topic], :path => path, :messages => messages, :offset => offset })

truncated
end
Expand All @@ -175,8 +210,10 @@ def tailf(path)
truncated = kafka_produce(path, batch, file.pos)
end

notifier = INotify::Notifier.new

mutex = Mutex.new
@tailf_notifier.watch(path, :modify) do |event|
notifier.watch(path, :modify) do |event|
mutex.synchronize do
unless file.closed?
(1..@max_batches).each do |i|
Expand All @@ -190,6 +227,14 @@ def tailf(path)
end
end
end

loop do
time_before = Time.now
notifier.process
time_after = Time.now
time_left = @settings[:tailf][:flush_interval] - (time_before - time_after)
sleep time_left if time_left > 0
end
end

@time_regexp_hash = {
Expand Down Expand Up @@ -298,7 +343,4 @@ Thread.new { loop { @timers.wait } }

end

Thread.new { @create_notifier.run }
Thread.new { @delete_notifier.run }

@tailf_notifier.run
[ Thread.new { @create_notifier.run }, Thread.new { @delete_notifier.run } ].each{|t| t.join}
2 changes: 1 addition & 1 deletion lib/tailf2kafka/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Tailf2Kafka
VERSION ||= '0.1.10'
VERSION ||= '0.2.0'
end
3 changes: 2 additions & 1 deletion tailf2kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ Gem::Specification.new do |s|
s.license = 'MIT'
s.has_rdoc = false

s.add_dependency('poseidon')
s.add_dependency('ruby-kafka')
s.add_dependency('snappy')
s.add_dependency('hash_symbolizer')
s.add_dependency('schash')
s.add_dependency('rb-inotify')
s.add_dependency('timers')
s.add_dependency('mixlib-shellout')
s.add_dependency('activesupport', '~> 4.2.6')
s.add_dependency('diplomat')

s.add_development_dependency('rake')

Expand Down