From 93542f9328c1989cd5ed08202ad350b963babcda Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Tue, 24 Mar 2020 04:59:53 +0530 Subject: [PATCH 1/4] Added support for DLQ 1. Used the same logic as elasticsearch output plugin to find out if dlq is enabled or not [execution_context? && execution_context.dlq_writer? && execution_context.dlq_writer is not a dummy writer?] 2. if dlq is enabled, send it to dlq_writer or log and drop the the events otherwise. Fixes logstash-plugins#109 Signed-off-by: RashmiRam --- lib/logstash/outputs/http.rb | 28 +++++++++++++++++- spec/outputs/http_spec.rb | 57 ++++++++++++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 3f37af7..40ceb3a 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -60,6 +60,9 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base # If encountered as response codes this plugin will retry these requests config :retryable_codes, :validate => :number, :list => true, :default => [429, 500, 502, 503, 504] + # If encountered as response codes, this plugin will write these events to DLQ + config :dlq_retryable_codes, :validate => :number, :list => true, :default => [400, 403, 404, 401] + # If you would like to consider some non-2xx codes to be successes # enumerate them here. Responses returning these codes will be considered successes config :ignorable_codes, :validate => :number, :list => true @@ -97,7 +100,7 @@ def register # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } - + @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil @requests = Array.new if @content_type.nil? @@ -154,6 +157,15 @@ def log_error_response(response, url, event) ) end + def write_to_dlq(url, event, response) + # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) + if @dlq_writer + @dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}") + else + log_error_response(response, url, event) + end + end + def send_events(events) successes = java.util.concurrent.atomic.AtomicInteger.new(0) failures = java.util.concurrent.atomic.AtomicInteger.new(0) @@ -242,6 +254,9 @@ def send_event(event, attempt) if retryable_response?(response) log_retryable_response(response) return :retry, event, attempt + elsif dlq_retryable_response?(response) + write_to_dlq(url, event, response) + return :failure, event, attempt else log_error_response(response, url, event) return :failure, event, attempt @@ -287,6 +302,10 @@ def retryable_response?(response) @retryable_codes && @retryable_codes.include?(response.code) end + def dlq_retryable_response?(response) + @dlq_retryable_codes && @dlq_retryable_codes.include?(response.code) + end + def retryable_exception?(exception) RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) } end @@ -379,4 +398,11 @@ def validate_format! end end end + + def dlq_enabled? + # this is the only way to determine if current logstash is supporting a dlq and dlq is also enabled + # Reference: https://github.com/elastic/logstash/issues/8064 + respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) && + !execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter) + end end diff --git a/spec/outputs/http_spec.rb b/spec/outputs/http_spec.rb index 24aa344..6913adb 100644 --- a/spec/outputs/http_spec.rb +++ b/spec/outputs/http_spec.rb @@ -56,6 +56,11 @@ def self.retry_fail_count() end multiroute(%w(get post put patch delete), "/bad") do + self.class.last_request = request + [415, "YUP"] + end + + multiroute(%w(get post put patch delete), "/dlq_bad") do self.class.last_request = request [400, "YUP"] end @@ -117,14 +122,23 @@ def sinatra_run_wait(app, opts) let(:expected_method) { method.clone.to_sym } let(:client) { subject.client } + let(:dlq_enabled) { false } + let(:dlq_writer) { double("dlq_writer") } + let(:execution_context) { double("execution_context") } before do + allow(subject).to receive(:dlq_enabled?).with(any_args).and_return(dlq_enabled) + allow(subject).to receive(:execution_context).with(any_args).and_return(execution_context) + allow(execution_context).to receive(:dlq_writer).with(any_args).and_return(dlq_writer) + allow(dlq_writer).to receive(:write).with(any_args) + subject.register allow(client).to receive(:send). with(expected_method, url, anything). and_call_original allow(subject).to receive(:log_failure).with(any_args) allow(subject).to receive(:log_retryable_response).with(any_args) + allow(subject).to receive(:write_to_dlq).with(any_args).and_call_original end context 'sending no events' do @@ -165,21 +179,60 @@ def sinatra_run_wait(app, opts) it "should log a failure" do expect(subject).to have_received(:log_failure).with(any_args) end + + it "should not be sent to dlq" do + expect(subject).not_to have_received(:write_to_dlq).with(any_args) + end end context "with ignorable failing requests" do let(:url) { "http://localhost:#{port}/bad"} - let(:verb_behavior_config) { super.merge("ignorable_codes" => [400]) } + let(:verb_behavior_config) { super.merge("ignorable_codes" => [415]) } before do subject.multi_receive([event]) end - it "should log a failure" do + it "should not log a failure" do + expect(subject).not_to have_received(:log_failure).with(any_args) + end + + it "should not be sent to dlq" do + expect(subject).not_to have_received(:write_to_dlq).with(any_args) + end + end + + context "with DLQ qualified failing requests" do + let(:url) { "http://localhost:#{port}/dlq_bad"} + let(:dlq_enabled) { true } + let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) } + + before do + subject.multi_receive([event]) + end + + it "should write to dlq" do + expect(subject).to have_received(:write_to_dlq).with(any_args) + end + + it "should not log a failure" do expect(subject).not_to have_received(:log_failure).with(any_args) end end + context "when DLQ is not enabled" do + let(:url) { "http://localhost:#{port}/dlq_bad"} + let(:verb_behavior_config) { super.merge("dlq_retryable_codes" => [400]) } + + before do + subject.multi_receive([event]) + end + + it "should not send the event to the DLQ instead, instead log" do + expect(subject).to have_received(:log_failure).with(any_args) + end + end + context "with retryable failing requests" do let(:url) { "http://localhost:#{port}/retry"} From ee4b8e4049077830bfe71cb7427ccb16266be610 Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Thu, 26 Mar 2020 17:10:33 +0530 Subject: [PATCH 2/4] Modified comments Signed-off-by: RashmiRam --- lib/logstash/outputs/http.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 40ceb3a..2d6f9a0 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -156,9 +156,9 @@ def log_error_response(response, url, event) :event => event ) end - + + # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) def write_to_dlq(url, event, response) - # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) if @dlq_writer @dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}") else From e93f84c49bb030177fd50e92997fb7e14b3b2ebf Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Sun, 29 Mar 2020 13:26:46 +0530 Subject: [PATCH 3/4] Added changelog and bumped up version Signed-off-by: RashmiRam --- CHANGELOG.md | 3 +++ logstash-output-http.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1442dbd..4302fb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.5 + - Added support for DLQ [#110](https://github.com/logstash-plugins/logstash-output-http/pull/110) + ## 5.2.4 - Relax dependency on http_client mixin since current major works on both diff --git a/logstash-output-http.gemspec b/logstash-output-http.gemspec index c5d2d87..6c0dfa5 100644 --- a/logstash-output-http.gemspec +++ b/logstash-output-http.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-http' - s.version = '5.2.4' + s.version = '5.2.5' s.licenses = ['Apache License (2.0)'] s.summary = "Sends events to a generic HTTP or HTTPS endpoint" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 6e56f6754abd241e4f24cbf2b0f2f8e296b34655 Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Wed, 1 Jul 2020 00:39:59 +0530 Subject: [PATCH 4/4] Logging when DLQ is enabled + copied metadata to custom metadata in dlq event --- lib/logstash/outputs/http.rb | 10 ++++++++++ spec/outputs/http_spec.rb | 14 +++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 2d6f9a0..39fe108 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -160,6 +160,16 @@ def log_error_response(response, url, event) # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) def write_to_dlq(url, event, response) if @dlq_writer + log_failure( + "Sending this non-2xx HTTP code #{response.code} to DLQ", + :response_code => response.code, + :url => url, + :event => event + ) + metadata = event.get("@metadata") + metadata.each_pair do |key, value| + event.set("[custom_metadata][#{key}]", value) + end @dlq_writer.write(event, "Sending #{response.code} erred HTTP request to DLQ, url: #{url}, response: #{response}") else log_error_response(response, url, event) diff --git a/spec/outputs/http_spec.rb b/spec/outputs/http_spec.rb index 6913adb..3ef9c9c 100644 --- a/spec/outputs/http_spec.rb +++ b/spec/outputs/http_spec.rb @@ -136,7 +136,7 @@ def sinatra_run_wait(app, opts) allow(client).to receive(:send). with(expected_method, url, anything). and_call_original - allow(subject).to receive(:log_failure).with(any_args) + allow(subject).to receive(:log_error_response).with(any_args) allow(subject).to receive(:log_retryable_response).with(any_args) allow(subject).to receive(:write_to_dlq).with(any_args).and_call_original end @@ -165,7 +165,7 @@ def sinatra_run_wait(app, opts) end it "should not log a failure" do - expect(subject).not_to have_received(:log_failure).with(any_args) + expect(subject).not_to have_received(:log_error_response).with(any_args) end end @@ -177,7 +177,7 @@ def sinatra_run_wait(app, opts) end it "should log a failure" do - expect(subject).to have_received(:log_failure).with(any_args) + expect(subject).to have_received(:log_error_response).with(any_args) end it "should not be sent to dlq" do @@ -194,7 +194,7 @@ def sinatra_run_wait(app, opts) end it "should not log a failure" do - expect(subject).not_to have_received(:log_failure).with(any_args) + expect(subject).not_to have_received(:log_error_response).with(any_args) end it "should not be sent to dlq" do @@ -216,7 +216,7 @@ def sinatra_run_wait(app, opts) end it "should not log a failure" do - expect(subject).not_to have_received(:log_failure).with(any_args) + expect(subject).not_to have_received(:log_error_response).with(any_args) end end @@ -229,7 +229,7 @@ def sinatra_run_wait(app, opts) end it "should not send the event to the DLQ instead, instead log" do - expect(subject).to have_received(:log_failure).with(any_args) + expect(subject).to have_received(:log_error_response).with(any_args) end end @@ -264,7 +264,7 @@ def sinatra_run_wait(app, opts) before do TestApp.last_request = nil end - + let(:events) { [event] } describe "with a good code" do