From 95bf768fa4a01a47fdb2bb8008b033f3c4813f29 Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 10 Sep 2015 09:17:52 +0530 Subject: [PATCH 1/3] Support for dynamic field values in prefix --- lib/logstash/outputs/s3.rb | 158 +++++++++++++++++++++++++++---------- 1 file changed, 115 insertions(+), 43 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 1f50d107..114686a8 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,6 +8,7 @@ require "thread" require "tmpdir" require "fileutils" +require 'pathname' # INFORMATION: @@ -60,6 +61,7 @@ # time_file => 5 (optional) # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => 1 + # Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it + config :no_event_wait, :validate => :number, :default => 5 + # Define tags to be appended to the file on the S3 bucket. # # Example: @@ -149,8 +154,13 @@ def aws_service_endpoint(region) def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new file - remote_filename = "#{@prefix}#{File.basename(file)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) @@ -170,17 +180,21 @@ def write_on_bucket(file) # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close + def create_temporary_file(prefix) + filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix])) + @file_rotation_lock[prefix].synchronize do + unless @tempfile[prefix].nil? + @tempfile[prefix].close + end + + if @prefixes.include? prefix + dirname = File.dirname(filename) + unless File.directory?(dirname) + FileUtils.mkdir_p(dirname) + end + @logger.debug("S3: Creating a new temporary file", :filename => filename) + @tempfile[prefix] = File.open(filename, "a") end - - @tempfile = File.open(filename, "a") end end @@ -195,7 +209,11 @@ def register @s3 = aws_s3_config @upload_queue = Queue.new - @file_rotation_lock = Mutex.new + @file_rotation_lock = Hash.new + @tempfile = Hash.new + @page_counter = Hash.new + @prefixes = Set.new + @empty_uploads = Hash.new if @prefix && @prefix =~ S3_INVALID_CHARACTERS @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) @@ -207,15 +225,14 @@ def register end test_s3_write - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file + #reset_page_counter + #create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @codec.on_event do |event, encoded_event| - handle_event(encoded_event) + handle_event(encoded_event, event) end end @@ -252,13 +269,36 @@ def restore_from_crashes end end + public + def shouldcleanup(prefix) + return @empty_uploads[prefix] > @no_event_wait + end + public def move_file_to_bucket(file) + + @logger.debug("S3: moving to bucket ", :file => file) + + basepath = Pathname.new @temporary_directory + dirname = Pathname.new File.dirname(file) + prefixpath = dirname.relative_path_from basepath + prefix = prefixpath.to_s + @logger.debug("S3: moving the file for prefix", :prefix => prefix) + if !File.zero?(file) + if @prefixes.include? prefix + @empty_uploads[prefix] = 0 + end write_on_bucket(file) @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + else + if @prefixes.include? prefix + @empty_uploads[prefix] += 1 + end end + @logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix]) + begin File.delete(file) rescue Errno::ENOENT @@ -267,6 +307,10 @@ def move_file_to_bucket(file) rescue Errno::EACCES @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end + + if shouldcleanup(prefix) + cleanprefix(prefix) + end end public @@ -293,9 +337,10 @@ def receive(event) end public - def rotate_events_log? - @file_rotation_lock.synchronize do - @tempfile.size > @size_file + + def rotate_events_log(prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].size > @size_file end end @@ -305,12 +350,13 @@ def write_events_to_multiple_files? end public - def write_to_tempfile(event) + def write_to_tempfile(event, prefix) + begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix])) - @file_rotation_lock.synchronize do - @tempfile.syswrite(event) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].syswrite(event) end rescue Errno::ENOSPC @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) @@ -322,9 +368,11 @@ def write_to_tempfile(event) def close shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? + + for prefix in @prefixes + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? + end end end @@ -335,20 +383,29 @@ def shutdown_upload_workers end private - def handle_event(encoded_event) + def handle_event(encoded_event, event) + actualprefix = event.sprintf(@prefix) + if not @prefixes.to_a().include? actualprefix + @file_rotation_lock[actualprefix] = Mutex.new + @prefixes.add(actualprefix) + reset_page_counter(actualprefix) + create_temporary_file(actualprefix) + @empty_uploads[actualprefix] = 0 + end + if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + if rotate_events_log(actualprefix) + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix])) - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + move_file_to_bucket_async(@tempfile[actualprefix].path) + next_page(actualprefix) + create_temporary_file(actualprefix) else - @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file) end end - write_to_tempfile(encoded_event) + write_to_tempfile(encoded_event, actualprefix) end private @@ -357,15 +414,30 @@ def configure_periodic_rotation LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - move_file_to_bucket_async(@tempfile.path) - next_page - create_temporary_file + @tempfile.keys.each do |key| + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path) + move_file_to_bucket_async(@tempfile[key].path) + next_page(key) + create_temporary_file(key) + end end end end + private + def cleanprefix(prefix) + path = File.join(@temporary_directory, prefix) + @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) + @file_rotation_lock[prefix].synchronize do + @tempfile[prefix].close + Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'} + FileUtils.remove_dir(path) + @prefixes.delete(prefix) + @tempfile.delete(prefix) + @empty_uploads[prefix] = 0 + end + end + private def configure_upload_workers @logger.debug("S3: Configure upload workers") @@ -398,13 +470,13 @@ def upload_worker end private - def next_page - @page_counter += 1 + def next_page(key) + @page_counter[key] += 1 end private - def reset_page_counter - @page_counter = 0 + def reset_page_counter(key) + @page_counter[key] = 0 end private From b6d430d8facbcbfaf28056bc405a3fba08f4aafa Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 10 Sep 2015 09:28:34 +0530 Subject: [PATCH 2/3] Fixing the issue with deletion of temp file along with the dynamic prefix changes --- lib/logstash/outputs/s3.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 114686a8..df149957 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -482,8 +482,13 @@ def reset_page_counter(key) private def delete_on_bucket(filename) bucket = @s3.buckets[@bucket] + + first = Pathname.new @temporary_directory + second = Pathname.new filename - remote_filename = "#{@prefix}#{File.basename(filename)}" + remote_filename_path = second.relative_path_from first + + remote_filename = remote_filename_path.to_s @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket) From b127992f7951202beb3c83aad255f0656a65d9ec Mon Sep 17 00:00:00 2001 From: Nitin Goel Date: Thu, 26 Nov 2015 09:31:46 +0530 Subject: [PATCH 3/3] Incorporating the review comments for the dynamic prefix based s3 outputs --- lib/logstash/outputs/s3.rb | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index df149957..7eb7acaf 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -8,7 +8,7 @@ require "thread" require "tmpdir" require "fileutils" -require 'pathname' +require "pathname" # INFORMATION: @@ -61,7 +61,7 @@ # time_file => 5 (optional) # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) -# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that) +# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base @@ -155,8 +155,8 @@ def write_on_bucket(file) # find and use the bucket bucket = @s3.buckets[@bucket] - first = Pathname.new @temporary_directory - second = Pathname.new file + first = Pathname.new(@temporary_directory) + second = Pathname.new(file) remote_filename_path = second.relative_path_from first @@ -187,11 +187,9 @@ def create_temporary_file(prefix) @tempfile[prefix].close end - if @prefixes.include? prefix + if @prefixes.include?(prefix) dirname = File.dirname(filename) - unless File.directory?(dirname) - FileUtils.mkdir_p(dirname) - end + FileUtils.mkdir_p(dirname) unless File.directory?(dirname) @logger.debug("S3: Creating a new temporary file", :filename => filename) @tempfile[prefix] = File.open(filename, "a") end @@ -226,8 +224,6 @@ def register test_s3_write restore_from_crashes if @restore == true - #reset_page_counter - #create_temporary_file configure_periodic_rotation if time_file != 0 configure_upload_workers @@ -270,7 +266,7 @@ def restore_from_crashes end public - def shouldcleanup(prefix) + def need_cleanup?(prefix) return @empty_uploads[prefix] > @no_event_wait end @@ -281,8 +277,7 @@ def move_file_to_bucket(file) basepath = Pathname.new @temporary_directory dirname = Pathname.new File.dirname(file) - prefixpath = dirname.relative_path_from basepath - prefix = prefixpath.to_s + prefix = dirname.relative_path_from(basepath).to_s @logger.debug("S3: moving the file for prefix", :prefix => prefix) if !File.zero?(file) @@ -308,9 +303,8 @@ def move_file_to_bucket(file) @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) end - if shouldcleanup(prefix) - cleanprefix(prefix) - end + clean_prefix(prefix) if need_cleanup?(prefix) + end public @@ -369,7 +363,7 @@ def close shutdown_upload_workers @periodic_rotation_thread.stop! if @periodic_rotation_thread - for prefix in @prefixes + @prefixes.each do |prefix| @file_rotation_lock[prefix].synchronize do @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed? end @@ -385,7 +379,7 @@ def shutdown_upload_workers private def handle_event(encoded_event, event) actualprefix = event.sprintf(@prefix) - if not @prefixes.to_a().include? actualprefix + if !@prefixes.include? actualprefix @file_rotation_lock[actualprefix] = Mutex.new @prefixes.add(actualprefix) reset_page_counter(actualprefix) @@ -425,7 +419,7 @@ def configure_periodic_rotation end private - def cleanprefix(prefix) + def clean_prefix(prefix) path = File.join(@temporary_directory, prefix) @logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix) @file_rotation_lock[prefix].synchronize do