From 1ae92af4453d098810569bde0833cd0ed42784e5 Mon Sep 17 00:00:00 2001 From: Boteng Yao Date: Wed, 24 Jan 2024 10:28:23 +0000 Subject: [PATCH] async http: set buffer limit for response and do not buffer for mirror Signed-off-by: Boteng Yao Signed-off-by: Yan Avlasov Signed-off-by: Ryan Northey --- changelogs/current.yaml | 4 + envoy/http/async_client.h | 14 +- source/common/http/async_client_impl.cc | 37 +++++- source/common/http/async_client_impl.h | 9 +- source/common/router/router.cc | 3 +- source/extensions/common/wasm/context.cc | 5 +- .../rest/rest_api_fetcher.cc | 4 +- .../common/ext_authz/ext_authz_http_impl.cc | 4 +- .../filters/http/gcp_authn/gcp_authn_impl.cc | 5 +- .../tracers/datadog/agent_http_client.cc | 3 + test/common/http/BUILD | 1 + test/common/http/async_client_impl_test.cc | 123 ++++++++++++++++++ .../ext_authz/ext_authz_integration_test.cc | 39 +++++- 13 files changed, 233 insertions(+), 18 deletions(-) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 1aab8dd553f5..31ce24b5a17a 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -27,6 +27,10 @@ bug_fixes: Only 101 is considered a successful response for websocket handshake for HTTP/1.1, and Envoy as a proxy will proxy the response header from upstream to downstream and then close the request if other status is received. This behavior can be reverted by ``envoy_reloadable_features_check_switch_protocol_websocket_handshake``. +- area: async http client + change: | + Added one option to disable the response body buffering for mirror request. Also introduced a 32MB cap for the response + buffer, which can be changed by the runtime flag ``http.async_response_buffer_limit`` based on the product needs. removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h index 5da55ee23b6f..8399fbd2ad44 100644 --- a/envoy/http/async_client.h +++ b/envoy/http/async_client.h @@ -45,7 +45,9 @@ class AsyncClient { */ enum class FailureReason { // The stream has been reset. - Reset + Reset, + // The stream exceeds the response buffer limit. + ExceedResponseBufferLimit }; /** @@ -291,6 +293,11 @@ class AsyncClient { return *this; } + StreamOptions& setDiscardResponseBody(bool discard) { + discard_response_body = discard; + return *this; + } + // For gmock test bool operator==(const StreamOptions& src) const { return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry && @@ -328,6 +335,7 @@ class AsyncClient { OptRef filter_config_; bool is_shadow{false}; + bool discard_response_body{false}; }; /** @@ -391,6 +399,10 @@ class AsyncClient { buffer_limit_ = limit; return *this; } + RequestOptions& setDiscardResponseBody(bool discard) { + discard_response_body = discard; + return *this; + } // For gmock test bool operator==(const RequestOptions& src) const { diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 18167336065a..aec368094b12 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -14,6 +14,9 @@ namespace Envoy { namespace Http { + +const absl::string_view AsyncClientImpl::ResponseBufferLimit = "http.async_response_buffer_limit"; + AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, @@ -25,7 +28,7 @@ AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, config_(http_context.asyncClientStatPrefix(), local_info, *stats_store.rootScope(), cm, runtime, random, std::move(shadow_writer), true, false, false, false, false, false, {}, dispatcher.timeSource(), http_context, router_context), - dispatcher_(dispatcher) {} + dispatcher_(dispatcher), runtime_(runtime) {} AsyncClientImpl::~AsyncClientImpl() { while (!active_streams_.empty()) { @@ -77,7 +80,8 @@ AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callba AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options) - : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), + : parent_(parent), discard_response_body_(options.discard_response_body), + stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), router_(options.filter_config_ ? *options.filter_config_ : parent.config_, parent.config_.async_stats_), stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr), @@ -257,7 +261,9 @@ void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) { AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options) - : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks) { + : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks), + response_buffer_limit_(parent.runtime_.snapshot().getInteger( + AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) { if (nullptr != options.parent_span_) { const std::string child_span_name = options.child_span_name_.empty() @@ -306,7 +312,22 @@ void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) { response_ = std::make_unique(std::move(headers)); } -void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { response_->body().move(data); } +void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { + if (discard_response_body_) { + data.drain(data.length()); + return; + } + + if (response_->body().length() + data.length() > response_buffer_limit_) { + ENVOY_LOG_EVERY_POW_2(warn, "the buffer size limit for async client response body " + "has been exceeded, draining data"); + data.drain(data.length()); + response_buffer_overlimit_ = true; + reset(); + } else { + response_->body().move(data); + } +} void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) { response_->trailers(std::move(trailers)); @@ -327,8 +348,12 @@ void AsyncRequestSharedImpl::onReset() { Tracing::EgressConfig::get()); if (!cancelled_) { - // In this case we don't have a valid response so we do need to raise a failure. - callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset); + if (response_buffer_overlimit_) { + callbacks_.onFailure(*this, AsyncClient::FailureReason::ExceedResponseBufferLimit); + } else { + // In this case we don't have a valid response so we do need to raise a failure. + callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset); + } } } diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 4d5c80527a33..dfc2e17b025c 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -51,6 +51,8 @@ namespace { // Limit the size of buffer for data used for retries. // This is currently fixed to 64KB. constexpr uint64_t kBufferLimitForRetry = 1 << 16; +// Response buffer limit 32MB. +constexpr uint64_t kBufferLimitForResponse = 32 * 1024 * 1024; } // namespace class AsyncStreamImpl; @@ -74,12 +76,14 @@ class AsyncClientImpl final : public AsyncClient { Singleton::Manager& singleton_manager_; Upstream::ClusterInfoConstSharedPtr cluster_; Event::Dispatcher& dispatcher() override { return dispatcher_; } + static const absl::string_view ResponseBufferLimit; private: template T* internalStartRequest(T* async_request); Router::FilterConfig config_; Event::Dispatcher& dispatcher_; std::list> active_streams_; + Runtime::Loader& runtime_; friend class AsyncStreamImpl; friend class AsyncRequestSharedImpl; @@ -92,7 +96,7 @@ class AsyncClientImpl final : public AsyncClient { class AsyncStreamImpl : public virtual AsyncClient::Stream, public StreamDecoderFilterCallbacks, public Event::DeferredDeletable, - Logger::Loggable, + public Logger::Loggable, public LinkedObject, public ScopeTrackedObject { public: @@ -151,6 +155,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream, absl::optional destructor_callback_; // Callback to listen for low/high/overflow watermark events. absl::optional> watermark_callbacks_; + const bool discard_response_body_; private: void cleanup(); @@ -303,6 +308,8 @@ class AsyncRequestSharedImpl : public virtual AsyncClient::Request, Tracing::SpanPtr child_span_; std::unique_ptr response_; bool cancelled_{}; + bool response_buffer_overlimit_{}; + const uint64_t response_buffer_limit_; }; class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest, diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 0334aedf5784..5f6fec69aa19 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -769,7 +769,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, .setBufferAccount(callbacks_->account()) // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0, // because a buffer limit of zero on async clients is interpreted as no buffer limit. - .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_); + .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_) + .setDiscardResponseBody(true); options.setFilterConfig(config_); if (end_stream) { // This is a header-only request, and can be dispatched immediately to the shadow diff --git a/source/extensions/common/wasm/context.cc b/source/extensions/common/wasm/context.cc index e43f296255e9..4a7534905ebe 100644 --- a/source/extensions/common/wasm/context.cc +++ b/source/extensions/common/wasm/context.cc @@ -1849,8 +1849,9 @@ void Context::onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason return; } status_code_ = static_cast(WasmResult::BrokenConnection); - // This is the only value currently. - ASSERT(reason == Http::AsyncClient::FailureReason::Reset); + // TODO(botengyao): handle different failure reasons. + ASSERT(reason == Http::AsyncClient::FailureReason::Reset || + reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); status_message_ = "reset"; // Deferred "after VM call" actions are going to be executed upon returning from // ContextBase::*, which might include deleting Context object via proxy_done(). diff --git a/source/extensions/config_subscription/rest/rest_api_fetcher.cc b/source/extensions/config_subscription/rest/rest_api_fetcher.cc index 92c06f023d19..6b0d63fe73ef 100644 --- a/source/extensions/config_subscription/rest/rest_api_fetcher.cc +++ b/source/extensions/config_subscription/rest/rest_api_fetcher.cc @@ -50,8 +50,8 @@ void RestApiFetcher::onSuccess(const Http::AsyncClient::Request& request, void RestApiFetcher::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason reason) { - // Currently Http::AsyncClient::FailureReason only has one value: "Reset". - ASSERT(reason == Http::AsyncClient::FailureReason::Reset); + ASSERT(reason == Http::AsyncClient::FailureReason::Reset || + reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); requestComplete(); } diff --git a/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc b/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc index 49a1a95e5218..47c8c7efc41a 100644 --- a/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc +++ b/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc @@ -263,7 +263,9 @@ void RawHttpClientImpl::onSuccess(const Http::AsyncClient::Request&, void RawHttpClientImpl::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason reason) { - ASSERT(reason == Http::AsyncClient::FailureReason::Reset); + // TODO(botengyao): handle different failure reasons. + ASSERT(reason == Http::AsyncClient::FailureReason::Reset || + reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); callbacks_->onComplete(std::make_unique(errorResponse())); callbacks_ = nullptr; } diff --git a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc index 95ce7104f366..29bdb92f57e8 100644 --- a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc +++ b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc @@ -90,8 +90,9 @@ void GcpAuthnClient::onSuccess(const Http::AsyncClient::Request&, void GcpAuthnClient::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason reason) { - // Http::AsyncClient::FailureReason only has one value: "Reset". - ASSERT(reason == Http::AsyncClient::FailureReason::Reset); + // TODO(botengyao): handle different failure reasons. + ASSERT(reason == Http::AsyncClient::FailureReason::Reset || + reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); ENVOY_LOG(error, "Request failed: stream has been reset"); active_request_ = nullptr; onError(); diff --git a/source/extensions/tracers/datadog/agent_http_client.cc b/source/extensions/tracers/datadog/agent_http_client.cc index 37a9fad328ca..8095aa8a15bc 100644 --- a/source/extensions/tracers/datadog/agent_http_client.cc +++ b/source/extensions/tracers/datadog/agent_http_client.cc @@ -126,6 +126,9 @@ void AgentHTTPClient::onFailure(const Http::AsyncClient::Request& request, case Http::AsyncClient::FailureReason::Reset: message += "The stream has been reset."; break; + case Http::AsyncClient::FailureReason::ExceedResponseBufferLimit: + message += "The stream exceeds the response buffer limit."; + break; default: message += "Unknown error."; } diff --git a/test/common/http/BUILD b/test/common/http/BUILD index d3fce341694f..b92e9a4ac5ea 100644 --- a/test/common/http/BUILD +++ b/test/common/http/BUILD @@ -34,6 +34,7 @@ envoy_cc_test( "//test/mocks/runtime:runtime_mocks", "//test/mocks/stats:stats_mocks", "//test/mocks/upstream:cluster_manager_mocks", + "//test/test_common:test_runtime_lib", "//test/test_common:test_time_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/config/route/v3:pkg_cc_proto", diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 1a56312eff56..4cd1166d95ea 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -218,6 +218,129 @@ TEST_F(AsyncClientImplTest, Basic) { .value()); } +TEST_F(AsyncClientImplTest, NoResponseBodyBuffering) { + message_->body().add("test body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _)) + .WillOnce(Invoke( + [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks, + const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true)); + + auto* request = client_.send(std::move(message_), callbacks_, + AsyncClient::RequestOptions().setDiscardResponseBody(true)); + EXPECT_NE(request, nullptr); + + EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _)); + EXPECT_CALL(callbacks_, onSuccess_(_, _)) + .WillOnce(Invoke([](const AsyncClient::Request&, ResponseMessage* response) -> void { + // Verify that there is zero response body. + EXPECT_EQ(response->body().length(), 0); + })); + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(data, true); + + EXPECT_EQ( + 1UL, + cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); + EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("internal.upstream_rq_200") + .value()); +} + +TEST_F(AsyncClientImplTest, LargeResponseBody) { + message_->body().add("test body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _)) + .WillOnce(Invoke( + [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks, + const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true)); + ON_CALL(runtime_.snapshot_, + getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) + .WillByDefault(Return(100)); + + auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions()); + EXPECT_NE(request, nullptr); + + EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _)); + EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit)); + + Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(100 + 1, 'a'))}; + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(*large_body, true); + EXPECT_EQ(large_body->length(), 0); +} + +TEST_F(AsyncClientImplTest, LargeResponseBodyMultipleRead) { + message_->body().add("test body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _)) + .WillOnce(Invoke( + [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks, + const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true)); + ON_CALL(runtime_.snapshot_, + getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) + .WillByDefault(Return(100)); + + auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions()); + EXPECT_NE(request, nullptr); + + EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _)); + EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit)); + + Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(50, 'a'))}; + Buffer::InstancePtr large_body_second{new Buffer::OwnedImpl(std::string(50, 'a'))}; + Buffer::InstancePtr large_body_third{new Buffer::OwnedImpl(std::string(2, 'a'))}; + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(*large_body, false); + response_decoder_->decodeData(*large_body_second, false); + response_decoder_->decodeData(*large_body_third, true); +} + TEST_F(AsyncClientImplTest, BasicOngoingRequest) { auto headers = std::make_unique(); HttpTestUtility::addDefaultHeaders(*headers); diff --git a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc index 5ce46084511f..5ab4e0954c4d 100644 --- a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc +++ b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc @@ -549,7 +549,9 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, .get(Http::LowerCaseString(std::string("regex-fool")))[0] ->value() .getStringView()); + } + void sendExtAuthzResponse() { // Send back authorization response with "baz" and "bat" headers. // Also add multiple values "append-foo" and "append-bar" for key "x-append-bat". // Also tell Envoy to remove "remove-me" header before sending to upstream. @@ -574,8 +576,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, cleanupUpstreamAndDownstream(); } - void initializeConfig(bool legacy_allowed_headers = true) { - config_helper_.addConfigModifier([this, legacy_allowed_headers]( + void initializeConfig(bool legacy_allowed_headers = true, bool failure_mode_allow = true) { + config_helper_.addConfigModifier([this, legacy_allowed_headers, failure_mode_allow]( envoy::config::bootstrap::v3::Bootstrap& bootstrap) { auto* ext_authz_cluster = bootstrap.mutable_static_resources()->add_clusters(); ext_authz_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); @@ -586,6 +588,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, } else { TestUtility::loadFromYaml(default_config_, proto_config_); } + proto_config_.set_failure_mode_allow(failure_mode_allow); + proto_config_.set_failure_mode_allow_header_add(failure_mode_allow); envoy::config::listener::v3::Filter ext_authz_filter; ext_authz_filter.set_name("envoy.filters.http.ext_authz"); ext_authz_filter.mutable_typed_config()->PackFrom(proto_config_); @@ -601,6 +605,7 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, initiateClientConnection(); waitForExtAuthzRequest(); + sendExtAuthzResponse(); AssertionResult result = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_); @@ -954,6 +959,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, DEPRECATED_FEATURE_TEST(LegacyDirectReponse) HttpIntegrationTest::initialize(); initiateClientConnection(); waitForExtAuthzRequest(); + sendExtAuthzResponse(); ASSERT_TRUE(response_->waitForEndStream()); EXPECT_TRUE(response_->complete()); @@ -975,6 +981,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, DEPRECATED_FEATURE_TEST(LegacyRedirectRespon HttpIntegrationTest::initialize(); initiateClientConnection(); waitForExtAuthzRequest(); + sendExtAuthzResponse(); ASSERT_TRUE(response_->waitForEndStream()); EXPECT_TRUE(response_->complete()); @@ -1042,12 +1049,39 @@ TEST_P(ExtAuthzHttpIntegrationTest, DirectReponse) { HttpIntegrationTest::initialize(); initiateClientConnection(); waitForExtAuthzRequest(); + sendExtAuthzResponse(); ASSERT_TRUE(response_->waitForEndStream()); EXPECT_TRUE(response_->complete()); EXPECT_EQ("204", response_->headers().Status()->value().getStringView()); } +// Test exceeding the async client buffer limit. +TEST_P(ExtAuthzHttpIntegrationTest, ErrorReponseWithDefultBufferLimit) { + initializeConfig(false, /*failure_mode_allow=*/false); + config_helper_.addRuntimeOverride("http.async_response_buffer_limit", "1024"); + + HttpIntegrationTest::initialize(); + initiateClientConnection(); + waitForExtAuthzRequest(); + + Http::TestResponseHeaderMapImpl response_headers{ + {":status", "200"}, + {"baz", "baz"}, + {"bat", "bar"}, + {"x-append-bat", "append-foo"}, + {"x-append-bat", "append-bar"}, + {"x-envoy-auth-headers-to-remove", "remove-me"}, + }; + ext_authz_request_->encodeHeaders(response_headers, false); + ext_authz_request_->encodeData(2048, true); + + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + // A forbidden response since the onFailure is called due to the async client buffer limit. + EXPECT_EQ("403", response_->headers().Status()->value().getStringView()); +} + // (uses new config for allowed_headers). TEST_P(ExtAuthzHttpIntegrationTest, RedirectResponse) { config_helper_.addConfigModifier( @@ -1063,6 +1097,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, RedirectResponse) { HttpIntegrationTest::initialize(); initiateClientConnection(); waitForExtAuthzRequest(); + sendExtAuthzResponse(); ASSERT_TRUE(response_->waitForEndStream()); EXPECT_TRUE(response_->complete());