diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 8538d4b4..36426914 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,6 +8,7 @@ require "thread" require "tmpdir" require "fileutils" +require "pathname" # INFORMATION: @@ -59,6 +60,7 @@ # size_file => 2048 (optional) # time_file => 5 (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -109,15 +111,8 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => 1 - # Define tags to be appended to the file on the S3 bucket. - # - # Example: - # tags => ["elasticsearch", "logstash", "kibana"] - # - # Will generate this file: - # "ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt" - # - config :tags, :validate => :array, :default => [] + # Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it + config :no_event_wait, :validate => :number, :default => 5 # Exposed attributes for testing purpose. attr_accessor :tempfile @@ -148,8 +143,13 @@ def aws_service_endpoint(region) def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] + + first = Pathname.new(@temporary_directory) + second = Pathname.new(file) - remote_filename = "#{@prefix}#{File.basename(file)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) @@ -169,17 +169,19 @@ def write_on_bucket(file) # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close + def create_temporary_file(prefix) + filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix])) + @file_rotation_lock[prefix].synchronize do + unless @tempfile[prefix].nil? + @tempfile[prefix].close + end + + if @prefixes.include?(prefix) + dirname = File.dirname(filename) + FileUtils.mkdir_p(dirname) unless File.directory?(dirname) + @logger.debug("S3: Creating a new temporary file", :filename => filename) + @tempfile[prefix] = File.open(filename, "a") end - - @tempfile = File.open(filename, "a") end end @@ -194,7 +196,11 @@ def register @s3 = aws_s3_config @upload_queue = Queue.new - @file_rotation_lock = Mutex.new + @file_rotation_lock = Hash.new + @tempfile = Hash.new + @page_counter = Hash.new + @prefixes = Set.new + @empty_uploads = Hash.new if @prefix && @prefix =~ S3_INVALID_CHARACTERS @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) @@ -206,15 +212,12 @@ def register end test_s3_write - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @codec.on_event do |event, encoded_event| - handle_event(encoded_event) + handle_event(encoded_event, event) end end @@ -251,13 +254,35 @@ def restore_from_crashes end end + public + def need_cleanup?(prefix) + return @empty_uploads[prefix] > @no_event_wait + end + public def move_file_to_bucket(file) + + @logger.debug("S3: moving to bucket ", :file => file) + + basepath = Pathname.new @temporary_directory + dirname = Pathname.new File.dirname(file) + prefix = dirname.relative_path_from(basepath).to_s + @logger.debug("S3: moving the file for prefix", :prefix => prefix) + if !File.zero?(file) + if @prefixes.include? prefix + @empty_uploads[prefix] = 0 + end write_on_bucket(file) @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + else + if @prefixes.include? prefix + @empty_uploads[prefix] += 1 + end end + @logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix]) + begin File.delete(file) rescue Errno::ENOENT @@ -266,6 +291,9 @@ def move_file_to_bucket(file) rescue Errno::EACCES @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end + + clean_prefix(prefix) if need_cleanup?(prefix) + end public @@ -292,9 +320,10 @@ def receive(event) end public - def rotate_events_log? - @file_rotation_lock.synchronize do - @tempfile.size > @size_file + + def rotate_events_log(prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].size > @size_file end end @@ -304,12 +333,13 @@ def write_events_to_multiple_files? end public - def write_to_tempfile(event) + def write_to_tempfile(event, prefix) + begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix])) - @file_rotation_lock.synchronize do - @tempfile.syswrite(event) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].syswrite(event) end rescue Errno::ENOSPC @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) @@ -318,12 +348,14 @@ def write_to_tempfile(event) end public - def close + def close() shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? + + @prefixes.each do |prefix| + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? + end end end @@ -334,20 +366,29 @@ def shutdown_upload_workers end private - def handle_event(encoded_event) + def handle_event(encoded_event, event) + actualprefix = event.sprintf(@prefix) + if !@prefixes.include? actualprefix + @file_rotation_lock[actualprefix] = Mutex.new + @prefixes.add(actualprefix) + reset_page_counter(actualprefix) + create_temporary_file(actualprefix) + @empty_uploads[actualprefix] = 0 + end + if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + if rotate_events_log(actualprefix) + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix])) - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + move_file_to_bucket_async(@tempfile[actualprefix].path) + next_page(actualprefix) + create_temporary_file(actualprefix) else - @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file) end end - write_to_tempfile(encoded_event) + write_to_tempfile(encoded_event, actualprefix) end private @@ -355,16 +396,33 @@ def configure_periodic_rotation @periodic_rotation_thread = Stud::Task.new do LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + Stud.interval(periodic_interval, :sleep_then_run => true) do + + @tempfile.keys.each do |key| + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path) + move_file_to_bucket_async(@tempfile[key].path) + next_page(key) + create_temporary_file(key) + end + end end end + private + def clean_prefix(prefix) + path = File.join(@temporary_directory, prefix) + @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close + Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'} + FileUtils.remove_dir(path) + @prefixes.delete(prefix) + @tempfile.delete(prefix) + @empty_uploads[prefix] = 0 + end + end + private def configure_upload_workers @logger.debug("S3: Configure upload workers") @@ -397,20 +455,25 @@ def upload_worker end private - def next_page - @page_counter += 1 + def next_page(key) + @page_counter[key] += 1 end private - def reset_page_counter - @page_counter = 0 + def reset_page_counter(key) + @page_counter[key] = 0 end private def delete_on_bucket(filename) bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new filename - remote_filename = "#{@prefix}#{File.basename(filename)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket)