From 02d73acdf109fb3b63d46f15e7b6e3f322ac24ea Mon Sep 17 00:00:00 2001 From: Frank Reno Date: Fri, 14 Jun 2019 18:50:41 -0600 Subject: [PATCH 1/2] metadata support --- lib/fluent/plugin/out_sumologic.rb | 44 ++++++++++++++++--- test/plugin/test_out_sumologic.rb | 69 +++++++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index a2ea15a..abf553c 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -12,14 +12,14 @@ def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies) end - def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type) - response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type)) + def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields) + response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields)) unless response.ok? raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}" end end - def request_headers(source_host, source_category, source_name, data_type, metric_data_format) + def request_headers(source_host, source_category, source_name, data_type, metric_data_format, collected_fields) headers = { 'X-Sumo-Name' => source_name, 'X-Sumo-Category' => source_category, @@ -38,6 +38,9 @@ def request_headers(source_host, source_category, source_name, data_type, metric raise RuntimeError, "Invalid #{metric_data_format}, must be graphite or carbon2 or prometheus" end end + unless collected_fields.nil? + headers['X-Sumo-Fields'] = collected_fields + end return headers end @@ -114,8 +117,8 @@ def configure(conf) if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE unless conf['log_format'].nil? - unless conf['log_format'] =~ /\A(?:json|text|json_merge)\z/ - raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json or json_merge" + unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/ + raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields" end end end @@ -200,9 +203,28 @@ def sumo_timestamp(time) time.to_s.length == 13 ? time : time * 1000 end + def sumo_fields(sumo_metadata) + fields = sumo_metadata['fields'] || "" + Hash[ + fields.split(',').map do |pair| + k, v = pair.split('=', 2) + [k, v] + end + ] + end + + def dump_collected_fields(log_fields) + if log_fields.nil? + log_fields + else + log_fields.map{|k,v| "#{k}=#{v}"}.join(',') + end + end + # This method is called every flush interval. Write the buffer chunk def write(chunk) messages_list = {} + log_fields = nil # Sort messages chunk.msgpack_each do |time, record| @@ -213,6 +235,10 @@ def write(chunk) key = sumo_key(sumo_metadata, chunk) log_format = sumo_metadata['log_format'] || @log_format + if log_format.eql? 'fields' + log_fields = sumo_fields(sumo_metadata) + end + # Strip any unwanted newlines record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!) @@ -229,6 +255,11 @@ def write(chunk) record = { @timestamp_key => sumo_timestamp(time) }.merge(record) end log = dump_log(merge_json(record)) + when 'fields' + if @add_timestamp + record = { :timestamp => sumo_timestamp(time) }.merge(record) + end + log = dump_log(record) else if @add_timestamp record = { @timestamp_key => sumo_timestamp(time) }.merge(record) @@ -261,7 +292,8 @@ def write(chunk) source_category =source_category, source_name =source_name, data_type =@data_type, - metric_data_format =@metric_data_format + metric_data_format =@metric_data_format, + collected_fields =dump_collected_fields(log_fields) ) end diff --git a/test/plugin/test_out_sumologic.rb b/test/plugin/test_out_sumologic.rb index 2f92344..2502d52 100644 --- a/test/plugin/test_out_sumologic.rb +++ b/test/plugin/test_out_sumologic.rb @@ -37,7 +37,7 @@ def test_invalid_log_format_configure log_format foo } exception = assert_raise(Fluent::ConfigError) {create_driver(config)} - assert_equal("Invalid log_format foo must be text, json or json_merge", exception.message) + assert_equal("Invalid log_format foo must be text, json, json_merge or fields", exception.message) end def test_invalid_metrics_data_type @@ -116,6 +116,27 @@ def test_emit_json times:1 end + def test_emit_empty_fields + config = %{ + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + log_format fields + source_category test + source_host test + source_name test + + } + driver = create_driver(config) + time = event_time + stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') + driver.run do + driver.feed("output.test", time, {'message' => 'test'}) + end + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, + body: /\A{"timestamp":\d+.,"message":"test"}\z/, + times:1 + end + def test_emit_json_double_encoded config = %{ endpoint https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234 @@ -200,6 +221,52 @@ def test_emit_json_merge_timestamp times:1 end + def test_emit_with_sumo_metadata_with_fields_json_format + config = %{ + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + log_format json + } + driver = create_driver(config) + time = event_time + stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') + ENV['HOST'] = "foo" + driver.run do + driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test', '_sumo_metadata' => { + "host": "#{ENV['HOST']}", + "source": "${tag}", + "category": "test", + "fields": "foo=bar, sumo = logic" + }}) + end + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test'}, + body: /\A{"timestamp":\d+.,"foo":"bar","message":"test"}\z/, + times:1 + end + + def test_emit_with_sumo_metadata_with_fields_fields_format + config = %{ + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + log_format fields + } + driver = create_driver(config) + time = event_time + stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') + ENV['HOST'] = "foo" + driver.run do + driver.feed("output.test", time, {'foo' => 'shark', 'message' => 'test', '_sumo_metadata' => { + "host": "#{ENV['HOST']}", + "source": "${tag}", + "category": "test", + "fields": "foo=bar, sumo = logic" + }}) + end + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, + body: /\A{"timestamp":\d+.,"foo":"shark","message":"test"}\z/, + times:1 + end + def test_emit_with_sumo_metadata config = %{ endpoint https://collectors.sumologic.com/v1/receivers/http/1234 From 95cd815411a4757fcf8fc4dcbd94f5e553246b16 Mon Sep 17 00:00:00 2001 From: Frank Reno Date: Wed, 19 Jun 2019 21:07:10 -0700 Subject: [PATCH 2/2] Fix comments --- lib/fluent/plugin/out_sumologic.rb | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index abf553c..2512737 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -235,10 +235,6 @@ def write(chunk) key = sumo_key(sumo_metadata, chunk) log_format = sumo_metadata['log_format'] || @log_format - if log_format.eql? 'fields' - log_fields = sumo_fields(sumo_metadata) - end - # Strip any unwanted newlines record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!) @@ -256,8 +252,9 @@ def write(chunk) end log = dump_log(merge_json(record)) when 'fields' + log_fields = sumo_fields(sumo_metadata) if @add_timestamp - record = { :timestamp => sumo_timestamp(time) }.merge(record) + record = { @timestamp_key => sumo_timestamp(time) }.merge(record) end log = dump_log(record) else