Skip to content

Commit c981755

Browse files
authored
Merge pull request #59 from jcalvert/unix_socket_support
Add UNIX socket support
2 parents d43bb3e + 9d59f27 commit c981755

File tree

4 files changed

+95
-5
lines changed

4 files changed

+95
-5
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@ end
1919
# output: myapp.access {"agent":"foo"}
2020
```
2121

22+
### UNIX socket
23+
24+
```ruby
25+
require 'fluent-logger'
26+
27+
log = Fluent::Logger::FluentLogger.new(nil, :socket_path => "/tmp/fluent.sock")
28+
unless log.post("myapp.access", {"agent"=>"foo"})
29+
p log.last_error # You can get last error object via last_error method
30+
end
31+
32+
# output: myapp.access {"agent":"foo"}
33+
```
34+
2235
### Singleton
2336
```ruby
2437
require 'fluent-logger'

lib/fluent/logger/fluent_logger.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def initialize(tag_prefix = nil, *args)
5454
@tag_prefix = tag_prefix
5555
@host = options[:host]
5656
@port = options[:port]
57+
@socket_path = options[:socket_path]
5758

5859
@mon = Monitor.new
5960
@pending = nil
@@ -63,7 +64,6 @@ def initialize(tag_prefix = nil, *args)
6364
@log_reconnect_error_threshold = options[:log_reconnect_error_threshold] || RECONNECT_WAIT_MAX_COUNT
6465

6566
@buffer_overflow_handler = options[:buffer_overflow_handler]
66-
6767
if logger = options[:logger]
6868
@logger = logger
6969
else
@@ -108,7 +108,7 @@ def close
108108
send_data(@pending)
109109
rescue => e
110110
set_last_error(e)
111-
@logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
111+
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
112112
call_buffer_overflow_handler(@pending)
113113
end
114114
end
@@ -123,6 +123,18 @@ def connect?
123123
@con && !@con.closed?
124124
end
125125

126+
def create_socket!
127+
if @socket_path
128+
@con = UNIXSocket.new(@socket_path)
129+
else
130+
@con = TCPSocket.new(@host, @port)
131+
end
132+
end
133+
134+
def connection_string
135+
@socket_path ? "#{@socket_path}" : "#{@host}:#{@port}"
136+
end
137+
126138
private
127139
def to_msgpack(msg)
128140
begin
@@ -170,7 +182,7 @@ def write(msg)
170182
rescue => e
171183
set_last_error(e)
172184
if @pending.bytesize > @limit
173-
@logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
185+
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
174186
call_buffer_overflow_handler(@pending)
175187
@pending = nil
176188
end
@@ -203,7 +215,7 @@ def send_data(data)
203215
end
204216

205217
def connect!
206-
@con = TCPSocket.new(@host, @port)
218+
create_socket!
207219
@con.sync = true
208220
@connect_error_history.clear
209221
@logged_reconnect_error = false
@@ -230,7 +242,7 @@ def call_buffer_overflow_handler(pending)
230242
end
231243

232244
def log_reconnect_error
233-
@logger.error("FluentLogger: Can't connect to #{@host}:#{@port}(#{@connect_error_history.size} retried): #{$!}")
245+
@logger.error("FluentLogger: Can't connect to #{connection_string}(#{@connect_error_history.size} retried): #{$!}")
234246
end
235247

236248
def set_last_error(e)

spec/fluent_logger_spec.rb

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,44 @@ def flush(messages)
230230
end
231231
end
232232
end
233+
234+
context "using socket_path" do
235+
236+
let(:socket_logger) {
237+
@logger_io = StringIO.new
238+
logger = ::Logger.new(@logger_io)
239+
Fluent::Logger::FluentLogger.new('logger-test', {
240+
:socket_path => fluentd.socket_path,
241+
:logger => logger,
242+
:buffer_overflow_handler => buffer_overflow_handler
243+
})
244+
}
245+
246+
context "running fluentd" do
247+
before(:all) do
248+
@serverengine = DummyServerengine.new
249+
@serverengine.startup
250+
end
251+
252+
before(:each) do
253+
fluentd.socket_startup
254+
end
255+
256+
after(:each) do
257+
fluentd.shutdown
258+
end
259+
260+
after(:all) do
261+
@serverengine.shutdown
262+
end
263+
264+
context('post') do
265+
it ('success') {
266+
expect(socket_logger.post('tag', {'b' => 'a'})).to be true
267+
fluentd.wait_transfer
268+
expect(fluentd.queue.last).to eq ['logger-test.tag', {'b' => 'a'}]
269+
}
270+
end
271+
end
272+
end
233273
end

spec/support/dummy_fluentd.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def initialize
1010
end
1111

1212
WAIT = ENV['WAIT'] ? ENV['WAIT'].to_f : 0.3
13+
SOCKET_PATH = ENV['SOCKET_PATH'] ? ENV['SOCKET_PATH'] : "/tmp/dummy_fluent.sock"
1314

1415
def wait_transfer
1516
sleep WAIT
@@ -29,6 +30,10 @@ def port
2930
@port
3031
end
3132

33+
def socket_path
34+
SOCKET_PATH
35+
end
36+
3237
def output
3338
sleep 0.0001 # next tick
3439
if Fluent::Engine.respond_to?(:match)
@@ -66,6 +71,26 @@ def startup
6671
wait_transfer
6772
end
6873

74+
def socket_startup
75+
config = Fluent::Config.parse(<<EOF, '(logger-spec)', '(logger-spec-dir)', true)
76+
<source>
77+
type unix
78+
path #{socket_path}
79+
</source>
80+
<match logger-test.**>
81+
type test
82+
</match>
83+
EOF
84+
Fluent::Test.setup
85+
Fluent::Engine.run_configure(config)
86+
@coolio_default_loop = nil
87+
@thread = Thread.new {
88+
@coolio_default_loop = Coolio::Loop.default
89+
Fluent::Engine.run
90+
}
91+
wait_transfer
92+
end
93+
6994
def shutdown
7095
@coolio_default_loop.stop rescue nil
7196
begin

0 commit comments

Comments
 (0)