diff --git a/CHANGELOG.md b/CHANGELOG.md index 1442dbd..4302fb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.5 + - Added support for DLQ [#110](https://github.com/logstash-plugins/logstash-output-http/pull/110) + ## 5.2.4 - Relax dependency on http_client mixin since current major works on both diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 3f37af7..39fe108 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -60,6 +60,9 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base # If encountered as response codes this plugin will retry these requests config :retryable_codes, :validate => :number, :list => true, :default => [429, 500, 502, 503, 504] + # If encountered as response codes, this plugin will write these events to DLQ + config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 403, 404, 401] + # If you would like to consider some non-2xx codes to be successes # enumerate them here. Responses returning these codes will be considered successes config :ignorable_codes, :validate => :number, :list => true @@ -97,7 +100,7 @@ def register # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } - + @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil @requests = Array.new if @content_type.nil? @@ -153,6 +156,25 @@ def log_error_response(response, url, event) :event => event ) end + + # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) + def write_to_dlq(url, event, response) + if @dlq_writer + log_failure( + "Sending this non-2xx HTTP code #{response.code} to DLQ", + :response_code => response.code, + :url => url, + :event => event + ) + metadata = event.get("@metadata") + metadata.each_pair do |key, value| + event.set("[custom_metadata][#{key}]", value) + end + @dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}") + else + log_error_response(response, url, event) + end + end def send_events(events) successes = java.util.concurrent.atomic.AtomicInteger.new(0) @@ -242,6 +264,9 @@ def send_event(event, attempt) if retryable_response?(response) log_retryable_response(response) return :retry, event, attempt + elsif dlq_retryable_response?(response) + write_to_dlq(url, event, response) + return :failure, event, attempt else log_error_response(response, url, event) return :failure, event, attempt @@ -287,6 +312,10 @@ def retryable_response?(response) @retryable_codes && @retryable_codes.include?(response.code) end + def dlq_retryable_response?(response) + @dlq_retryable_codes && @dlq_retryable_codes.include?(response.code) + end + def retryable_exception?(exception) RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) } end @@ -379,4 +408,11 @@ def validate_format! end end end + + def dlq_enabled? + # this is the only way to determine if current logstash is supporting a dlq and dlq is also enabled + # Reference: https://github.com/elastic/logstash/issues/8064 + respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) && + !execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter) + end end diff --git a/logstash-output-http.gemspec b/logstash-output-http.gemspec index c5d2d87..6c0dfa5 100644 --- a/logstash-output-http.gemspec +++ b/logstash-output-http.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-http' - s.version = '5.2.4' + s.version = '5.2.5' s.licenses = ['Apache License (2.0)'] s.summary = "Sends events to a generic HTTP or HTTPS endpoint" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/http_spec.rb b/spec/outputs/http_spec.rb index 24aa344..3ef9c9c 100644 --- a/spec/outputs/http_spec.rb +++ b/spec/outputs/http_spec.rb @@ -56,6 +56,11 @@ def self.retry_fail_count() end multiroute(%w(get post put patch delete), "/bad") do + self.class.last_request = request + [415, "YUP"] + end + + multiroute(%w(get post put patch delete), "/dlq_bad") do self.class.last_request = request [400, "YUP"] end @@ -117,14 +122,23 @@ def sinatra_run_wait(app, opts) let(:expected_method) { method.clone.to_sym } let(:client) { subject.client } + let(:dlq_enabled) { false } + let(:dlq_writer) { double("dlq_writer") } + let(:execution_context) { double("execution_context") } before do + allow(subject).to receive(:dlq_enabled?).with(any_args).and_return(dlq_enabled) + allow(subject).to receive(:execution_context).with(any_args).and_return(execution_context) + allow(execution_context).to receive(:dlq_writer).with(any_args).and_return(dlq_writer) + allow(dlq_writer).to receive(:write).with(any_args) + subject.register allow(client).to receive(:send). with(expected_method, url, anything). and_call_original - allow(subject).to receive(:log_failure).with(any_args) + allow(subject).to receive(:log_error_response).with(any_args) allow(subject).to receive(:log_retryable_response).with(any_args) + allow(subject).to receive(:write_to_dlq).with(any_args).and_call_original end context 'sending no events' do @@ -151,7 +165,7 @@ def sinatra_run_wait(app, opts) end it "should not log a failure" do - expect(subject).not_to have_received(:log_failure).with(any_args) + expect(subject).not_to have_received(:log_error_response).with(any_args) end end @@ -163,20 +177,59 @@ def sinatra_run_wait(app, opts) end it "should log a failure" do - expect(subject).to have_received(:log_failure).with(any_args) + expect(subject).to have_received(:log_error_response).with(any_args) + end + + it "should not be sent to dlq" do + expect(subject).not_to have_received(:write_to_dlq).with(any_args) end end context "with ignorable failing requests" do let(:url) { "http://localhost:#{port}/bad"} - let(:verb_behavior_config) { super.merge("ignorable_codes" => [400]) } + let(:verb_behavior_config) { super.merge("ignorable_codes" => [415]) } before do subject.multi_receive([event]) end - it "should log a failure" do - expect(subject).not_to have_received(:log_failure).with(any_args) + it "should not log a failure" do + expect(subject).not_to have_received(:log_error_response).with(any_args) + end + + it "should not be sent to dlq" do + expect(subject).not_to have_received(:write_to_dlq).with(any_args) + end + end + + context "with DLQ qualified failing requests" do + let(:url) { "http://localhost:#{port}/dlq_bad"} + let(:dlq_enabled) { true } + let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) } + + before do + subject.multi_receive([event]) + end + + it "should write to dlq" do + expect(subject).to have_received(:write_to_dlq).with(any_args) + end + + it "should not log a failure" do + expect(subject).not_to have_received(:log_error_response).with(any_args) + end + end + + context "when DLQ is not enabled" do + let(:url) { "http://localhost:#{port}/dlq_bad"} + let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) } + + before do + subject.multi_receive([event]) + end + + it "should not send the event to the DLQ instead, instead log" do + expect(subject).to have_received(:log_error_response).with(any_args) end end @@ -211,7 +264,7 @@ def sinatra_run_wait(app, opts) before do TestApp.last_request = nil end - + let(:events) { [event] } describe "with a good code" do