Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dynamic prefix field based s3 directory separation #44

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 115 additions & 44 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "thread"
require "tmpdir"
require "fileutils"
require "pathname"


# INFORMATION:
Expand Down Expand Up @@ -60,6 +61,7 @@
# time_file => 5 (optional)
# format => "plain" (optional)
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that)
# }
#
class LogStash::Outputs::S3 < LogStash::Outputs::Base
Expand Down Expand Up @@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
# Specify how many workers to use to upload the files to S3
config :upload_workers_count, :validate => :number, :default => 1

# Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it
config :no_event_wait, :validate => :number, :default => 5

# Define tags to be appended to the file on the S3 bucket.
#
# Example:
Expand Down Expand Up @@ -149,8 +154,13 @@ def aws_service_endpoint(region)
def write_on_bucket(file)
# find and use the bucket
bucket = @s3.buckets[@bucket]

first = Pathname.new(@temporary_directory)
second = Pathname.new(file)

remote_filename = "#{@prefix}#{File.basename(file)}"
remote_filename_path = second.relative_path_from first

remote_filename = remote_filename_path.to_s

@logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)

Expand All @@ -170,17 +180,19 @@ def write_on_bucket(file)

# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
public
def create_temporary_file
filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))

@logger.debug("S3: Creating a new temporary file", :filename => filename)

@file_rotation_lock.synchronize do
unless @tempfile.nil?
@tempfile.close
def create_temporary_file(prefix)
filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix]))
@file_rotation_lock[prefix].synchronize do
unless @tempfile[prefix].nil?
@tempfile[prefix].close
end

if @prefixes.include?(prefix)
dirname = File.dirname(filename)
FileUtils.mkdir_p(dirname) unless File.directory?(dirname)
@logger.debug("S3: Creating a new temporary file", :filename => filename)
@tempfile[prefix] = File.open(filename, "a")
end

@tempfile = File.open(filename, "a")
end
end

Expand All @@ -195,7 +207,11 @@ def register

@s3 = aws_s3_config
@upload_queue = Queue.new
@file_rotation_lock = Mutex.new
@file_rotation_lock = Hash.new
@tempfile = Hash.new
@page_counter = Hash.new
@prefixes = Set.new
@empty_uploads = Hash.new

if @prefix && @prefix =~ S3_INVALID_CHARACTERS
@logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
Expand All @@ -207,15 +223,12 @@ def register
end

test_s3_write

restore_from_crashes if @restore == true
reset_page_counter
create_temporary_file
configure_periodic_rotation if time_file != 0
configure_upload_workers

@codec.on_event do |event, encoded_event|
handle_event(encoded_event)
handle_event(encoded_event, event)
end
end

Expand Down Expand Up @@ -252,13 +265,35 @@ def restore_from_crashes
end
end

public
def need_cleanup?(prefix)
return @empty_uploads[prefix] > @no_event_wait
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a predicate method It should contains an integration mark.
I also suggest we rename it to need_cleanup?(prefix) since it express more necessity.


public
def move_file_to_bucket(file)

@logger.debug("S3: moving to bucket ", :file => file)

basepath = Pathname.new @temporary_directory
dirname = Pathname.new File.dirname(file)
prefix = dirname.relative_path_from(basepath).to_s
@logger.debug("S3: moving the file for prefix", :prefix => prefix)

if !File.zero?(file)
if @prefixes.include? prefix
@empty_uploads[prefix] = 0
end
write_on_bucket(file)
@logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
else
if @prefixes.include? prefix
@empty_uploads[prefix] += 1
end
end

@logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix])

begin
File.delete(file)
rescue Errno::ENOENT
Expand All @@ -267,6 +302,9 @@ def move_file_to_bucket(file)
rescue Errno::EACCES
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
end

clean_prefix(prefix) if need_cleanup?(prefix)

end

public
Expand All @@ -293,9 +331,10 @@ def receive(event)
end

public
def rotate_events_log?
@file_rotation_lock.synchronize do
@tempfile.size > @size_file

def rotate_events_log(prefix)
@file_rotation_lock[prefix].synchronize do
@tempfile[prefix].size > @size_file
end
end

Expand All @@ -305,12 +344,13 @@ def write_events_to_multiple_files?
end

public
def write_to_tempfile(event)
def write_to_tempfile(event, prefix)

begin
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix]))

@file_rotation_lock.synchronize do
@tempfile.syswrite(event)
@file_rotation_lock[prefix].synchronize do
@tempfile[prefix].syswrite(event)
end
rescue Errno::ENOSPC
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
Expand All @@ -322,9 +362,11 @@ def write_to_tempfile(event)
def close
shutdown_upload_workers
@periodic_rotation_thread.stop! if @periodic_rotation_thread

@file_rotation_lock.synchronize do
@tempfile.close unless @tempfile.nil? && @tempfile.closed?

@prefixes.each do |prefix|
@file_rotation_lock[prefix].synchronize do
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use iterators in this case

@prefixes.each do |prefix|
     @file_rotation_lock[prefix].synchronize do
        @tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
      end
end

end

Expand All @@ -335,20 +377,29 @@ def shutdown_upload_workers
end

private
def handle_event(encoded_event)
def handle_event(encoded_event, event)
actualprefix = event.sprintf(@prefix)
if [email protected]? actualprefix
@file_rotation_lock[actualprefix] = Mutex.new
@prefixes.add(actualprefix)
reset_page_counter(actualprefix)
create_temporary_file(actualprefix)
@empty_uploads[actualprefix] = 0
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth it to move all this logic into his own method? What do you think?

initialize_prefix(prefix) or configure_prefix(prefix)


if write_events_to_multiple_files?
if rotate_events_log?
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
if rotate_events_log(actualprefix)
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix]))

move_file_to_bucket_async(@tempfile.path)
next_page
create_temporary_file
move_file_to_bucket_async(@tempfile[actualprefix].path)
next_page(actualprefix)
create_temporary_file(actualprefix)
else
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file)
end
end

write_to_tempfile(encoded_event)
write_to_tempfile(encoded_event, actualprefix)
end

private
Expand All @@ -357,15 +408,30 @@ def configure_periodic_rotation
LogStash::Util::set_thread_name("<S3 periodic uploader")

Stud.interval(periodic_interval, :sleep_then_run => true) do
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)

move_file_to_bucket_async(@tempfile.path)
next_page
create_temporary_file
@tempfile.keys.each do |key|
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path)
move_file_to_bucket_async(@tempfile[key].path)
next_page(key)
create_temporary_file(key)
end
end
end
end

private
def clean_prefix(prefix)
path = File.join(@temporary_directory, prefix)
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
@file_rotation_lock[prefix].synchronize do
@tempfile[prefix].close
Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'}
FileUtils.remove_dir(path)
@prefixes.delete(prefix)
@tempfile.delete(prefix)
@empty_uploads[prefix] = 0
end
end

private
def configure_upload_workers
@logger.debug("S3: Configure upload workers")
Expand Down Expand Up @@ -398,20 +464,25 @@ def upload_worker
end

private
def next_page
@page_counter += 1
def next_page(key)
@page_counter[key] += 1
end

private
def reset_page_counter
@page_counter = 0
def reset_page_counter(key)
@page_counter[key] = 0
end

private
def delete_on_bucket(filename)
bucket = @s3.buckets[@bucket]

first = Pathname.new @temporary_directory
second = Pathname.new filename

remote_filename = "#{@prefix}#{File.basename(filename)}"
remote_filename_path = second.relative_path_from first

remote_filename = remote_filename_path.to_s

@logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket)

Expand Down