diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 257ef357..fb4a5df0 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -4,10 +4,13 @@ require "logstash/plugin_mixins/aws_config" require "stud/temporary" require "stud/task" -require "socket" # for Socket.gethostname +require "concurrent" +require "socket" require "thread" require "tmpdir" require "fileutils" +require "set" +require "pathname" # INFORMATION: @@ -17,35 +20,34 @@ # Requirements: # * Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key) # * S3 PutObject permission -# * Run logstash as superuser to establish connection # -# S3 outputs create temporary files into "/opt/logstash/S3_temp/". If you want, you can change the path at the start of register method. +# S3 outputs create temporary files into the OS' temporary directory, you can specify where to save them using the `temporary_directory` option. # # S3 output files have the following format # # ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt # -# ls.s3 : indicate logstash plugin s3 # -# "ip-10-228-27-95" : indicates the ip of your machine. -# "2013-04-18T10.00" : represents the time whenever you specify time_file. -# "tag_hello" : this indicates the event's tag. -# "part0" : this means if you indicate size_file then it will generate more parts if you file.size > size_file. -# When a file is full it will be pushed to the bucket and then deleted from the temporary directory. -# If a file is empty, it is simply deleted. Empty files will not be pushed +# |======= +# | ls.s3 | indicate logstash plugin s3 | +# | ip-10-228-27-95 | indicates the ip of your machine. | +# | 2013-04-18T10.00 | represents the time whenever you specify time_file. | +# | tag_hello | this indicates the event's tag. | +# | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | +# |======= # # Crash Recovery: -# * This plugin will recover and upload temporary log files after crash/abnormal termination +# * This plugin will recover and upload temporary log files after crash/abnormal termination when using `restore` set to true # ##[Note regarding time_file and size_file] : # -# Both time_file and size_file settings can trigger a log "file rotation" -# A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. +## Both time_file and size_file settings can trigger a log "file rotation" +## A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. # ## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). ## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered. ## -## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.. +## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. ## When time_file minutes elapses, a log rotation will be triggered. # ## If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. @@ -63,37 +65,54 @@ # access_key_id => "crazy_key" (required) # secret_access_key => "monkey_access_key" (required) # region => "eu-west-1" (optional, default = "us-east-1") -# bucket => "boss_please_open_your_bucket" (required) +# bucket => "your_bucket" (required) # size_file => 2048 (optional) - Bytes # time_file => 5 (optional) - Minutes # format => "plain" (optional) -# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control". Defaults to "private" ) +# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base - include LogStash::PluginMixins::AwsConfig + require "logstash/outputs/s3/writable_directory_validator" + require "logstash/outputs/s3/path_validator" + require "logstash/outputs/s3/write_bucket_permission_validator" + require "logstash/outputs/s3/size_rotation_policy" + require "logstash/outputs/s3/time_rotation_policy" + require "logstash/outputs/s3/size_and_time_rotation_policy" + require "logstash/outputs/s3/temporary_file" + require "logstash/outputs/s3/temporary_file_factory" + require "logstash/outputs/s3/uploader" + require "logstash/outputs/s3/file_repository" + + include LogStash::PluginMixins::AwsConfig::V2 + + PREFIX_KEY_NORMALIZE_CHARACTER = "_" + PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 + CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 2, + :fallback_policy => :caller_runs + }) - TEMPFILE_EXTENSION = "txt" - S3_INVALID_CHARACTERS = /[\^`><]/ config_name "s3" - default :codec, 'line' + default :codec, "line" - concurrency :single + concurrency :shared # S3 bucket - config :bucket, :validate => :string + config :bucket, :validate => :string, :required => true # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. # If you have tags then it will generate a specific size file for every tags ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. - config :size_file, :validate => :number, :default => 0 + config :size_file, :validate => :number, :default => 1024 * 1024 * 5 # Set the time, in MINUTES, to close the current sub_time_section of bucket. # If you define file_size you have a number of files in consideration of the section and the current tag. # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 0 + config :time_file, :validate => :number, :default => 15 ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". ## This is hack for not destroy the new files after restoring the initial files. @@ -102,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :restore, :validate => :boolean, :default => false # The S3 canned ACL to use when putting the file. Defaults to "private". - config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control"], + config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], :default => "private" # Specifies wether or not to use S3's AES256 server side encryption. Defaults to false. @@ -113,10 +132,14 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") # Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash. + # This option support string interpolation, be warned this can created a lot of temporary local files. config :prefix, :validate => :string, :default => '' # Specify how many workers to use to upload the files to S3 - config :upload_workers_count, :validate => :number, :default => 1 + config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil + + # Number of items we can keep in the local queue before uploading them + config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).ceil # The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly # specified here @@ -135,348 +158,202 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify the content encoding. Supports ("gzip"). Defaults to "none" config :encoding, :validate => ["none", "gzip"], :default => "none" - # Exposed attributes for testing purpose. - attr_accessor :tempfile - attr_reader :page_counter, :upload_workers - attr_reader :s3 - - def aws_s3_config - @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region) - @s3 = AWS::S3.new(full_options) - end - - def full_options - aws_options_hash.merge(signature_options) - end - - def signature_options - if @signature_version - {:s3_signature_version => @signature_version} - else - {} - end - end - - def aws_service_endpoint(region) - return { - :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com" - } - end - - public - def write_on_bucket(file) - # find and use the bucket - bucket = @s3.buckets[@bucket] - - remote_filename = "#{@prefix}#{File.basename(file)}" + # Define the strategy to use to decide when we need to rotate the file and push it to S3, + # The default strategy is to check for both size and time, the first one to match will rotate the file. + config :rotation_strategy, :validate => ["size_and_time", "size", "time"], :default => "size_and_time" - @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) + # The common use case is to define permission on the root bucket and give Logstash full access to write its logs. + # In some circonstances you need finer grained permission on subfolder, this allow you to disable the check at startup. + config :validate_credentials_on_root_bucket, :validate => :boolean, :default => true - File.open(file, 'r') do |fileIO| - begin - # prepare for write the file - object = bucket.objects[remote_filename] - object.write(fileIO, - :acl => @canned_acl, - :server_side_encryption => @server_side_encryption ? :aes256 : nil, - :content_encoding => @encoding == "gzip" ? "gzip" : nil) - rescue AWS::Errors::Base => error - @logger.error("S3: AWS error", :error => error) - raise LogStash::Error, "AWS Configuration Error, #{error}" + def register + # I've move the validation of the items into custom classes + # to prepare for the new config validation that will be part of the core so the core can + # be moved easily. + unless @prefix.empty? + if !PathValidator.valid?(prefix) + raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end - @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket => @bucket, :canned_acl => @canned_acl) - end - - # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. - public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close - end - - if @encoding == "gzip" - @tempfile = Zlib::GzipWriter.open(filename) - else - @tempfile = File.open(filename, "a") - end + if !WritableDirectoryValidator.valid?(@temporary_directory) + raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" end - end - public - def register - require "aws-sdk" - # required if using ruby version < 2.0 - # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby - AWS.eager_autoload!(AWS::S3) - - @s3 = aws_s3_config - @upload_queue = Queue.new - @file_rotation_lock = Mutex.new - - if @prefix && @prefix =~ S3_INVALID_CHARACTERS - @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) - raise LogStash::ConfigurationError, "S3: prefix contains invalid characters" + if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource) + raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check you credentials or your permissions." end - if !Dir.exist?(@temporary_directory) - FileUtils.mkdir_p(@temporary_directory) + if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0 + raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0" end - test_s3_write - - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file - configure_periodic_rotation if time_file != 0 - configure_upload_workers - - @codec.on_event do |event, encoded_event| - handle_event(encoded_event) - end - end + @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) + @rotation = rotation_strategy + @uploader = Uploader.new(bucket_resource, @logger, Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => @upload_workers_count, + :max_queue => @upload_queue_size, + :fallback_policy => :caller_runs + })) - # Use the same method that Amazon use to check - # permission on the user bucket by creating a small file - public - def test_s3_write - @logger.debug("S3: Creating a test file on S3") + # Restoring from crash will use a new threadpool to slowly recover + # New events should have more priority. + restore_from_crash if @restore - test_filename = File.join(@temporary_directory, - "logstash-programmatic-access-test-object-#{Time.now.to_i}") + # If we need time based rotation we need to do periodic check on the file + # to take care of file that were not updated recently + start_periodic_check if @rotation.need_periodic? + end - File.open(test_filename, 'a') do |file| - file.write('test') - end + def multi_receive_encoded(events_and_encoded) + prefix_written_to = Set.new - begin - write_on_bucket(test_filename) + events_and_encoded.each do |event, encoded| + prefix_key = normalize_key(event.sprintf(@prefix)) + prefix_written_to << prefix_key begin - remote_filename = "#{@prefix}#{File.basename(test_filename)}" - bucket = @s3.buckets[@bucket] - bucket.objects[remote_filename].delete - rescue StandardError => e - # we actually only need `put_object`, but if we dont delete them - # we can have a lot of tests files + @file_repository.get_file(prefix_key) { |file| file.write(encoded) } + # The output should stop accepting new events coming in, since it cannot do anything with them anymore. + # Log the error and rethrow it. + rescue Errno::ENOSPC => e + @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) + raise e end - ensure - File.delete(test_filename) end + + # Groups IO calls to optimize fstat checks + rotate_if_needed(prefix_written_to) end - public - def restore_from_crashes - @logger.debug("S3: Checking for temp files from a previoius crash...") + def close + stop_periodic_check if @rotation.need_periodic? - Dir[File.join(@temporary_directory, "*.#{get_tempfile_extension}")].each do |file| - name_file = File.basename(file) - @logger.warn("S3: Found temporary file from crash. Uploading file to S3.", :filename => name_file) - move_file_to_bucket_async(file) - end - end + @logger.debug("Uploading current workspace") - public - def move_file_to_bucket(file) - if !File.zero?(file) - write_on_bucket(file) - @logger.debug("S3: File was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + # The plugin has stopped receiving new events, but we still have + # data on disk, lets make sure it get to S3. + # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup + # the content in the temporary directly and upload it. + # This will block the shutdown until all upload are done or the use force quit. + @file_repository.each_files do |file| + upload_file(file) end - begin - File.delete(file) - rescue Errno::ENOENT - # Something else deleted the file, logging but not raising the issue - @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file)) - rescue Errno::EACCES - @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) - end - end + @file_repository.shutdown - public - def periodic_interval - @time_file * 60 + @uploader.stop # wait until all the current upload are complete + @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end - private - def get_tempfile_extension - @encoding == "gzip" ? "#{TEMPFILE_EXTENSION}.gz" : "#{TEMPFILE_EXTENSION}" + def full_options + options = { :credentials => credentials } + options[:s3_signature_version] = @signature_version if @signature_version + options.merge(aws_options_hash) end - public - def get_temporary_filename(page_counter = 0) - current_time = Time.now - filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}" - - if @tags.size > 0 - return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{get_tempfile_extension}" - else - return "#{filename}.part#{page_counter}.#{get_tempfile_extension}" - end + def normalize_key(prefix_key) + prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end - public - def receive(event) + private + # We start a task in the background for check for stale files and make sure we rotate them to S3 if needed. + def start_periodic_check + @logger.debug("Start periodic rotation check") - @codec.encode(event) - end + @periodic_check = Concurrent::TimerTask.new(:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS) do + @logger.debug("Periodic check for stale files") - public - def rotate_events_log? - @file_rotation_lock.synchronize do - tempfile_size > @size_file + rotate_if_needed(@file_repository.keys) end - end - private - def tempfile_size - if @tempfile.instance_of? File - @tempfile.size - elsif @tempfile.instance_of? Zlib::GzipWriter - @tempfile.tell - else - raise LogStash::Error, "Unable to get size of temp file of type #{@tempfile.class}" - end + @periodic_check.execute end - public - def write_events_to_multiple_files? - @size_file > 0 + def stop_periodic_check + @periodic_check.shutdown end - public - def write_to_tempfile(event) - begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile.path)) - - @file_rotation_lock.synchronize do - @tempfile.write(event) - end - rescue Errno::ENOSPC - @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) - close - end + def bucket_resource + Aws::S3::Bucket.new(@bucket, { :credentials => credentials }.merge(aws_options_hash)) end - public - def close - shutdown_upload_workers - @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? - end + def aws_service_endpoint(region) + { :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com"} end - private - def shutdown_upload_workers - @logger.debug("S3: Gracefully shutdown the upload workers") - @upload_queue << LogStash::SHUTDOWN + def upload_options + { + :acl => @cannel_acl, + :server_side_encryption => @server_side_encryption ? :aes256 : nil, + :content_encoding => @encoding == "gzip" ? "gzip" : nil + } end - private - def handle_event(encoded_event) - if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile.path)) - - tempfile_path = @tempfile.path - # close and start next file before sending the previous one - next_page - create_temporary_file - - # send to s3 - move_file_to_bucket_async(tempfile_path) - else - @logger.debug("S3: tempfile file size report.", :tempfile_size => tempfile_size, :size_file => @size_file) + def rotate_if_needed(prefixes) + prefixes.each do |prefix| + # Each file access is thread safe, until the rotation is done then only + # one thread has access to the resource. + @file_repository.get_factory(prefix) do |factory| + temp_file = factory.current + + if @rotation.rotate?(temp_file) + @logger.debug("Rotate file", + :strategy => @rotation.class.name, + :key => temp_file.key, + :path => temp_file.path) + + upload_file(temp_file) + factory.rotate! + end end end - - write_to_tempfile(encoded_event) end - private - def configure_periodic_rotation - @periodic_rotation_thread = Stud::Task.new do - LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - tempfile_path = @tempfile.path - # close and start next file before sending the previous one - next_page - create_temporary_file + def upload_file(temp_file) + @logger.debug("Queue for upload", :path => temp_file.path) - # send to s3 - move_file_to_bucket_async(tempfile_path) - end + # if the queue is full the calling thread will be used to upload + temp_file.fsync # make sure we flush the fd before uploading it. + if temp_file.size > 0 + @uploader.upload_async(temp_file, + :on_complete => method(:clean_temporary_file), + :upload_options => upload_options ) end end - private - def configure_upload_workers - @logger.debug("S3: Configure upload workers") - - @upload_workers = @upload_workers_count.times.map do |worker_id| - Stud::Task.new do - LogStash::Util::set_thread_name(" worker_id) - - continue = upload_worker - end - end + def rotation_strategy + case @rotation_strategy + when "size" + SizeRotationPolicy.new(size_file) + when "time" + TimeRotationPolicy.new(time_file) + when "size_and_time" + SizeAndTimeRotationPolicy.new(size_file, time_file) end end - private - def upload_worker - file = nil - begin - file = @upload_queue.deq - - if file == LogStash::SHUTDOWN - @logger.debug("S3: upload worker is shutting down gracefuly") - @upload_queue.enq(LogStash::SHUTDOWN) - false - else - @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) - move_file_to_bucket(file) - true - end - rescue Exception => ex - @logger.error("failed to upload, will re-enqueue #{file} for upload", - :ex => ex, :backtrace => ex.backtrace) - unless file.nil? # Rare case if the first line of the begin doesn't execute - @upload_queue.enq(file) - end - true - end + def clean_temporary_file(file) + @logger.debug("Removing temporary file", :file => file.path) + file.delete! end - private - def next_page - @page_counter += 1 - end + # The upload process will use a separate uploader/threadpool with less resource allocated to it. + # but it will use an unbounded queue for the work, it may take some time before all the older files get processed. + def restore_from_crash + @crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL) - private - def reset_page_counter - @page_counter = 0 - end + temp_folder_path = Pathname.new(@temporary_directory) + Dir.glob(::File.join(@temporary_directory, "**/*")) do |file| + if ::File.file?(file) + key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) + temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1)) - private - def move_file_to_bucket_async(file) - @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file)) - @upload_queue.enq(file) + @logger.debug("Recovering from crash and uploading", :file => temp_file.path) + @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) + end + end end end diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb new file mode 100644 index 00000000..1b1cd5c0 --- /dev/null +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -0,0 +1,120 @@ +# encoding: utf-8 +require "java" +require "concurrent" +require "concurrent/timer_task" +require "logstash/util" + +ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap + +module LogStash + module Outputs + class S3 + class FileRepository + DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60 + DEFAULT_STALE_TIME_SECS = 15 * 60 + # Ensure that all access or work done + # on a factory is threadsafe + class PrefixedValue + def initialize(factory, stale_time) + @factory = factory + @lock = Mutex.new + @stale_time = stale_time + end + + def with_lock + @lock.synchronize { + yield @factory + } + end + + def stale? + with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) } + end + + def apply(prefix) + return self + end + + def delete! + with_lock{ |factory| factory.current.delete! } + end + end + + class FactoryInitializer + def initialize(tags, encoding, temporary_directory, stale_time) + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + @stale_time = stale_time + end + + def apply(prefix_key) + PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) + end + end + + def initialize(tags, encoding, temporary_directory, + stale_time = DEFAULT_STALE_TIME_SECS, + sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) + # The path need to contains the prefix so when we start + # logtash after a crash we keep the remote structure + @prefixed_factories = ConcurrentHashMap.new + + @sweeper_interval = sweeper_interval + + @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time) + + start_stale_sweeper + end + + def keys + @prefixed_factories.keySet + end + + def each_files + @prefixed_factories.elements.each do |prefixed_file| + prefixed_file.with_lock { |factory| yield factory.current } + end + end + + # Return the file factory + def get_factory(prefix_key) + @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } + end + + def get_file(prefix_key) + get_factory(prefix_key) { |factory| yield factory.current } + end + + def shutdown + stop_stale_sweeper + end + + def size + @prefixed_factories.size + end + + def remove_stale(k, v) + if v.stale? + @prefixed_factories.remove(k, v) + v.delete! + end + end + + def start_stale_sweeper + @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do + LogStash::Util.set_thread_name("S3, Stale factory sweeper") + + @prefixed_factories.forEach{|k,v| remove_stale(k,v)} + end + + @stale_sweeper.execute + end + + def stop_stale_sweeper + @stale_sweeper.shutdown + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/path_validator.rb b/lib/logstash/outputs/s3/path_validator.rb new file mode 100644 index 00000000..0311e450 --- /dev/null +++ b/lib/logstash/outputs/s3/path_validator.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class PathValidator + INVALID_CHARACTERS = "\^`><" + + def self.valid?(name) + name.match(matches_re).nil? + end + + def self.matches_re + /[#{Regexp.escape(INVALID_CHARACTERS)}]/ + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb new file mode 100644 index 00000000..356d86cf --- /dev/null +++ b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb @@ -0,0 +1,24 @@ +# encoding: utf-8 +require "logstash/outputs/s3/size_rotation_policy" +require "logstash/outputs/s3/time_rotation_policy" + +module LogStash + module Outputs + class S3 + class SizeAndTimeRotationPolicy + def initialize(file_size, time_file) + @size_strategy = SizeRotationPolicy.new(file_size) + @time_strategy = TimeRotationPolicy.new(time_file) + end + + def rotate?(file) + @size_strategy.rotate?(file) || @time_strategy.rotate?(file) + end + + def need_periodic? + true + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/size_rotation_policy.rb b/lib/logstash/outputs/s3/size_rotation_policy.rb new file mode 100644 index 00000000..2b1dbdbc --- /dev/null +++ b/lib/logstash/outputs/s3/size_rotation_policy.rb @@ -0,0 +1,26 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class SizeRotationPolicy + attr_reader :size_file + + def initialize(size_file) + if size_file <= 0 + raise LogStash::ConfigurationError, "`size_file` need to be greather than 0" + end + + @size_file = size_file + end + + def rotate?(file) + file.size >= size_file + end + + def need_periodic? + false + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/temporary_file.rb b/lib/logstash/outputs/s3/temporary_file.rb new file mode 100644 index 00000000..176db6b3 --- /dev/null +++ b/lib/logstash/outputs/s3/temporary_file.rb @@ -0,0 +1,51 @@ +# encoding: utf-8 +require "thread" +require "forwardable" +require "fileutils" + +module LogStash + module Outputs + class S3 + # Wrap the actual file descriptor into an utility classe + # It make it more OOP and easier to reason with the paths. + class TemporaryFile + extend Forwardable + DELEGATES_METHODS = [:path, :write, :close, :size, :fsync] + + def_delegators :@fd, *DELEGATES_METHODS + + def initialize(key, fd, temp_path) + @fd = fd + @key = key + @temp_path = temp_path + @created_at = Time.now + end + + def ctime + @created_at + end + + def temp_path + @temp_path + end + + def key + @key.gsub(/^\//, "") + end + + # Each temporary file is made inside a directory named with an UUID, + # instead of deleting the file directly and having the risk of deleting other files + # we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as + # a sandbox. + def delete! + @fd.close + ::FileUtils.rm_rf(@temp_path, :secure => true) + end + + def empty? + size == 0 + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb new file mode 100644 index 00000000..3b34eb9e --- /dev/null +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -0,0 +1,93 @@ +# encoding: utf-8 +require "socket" +require "securerandom" +require "fileutils" + +module LogStash + module Outputs + class S3 + # Since the file can contains dynamic part, we have to handle a more local structure to + # allow a nice recovery from a crash. + # + # The local structure will look like this. + # + # ///ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz + # + # Since the UUID should be fairly unique I can destroy the whole path when an upload is complete. + # I do not have to mess around to check if the other directory have file in it before destroying them. + class TemporaryFileFactory + FILE_MODE = "a" + GZIP_ENCODING = "gzip" + GZIP_EXTENSION = "txt.gz" + TXT_EXTENSION = "txt" + STRFTIME = "%Y-%m-%dT%H.%M" + + attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current + + def initialize(prefix, tags, encoding, temporary_directory) + @counter = 0 + @prefix = prefix + + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + @lock = Mutex.new + + rotate! + end + + def rotate! + @lock.synchronize { + @current = new_file + increment_counter + @current + } + end + + private + def extension + gzip? ? GZIP_EXTENSION : TXT_EXTENSION + end + + def gzip? + encoding == GZIP_ENCODING + end + + def increment_counter + @counter += 1 + end + + def current_time + Time.now.strftime(STRFTIME) + end + + def generate_name + filename = "ls.s3.#{SecureRandom.uuid}.#{current_time}" + + if tags.size > 0 + "#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}" + else + "#{filename}.part#{counter}.#{extension}" + end + end + + def new_file + uuid = SecureRandom.uuid + name = generate_name + path = ::File.join(temporary_directory, uuid) + key = ::File.join(prefix, name) + + FileUtils.mkdir_p(::File.join(path, prefix)) + + io = if gzip? + Zlib::GzipWriter.open(::File.join(path, key)) + else + ::File.open(::File.join(path, key), FILE_MODE) + end + + TemporaryFile.new(key, io, path) + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/time_rotation_policy.rb b/lib/logstash/outputs/s3/time_rotation_policy.rb new file mode 100644 index 00000000..9fb8adc0 --- /dev/null +++ b/lib/logstash/outputs/s3/time_rotation_policy.rb @@ -0,0 +1,26 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class TimeRotationPolicy + attr_reader :time_file + + def initialize(time_file) + if time_file <= 0 + raise LogStash::ConfigurationError, "`time_file` need to be greather than 0" + end + + @time_file = time_file * 60 + end + + def rotate?(file) + file.size > 0 && (Time.now - file.ctime) >= time_file + end + + def need_periodic? + true + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/uploader.rb b/lib/logstash/outputs/s3/uploader.rb new file mode 100644 index 00000000..3a7251a2 --- /dev/null +++ b/lib/logstash/outputs/s3/uploader.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require "logstash/util" +require "aws-sdk-resources" + +module LogStash + module Outputs + class S3 + class Uploader + TIME_BEFORE_RETRYING_SECONDS = 1 + DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 8, + :max_queue => 1, + :fallback_policy => :caller_runs + }) + + + attr_reader :bucket, :upload_options, :logger + + def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) + @bucket = bucket + @workers_pool = threadpool + @logger = logger + end + + def upload_async(file, options = {}) + @workers_pool.post do + LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}") + upload(file, options) + end + end + + def upload(file, options = {}) + upload_options = options.fetch(:upload_options, {}) + + begin + obj = bucket.object(file.key) + obj.upload_file(file.path, upload_options) + rescue => e + # When we get here it usually mean that S3 tried to do some retry by himself (default is 3) + # When the retry limit is reached or another error happen we will wait and retry. + # + # Thread might be stuck here, but I think its better than losing anything + # its either a transient errors or something bad really happened. + sleep(TIME_BEFORE_RETRYING_SECONDS) + logger.error("Uploading failed, retrying", :exception => e, :path => file.path) + retry + end + + options[:on_complete].call(file) unless options[:on_complete].nil? + end + + def stop + @workers_pool.shutdown + @workers_pool.wait_for_termination(nil) # block until its done + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/writable_directory_validator.rb b/lib/logstash/outputs/s3/writable_directory_validator.rb new file mode 100644 index 00000000..bdcf4256 --- /dev/null +++ b/lib/logstash/outputs/s3/writable_directory_validator.rb @@ -0,0 +1,17 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class WritableDirectoryValidator + def self.valid?(path) + begin + FileUtils.mkdir_p(path) unless Dir.exist?(path) + ::File.writable?(path) + rescue + false + end + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/write_bucket_permission_validator.rb b/lib/logstash/outputs/s3/write_bucket_permission_validator.rb new file mode 100644 index 00000000..0b52a736 --- /dev/null +++ b/lib/logstash/outputs/s3/write_bucket_permission_validator.rb @@ -0,0 +1,49 @@ +# encoding: utf-8 +require "stud/temporary" +require "socket" +require "fileutils" + +module LogStash + module Outputs + class S3 + class WriteBucketPermissionValidator + def self.valid?(bucket_resource) + begin + upload_test_file(bucket_resource) + true + rescue + false + end + end + + private + def self.upload_test_file(bucket_resource) + generated_at = Time.now + + key = "logstash-programmatic-access-test-object-#{generated_at}" + content = "Logstash permission check on #{generated_at}, by #{Socket.gethostname}" + + begin + f = Stud::Temporary.file + f.write(content) + f.fsync + f.close + + obj = bucket_resource.object(key) + obj.upload_file(f) + + begin + obj.delete + rescue + # Try to remove the files on the remote bucket, + # but don't raise any errors if that doesn't work. + # since we only really need `putobject`. + end + ensure + FileUtils.rm_rf(f.path) + end + end + end + end + end +end diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index 3ed6eab6..17a7bb5f 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -22,6 +22,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-mixin-aws' + s.add_runtime_dependency "concurrent-ruby" s.add_runtime_dependency 'stud', '~> 0.0.22' s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'logstash-input-generator' diff --git a/spec/integration/dynamic_prefix_spec.rb b/spec/integration/dynamic_prefix_spec.rb new file mode 100644 index 00000000..71dec731 --- /dev/null +++ b/spec/integration/dynamic_prefix_spec.rb @@ -0,0 +1,92 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Dynamic Prefix", :integration => true do + include_context "setup plugin" + + let(:options) { main_options.merge({ "rotation_strategy" => "size" }) } + let(:sandbox) { "test" } + + before do + clean_remote_files(sandbox) + subject.register + subject.multi_receive_encoded(batch) + subject.close + end + + context "With field string" do + let(:prefix) { "/#{sandbox}/%{server}/%{language}" } + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "es1", "language" => "ruby"}) + b[e1] = "es1-ruby" + e2 = LogStash::Event.new({ "server" => "es2", "language" => "java"}) + b[e2] = "es2-ruby" + b + end + + it "creates a specific quantity of files" do + expect(bucket_resource.objects(:prefix => sandbox).count).to eq(batch.size) + end + + it "creates specific keys" do + re = Regexp.union(/^es1\/ruby\/ls.s3.sashimi/, /^es2\/java\/ls.s3.sashimi/) + + bucket_resource.objects(:prefix => sandbox) do |obj| + expect(obj.key).to match(re) + end + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => sandbox).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(batch.size) + end + end + + context "with unsupported char" do + let(:prefix) { "/#{sandbox}/%{server}/%{language}" } + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "e>s1", "language" => "ruby"}) + b[e1] = "es2-ruby" + b + end + + it "convert them to underscore" do + re = Regexp.union(/^e_s1\/ruby\/ls.s3.sashimi/) + + bucket_resource.objects(:prefix => sandbox) do |obj| + expect(obj.key).to match(re) + end + end + end + + context "with dates" do + let(:prefix) { "/#{sandbox}/%{+YYYY-MM-d}" } + + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "e>s1", "language" => "ruby"}) + b[e1] = "es2-ruby" + b + end + + it "creates dated path" do + re = /^#{sandbox}\/\d{4}-\d{2}-\d{2}\/ls\.s3\./ + expect(bucket_resource.objects(:prefix => sandbox).first.key).to match(re) + end + end +end diff --git a/spec/integration/restore_from_crash_spec.rb b/spec/integration/restore_from_crash_spec.rb new file mode 100644 index 00000000..a119be6f --- /dev/null +++ b/spec/integration/restore_from_crash_spec.rb @@ -0,0 +1,39 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Restore from crash", :integration => true do + include_context "setup plugin" + + let(:options) { main_options.merge({ "restore" => true }) } + + let(:number_of_files) { 5 } + let(:dummy_content) { "foobar\n" * 100 } + + before do + clean_remote_files(prefix) + # Use the S3 factory to create mutliples files with dummy content + factory = LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory) + + # Creating a factory always create a file + factory.current.write(dummy_content) + factory.current.fsync + + (number_of_files - 1).times do + factory.current.write(dummy_content) + factory.current.fsync + factory.rotate! + end + end + + it "uploads the file to the bucket" do + subject.register + try(20) do + expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) + expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + end + end +end + diff --git a/spec/integration/s3_spec.rb b/spec/integration/s3_spec.rb deleted file mode 100644 index 1fe208ba..00000000 --- a/spec/integration/s3_spec.rb +++ /dev/null @@ -1,97 +0,0 @@ -# encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" -require "logstash/outputs/s3" -require 'socket' -require "aws-sdk" -require "fileutils" -require "stud/temporary" -require_relative "../supports/helpers" - -describe LogStash::Outputs::S3, :integration => true, :s3 => true do - before do - Thread.abort_on_exception = true - end - - let!(:minimal_settings) { { "access_key_id" => ENV['AWS_ACCESS_KEY_ID'], - "secret_access_key" => ENV['AWS_SECRET_ACCESS_KEY'], - "bucket" => ENV['AWS_LOGSTASH_TEST_BUCKET'], - "region" => ENV["AWS_REGION"] || "us-east-1", - "temporary_directory" => Stud::Temporary.pathname('temporary_directory') }} - - let!(:s3_object) do - s3output = LogStash::Outputs::S3.new(minimal_settings) - s3output.register - s3output.s3 - end - - after(:each) do - delete_matching_keys_on_bucket('studtmp') - delete_matching_keys_on_bucket('my-prefix') - end - - describe "#register" do - it "write a file on the bucket to check permissions" do - s3 = LogStash::Outputs::S3.new(minimal_settings) - expect(s3.register).not_to raise_error - end - end - - describe "#write_on_bucket" do - after(:each) do - File.unlink(fake_data.path) - end - - let!(:fake_data) { Stud::Temporary.file } - - it "should prefix the file on the bucket if a prefix is specified" do - prefix = "my-prefix" - - config = minimal_settings.merge({ - "prefix" => prefix, - }) - - s3 = LogStash::Outputs::S3.new(config) - s3.register - s3.write_on_bucket(fake_data) - - expect(key_exists_on_bucket?("#{prefix}#{File.basename(fake_data.path)}")).to eq(true) - end - - it 'should use the same local filename if no prefix is specified' do - s3 = LogStash::Outputs::S3.new(minimal_settings) - s3.register - s3.write_on_bucket(fake_data) - - expect(key_exists_on_bucket?(File.basename(fake_data.path))).to eq(true) - end - end - - describe "#move_file_to_bucket" do - let!(:s3) { LogStash::Outputs::S3.new(minimal_settings) } - - before do - s3.register - end - - it "should upload the file if the size > 0" do - tmp = Stud::Temporary.file - allow(File).to receive(:zero?).and_return(false) - s3.move_file_to_bucket(tmp) - expect(key_exists_on_bucket?(File.basename(tmp.path))).to eq(true) - end - end - - describe "#restore_from_crashes" do - it "read the temp directory and upload the matching file to s3" do - Stud::Temporary.pathname do |temp_path| - tempfile = File.open(File.join(temp_path, 'A'), 'w+') { |f| f.write('test')} - - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => temp_path })) - s3.restore_from_crashes - - expect(File.exist?(tempfile.path)).to eq(false) - expect(key_exists_on_bucket?(File.basename(tempfile.path))).to eq(true) - end - end - end -end diff --git a/spec/integration/size_rotation_spec.rb b/spec/integration/size_rotation_spec.rb new file mode 100644 index 00000000..85dced03 --- /dev/null +++ b/spec/integration/size_rotation_spec.rb @@ -0,0 +1,58 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Size rotation", :integration => true do + include_context "setup plugin" + + let(:size_file) { batch_size.times.inject(0) { |sum, i| sum + "#{event_encoded}\n".bytesize } } + let(:options) { main_options.merge({ "rotation_strategy" => "size" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:number_of_files) { number_of_events / batch_size } + + before do + clean_remote_files(prefix) + subject.register + batch.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + subject.close + end + + it "creates a specific quantity of files" do + expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + end + + it "Rotates the files based on size" do + bucket_resource.objects(:prefix => prefix).each do |f| + expect(f.size).to eq(size_file) + end + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/stress_test_spec.rb b/spec/integration/stress_test_spec.rb new file mode 100644 index 00000000..02e74c62 --- /dev/null +++ b/spec/integration/stress_test_spec.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Upload current file on shutdown", :integration => true, :slow => true do + include_context "setup plugin" + let(:stress_time) { ENV["RUNTIME"] || 1 * 60 } + let(:options) { main_options } + + let(:time_file) { 15 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + batch_size.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:workers) { 3 } + + it "Persists all events" do + started_at = Time.now + events_sent = {} + + clean_remote_files(prefix) + subject.register + + workers.times do + Thread.new do + events_sent[Thread.current] = 0 + + while Time.now - started_at < stress_time + subject.multi_receive_encoded(batch) + events_sent[Thread.current] += batch_size + end + end + end + + sleep(1) while Time.now - started_at < stress_time + + subject.close + + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(events_sent.values.inject(0, :+)) + end +end diff --git a/spec/integration/time_based_rotation_with_constant_write_spec.rb b/spec/integration/time_based_rotation_with_constant_write_spec.rb new file mode 100644 index 00000000..e3efd2cf --- /dev/null +++ b/spec/integration/time_based_rotation_with_constant_write_spec.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "File Time rotation with constant write", :integration => true do + include_context "setup plugin" + + let(:time_file) { 0.5 } + let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:minimum_number_of_time_rotation) { 3 } + let(:batch_step) { (number_of_events / minimum_number_of_time_rotation).ceil } + + before do + clean_remote_files(prefix) + subject.register + + # simulate batch read/write + batch.each_slice(batch_step) do |batch_time| + batch_time.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + sleep(time_file * 2) + end + + subject.close + end + + it "creates multiples files" do + # using close will upload the current file + expect(bucket_resource.objects(:prefix => prefix).count).to be_between(minimum_number_of_time_rotation, minimum_number_of_time_rotation + 1).inclusive + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/time_based_rotation_with_stale_write_spec.rb b/spec/integration/time_based_rotation_with_stale_write_spec.rb new file mode 100644 index 00000000..f008e6bd --- /dev/null +++ b/spec/integration/time_based_rotation_with_stale_write_spec.rb @@ -0,0 +1,55 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "File Time rotation with stale write", :integration => true do + include_context "setup plugin" + + let(:time_file) { 1 } + let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + + before do + stub_const('LogStash::Outputs::S3::PERIODIC_CHECK_INTERVAL_IN_SECONDS', 1) + clean_remote_files(prefix) + subject.register + subject.multi_receive_encoded(batch) + sleep(time_file * 5) # the periodic check should have kick int + end + + after do + subject.close + end + + it "create one file" do + # using close will upload the current file + expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/upload_current_file_on_shutdown_spec.rb b/spec/integration/upload_current_file_on_shutdown_spec.rb new file mode 100644 index 00000000..efd32993 --- /dev/null +++ b/spec/integration/upload_current_file_on_shutdown_spec.rb @@ -0,0 +1,53 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Upload current file on shutdown", :integration => true do + include_context "setup plugin" + let(:options) { main_options } + + let(:size_file) { 1000000 } + let(:time_file) { 100000 } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + + before do + clean_remote_files(prefix) + subject.register + batch.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + subject.close + end + + it "creates a specific quantity of files" do + # Since we have really big value of time_file and size_file + expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb new file mode 100644 index 00000000..49d7c096 --- /dev/null +++ b/spec/outputs/s3/file_repository_spec.rb @@ -0,0 +1,145 @@ +# encoding: utf-8 +require "logstash/outputs/s3" +require "stud/temporary" +require "fileutils" +require_relative "../../spec_helper" + +describe LogStash::Outputs::S3::FileRepository do + let(:tags) { ["secret", "service"] } + let(:encoding) { "none" } + let(:temporary_directory) { Stud::Temporary.pathname } + let(:prefix_key) { "a-key" } + + before do + FileUtils.mkdir_p(temporary_directory) + end + + subject { described_class.new(tags, encoding, temporary_directory) } + + it "returns a temporary file" do + subject.get_file(prefix_key) do |file| + expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end + end + + it "returns the same file for the same prefix key" do + file_path = nil + + subject.get_file(prefix_key) do |file| + file_path = file.path + end + + subject.get_file(prefix_key) do |file| + expect(file.path).to eq(file_path) + end + end + + it "returns the same file for the same dynamic prefix key" do + prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/" + event = LogStash::Event.new({ "type" => "syslog"}) + key = event.sprintf(prefix) + file_path = nil + + + subject.get_file(key) do |file| + file_path = file.path + end + + subject.get_file(key) do |file| + expect(file.path).to eq(file_path) + end + end + + it "returns different file for different prefix keys" do + file_path = nil + + subject.get_file(prefix_key) do |file| + file_path = file.path + end + + subject.get_file("another_prefix_key") do |file| + expect(file.path).not_to eq(file_path) + end + end + + it "allows to get the file factory for a specific prefix" do + subject.get_factory(prefix_key) do |factory| + expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) + end + end + + it "returns a different file factory for a different prefix keys" do + factory = nil + + subject.get_factory(prefix_key) do |f| + factory = f + end + + subject.get_factory("another_prefix_key") do |f| + expect(factory).not_to eq(f) + end + end + + it "returns the number of prefix keys" do + expect(subject.size).to eq(0) + subject.get_file(prefix_key) { |file| file.write("something") } + expect(subject.size).to eq(1) + end + + it "returns all available keys" do + subject.get_file(prefix_key) { |file| file.write("something") } + expect(subject.keys.toArray).to eq([prefix_key]) + end + + it "clean stale factories" do + @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(@file_repository.size).to eq(0) + path = "" + @file_repository.get_factory(prefix_key) do |factory| + factory.current.write("hello") + # force a rotation so we get an empty file that will get stale. + factory.rotate! + path = factory.current.temp_path + end + + @file_repository.get_file("another-prefix") { |file| file.write("hello") } + expect(@file_repository.size).to eq(2) + @file_repository.keys.each do |k| + puts k + end + try(10) { expect(@file_repository.size).to eq(1) } + expect(File.directory?(path)).to be_falsey + end +end + + +describe LogStash::Outputs::S3::FileRepository::PrefixedValue do + let(:factory) { spy("factory", :current => file) } + subject { described_class.new(factory, 1) } + + context "#stale?" do + context "the file is empty and older than stale time" do + let(:file) { double("file", :size => 0, :ctime => Time.now - 5) } + + it "returns true" do + expect(subject.stale?).to be_truthy + end + end + + context "when the file has data in it" do + let(:file) { double("file", :size => 200, :ctime => Time.now - 5) } + + it "returns false" do + expect(subject.stale?).to be_falsey + end + end + + context "when the file is not old enough" do + let(:file) { double("file", :size => 0, :ctime => Time.now + 100) } + + it "returns false" do + expect(subject.stale?).to be_falsey + end + end + end +end diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb new file mode 100644 index 00000000..3e822783 --- /dev/null +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -0,0 +1,77 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/size_and_time_rotation_policy" +require "logstash/outputs/s3/temporary_file" + +describe LogStash::Outputs::S3::SizeAndTimeRotationPolicy do + let(:file_size) { 10 } + let(:time_file) { 1 } + subject { described_class.new(file_size, time_file) } + + let(:temporary_directory) { Stud::Temporary.pathname } + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } + + it "raises an exception if the `time_file` is set to 0" do + expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/) + end + + it "raises an exception if the `time_file` is < 0" do + expect { described_class.new(100, -100) }.to raise_error(LogStash::ConfigurationError, /time_file/) + end + + it "raises an exception if the `size_file` is 0" do + expect { described_class.new(0, 100) }.to raise_error(LogStash::ConfigurationError, /size_file/) + end + + it "raises an exception if the `size_file` is < 0" do + expect { described_class.new(-100, 100) }.to raise_error(LogStash::ConfigurationError, /size_file/) + end + + it "returns true if the size on disk is higher than the `file_size`" do + file.write(content) + file.fsync + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false if the size is inferior than the `file_size`" do + expect(subject.rotate?(file)).to be_falsey + end + + context "when the size of the file is superior to 0" do + let(:file_size) { 10000 } + + before :each do + file.write(content) + file.fsync + end + + it "returns true if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - (time_file * 2 * 60) ) + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false is not old enough" do + allow(file).to receive(:ctime).and_return(Time.now + time_file * 10) + expect(subject.rotate?(file)).to be_falsey + end + end + + context "When the size of the file is 0" do + it "returns false if the file old enough" do + expect(subject.rotate?(file)).to be_falsey + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "#need_periodic?" do + it "return true" do + expect(subject.need_periodic?).to be_truthy + end + end +end diff --git a/spec/outputs/s3/size_rotation_policy_spec.rb b/spec/outputs/s3/size_rotation_policy_spec.rb new file mode 100644 index 00000000..e8ca74a3 --- /dev/null +++ b/spec/outputs/s3/size_rotation_policy_spec.rb @@ -0,0 +1,41 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/size_rotation_policy" +require "logstash/outputs/s3/temporary_file" +require "fileutils" + +describe LogStash::Outputs::S3::SizeRotationPolicy do + subject { described_class.new(size_file) } + + let(:temporary_directory) { Stud::Temporary.directory } + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:size_file) { 10 } # in bytes + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } + + it "returns true if the size on disk is higher than the `size_file`" do + file.write(content) + file.fsync + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false if the size is inferior than the `size_file`" do + expect(subject.rotate?(file)).to be_falsey + end + + it "raises an exception if the `size_file` is 0" do + expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /need to be greather than 0/) + end + + it "raises an exception if the `size_file` is < 0" do + expect { described_class.new(-100) }.to raise_error(LogStash::ConfigurationError, /need to be greather than 0/) + end + + context "#need_periodic?" do + it "return false" do + expect(subject.need_periodic?).to be_falsey + end + end + +end diff --git a/spec/outputs/s3/temporary_file_factory_spec.rb b/spec/outputs/s3/temporary_file_factory_spec.rb new file mode 100644 index 00000000..25b7c5a9 --- /dev/null +++ b/spec/outputs/s3/temporary_file_factory_spec.rb @@ -0,0 +1,85 @@ +# encoding: utf-8 +require "logstash/outputs/s3/temporary_file_factory" +require "logstash/outputs/s3/temporary_file" +require "stud/temporary" +require "fileutils" + +describe LogStash::Outputs::S3::TemporaryFileFactory do + let(:prefix) { "foobar" } + let(:tags) { [] } + let(:temporary_directory) { Stud::Temporary.pathname } + + before do + FileUtils.mkdir_p(temporary_directory) + end + + subject { described_class.new(prefix, tags, encoding, temporary_directory) } + + shared_examples "file factory" do + it "creates the file on disk" do + expect(File.exist?(subject.current.path)).to be_truthy + end + + it "create a temporary file when initialized" do + expect(subject.current).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end + + it "create a file in the right format" do + expect(subject.current.path).to match(extension) + end + + it "allow to rotate the file" do + file_path = subject.current.path + expect(subject.rotate!.path).not_to eq(file_path) + end + + it "increments the part name on rotation" do + expect(subject.current.path).to match(/part0/) + expect(subject.rotate!.path).to match(/part1/) + end + + it "includes the date" do + n = Time.now + expect(subject.current.path).to include(n.strftime("%Y-%m-%dT")) + end + + it "include the file key in the path" do + file = subject.current + expect(file.path).to match(/#{file.key}/) + end + + it "create a unique directory in the temporary directory for each file" do + uuid = "hola" + expect(SecureRandom).to receive(:uuid).and_return(uuid).twice + expect(subject.current.path).to include(uuid) + end + + context "with tags supplied" do + let(:tags) { ["secret", "service"] } + + it "adds tags to the filename" do + expect(subject.current.path).to match(/tag_#{tags.join('.')}.part/) + end + end + + context "without tags" do + it "doesn't add tags to the filename" do + expect(subject.current.path).not_to match(/tag_/) + end + end + end + + context "when gzip" do + let(:encoding) { "gzip" } + let(:extension) { /\.txt.gz$/ } + + include_examples "file factory" + end + + context "when encoding set to `none`" do + let(:encoding) { "none" } + let(:extension) { /\.txt$/ } + + include_examples "file factory" + end +end diff --git a/spec/outputs/s3/temporary_file_spec.rb b/spec/outputs/s3/temporary_file_spec.rb new file mode 100644 index 00000000..fd88e1e0 --- /dev/null +++ b/spec/outputs/s3/temporary_file_spec.rb @@ -0,0 +1,46 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/temporary_file" +require "stud/temporary" +require "fileutils" +require "securerandom" + +describe LogStash::Outputs::S3::TemporaryFile do + let(:content) { "hello world" } + let(:key) { "foo" } + let(:uuid) { SecureRandom.uuid } + let(:temporary_file) { ::File.open(::File.join(temporary_directory, uuid, key), "w+") } + let(:temporary_directory) { Stud::Temporary.directory } + + before :each do + FileUtils.mkdir_p(::File.join(temporary_directory, uuid)) + end + + subject { described_class.new(key, temporary_file, temporary_directory) } + + it "returns the key of the file" do + expect(subject.key).to eq(key) + end + + it "saves content to a file" do + subject.write(content) + subject.close + expect(File.read(subject.path).strip).to eq(content) + end + + it "deletes a file" do + expect(File.exist?(subject.path)).to be_truthy + subject.delete! + expect(File.exist?(subject.path)).to be_falsey + end + + described_class::DELEGATES_METHODS.each do |method_name| + it "delegates method `#{method_name}` to file descriptor" do + expect(subject.respond_to?(method_name)).to be_truthy + end + end + + it "returns the creation time" do + expect(subject.ctime).to be < Time.now + 0.5 + end +end diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb new file mode 100644 index 00000000..d6f3407a --- /dev/null +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -0,0 +1,56 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/time_rotation_policy" +require "logstash/outputs/s3/temporary_file" + +describe LogStash::Outputs::S3::TimeRotationPolicy do + subject { described_class.new(max_time) } + + let(:max_time) { 1 } + let(:temporary_directory) { Stud::Temporary.directory } + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } + + it "raises an exception if the `file_time` is set to 0" do + expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) + end + + it "raises an exception if the `file_time` is < 0" do + expect { described_class.new(-100) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) + end + + context "when the size of the file is superior to 0" do + before :each do + file.write(content) + file.fsync + end + + it "returns true if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60)) + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "When the size of the file is 0" do + it "returns false if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60)) + expect(subject.rotate?(file)).to be_falsey + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "#need_periodic?" do + it "return false" do + expect(subject.need_periodic?).to be_truthy + end + end +end diff --git a/spec/outputs/s3/uploader_spec.rb b/spec/outputs/s3/uploader_spec.rb new file mode 100644 index 00000000..16da5427 --- /dev/null +++ b/spec/outputs/s3/uploader_spec.rb @@ -0,0 +1,57 @@ +# Encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/uploader" +require "logstash/outputs/s3/temporary_file" +require "aws-sdk" +require "stud/temporary" + +describe LogStash::Outputs::S3::Uploader do + let(:logger) { spy(:logger ) } + let(:max_upload_workers) { 1 } + let(:bucket_name) { "foobar-bucket" } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:temporary_directory) { Stud::Temporary.pathname } + let(:temporary_file) { Stud::Temporary.file } + let(:key) { "foobar" } + let(:upload_options) { {} } + let(:threadpool) do + Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 8, + :max_queue => 1, + :fallback_policy => :caller_runs + }) + end + + let(:file) do + f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file, temporary_directory) + f.write("random content") + f.fsync + f + end + + subject { described_class.new(bucket, logger, threadpool) } + + it "upload file to the s3 bucket" do + subject.upload(file) + end + + it "execute a callback when the upload is complete" do + callback = proc { |f| } + + expect(callback).to receive(:call).with(file) + subject.upload(file, { :on_complete => callback }) + end + + it "retries errors indefinitively" do + s3 = double("s3").as_null_object + + expect(logger).to receive(:error).with(any_args).once + expect(bucket).to receive(:object).with(file.key).and_return(s3).twice + expect(s3).to receive(:upload_file).with(any_args).and_raise(StandardError) + expect(s3).to receive(:upload_file).with(any_args).and_return(true) + + subject.upload(file) + end +end diff --git a/spec/outputs/s3/writable_directory_validator_spec.rb b/spec/outputs/s3/writable_directory_validator_spec.rb new file mode 100644 index 00000000..96762177 --- /dev/null +++ b/spec/outputs/s3/writable_directory_validator_spec.rb @@ -0,0 +1,40 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/writable_directory_validator" +require "stud/temporary" + +describe LogStash::Outputs::S3::WritableDirectoryValidator do + let(:temporary_directory) { File.join(Stud::Temporary.directory, Time.now.to_i.to_s) } + + subject { described_class } + + context "when the directory doesn't exists" do + it "creates the directory" do + expect(Dir.exist?(temporary_directory)).to be_falsey + expect(subject.valid?(temporary_directory)).to be_truthy + expect(Dir.exist?(temporary_directory)).to be_truthy + end + end + + context "when the directory exist" do + before do + FileUtils.mkdir_p(temporary_directory) + end + + it "doesn't change the directory" do + expect(Dir.exist?(temporary_directory)).to be_truthy + expect(subject.valid?(temporary_directory)).to be_truthy + expect(Dir.exist?(temporary_directory)).to be_truthy + end + end + + it "return false if the directory is not writable" do + expect(::File).to receive(:writable?).with(temporary_directory).and_return(false) + expect(subject.valid?(temporary_directory)).to be_falsey + end + + it "return true if the directory is writable" do + expect(::File).to receive(:writable?).with(temporary_directory).and_return(true) + expect(subject.valid?(temporary_directory)).to be_truthy + end +end diff --git a/spec/outputs/s3/write_bucket_permission_validator_spec.rb b/spec/outputs/s3/write_bucket_permission_validator_spec.rb new file mode 100644 index 00000000..7f36f39f --- /dev/null +++ b/spec/outputs/s3/write_bucket_permission_validator_spec.rb @@ -0,0 +1,38 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/write_bucket_permission_validator" +require "aws-sdk" + +describe LogStash::Outputs::S3::WriteBucketPermissionValidator do + let(:bucket_name) { "foobar" } + let(:obj) { double("s3_object") } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + + subject { described_class } + + before do + expect(bucket).to receive(:object).with(any_args).and_return(obj) + end + + context "when permissions are sufficient" do + it "returns true" do + expect(obj).to receive(:upload_file).with(any_args).and_return(true) + expect(obj).to receive(:delete).and_return(true) + expect(subject.valid?(bucket)).to be_truthy + end + + it "hides delete errors" do + expect(obj).to receive(:upload_file).with(any_args).and_return(true) + expect(obj).to receive(:delete).and_raise(StandardError) + expect(subject.valid?(bucket)).to be_truthy + end + end + + context "when permission aren't sufficient" do + it "returns false" do + expect(obj).to receive(:upload_file).with(any_args).and_raise(StandardError) + expect(subject.valid?(bucket)).to be_falsey + end + end +end diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 77803174..641707d0 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -1,371 +1,78 @@ # encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/s3" +require "logstash/event" require "logstash/codecs/line" -require "logstash/pipeline" -require "aws-sdk" -require "fileutils" -require_relative "../supports/helpers" +require "stud/temporary" describe LogStash::Outputs::S3 do + let(:prefix) { "super/%{server}" } + let(:region) { "us-east-1" } + let(:bucket_name) { "mybucket" } + let(:options) { { "region" => region, "bucket" => bucket_name, "prefix" => prefix } } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:mock_bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:event) { LogStash::Event.new({ "server" => "overwatch" }) } + let(:event_encoded) { "super hype" } + let(:events_and_encoded) { { event => event_encoded } } + + subject { described_class.new(options) } + before do - # We stub all the calls from S3, for more information see: - # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses - AWS.stub! - Thread.abort_on_exception = true + allow(subject).to receive(:bucket_resource).and_return(mock_bucket) + allow(LogStash::Outputs::S3::WriteBucketPermissionValidator).to receive(:valid?).with(mock_bucket).and_return(true) end - let(:minimal_settings) { { "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "my-bucket" } } - - describe "configuration" do - let!(:config) { { "region" => "sa-east-1" } } - + context "#register configuration validation" do describe "signature version" do it "should set the signature version if specified" do - s3 = LogStash::Outputs::S3.new(config.merge({ "signature_version" => 'v4' })) - expect(s3.full_options[:s3_signature_version]).to eql('v4') + ["v2", "v4"].each do |version| + s3 = described_class.new(options.merge({ "signature_version" => version })) + expect(s3.full_options).to include(:s3_signature_version => version) + end end it "should omit the option completely if not specified" do - s3 = LogStash::Outputs::S3.new(config) + s3 = described_class.new(options) expect(s3.full_options.has_key?(:s3_signature_version)).to eql(false) end end - end - - describe "#register" do - it "should create the tmp directory if it doesn't exist" do - temporary_directory = Stud::Temporary.pathname("temporary_directory") - - config = { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash", - "size_file" => 10, - "temporary_directory" => temporary_directory - } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - - expect(Dir.exist?(temporary_directory)).to eq(true) - s3.close - FileUtils.rm_r(temporary_directory) - end - - it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do - config = { - "prefix" => "`no\><^" - } - - s3 = LogStash::Outputs::S3.new(config) - - expect { - s3.register - }.to raise_error(LogStash::ConfigurationError) - end - end - - describe "#generate_temporary_filename" do - before do - allow(Socket).to receive(:gethostname) { "logstash.local" } - end - - it "should add tags to the filename if present" do - config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"}) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.tag_#{config["tags"].join("\.")}\.part0\.txt\Z/) - end - - it "should not add the tags to the filename" do - config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" }) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename(3)).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part3\.txt\Z/) - end - - it "normalized the temp directory to include the trailing slash if missing" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" })) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part0\.txt\Z/) - end - end - - describe "#write_on_bucket" do - let!(:fake_data) { Stud::Temporary.file } - - let(:fake_bucket) do - s3 = double('S3Object') - allow(s3).to receive(:write) - s3 - end - - it "should prefix the file on the bucket if a prefix is specified" do - prefix = "my-prefix" - - config = minimal_settings.merge({ - "prefix" => prefix, - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - - it 'should use the same local filename if no prefix is specified' do - config = minimal_settings.merge({ - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket } - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - end - - describe "#write_events_to_multiple_files?" do - it 'returns true if the size_file is != 0 ' do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 })) - expect(s3.write_events_to_multiple_files?).to eq(true) - end - - it 'returns false if size_file is zero or not set' do - s3 = LogStash::Outputs::S3.new(minimal_settings) - expect(s3.write_events_to_multiple_files?).to eq(false) - end - end - - describe "#write_to_tempfile" do - it "should append the event to a file" do - Stud::Temporary.file("logstash", "a+") do |tmp| - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.tempfile = tmp - s3.write_to_tempfile("test-write") - tmp.rewind - expect(tmp.read).to eq("test-write") - end - end - end - - describe "#rotate_events_log" do - - context "having a single worker" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) } - - before(:each) do - s3.register - end - - it "returns true if the tempfile is over the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 2024001 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to be(true) - end - end - - it "returns false if the tempfile is under the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 100 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to eq(false) - end - end - end - context "having periodic rotations" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024, "time_file" => 6e-10 })) } - let(:tmp) { Tempfile.new('s3_rotation_temp_file') } + describe "temporary directory" do + let(:temporary_directory) { Stud::Temporary.pathname } + let(:options) { super.merge({ "temporary_directory" => temporary_directory }) } - before(:each) do - s3.tempfile = tmp - s3.register + it "creates the directory when it doesn't exist" do + expect(Dir.exist?(temporary_directory)).to be_falsey + subject.register + expect(Dir.exist?(temporary_directory)).to be_truthy end - after(:each) do - s3.close - tmp.close - tmp.unlink - end - - it "raises no error when periodic rotation happen" do - 1000.times do - expect { s3.rotate_events_log? }.not_to raise_error - end + it "raises an error if we cannot write to the directory" do + expect(LogStash::Outputs::S3::WritableDirectoryValidator).to receive(:valid?).with(temporary_directory).and_return(false) + expect { subject.register }.to raise_error(LogStash::ConfigurationError) end end - end - - describe "#move_file_to_bucket" do - subject { LogStash::Outputs::S3.new(minimal_settings) } - - it "should always delete the source file" do - tmp = Stud::Temporary.file - allow(File).to receive(:zero?).and_return(true) - expect(File).to receive(:delete).with(tmp) - - subject.move_file_to_bucket(tmp) + it "validates the prefix" do + s3 = described_class.new(options.merge({ "prefix" => "`no\><^" })) + expect { s3.register }.to raise_error(LogStash::ConfigurationError) end - it 'should not upload the file if the size of the file is zero' do - temp_file = Stud::Temporary.file - allow(temp_file).to receive(:zero?).and_return(true) - - expect(subject).not_to receive(:write_on_bucket) - subject.move_file_to_bucket(temp_file) - end - - it "should upload the file if the size > 0" do - tmp = Stud::Temporary.file - - allow(File).to receive(:zero?).and_return(false) - expect(subject).to receive(:write_on_bucket) - - subject.move_file_to_bucket(tmp) - end - end - - describe "#restore_from_crashes" do - it "read the temp directory and upload the matching file to s3" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" })) - - expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"]) - expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt") - - - s3.restore_from_crashes - end - end - - describe "#receive" do - it "should send the event through the codecs" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} - event = LogStash::Event.new(data) - - expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event) - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) + it "allow to not validate credentials" do + s3 = described_class.new(options.merge({"validate_credentials_on_root_bucket" => false})) + expect(LogStash::Outputs::S3::WriteBucketPermissionValidator).not_to receive(:valid?).with(any_args) s3.register - - s3.receive(event) end end - describe "when rotating the temporary file" do - before { allow(File).to receive(:delete) } - - it "doesn't skip events if using the size_file option" do - Stud::Temporary.directory do |temporary_directory| - size_file = rand(200..20000) - event_count = rand(300..15000) - - config = %Q[ - input { - generator { - count => #{event_count} - } - } - output { - s3 { - access_key_id => "1234" - secret_access_key => "secret" - size_file => #{size_file} - codec => line - temporary_directory => '#{temporary_directory}' - bucket => 'testing' - } - } - ] - - pipeline = LogStash::Pipeline.new(config) - - pipeline_thread = Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - pipeline_thread.join - - events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')]) - expect(events_written_count).to eq(event_count) - end - end - - describe "closing" do - let(:options) do - { - "access_key_id" => 1234, - "secret_access_key" => "secret", - "bucket" => "mahbucket" - } - end - subject do - ::LogStash::Outputs::S3.new(options) - end - - before do - subject.register - end - - it "should be clean" do - subject.do_close - end - - it "should remove all worker threads" do - subject.do_close - sleep 1 - expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false - end + context "receiving events" do + before do + subject.register end - it "doesn't skip events if using the time_file option", :tag => :slow do - Stud::Temporary.directory do |temporary_directory| - time_file = rand(1..2) - number_of_rotation = rand(2..5) - - config = { - "time_file" => time_file, - "codec" => "line", - "temporary_directory" => temporary_directory, - "bucket" => "testing" - } - - s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config)) - # Make the test run in seconds intead of minutes.. - expect(s3).to receive(:periodic_interval).and_return(time_file) - s3.register - - # Force to have a few files rotation - stop_time = Time.now + (number_of_rotation * time_file) - event_count = 0 - - event = LogStash::Event.new("message" => "Hello World") - - until Time.now > stop_time do - s3.receive(event) - event_count += 1 - end - s3.close - - generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')] - - events_written_count = events_in_files(generated_files) - - # Skew times can affect the number of rotation.. - expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1) - expect(events_written_count).to eq(event_count) - end + it "uses `Event#sprintf` for the prefix" do + expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") + subject.multi_receive_encoded(events_and_encoded) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 00000000..36a4a794 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,3 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require_relative "supports/helpers" diff --git a/spec/supports/helpers.rb b/spec/supports/helpers.rb index e34f1ff3..09aa6bc7 100644 --- a/spec/supports/helpers.rb +++ b/spec/supports/helpers.rb @@ -1,14 +1,38 @@ -def delete_matching_keys_on_bucket(prefix) - s3_object.buckets[minimal_settings["bucket"]].objects.with_prefix(prefix).each do |obj| - obj.delete +# encoding: utf-8 +shared_context "setup plugin" do + let(:temporary_directory) { Stud::Temporary.pathname } + + let(:bucket) { ENV["AWS_LOGSTASH_TEST_BUCKET"] } + let(:access_key_id) { ENV["AWS_ACCESS_KEY_ID"] } + let(:secret_access_key) { ENV["AWS_SECRET_ACCESS_KEY"] } + let(:size_file) { 100 } + let(:time_file) { 100 } + let(:tags) { [] } + let(:prefix) { "home" } + let(:region) { "us-east-1" } + + let(:main_options) do + { + "bucket" => bucket, + "prefix" => prefix, + "temporary_directory" => temporary_directory, + "access_key_id" => access_key_id, + "secret_access_key" => secret_access_key, + "size_file" => size_file, + "time_file" => time_file, + "region" => region, + "tags" => [] + } end -end -def key_exists_on_bucket?(key) - s3_object.buckets[minimal_settings["bucket"]].objects[key].exists? -end + let(:client_credentials) { Aws::Credentials.new(access_key_id, secret_access_key) } + let(:bucket_resource) { Aws::S3::Bucket.new(bucket, { :credentials => client_credentials, :region => region }) } -def events_in_files(files) - files.collect { |file| File.foreach(file).count }.inject(&:+) + subject { LogStash::Outputs::S3.new(options) } end +def clean_remote_files(prefix = "") + bucket_resource.objects(:prefix => prefix).each do |object| + object.delete + end +end