diff --git a/changelogs/current.yaml b/changelogs/current.yaml
index 1aab8dd553f5..31ce24b5a17a 100644
--- a/changelogs/current.yaml
+++ b/changelogs/current.yaml
@@ -27,6 +27,10 @@ bug_fixes:
     Only 101 is considered a successful response for websocket handshake for HTTP/1.1, and Envoy as a proxy will proxy the response
     header from upstream to downstream and then close the request if other status is received. This behavior can be
     reverted by ``envoy_reloadable_features_check_switch_protocol_websocket_handshake``.
+- area: async http client
+  change: |
+    Added one option to disable the response body buffering for mirror request. Also introduced a 32MB cap for the response
+    buffer, which can be changed by the runtime flag ``http.async_response_buffer_limit`` based on the product needs.
 
 removed_config_or_runtime:
 # *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h
index 5da55ee23b6f..8399fbd2ad44 100644
--- a/envoy/http/async_client.h
+++ b/envoy/http/async_client.h
@@ -45,7 +45,9 @@ class AsyncClient {
    */
   enum class FailureReason {
     // The stream has been reset.
-    Reset
+    Reset,
+    // The stream exceeds the response buffer limit.
+    ExceedResponseBufferLimit
   };
 
   /**
@@ -291,6 +293,11 @@ class AsyncClient {
       return *this;
     }
 
+    StreamOptions& setDiscardResponseBody(bool discard) {
+      discard_response_body = discard;
+      return *this;
+    }
+
     // For gmock test
     bool operator==(const StreamOptions& src) const {
       return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry &&
@@ -328,6 +335,7 @@ class AsyncClient {
     OptRef<Router::FilterConfig> filter_config_;
 
     bool is_shadow{false};
+    bool discard_response_body{false};
   };
 
   /**
@@ -391,6 +399,10 @@ class AsyncClient {
       buffer_limit_ = limit;
       return *this;
     }
+    RequestOptions& setDiscardResponseBody(bool discard) {
+      discard_response_body = discard;
+      return *this;
+    }
 
     // For gmock test
     bool operator==(const RequestOptions& src) const {
diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc
index 18167336065a..aec368094b12 100644
--- a/source/common/http/async_client_impl.cc
+++ b/source/common/http/async_client_impl.cc
@@ -14,6 +14,9 @@
 
 namespace Envoy {
 namespace Http {
+
+const absl::string_view AsyncClientImpl::ResponseBufferLimit = "http.async_response_buffer_limit";
+
 AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
                                  Stats::Store& stats_store, Event::Dispatcher& dispatcher,
                                  const LocalInfo::LocalInfo& local_info,
@@ -25,7 +28,7 @@ AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
       config_(http_context.asyncClientStatPrefix(), local_info, *stats_store.rootScope(), cm,
               runtime, random, std::move(shadow_writer), true, false, false, false, false, false,
               {}, dispatcher.timeSource(), http_context, router_context),
-      dispatcher_(dispatcher) {}
+      dispatcher_(dispatcher), runtime_(runtime) {}
 
 AsyncClientImpl::~AsyncClientImpl() {
   while (!active_streams_.empty()) {
@@ -77,7 +80,8 @@ AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callba
 
 AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
                                  const AsyncClient::StreamOptions& options)
-    : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
+    : parent_(parent), discard_response_body_(options.discard_response_body),
+      stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
       router_(options.filter_config_ ? *options.filter_config_ : parent.config_,
               parent.config_.async_stats_),
       stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr),
@@ -257,7 +261,9 @@ void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
 AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
                                                AsyncClient::Callbacks& callbacks,
                                                const AsyncClient::RequestOptions& options)
-    : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks) {
+    : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks),
+      response_buffer_limit_(parent.runtime_.snapshot().getInteger(
+          AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) {
   if (nullptr != options.parent_span_) {
     const std::string child_span_name =
         options.child_span_name_.empty()
@@ -306,7 +312,22 @@ void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
   response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
 }
 
-void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { response_->body().move(data); }
+void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) {
+  if (discard_response_body_) {
+    data.drain(data.length());
+    return;
+  }
+
+  if (response_->body().length() + data.length() > response_buffer_limit_) {
+    ENVOY_LOG_EVERY_POW_2(warn, "the buffer size limit for async client response body "
+                                "has been exceeded, draining data");
+    data.drain(data.length());
+    response_buffer_overlimit_ = true;
+    reset();
+  } else {
+    response_->body().move(data);
+  }
+}
 
 void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
   response_->trailers(std::move(trailers));
@@ -327,8 +348,12 @@ void AsyncRequestSharedImpl::onReset() {
                                                    Tracing::EgressConfig::get());
 
   if (!cancelled_) {
-    // In this case we don't have a valid response so we do need to raise a failure.
-    callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
+    if (response_buffer_overlimit_) {
+      callbacks_.onFailure(*this, AsyncClient::FailureReason::ExceedResponseBufferLimit);
+    } else {
+      // In this case we don't have a valid response so we do need to raise a failure.
+      callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
+    }
   }
 }
 
diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h
index 4d5c80527a33..dfc2e17b025c 100644
--- a/source/common/http/async_client_impl.h
+++ b/source/common/http/async_client_impl.h
@@ -51,6 +51,8 @@ namespace {
 // Limit the size of buffer for data used for retries.
 // This is currently fixed to 64KB.
 constexpr uint64_t kBufferLimitForRetry = 1 << 16;
+// Response buffer limit 32MB.
+constexpr uint64_t kBufferLimitForResponse = 32 * 1024 * 1024;
 } // namespace
 
 class AsyncStreamImpl;
@@ -74,12 +76,14 @@ class AsyncClientImpl final : public AsyncClient {
   Singleton::Manager& singleton_manager_;
   Upstream::ClusterInfoConstSharedPtr cluster_;
   Event::Dispatcher& dispatcher() override { return dispatcher_; }
+  static const absl::string_view ResponseBufferLimit;
 
 private:
   template <typename T> T* internalStartRequest(T* async_request);
   Router::FilterConfig config_;
   Event::Dispatcher& dispatcher_;
   std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
+  Runtime::Loader& runtime_;
 
   friend class AsyncStreamImpl;
   friend class AsyncRequestSharedImpl;
@@ -92,7 +96,7 @@ class AsyncClientImpl final : public AsyncClient {
 class AsyncStreamImpl : public virtual AsyncClient::Stream,
                         public StreamDecoderFilterCallbacks,
                         public Event::DeferredDeletable,
-                        Logger::Loggable<Logger::Id::http>,
+                        public Logger::Loggable<Logger::Id::http>,
                         public LinkedObject<AsyncStreamImpl>,
                         public ScopeTrackedObject {
 public:
@@ -151,6 +155,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
   absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
   // Callback to listen for low/high/overflow watermark events.
   absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
+  const bool discard_response_body_;
 
 private:
   void cleanup();
@@ -303,6 +308,8 @@ class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
   Tracing::SpanPtr child_span_;
   std::unique_ptr<ResponseMessageImpl> response_;
   bool cancelled_{};
+  bool response_buffer_overlimit_{};
+  const uint64_t response_buffer_limit_;
 };
 
 class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
diff --git a/source/common/router/router.cc b/source/common/router/router.cc
index 0334aedf5784..5f6fec69aa19 100644
--- a/source/common/router/router.cc
+++ b/source/common/router/router.cc
@@ -769,7 +769,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
               .setBufferAccount(callbacks_->account())
               // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
               // because a buffer limit of zero on async clients is interpreted as no buffer limit.
-              .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_);
+              .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_)
+              .setDiscardResponseBody(true);
       options.setFilterConfig(config_);
       if (end_stream) {
         // This is a header-only request, and can be dispatched immediately to the shadow
diff --git a/source/extensions/common/wasm/context.cc b/source/extensions/common/wasm/context.cc
index e43f296255e9..4a7534905ebe 100644
--- a/source/extensions/common/wasm/context.cc
+++ b/source/extensions/common/wasm/context.cc
@@ -1849,8 +1849,9 @@ void Context::onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason
     return;
   }
   status_code_ = static_cast<uint32_t>(WasmResult::BrokenConnection);
-  // This is the only value currently.
-  ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
+  // TODO(botengyao): handle different failure reasons.
+  ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
+         reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
   status_message_ = "reset";
   // Deferred "after VM call" actions are going to be executed upon returning from
   // ContextBase::*, which might include deleting Context object via proxy_done().
diff --git a/source/extensions/config_subscription/rest/rest_api_fetcher.cc b/source/extensions/config_subscription/rest/rest_api_fetcher.cc
index 92c06f023d19..6b0d63fe73ef 100644
--- a/source/extensions/config_subscription/rest/rest_api_fetcher.cc
+++ b/source/extensions/config_subscription/rest/rest_api_fetcher.cc
@@ -50,8 +50,8 @@ void RestApiFetcher::onSuccess(const Http::AsyncClient::Request& request,
 
 void RestApiFetcher::onFailure(const Http::AsyncClient::Request&,
                                Http::AsyncClient::FailureReason reason) {
-  // Currently Http::AsyncClient::FailureReason only has one value: "Reset".
-  ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
+  ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
+         reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
   onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
   requestComplete();
 }
diff --git a/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc b/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc
index 49a1a95e5218..47c8c7efc41a 100644
--- a/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc
+++ b/source/extensions/filters/common/ext_authz/ext_authz_http_impl.cc
@@ -263,7 +263,9 @@ void RawHttpClientImpl::onSuccess(const Http::AsyncClient::Request&,
 
 void RawHttpClientImpl::onFailure(const Http::AsyncClient::Request&,
                                   Http::AsyncClient::FailureReason reason) {
-  ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
+  // TODO(botengyao): handle different failure reasons.
+  ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
+         reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
   callbacks_->onComplete(std::make_unique<Response>(errorResponse()));
   callbacks_ = nullptr;
 }
diff --git a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc
index 95ce7104f366..29bdb92f57e8 100644
--- a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc
+++ b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc
@@ -90,8 +90,9 @@ void GcpAuthnClient::onSuccess(const Http::AsyncClient::Request&,
 
 void GcpAuthnClient::onFailure(const Http::AsyncClient::Request&,
                                Http::AsyncClient::FailureReason reason) {
-  // Http::AsyncClient::FailureReason only has one value: "Reset".
-  ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
+  // TODO(botengyao): handle different failure reasons.
+  ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
+         reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
   ENVOY_LOG(error, "Request failed: stream has been reset");
   active_request_ = nullptr;
   onError();
diff --git a/source/extensions/tracers/datadog/agent_http_client.cc b/source/extensions/tracers/datadog/agent_http_client.cc
index 37a9fad328ca..8095aa8a15bc 100644
--- a/source/extensions/tracers/datadog/agent_http_client.cc
+++ b/source/extensions/tracers/datadog/agent_http_client.cc
@@ -126,6 +126,9 @@ void AgentHTTPClient::onFailure(const Http::AsyncClient::Request& request,
   case Http::AsyncClient::FailureReason::Reset:
     message += "The stream has been reset.";
     break;
+  case Http::AsyncClient::FailureReason::ExceedResponseBufferLimit:
+    message += "The stream exceeds the response buffer limit.";
+    break;
   default:
     message += "Unknown error.";
   }
diff --git a/test/common/http/BUILD b/test/common/http/BUILD
index d3fce341694f..b92e9a4ac5ea 100644
--- a/test/common/http/BUILD
+++ b/test/common/http/BUILD
@@ -34,6 +34,7 @@ envoy_cc_test(
         "//test/mocks/runtime:runtime_mocks",
         "//test/mocks/stats:stats_mocks",
         "//test/mocks/upstream:cluster_manager_mocks",
+        "//test/test_common:test_runtime_lib",
         "//test/test_common:test_time_lib",
         "@envoy_api//envoy/config/core/v3:pkg_cc_proto",
         "@envoy_api//envoy/config/route/v3:pkg_cc_proto",
diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc
index 1a56312eff56..4cd1166d95ea 100644
--- a/test/common/http/async_client_impl_test.cc
+++ b/test/common/http/async_client_impl_test.cc
@@ -218,6 +218,129 @@ TEST_F(AsyncClientImplTest, Basic) {
                      .value());
 }
 
+TEST_F(AsyncClientImplTest, NoResponseBodyBuffering) {
+  message_->body().add("test body");
+  Buffer::Instance& data = message_->body();
+
+  EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
+      .WillOnce(Invoke(
+          [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
+              const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
+            callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
+                                  stream_info_, {});
+            response_decoder_ = &decoder;
+            return nullptr;
+          }));
+
+  TestRequestHeaderMapImpl copy(message_->headers());
+  copy.addCopy("x-envoy-internal", "true");
+  copy.addCopy("x-forwarded-for", "127.0.0.1");
+  copy.addCopy(":scheme", "http");
+
+  EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
+  EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));
+
+  auto* request = client_.send(std::move(message_), callbacks_,
+                               AsyncClient::RequestOptions().setDiscardResponseBody(true));
+  EXPECT_NE(request, nullptr);
+
+  EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
+  EXPECT_CALL(callbacks_, onSuccess_(_, _))
+      .WillOnce(Invoke([](const AsyncClient::Request&, ResponseMessage* response) -> void {
+        // Verify that there is zero response body.
+        EXPECT_EQ(response->body().length(), 0);
+      }));
+  ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
+  response_decoder_->decodeHeaders(std::move(response_headers), false);
+  response_decoder_->decodeData(data, true);
+
+  EXPECT_EQ(
+      1UL,
+      cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value());
+  EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_
+                     .counter("internal.upstream_rq_200")
+                     .value());
+}
+
+TEST_F(AsyncClientImplTest, LargeResponseBody) {
+  message_->body().add("test body");
+  Buffer::Instance& data = message_->body();
+
+  EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
+      .WillOnce(Invoke(
+          [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
+              const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
+            callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
+                                  stream_info_, {});
+            response_decoder_ = &decoder;
+            return nullptr;
+          }));
+
+  TestRequestHeaderMapImpl copy(message_->headers());
+  copy.addCopy("x-envoy-internal", "true");
+  copy.addCopy("x-forwarded-for", "127.0.0.1");
+  copy.addCopy(":scheme", "http");
+
+  EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
+  EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));
+  ON_CALL(runtime_.snapshot_,
+          getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse))
+      .WillByDefault(Return(100));
+
+  auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions());
+  EXPECT_NE(request, nullptr);
+
+  EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
+  EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit));
+
+  Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(100 + 1, 'a'))};
+  ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
+  response_decoder_->decodeHeaders(std::move(response_headers), false);
+  response_decoder_->decodeData(*large_body, true);
+  EXPECT_EQ(large_body->length(), 0);
+}
+
+TEST_F(AsyncClientImplTest, LargeResponseBodyMultipleRead) {
+  message_->body().add("test body");
+  Buffer::Instance& data = message_->body();
+
+  EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
+      .WillOnce(Invoke(
+          [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
+              const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
+            callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
+                                  stream_info_, {});
+            response_decoder_ = &decoder;
+            return nullptr;
+          }));
+
+  TestRequestHeaderMapImpl copy(message_->headers());
+  copy.addCopy("x-envoy-internal", "true");
+  copy.addCopy("x-forwarded-for", "127.0.0.1");
+  copy.addCopy(":scheme", "http");
+
+  EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
+  EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));
+  ON_CALL(runtime_.snapshot_,
+          getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse))
+      .WillByDefault(Return(100));
+
+  auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions());
+  EXPECT_NE(request, nullptr);
+
+  EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
+  EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit));
+
+  Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(50, 'a'))};
+  Buffer::InstancePtr large_body_second{new Buffer::OwnedImpl(std::string(50, 'a'))};
+  Buffer::InstancePtr large_body_third{new Buffer::OwnedImpl(std::string(2, 'a'))};
+  ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
+  response_decoder_->decodeHeaders(std::move(response_headers), false);
+  response_decoder_->decodeData(*large_body, false);
+  response_decoder_->decodeData(*large_body_second, false);
+  response_decoder_->decodeData(*large_body_third, true);
+}
+
 TEST_F(AsyncClientImplTest, BasicOngoingRequest) {
   auto headers = std::make_unique<TestRequestHeaderMapImpl>();
   HttpTestUtility::addDefaultHeaders(*headers);
diff --git a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc
index 5ce46084511f..5ab4e0954c4d 100644
--- a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc
+++ b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc
@@ -549,7 +549,9 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
                           .get(Http::LowerCaseString(std::string("regex-fool")))[0]
                           ->value()
                           .getStringView());
+  }
 
+  void sendExtAuthzResponse() {
     // Send back authorization response with "baz" and "bat" headers.
     // Also add multiple values "append-foo" and "append-bar" for key "x-append-bat".
     // Also tell Envoy to remove "remove-me" header before sending to upstream.
@@ -574,8 +576,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
     cleanupUpstreamAndDownstream();
   }
 
-  void initializeConfig(bool legacy_allowed_headers = true) {
-    config_helper_.addConfigModifier([this, legacy_allowed_headers](
+  void initializeConfig(bool legacy_allowed_headers = true, bool failure_mode_allow = true) {
+    config_helper_.addConfigModifier([this, legacy_allowed_headers, failure_mode_allow](
                                          envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
       auto* ext_authz_cluster = bootstrap.mutable_static_resources()->add_clusters();
       ext_authz_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]);
@@ -586,6 +588,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
       } else {
         TestUtility::loadFromYaml(default_config_, proto_config_);
       }
+      proto_config_.set_failure_mode_allow(failure_mode_allow);
+      proto_config_.set_failure_mode_allow_header_add(failure_mode_allow);
       envoy::config::listener::v3::Filter ext_authz_filter;
       ext_authz_filter.set_name("envoy.filters.http.ext_authz");
       ext_authz_filter.mutable_typed_config()->PackFrom(proto_config_);
@@ -601,6 +605,7 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest,
 
     initiateClientConnection();
     waitForExtAuthzRequest();
+    sendExtAuthzResponse();
 
     AssertionResult result =
         fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_);
@@ -954,6 +959,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, DEPRECATED_FEATURE_TEST(LegacyDirectReponse)
   HttpIntegrationTest::initialize();
   initiateClientConnection();
   waitForExtAuthzRequest();
+  sendExtAuthzResponse();
 
   ASSERT_TRUE(response_->waitForEndStream());
   EXPECT_TRUE(response_->complete());
@@ -975,6 +981,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, DEPRECATED_FEATURE_TEST(LegacyRedirectRespon
   HttpIntegrationTest::initialize();
   initiateClientConnection();
   waitForExtAuthzRequest();
+  sendExtAuthzResponse();
 
   ASSERT_TRUE(response_->waitForEndStream());
   EXPECT_TRUE(response_->complete());
@@ -1042,12 +1049,39 @@ TEST_P(ExtAuthzHttpIntegrationTest, DirectReponse) {
   HttpIntegrationTest::initialize();
   initiateClientConnection();
   waitForExtAuthzRequest();
+  sendExtAuthzResponse();
 
   ASSERT_TRUE(response_->waitForEndStream());
   EXPECT_TRUE(response_->complete());
   EXPECT_EQ("204", response_->headers().Status()->value().getStringView());
 }
 
+// Test exceeding the async client buffer limit.
+TEST_P(ExtAuthzHttpIntegrationTest, ErrorReponseWithDefultBufferLimit) {
+  initializeConfig(false, /*failure_mode_allow=*/false);
+  config_helper_.addRuntimeOverride("http.async_response_buffer_limit", "1024");
+
+  HttpIntegrationTest::initialize();
+  initiateClientConnection();
+  waitForExtAuthzRequest();
+
+  Http::TestResponseHeaderMapImpl response_headers{
+      {":status", "200"},
+      {"baz", "baz"},
+      {"bat", "bar"},
+      {"x-append-bat", "append-foo"},
+      {"x-append-bat", "append-bar"},
+      {"x-envoy-auth-headers-to-remove", "remove-me"},
+  };
+  ext_authz_request_->encodeHeaders(response_headers, false);
+  ext_authz_request_->encodeData(2048, true);
+
+  ASSERT_TRUE(response_->waitForEndStream());
+  EXPECT_TRUE(response_->complete());
+  // A forbidden response since the onFailure is called due to the async client buffer limit.
+  EXPECT_EQ("403", response_->headers().Status()->value().getStringView());
+}
+
 // (uses new config for allowed_headers).
 TEST_P(ExtAuthzHttpIntegrationTest, RedirectResponse) {
   config_helper_.addConfigModifier(
@@ -1063,6 +1097,7 @@ TEST_P(ExtAuthzHttpIntegrationTest, RedirectResponse) {
   HttpIntegrationTest::initialize();
   initiateClientConnection();
   waitForExtAuthzRequest();
+  sendExtAuthzResponse();
 
   ASSERT_TRUE(response_->waitForEndStream());
   EXPECT_TRUE(response_->complete());