Skip to content

Commit

Permalink
Merge pull request #49 from SumoLogic/add-log-metadata-support
Browse files Browse the repository at this point in the history
metadata support
  • Loading branch information
frankreno authored Jun 24, 2019
2 parents 6cb9846 + 95cd815 commit 1cd96a8
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 7 deletions.
41 changes: 35 additions & 6 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies
create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
end

def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type)
response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type))
def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields)
response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
unless response.ok?
raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
end
end

def request_headers(source_host, source_category, source_name, data_type, metric_data_format)
def request_headers(source_host, source_category, source_name, data_type, metric_data_format, collected_fields)
headers = {
'X-Sumo-Name' => source_name,
'X-Sumo-Category' => source_category,
Expand All @@ -38,6 +38,9 @@ def request_headers(source_host, source_category, source_name, data_type, metric
raise RuntimeError, "Invalid #{metric_data_format}, must be graphite or carbon2 or prometheus"
end
end
unless collected_fields.nil?
headers['X-Sumo-Fields'] = collected_fields
end
return headers
end

Expand Down Expand Up @@ -114,8 +117,8 @@ def configure(conf)

if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE
unless conf['log_format'].nil?
unless conf['log_format'] =~ /\A(?:json|text|json_merge)\z/
raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json or json_merge"
unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/
raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields"
end
end
end
Expand Down Expand Up @@ -200,9 +203,28 @@ def sumo_timestamp(time)
time.to_s.length == 13 ? time : time * 1000
end

def sumo_fields(sumo_metadata)
fields = sumo_metadata['fields'] || ""
Hash[
fields.split(',').map do |pair|
k, v = pair.split('=', 2)
[k, v]
end
]
end

def dump_collected_fields(log_fields)
if log_fields.nil?
log_fields
else
log_fields.map{|k,v| "#{k}=#{v}"}.join(',')
end
end

# This method is called every flush interval. Write the buffer chunk
def write(chunk)
messages_list = {}
log_fields = nil

# Sort messages
chunk.msgpack_each do |time, record|
Expand All @@ -229,6 +251,12 @@ def write(chunk)
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
log = dump_log(merge_json(record))
when 'fields'
log_fields = sumo_fields(sumo_metadata)
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
log = dump_log(record)
else
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
Expand Down Expand Up @@ -261,7 +289,8 @@ def write(chunk)
source_category =source_category,
source_name =source_name,
data_type =@data_type,
metric_data_format =@metric_data_format
metric_data_format =@metric_data_format,
collected_fields =dump_collected_fields(log_fields)
)
end

Expand Down
69 changes: 68 additions & 1 deletion test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_invalid_log_format_configure
log_format foo
}
exception = assert_raise(Fluent::ConfigError) {create_driver(config)}
assert_equal("Invalid log_format foo must be text, json or json_merge", exception.message)
assert_equal("Invalid log_format foo must be text, json, json_merge or fields", exception.message)
end

def test_invalid_metrics_data_type
Expand Down Expand Up @@ -116,6 +116,27 @@ def test_emit_json
times:1
end

def test_emit_empty_fields
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format fields
source_category test
source_host test
source_name test
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => 'test'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"test"}\z/,
times:1
end

def test_emit_json_double_encoded
config = %{
endpoint https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234
Expand Down Expand Up @@ -200,6 +221,52 @@ def test_emit_json_merge_timestamp
times:1
end

def test_emit_with_sumo_metadata_with_fields_json_format
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
ENV['HOST'] = "foo"
driver.run do
driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test', '_sumo_metadata' => {
"host": "#{ENV['HOST']}",
"source": "${tag}",
"category": "test",
"fields": "foo=bar, sumo = logic"
}})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test'},
body: /\A{"timestamp":\d+.,"foo":"bar","message":"test"}\z/,
times:1
end

def test_emit_with_sumo_metadata_with_fields_fields_format
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format fields
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
ENV['HOST'] = "foo"
driver.run do
driver.feed("output.test", time, {'foo' => 'shark', 'message' => 'test', '_sumo_metadata' => {
"host": "#{ENV['HOST']}",
"source": "${tag}",
"category": "test",
"fields": "foo=bar, sumo = logic"
}})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'},
body: /\A{"timestamp":\d+.,"foo":"shark","message":"test"}\z/,
times:1
end

def test_emit_with_sumo_metadata
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
Expand Down

0 comments on commit 1cd96a8

Please sign in to comment.