From e9398d3edd946fdba9680ec6628fb466762a7e7d Mon Sep 17 00:00:00 2001 From: Gustavo Meira Date: Fri, 14 Feb 2025 15:08:02 +0000 Subject: [PATCH] router: fix ignored metadata in shadowed requests (#38092) The idea here is to fix https://github.com/envoyproxy/envoy/issues/36542. In order to do that, this PR just passes over the metadata criteria to the new shadowed/duplicated request via the `StreamOptions` builder, which is then injected in the `NullRouteImpl` used by the new async client. --------- Signed-off-by: Gustavo --- envoy/http/async_client.h | 1 - source/common/http/async_client_impl.cc | 17 +++++-- source/common/http/null_route_impl.h | 33 ++++++++----- source/common/router/router.cc | 25 +++++----- test/common/router/router_test.cc | 48 +++++++++++++++++++ test/integration/BUILD | 1 + .../shadow_policy_integration_test.cc | 42 ++++++++++++++++ 7 files changed, 138 insertions(+), 29 deletions(-) diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h index d0659f255392..c0b565395ccc 100644 --- a/envoy/http/async_client.h +++ b/envoy/http/async_client.h @@ -379,7 +379,6 @@ class AsyncClient { sampled_ = sampled; return *this; } - StreamOptions& setSidestreamWatermarkCallbacks(SidestreamWatermarkCallbacks* callbacks) { sidestream_watermark_callbacks = callbacks; return *this; diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 762138d6325b..69aca0d5289a 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -131,10 +131,21 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal if (!creation_status.ok()) { return; } + + const Router::MetadataMatchCriteria* metadata_matching_criteria = nullptr; + if (options.parent_context.stream_info != nullptr) { + stream_info_.setParentStreamInfo(*options.parent_context.stream_info); + metadata_matching_criteria = + options.parent_context.stream_info->route() + ? options.parent_context.stream_info->route()->routeEntry()->metadataMatchCriteria() + : nullptr; + } + auto route_or_error = NullRouteImpl::create( parent_.cluster_->name(), retry_policy_ != nullptr ? *retry_policy_ : *options.parsed_retry_policy, - parent_.factory_context_.regexEngine(), options.timeout, options.hash_policy); + parent_.factory_context_.regexEngine(), options.timeout, options.hash_policy, + metadata_matching_criteria); SET_AND_RETURN_IF_NOT_OK(route_or_error.status(), creation_status); route_ = std::move(*route_or_error); stream_info_.dynamicMetadata().MergeFrom(options.metadata); @@ -142,10 +153,6 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal stream_info_.setUpstreamClusterInfo(parent_.cluster_); stream_info_.route_ = route_; - if (options.parent_context.stream_info != nullptr) { - stream_info_.setParentStreamInfo(*options.parent_context.stream_info); - } - if (options.buffer_body_for_retry) { buffered_body_ = std::make_unique(account_); } diff --git a/source/common/http/null_route_impl.h b/source/common/http/null_route_impl.h index e57fb460b5b4..a7d1df732c09 100644 --- a/source/common/http/null_route_impl.h +++ b/source/common/http/null_route_impl.h @@ -95,10 +95,12 @@ struct RouteEntryImpl : public Router::RouteEntry { create(const std::string& cluster_name, const absl::optional& timeout, const Protobuf::RepeatedPtrField& hash_policy, - const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine) { + const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine, + const Router::MetadataMatchCriteria* metadata_match) { absl::Status creation_status = absl::OkStatus(); - auto ret = std::unique_ptr(new RouteEntryImpl( - cluster_name, timeout, hash_policy, retry_policy, regex_engine, creation_status)); + auto ret = std::unique_ptr( + new RouteEntryImpl(cluster_name, timeout, hash_policy, retry_policy, regex_engine, + creation_status, metadata_match)); RETURN_IF_NOT_OK(creation_status); return ret; } @@ -109,8 +111,9 @@ struct RouteEntryImpl : public Router::RouteEntry { const Protobuf::RepeatedPtrField& hash_policy, const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine, - absl::Status& creation_status) - : retry_policy_(retry_policy), cluster_name_(cluster_name), timeout_(timeout) { + absl::Status& creation_status, const Router::MetadataMatchCriteria* metadata_match) + : metadata_match_(metadata_match), retry_policy_(retry_policy), cluster_name_(cluster_name), + timeout_(timeout) { if (!hash_policy.empty()) { auto policy_or_error = HashPolicyImpl::create(hash_policy, regex_engine); SET_AND_RETURN_IF_NOT_OK(policy_or_error.status(), creation_status); @@ -148,7 +151,9 @@ struct RouteEntryImpl : public Router::RouteEntry { } const HashPolicy* hashPolicy() const override { return hash_policy_.get(); } const Router::HedgePolicy& hedgePolicy() const override { return hedge_policy_; } - const Router::MetadataMatchCriteria* metadataMatchCriteria() const override { return nullptr; } + const Router::MetadataMatchCriteria* metadataMatchCriteria() const override { + return metadata_match_; + } Upstream::ResourcePriority priority() const override { return Upstream::ResourcePriority::Default; } @@ -207,6 +212,7 @@ struct RouteEntryImpl : public Router::RouteEntry { const Router::RouteEntry::UpgradeMap& upgradeMap() const override { return upgrade_map_; } const Router::EarlyDataPolicy& earlyDataPolicy() const override { return *early_data_policy_; } + const Router::MetadataMatchCriteria* metadata_match_; std::unique_ptr hash_policy_; const Router::RetryPolicy& retry_policy_; @@ -233,10 +239,12 @@ struct NullRouteImpl : public Router::Route { create(const std::string cluster_name, const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine, const absl::optional& timeout = {}, const Protobuf::RepeatedPtrField& - hash_policy = {}) { + hash_policy = {}, + const Router::MetadataMatchCriteria* metadata_match = nullptr) { absl::Status creation_status; - auto ret = std::unique_ptr(new NullRouteImpl( - cluster_name, retry_policy, regex_engine, timeout, hash_policy, creation_status)); + auto ret = std::unique_ptr(new NullRouteImpl(cluster_name, retry_policy, + regex_engine, timeout, hash_policy, + creation_status, metadata_match)); RETURN_IF_NOT_OK(creation_status); return ret; } @@ -272,9 +280,10 @@ struct NullRouteImpl : public Router::Route { const absl::optional& timeout, const Protobuf::RepeatedPtrField& hash_policy, - absl::Status& creation_status) { - auto entry_or_error = - RouteEntryImpl::create(cluster_name, timeout, hash_policy, retry_policy, regex_engine); + absl::Status& creation_status, + const Router::MetadataMatchCriteria* metadata_match) { + auto entry_or_error = RouteEntryImpl::create(cluster_name, timeout, hash_policy, retry_policy, + regex_engine, metadata_match); SET_AND_RETURN_IF_NOT_OK(entry_or_error.status(), creation_status); route_entry_ = std::move(*entry_or_error); } diff --git a/source/common/router/router.cc b/source/common/router/router.cc index b301e412a381..1d28229ce5fc 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -857,7 +857,7 @@ Http::FilterHeadersStatus Filter::continueDecodeHeaders( continue; } auto shadow_headers = Http::createHeaderMap(*shadow_headers_); - auto options = + const auto options = Http::AsyncClient::RequestOptions() .setTimeout(timeout_.global_timeout_) .setParentSpan(callbacks_->activeSpan()) @@ -869,8 +869,9 @@ Http::FilterHeadersStatus Filter::continueDecodeHeaders( // A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0, // because a buffer limit of zero on async clients is interpreted as no buffer limit. .setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_) - .setDiscardResponseBody(true); - options.setFilterConfig(config_); + .setDiscardResponseBody(true) + .setFilterConfig(config_) + .setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()}); if (end_stream) { // This is a header-only request, and can be dispatched immediately to the shadow // without waiting. @@ -1139,14 +1140,16 @@ void Filter::maybeDoShadowing() { if (shadow_trailers_) { request->trailers(Http::createHeaderMap(*shadow_trailers_)); } - auto options = Http::AsyncClient::RequestOptions() - .setTimeout(timeout_.global_timeout_) - .setParentSpan(callbacks_->activeSpan()) - .setChildSpanName("mirror") - .setSampled(shadow_policy.traceSampled()) - .setIsShadow(true) - .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend()); - options.setFilterConfig(config_); + const auto options = + Http::AsyncClient::RequestOptions() + .setTimeout(timeout_.global_timeout_) + .setParentSpan(callbacks_->activeSpan()) + .setChildSpanName("mirror") + .setSampled(shadow_policy.traceSampled()) + .setIsShadow(true) + .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend()) + .setFilterConfig(config_) + .setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()}); config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request), options); } diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 51ab31e2fb37..95c0d288009a 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -4959,6 +4959,54 @@ TEST_P(RouterShadowingTest, ShadowCallbacksNotCalledInDestructor) { EXPECT_CALL(foo_request, cancel()).Times(0); } +TEST_P(RouterShadowingTest, ShadowRequestCarriesParentContext) { + ShadowPolicyPtr policy = makeShadowPolicy("foo", "", "bar"); + callbacks_.route_->route_entry_.shadow_policies_.push_back(policy); + ON_CALL(callbacks_, streamId()).WillByDefault(Return(43)); + + NiceMock encoder; + Http::ResponseDecoder* response_decoder = nullptr; + expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10); + + EXPECT_CALL( + runtime_.snapshot_, + featureEnabled("bar", testing::Matcher(Percent(0)), + 43)) + .WillOnce(Return(true)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + NiceMock foo_client; + NiceMock foo_request(&foo_client); + + if (streaming_shadow_) { + EXPECT_CALL(*shadow_writer_, streamingShadow_("foo", _, _)) + .WillOnce(Invoke([&](const std::string&, Http::RequestHeaderMapPtr&, + const Http::AsyncClient::RequestOptions& options) { + EXPECT_NE(options.parent_context.stream_info, nullptr); + EXPECT_EQ(callbacks_.streamInfo().route(), options.parent_context.stream_info->route()); + return &foo_request; + })); + } else { + EXPECT_CALL(*shadow_writer_, shadow_("foo", _, _)) + .WillOnce(Invoke([&](const std::string&, Http::RequestMessagePtr&, + const Http::AsyncClient::RequestOptions& options) { + EXPECT_NE(options.parent_context.stream_info, nullptr); + EXPECT_EQ(callbacks_.streamInfo().route(), options.parent_context.stream_info->route()); + return &foo_request; + })); + } + + const auto should_end_stream = !streaming_shadow_; + router_->decodeHeaders(headers, should_end_stream); + + if (streaming_shadow_) { + EXPECT_CALL(foo_request, removeWatermarkCallbacks()); + EXPECT_CALL(foo_request, cancel()); + } + + router_->onDestroy(); +} + TEST_F(RouterTest, AltStatName) { // Also test no upstream timeout here. EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) diff --git a/test/integration/BUILD b/test/integration/BUILD index 490f951b0b75..d567bacdb1b6 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -602,6 +602,7 @@ envoy_cc_test( ":http_integration_lib", ":integration_lib", ":socket_interface_swap_lib", + "//source/extensions/load_balancing_policies/subset:config", "//test/integration/filters:add_header_filter_config_lib", "//test/integration/filters:encoder_decoder_buffer_filter_lib", "//test/integration/filters:on_local_reply_filter_config_lib", diff --git a/test/integration/shadow_policy_integration_test.cc b/test/integration/shadow_policy_integration_test.cc index 3bfeb0360499..72b52059f31f 100644 --- a/test/integration/shadow_policy_integration_test.cc +++ b/test/integration/shadow_policy_integration_test.cc @@ -993,5 +993,47 @@ TEST_P(ShadowPolicyIntegrationTest, ShadowedClusterHostHeaderDisabledAppendSuffi EXPECT_EQ(mirror_headers_->Host()->value().getStringView(), "sni.lyft.com"); } +TEST_P(ShadowPolicyIntegrationTest, ShadowedRequestMetadataLoadbalancing) { + initialConfigSetup("cluster_1", ""); + config_helper_.addConfigModifier( + [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { + auto* metadata_match = hcm.mutable_route_config() + ->mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->mutable_metadata_match(); + TestUtility::loadFromYaml(R"EOF( + filterMetadata: + envoy.lb: + stack: "default" + )EOF", + *metadata_match); + }); + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto clusters = bootstrap.mutable_static_resources()->mutable_clusters(); + for (auto& cluster : *clusters) { + TestUtility::loadFromYaml(R"EOF( + subsetSelectors: + - keys: + - "stack" + )EOF", + *cluster.mutable_lb_subset_config()); + + auto lb_endpoint = + cluster.mutable_load_assignment()->mutable_endpoints(0)->mutable_lb_endpoints(0); + + TestUtility::loadFromYaml(R"EOF( + filterMetadata: + envoy.lb: + stack: "default" + )EOF", + *lb_endpoint->mutable_metadata()); + } + }); + initialize(); + sendRequestAndValidateResponse(); +} + } // namespace } // namespace Envoy