diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 1f50d107..7eb7acaf 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,6 +8,7 @@ require "thread" require "tmpdir" require "fileutils" +require "pathname" # INFORMATION: @@ -60,6 +61,7 @@ # time_file => 5 (optional) # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => 1 + # Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it + config :no_event_wait, :validate => :number, :default => 5 + # Define tags to be appended to the file on the S3 bucket. # # Example: @@ -149,8 +154,13 @@ def aws_service_endpoint(region) def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] + + first = Pathname.new(@temporary_directory) + second = Pathname.new(file) - remote_filename = "#{@prefix}#{File.basename(file)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) @@ -170,17 +180,19 @@ def write_on_bucket(file) # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close + def create_temporary_file(prefix) + filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix])) + @file_rotation_lock[prefix].synchronize do + unless @tempfile[prefix].nil? + @tempfile[prefix].close + end + + if @prefixes.include?(prefix) + dirname = File.dirname(filename) + FileUtils.mkdir_p(dirname) unless File.directory?(dirname) + @logger.debug("S3: Creating a new temporary file", :filename => filename) + @tempfile[prefix] = File.open(filename, "a") end - - @tempfile = File.open(filename, "a") end end @@ -195,7 +207,11 @@ def register @s3 = aws_s3_config @upload_queue = Queue.new - @file_rotation_lock = Mutex.new + @file_rotation_lock = Hash.new + @tempfile = Hash.new + @page_counter = Hash.new + @prefixes = Set.new + @empty_uploads = Hash.new if @prefix && @prefix =~ S3_INVALID_CHARACTERS @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) @@ -207,15 +223,12 @@ def register end test_s3_write - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @codec.on_event do |event, encoded_event| - handle_event(encoded_event) + handle_event(encoded_event, event) end end @@ -252,13 +265,35 @@ def restore_from_crashes end end + public + def need_cleanup?(prefix) + return @empty_uploads[prefix] > @no_event_wait + end + public def move_file_to_bucket(file) + + @logger.debug("S3: moving to bucket ", :file => file) + + basepath = Pathname.new @temporary_directory + dirname = Pathname.new File.dirname(file) + prefix = dirname.relative_path_from(basepath).to_s + @logger.debug("S3: moving the file for prefix", :prefix => prefix) + if !File.zero?(file) + if @prefixes.include? prefix + @empty_uploads[prefix] = 0 + end write_on_bucket(file) @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + else + if @prefixes.include? prefix + @empty_uploads[prefix] += 1 + end end + @logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix]) + begin File.delete(file) rescue Errno::ENOENT @@ -267,6 +302,9 @@ def move_file_to_bucket(file) rescue Errno::EACCES @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end + + clean_prefix(prefix) if need_cleanup?(prefix) + end public @@ -293,9 +331,10 @@ def receive(event) end public - def rotate_events_log? - @file_rotation_lock.synchronize do - @tempfile.size > @size_file + + def rotate_events_log(prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].size > @size_file end end @@ -305,12 +344,13 @@ def write_events_to_multiple_files? end public - def write_to_tempfile(event) + def write_to_tempfile(event, prefix) + begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix])) - @file_rotation_lock.synchronize do - @tempfile.syswrite(event) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].syswrite(event) end rescue Errno::ENOSPC @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) @@ -322,9 +362,11 @@ def write_to_tempfile(event) def close shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? + + @prefixes.each do |prefix| + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? + end end end @@ -335,20 +377,29 @@ def shutdown_upload_workers end private - def handle_event(encoded_event) + def handle_event(encoded_event, event) + actualprefix = event.sprintf(@prefix) + if !@prefixes.include? actualprefix + @file_rotation_lock[actualprefix] = Mutex.new + @prefixes.add(actualprefix) + reset_page_counter(actualprefix) + create_temporary_file(actualprefix) + @empty_uploads[actualprefix] = 0 + end + if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + if rotate_events_log(actualprefix) + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix])) - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + move_file_to_bucket_async(@tempfile[actualprefix].path) + next_page(actualprefix) + create_temporary_file(actualprefix) else - @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file) end end - write_to_tempfile(encoded_event) + write_to_tempfile(encoded_event, actualprefix) end private @@ -357,15 +408,30 @@ def configure_periodic_rotation LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + @tempfile.keys.each do |key| + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path) + move_file_to_bucket_async(@tempfile[key].path) + next_page(key) + create_temporary_file(key) + end end end end + private + def clean_prefix(prefix) + path = File.join(@temporary_directory, prefix) + @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close + Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'} + FileUtils.remove_dir(path) + @prefixes.delete(prefix) + @tempfile.delete(prefix) + @empty_uploads[prefix] = 0 + end + end + private def configure_upload_workers @logger.debug("S3: Configure upload workers") @@ -398,20 +464,25 @@ def upload_worker end private - def next_page - @page_counter += 1 + def next_page(key) + @page_counter[key] += 1 end private - def reset_page_counter - @page_counter = 0 + def reset_page_counter(key) + @page_counter[key] = 0 end private def delete_on_bucket(filename) bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new filename - remote_filename = "#{@prefix}#{File.basename(filename)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket)