Skip to content

Commit 86665fc

Browse files
authored
Merge pull request #63 from fluent/support-event-time
Support event time
2 parents 1c3d83b + c1a2587 commit 86665fc

File tree

4 files changed

+82
-9
lines changed

4 files changed

+82
-9
lines changed

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,20 @@ Fluent::Logger::ConsoleLogger.open(io)
126126
Fluent::Logger::NullLogger.open
127127
```
128128

129-
## Buffer overflow
129+
## Tips
130+
131+
### Use nanosecond-precision time
132+
133+
To send events with nanosecond-precision time (Fluent 0.14 and up), specify `nanosecond_precision` to `FluentLogger` constructor.
134+
135+
```rb
136+
log = Fluent::Logger::FluentLogger.new(nil, :host => 'localhost', :port => 24224, :nanosecond_precision => true)
137+
# Use nanosecond time instead
138+
log.post("myapp.access", {"agent" => "foo"})
139+
log.post_with_time("myapp.access", {"agent" => "foo"}, Time.now) # Need Time object for post_with_time
140+
```
141+
142+
### Buffer overflow
130143

131144
You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.
132145

fluent-logger.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ EOF
3030
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
3131
gem.require_paths = ['lib']
3232

33-
gem.add_dependency "msgpack", ">= 0.5.6", "< 2"
33+
gem.add_dependency "msgpack", ">= 1.0.0", "< 2"
3434
gem.add_development_dependency 'rake', '>= 0.9.2'
3535
gem.add_development_dependency 'rspec', '>= 3.0.0'
3636
gem.add_development_dependency 'rspec-its', '>= 1.1.0'

lib/fluent/logger/fluent_logger.rb

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,30 @@
2323

2424
module Fluent
2525
module Logger
26+
class EventTime
27+
TYPE = 0
28+
29+
def initialize(raw_time)
30+
@time = raw_time
31+
end
32+
33+
def to_msgpack(io = nil)
34+
@time.to_i.to_msgpack(io)
35+
end
36+
37+
def to_msgpack_ext
38+
[@time.to_i, @time.nsec].pack('NN')
39+
end
40+
41+
def self.from_msgpack_ext(data)
42+
new(*data.unpack('NN'))
43+
end
44+
45+
def to_json(*args)
46+
@time.to_i
47+
end
48+
end
49+
2650
class FluentLogger < LoggerBase
2751
BUFFER_LIMIT = 8*1024*1024
2852
RECONNECT_WAIT = 0.5
@@ -55,6 +79,13 @@ def initialize(tag_prefix = nil, *args)
5579
@host = options[:host]
5680
@port = options[:port]
5781
@socket_path = options[:socket_path]
82+
@nanosecond_precision = options[:nanosecond_precision]
83+
84+
@factory = MessagePack::Factory.new
85+
if @nanosecond_precision
86+
@factory.register_type(EventTime::TYPE, EventTime)
87+
end
88+
@packer = @factory.packer
5889

5990
@mon = Monitor.new
6091
@pending = nil
@@ -98,7 +129,11 @@ def last_error
98129
def post_with_time(tag, map, time)
99130
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
100131
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
101-
write [tag, time.to_i, map]
132+
if @nanosecond_precision && time.is_a?(Time)
133+
write [tag, EventTime.new(time), map]
134+
else
135+
write [tag, time.to_i, map]
136+
end
102137
end
103138

104139
def close
@@ -147,7 +182,11 @@ def pending_bytesize
147182

148183
def to_msgpack(msg)
149184
begin
150-
msg.to_msgpack
185+
@mon.synchronize {
186+
res = @packer.pack(msg).to_s
187+
@packer.clear
188+
res
189+
}
151190
rescue NoMethodError
152191
JSON.parse(JSON.generate(msg)).to_msgpack
153192
end

spec/fluent_logger_spec.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,26 @@
1212
DummyFluentd.new
1313
}
1414

15-
let(:logger) {
15+
let(:internal_logger) {
1616
@logger_io = StringIO.new
17-
logger = ::Logger.new(@logger_io)
18-
Fluent::Logger::FluentLogger.new('logger-test', {
17+
::Logger.new(@logger_io)
18+
}
19+
20+
let(:logger_config) {
21+
{
1922
:host => 'localhost',
2023
:port => fluentd.port,
21-
:logger => logger,
24+
:logger => internal_logger,
2225
:buffer_overflow_handler => buffer_overflow_handler
23-
})
26+
}
27+
}
28+
29+
let(:logger) {
30+
Fluent::Logger::FluentLogger.new('logger-test', logger_config)
31+
}
32+
33+
let(:logger_with_nanosec) {
34+
Fluent::Logger::FluentLogger.new('logger-test', logger_config.merge(:nanosecond_precision => true))
2435
}
2536

2637
let(:buffer_overflow_handler) { nil }
@@ -65,6 +76,16 @@
6576
expect(logger.pending_bytesize).to eq 0
6677
}
6778

79+
if defined?(Fluent::EventTime)
80+
it ('success with nanosecond') {
81+
expect(logger_with_nanosec.pending_bytesize).to eq 0
82+
expect(logger_with_nanosec.post('tag', {'a' => 'b'})).to be true
83+
fluentd.wait_transfer
84+
expect(fluentd.queue.last).to eq ['logger-test.tag', {'a' => 'b'}]
85+
expect(fluentd.output.emits.first[1]).to be_a_kind_of(Fluent::EventTime)
86+
}
87+
end
88+
6889
it ('close after post') {
6990
expect(logger).to be_connect
7091
logger.close

0 commit comments

Comments
 (0)