Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for DLQ #110

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.5
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## 5.2.5
## 5.6.0

As per later comment

- Added support for DLQ [#110](https://github.com/logstash-plugins/logstash-output-http/pull/110)

## 5.2.4
- Relax dependency on http_client mixin since current major works on both

Expand Down
38 changes: 37 additions & 1 deletion lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
# If encountered as response codes this plugin will retry these requests
config :retryable_codes, :validate => :number, :list => true, :default => [429, 500, 502, 503, 504]

# If encountered as response codes, this plugin will write these events to DLQ
config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 403, 404, 401]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 403, 404, 401]
config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 401, 403, 404]


# If you would like to consider some non-2xx codes to be successes
# enumerate them here. Responses returning these codes will be considered successes
config :ignorable_codes, :validate => :number, :list => true
Expand Down Expand Up @@ -97,7 +100,7 @@ def register
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times {|t| @request_tokens << true }

@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
@requests = Array.new

if @content_type.nil?
Expand Down Expand Up @@ -153,6 +156,25 @@ def log_error_response(response, url, event)
:event => event
)
end

# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
def write_to_dlq(url, event, response)
if @dlq_writer
log_failure(
"Sending this non-2xx HTTP code #{response.code} to DLQ",
:response_code => response.code,
:url => url,
:event => event
)
metadata = event.get("@metadata")
metadata.each_pair do |key, value|
event.set("[custom_metadata][#{key}]", value)
end
Comment on lines +169 to +172
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metadata field is already stored in the DLQ event, so that it can be read back on the downstream DLQ consumer.
No need to copy the @metadata in custom_metadata field before DLQ write.

@dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}")
else
log_error_response(response, url, event)
end
end

def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
Expand Down Expand Up @@ -242,6 +264,9 @@ def send_event(event, attempt)
if retryable_response?(response)
log_retryable_response(response)
return :retry, event, attempt
elsif dlq_retryable_response?(response)
write_to_dlq(url, event, response)
return :failure, event, attempt
else
log_error_response(response, url, event)
return :failure, event, attempt
Expand Down Expand Up @@ -287,6 +312,10 @@ def retryable_response?(response)
@retryable_codes && @retryable_codes.include?(response.code)
end

def dlq_retryable_response?(response)
@dlq_retryable_codes && @dlq_retryable_codes.include?(response.code)
end

def retryable_exception?(exception)
RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) }
end
Expand Down Expand Up @@ -379,4 +408,11 @@ def validate_format!
end
end
end

def dlq_enabled?
# this is the only way to determine if current logstash is supporting a dlq and dlq is also enabled
# Reference: https://github.com/elastic/logstash/issues/8064
respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) &&
!execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter)
end
end
2 changes: 1 addition & 1 deletion logstash-output-http.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-http'
s.version = '5.2.4'
s.version = '5.2.5'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new feature so it deserve a minor version. Actually main has 5.5.0 so the target could be 5.6.0

Suggested change
s.version = '5.2.5'
s.version = '5.6.0'

s.licenses = ['Apache License (2.0)']
s.summary = "Sends events to a generic HTTP or HTTPS endpoint"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
67 changes: 60 additions & 7 deletions spec/outputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def self.retry_fail_count()
end

multiroute(%w(get post put patch delete), "/bad") do
self.class.last_request = request
[415, "YUP"]
end

multiroute(%w(get post put patch delete), "/dlq_bad") do
self.class.last_request = request
[400, "YUP"]
end
Expand Down Expand Up @@ -117,14 +122,23 @@ def sinatra_run_wait(app, opts)

let(:expected_method) { method.clone.to_sym }
let(:client) { subject.client }
let(:dlq_enabled) { false }
let(:dlq_writer) { double("dlq_writer") }
let(:execution_context) { double("execution_context") }

before do
allow(subject).to receive(:dlq_enabled?).with(any_args).and_return(dlq_enabled)
allow(subject).to receive(:execution_context).with(any_args).and_return(execution_context)
allow(execution_context).to receive(:dlq_writer).with(any_args).and_return(dlq_writer)
allow(dlq_writer).to receive(:write).with(any_args)

subject.register
allow(client).to receive(:send).
with(expected_method, url, anything).
and_call_original
allow(subject).to receive(:log_failure).with(any_args)
allow(subject).to receive(:log_error_response).with(any_args)
allow(subject).to receive(:log_retryable_response).with(any_args)
allow(subject).to receive(:write_to_dlq).with(any_args).and_call_original
end

context 'sending no events' do
Expand All @@ -151,7 +165,7 @@ def sinatra_run_wait(app, opts)
end

it "should not log a failure" do
expect(subject).not_to have_received(:log_failure).with(any_args)
expect(subject).not_to have_received(:log_error_response).with(any_args)
end
end

Expand All @@ -163,20 +177,59 @@ def sinatra_run_wait(app, opts)
end

it "should log a failure" do
expect(subject).to have_received(:log_failure).with(any_args)
expect(subject).to have_received(:log_error_response).with(any_args)
end

it "should not be sent to dlq" do
expect(subject).not_to have_received(:write_to_dlq).with(any_args)
end
end

context "with ignorable failing requests" do
let(:url) { "http://localhost:#{port}/bad"}
let(:verb_behavior_config) { super.merge("ignorable_codes" => [400]) }
let(:verb_behavior_config) { super.merge("ignorable_codes" => [415]) }

before do
subject.multi_receive([event])
end

it "should log a failure" do
expect(subject).not_to have_received(:log_failure).with(any_args)
it "should not log a failure" do
expect(subject).not_to have_received(:log_error_response).with(any_args)
end

it "should not be sent to dlq" do
expect(subject).not_to have_received(:write_to_dlq).with(any_args)
end
end

context "with DLQ qualified failing requests" do
let(:url) { "http://localhost:#{port}/dlq_bad"}
let(:dlq_enabled) { true }
let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) }

before do
subject.multi_receive([event])
end

it "should write to dlq" do
expect(subject).to have_received(:write_to_dlq).with(any_args)
end

it "should not log a failure" do
expect(subject).not_to have_received(:log_error_response).with(any_args)
end
end

context "when DLQ is not enabled" do
let(:url) { "http://localhost:#{port}/dlq_bad"}
let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) }

before do
subject.multi_receive([event])
end

it "should not send the event to the DLQ instead, instead log" do
expect(subject).to have_received(:log_error_response).with(any_args)
end
end

Expand Down Expand Up @@ -211,7 +264,7 @@ def sinatra_run_wait(app, opts)
before do
TestApp.last_request = nil
end

let(:events) { [event] }

describe "with a good code" do
Expand Down