Skip to content

Commit b56fdb3

Browse files
author
Nitin Goel
committed
Support for dynamic field values in prefix
1 parent 6b63cce commit b56fdb3

File tree

1 file changed

+120
-45
lines changed
  • lib/logstash/outputs

1 file changed

+120
-45
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 120 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "thread"
88
require "tmpdir"
99
require "fileutils"
10+
require 'pathname'
1011

1112

1213
# INFORMATION:
@@ -60,6 +61,7 @@
6061
# time_file => 5 (optional)
6162
# format => "plain" (optional)
6263
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
64+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
6365
# }
6466
#
6567
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
110112
# Specify how many workers to use to upload the files to S3
111113
config :upload_workers_count, :validate => :number, :default => 1
112114

115+
# Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it
116+
config :no_event_wait, :validate => :number, :default => 5
117+
113118
# Exposed attributes for testing purpose.
114119
attr_accessor :tempfile
115120
attr_reader :page_counter
@@ -139,8 +144,13 @@ def aws_service_endpoint(region)
139144
def write_on_bucket(file)
140145
# find and use the bucket
141146
bucket = @s3.buckets[@bucket]
147+
148+
first = Pathname.new @temporary_directory
149+
second = Pathname.new file
142150

143-
remote_filename = "#{@prefix}#{File.basename(file)}"
151+
remote_filename_path = second.relative_path_from first
152+
153+
remote_filename = remote_filename_path.to_s
144154

145155
@logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)
146156

@@ -160,17 +170,21 @@ def write_on_bucket(file)
160170

161171
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
162172
public
163-
def create_temporary_file
164-
filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))
165-
166-
@logger.debug("S3: Creating a new temporary file", :filename => filename)
167-
168-
@file_rotation_lock.synchronize do
169-
unless @tempfile.nil?
170-
@tempfile.close
173+
def create_temporary_file(prefix)
174+
filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix]))
175+
@file_rotation_lock[prefix].synchronize do
176+
unless @tempfile[prefix].nil?
177+
@tempfile[prefix].close
178+
end
179+
180+
if @prefixes.include? prefix
181+
dirname = File.dirname(filename)
182+
unless File.directory?(dirname)
183+
FileUtils.mkdir_p(dirname)
184+
end
185+
@logger.debug("S3: Creating a new temporary file", :filename => filename)
186+
@tempfile[prefix] = File.open(filename, "a")
171187
end
172-
173-
@tempfile = File.open(filename, "a")
174188
end
175189
end
176190

@@ -185,7 +199,11 @@ def register
185199

186200
@s3 = aws_s3_config
187201
@upload_queue = Queue.new
188-
@file_rotation_lock = Mutex.new
202+
@file_rotation_lock = Hash.new
203+
@tempfile = Hash.new
204+
@page_counter = Hash.new
205+
@prefixes = Set.new
206+
@empty_uploads = Hash.new
189207

190208
if @prefix && @prefix =~ S3_INVALID_CHARACTERS
191209
@logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
@@ -197,15 +215,14 @@ def register
197215
end
198216

199217
test_s3_write
200-
201218
restore_from_crashes if @restore == true
202-
reset_page_counter
203-
create_temporary_file
219+
#reset_page_counter
220+
#create_temporary_file
204221
configure_periodic_rotation if time_file != 0
205222
configure_upload_workers
206223

207224
@codec.on_event do |event, encoded_event|
208-
handle_event(encoded_event)
225+
handle_event(encoded_event, event)
209226
end
210227
end
211228

@@ -242,13 +259,36 @@ def restore_from_crashes
242259
end
243260
end
244261

262+
public
263+
def shouldcleanup(prefix)
264+
return @empty_uploads[prefix] > @no_event_wait
265+
end
266+
245267
public
246268
def move_file_to_bucket(file)
269+
270+
@logger.debug("S3: moving to bucket ", :file => file)
271+
272+
basepath = Pathname.new @temporary_directory
273+
dirname = Pathname.new File.dirname(file)
274+
prefixpath = dirname.relative_path_from basepath
275+
prefix = prefixpath.to_s
276+
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
277+
247278
if !File.zero?(file)
279+
if @prefixes.include? prefix
280+
@empty_uploads[prefix] = 0
281+
end
248282
write_on_bucket(file)
249283
@logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
284+
else
285+
if @prefixes.include? prefix
286+
@empty_uploads[prefix] += 1
287+
end
250288
end
251289

290+
@logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix])
291+
252292
begin
253293
File.delete(file)
254294
rescue Errno::ENOENT
@@ -257,6 +297,10 @@ def move_file_to_bucket(file)
257297
rescue Errno::EACCES
258298
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
259299
end
300+
301+
if shouldcleanup(prefix)
302+
cleanprefix(prefix)
303+
end
260304
end
261305

262306
public
@@ -283,9 +327,10 @@ def receive(event)
283327
end
284328

285329
public
286-
def rotate_events_log?
287-
@file_rotation_lock.synchronize do
288-
@tempfile.size > @size_file
330+
331+
def rotate_events_log(prefix)
332+
@file_rotation_lock[prefix].synchronize do
333+
@tempfile[prefix].size > @size_file
289334
end
290335
end
291336

@@ -295,12 +340,13 @@ def write_events_to_multiple_files?
295340
end
296341

297342
public
298-
def write_to_tempfile(event)
343+
def write_to_tempfile(event, prefix)
344+
299345
begin
300-
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
346+
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix]))
301347

302-
@file_rotation_lock.synchronize do
303-
@tempfile.syswrite(event)
348+
@file_rotation_lock[prefix].synchronize do
349+
@tempfile[prefix].syswrite(event)
304350
end
305351
rescue Errno::ENOSPC
306352
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
@@ -309,13 +355,16 @@ def write_to_tempfile(event)
309355
end
310356

311357
public
312-
def teardown
358+
def teardown()
313359
shutdown_upload_workers
314360
@periodic_rotation_thread.stop! if @periodic_rotation_thread
315-
316-
@file_rotation_lock.synchronize do
317-
@tempfile.close unless @tempfile.nil? && @tempfile.closed?
361+
362+
for prefix in @prefixes
363+
@file_rotation_lock[prefix].synchronize do
364+
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
365+
end
318366
end
367+
319368
finished
320369
end
321370

@@ -326,37 +375,63 @@ def shutdown_upload_workers
326375
end
327376

328377
private
329-
def handle_event(encoded_event)
378+
def handle_event(encoded_event, event)
379+
actualprefix = event.sprintf(@prefix)
380+
if not @prefixes.to_a().include? actualprefix
381+
@file_rotation_lock[actualprefix] = Mutex.new
382+
@prefixes.add(actualprefix)
383+
reset_page_counter(actualprefix)
384+
create_temporary_file(actualprefix)
385+
@empty_uploads[actualprefix] = 0
386+
end
387+
330388
if write_events_to_multiple_files?
331-
if rotate_events_log?
332-
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
389+
if rotate_events_log(actualprefix)
390+
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix]))
333391

334-
move_file_to_bucket_async(@tempfile.path)
335-
next_page
336-
create_temporary_file
392+
move_file_to_bucket_async(@tempfile[actualprefix].path)
393+
next_page(actualprefix)
394+
create_temporary_file(actualprefix)
337395
else
338-
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
396+
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file)
339397
end
340398
end
341399

342-
write_to_tempfile(encoded_event)
400+
write_to_tempfile(encoded_event, actualprefix)
343401
end
344402

345403
private
346404
def configure_periodic_rotation
347405
@periodic_rotation_thread = Stud::Task.new do
348406
LogStash::Util::set_thread_name("<S3 periodic uploader")
349407

350-
Stud.interval(periodic_interval, :sleep_then_run => true) do
351-
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
352-
353-
move_file_to_bucket_async(@tempfile.path)
354-
next_page
355-
create_temporary_file
408+
Stud.interval(periodic_interval, :sleep_then_run => true) do
409+
410+
@tempfile.keys.each do |key|
411+
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path)
412+
move_file_to_bucket_async(@tempfile[key].path)
413+
next_page(key)
414+
create_temporary_file(key)
415+
end
416+
356417
end
357418
end
358419
end
359420

421+
private
422+
def cleanprefix(prefix)
423+
path = File.join(@temporary_directory, prefix)
424+
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
425+
@file_rotation_lock[prefix].synchronize do
426+
@tempfile[prefix].close
427+
Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'}
428+
FileUtils.remove_dir(path)
429+
@prefixes.delete(prefix)
430+
@tempfile.delete(prefix)
431+
@empty_uploads[prefix] = 0
432+
end
433+
end
434+
360435
private
361436
def configure_upload_workers
362437
@logger.debug("S3: Configure upload workers")
@@ -389,13 +464,13 @@ def upload_worker
389464
end
390465

391466
private
392-
def next_page
393-
@page_counter += 1
467+
def next_page(key)
468+
@page_counter[key] += 1
394469
end
395470

396471
private
397-
def reset_page_counter
398-
@page_counter = 0
472+
def reset_page_counter(key)
473+
@page_counter[key] = 0
399474
end
400475

401476
private

0 commit comments

Comments
 (0)