Skip to content

Commit b79c65f

Browse files
author
Nevins Bartolomeo
committed
making changes sugguested in logstash-plugins#102
1 parent 014fde5 commit b79c65f

File tree

7 files changed

+29
-20
lines changed

7 files changed

+29
-20
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
## Both time_file and size_file settings can trigger a log "file rotation"
4545
## A log rotation pushes the current log "part" to s3 and deleted from local temporary storage.
4646
#
47-
## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
47+
## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
4848
## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.
4949
##
5050
## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.
@@ -136,7 +136,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
136136
config :prefix, :validate => :string, :default => ''
137137

138138
# Specify how many workers to use to upload the files to S3
139-
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.25).round
139+
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round
140140

141141
# Number of items we can keep in the local queue before uploading them
142142
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round
@@ -171,7 +171,7 @@ def register
171171
# to prepare for the new config validation that will be part of the core so the core can
172172
# be moved easily.
173173
unless @prefix.empty?
174-
if !PathValidator.valid?(prefix)
174+
if !PathValidator.valid?(prefix)
175175
raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
176176
end
177177
end
@@ -180,7 +180,7 @@ def register
180180
raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
181181
end
182182

183-
if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource)
183+
if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource)
184184
raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check you credentials or your permissions."
185185
end
186186

lib/logstash/outputs/s3/file_repository.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# encoding: utf-8
2+
require "java"
23
require "concurrent"
3-
require "concurrent/map"
44
require "concurrent/timer_task"
55
require "logstash/util"
66

7+
java_import "java.util.concurrent.ConcurrentHashMap"
8+
79
module LogStash
810
module Outputs
911
class S3
@@ -28,14 +30,18 @@ def with_lock
2830
def stale?
2931
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
3032
end
33+
34+
def apply(prefix)
35+
return self
36+
end
3137
end
3238

3339
def initialize(tags, encoding, temporary_directory,
3440
stale_time = DEFAULT_STALE_TIME_SECS,
3541
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
3642
# The path need to contains the prefix so when we start
3743
# logtash after a crash we keep the remote structure
38-
@prefixed_factories = Concurrent::Map.new
44+
@prefixed_factories = ConcurrentHashMap.new
3945

4046
@tags = tags
4147
@encoding = encoding
@@ -48,19 +54,20 @@ def initialize(tags, encoding, temporary_directory,
4854
end
4955

5056
def keys
51-
@prefixed_factories.keys
57+
arr = []
58+
@prefixed_factories.keySet.each {|k| arr << k}
59+
arr
5260
end
5361

5462
def each_files
55-
@prefixed_factories.each_value do |prefixed_file|
63+
@prefixed_factories.values do |prefixed_file|
5664
prefixed_file.with_lock { |factory| yield factory.current }
5765
end
5866
end
5967

6068
# Return the file factory
6169
def get_factory(prefix_key)
62-
@prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) }
63-
.with_lock { |factory| yield factory }
70+
@prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory }
6471
end
6572

6673
def get_file(prefix_key)
@@ -79,8 +86,10 @@ def start_stale_sweeper
7986
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
8087
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
8188

82-
@prefixed_factories.each_pair do |k, v|
83-
@prefixed_factories.delete_pair(k, v) if v.stale?
89+
@prefixed_factories.entrySet.each do |s|
90+
if s.getValue.stale?
91+
@prefixed_factories.remove(s.getKey, s.getValue)
92+
end
8493
end
8594
end
8695

lib/logstash/outputs/s3/temporary_file_factory.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class S3
1111
#
1212
# The local structure will look like this.
1313
#
14-
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
14+
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
1515
#
1616
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete.
1717
# I do not have to mess around to check if the other directory have file in it before destroying them.
@@ -59,7 +59,7 @@ def current_time
5959
end
6060

6161
def generate_name
62-
filename = "ls.s3.#{Socket.gethostname}.#{current_time}"
62+
filename = "ls.s3.#{SecureRandom.uuid}.#{current_time}"
6363

6464
if tags.size > 0
6565
"#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}"

lib/logstash/outputs/s3/time_rotation_policy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def initialize(time_file)
1414
end
1515

1616
def rotate?(file)
17-
file.size > 0 && Time.now - file.ctime >= time_file
17+
file.size > 0 && ((Time.now - file.ctime)/60).floor >= time_file
1818
end
1919

2020
def need_periodic?

spec/outputs/s3/size_and_time_rotation_policy_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
end
4949

5050
it "returns true if the file old enough" do
51-
allow(file).to receive(:ctime).and_return(Time.now - time_file * 2)
51+
allow(file).to receive(:ctime).and_return(Time.now - (time_file * 2 * 60) )
5252
expect(subject.rotate?(file)).to be_truthy
5353
end
5454

spec/outputs/s3/temporary_file_factory_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
it "includes the date" do
4242
n = Time.now
43-
expect(subject.current.path).to match(/ls.s3.#{Socket.gethostname}.#{n.strftime("%Y-%m-%dT")}\d+\.\d+\./)
43+
expect(subject.current.path).to include(n.strftime("%Y-%m-%dT"))
4444
end
4545

4646
it "include the file key in the path" do
@@ -50,7 +50,7 @@
5050

5151
it "create a unique directory in the temporary directory for each file" do
5252
uuid = "hola"
53-
expect(SecureRandom).to receive(:uuid).and_return(uuid)
53+
expect(SecureRandom).to receive(:uuid).and_return(uuid).twice
5454
expect(subject.current.path).to include(uuid)
5555
end
5656

spec/outputs/s3/time_rotation_policy_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
end
2828

2929
it "returns true if the file old enough" do
30-
allow(file).to receive(:ctime).and_return(Time.now - max_time * 2)
30+
allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60))
3131
expect(subject.rotate?(file)).to be_truthy
3232
end
3333

@@ -38,7 +38,7 @@
3838

3939
context "When the size of the file is 0" do
4040
it "returns false if the file old enough" do
41-
allow(file).to receive(:ctime).and_return(Time.now - max_time * 2)
41+
allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60))
4242
expect(subject.rotate?(file)).to be_falsey
4343
end
4444

0 commit comments

Comments
 (0)