Skip to content

Commit

Permalink
router: fix ignored metadata in shadowed requests (#38092)
Browse files Browse the repository at this point in the history
The idea here is to fix
#36542. In order to do that,
this PR just passes over the metadata criteria to the new
shadowed/duplicated request via the `StreamOptions` builder, which is
then injected in the `NullRouteImpl` used by the new async client.


---------

Signed-off-by: Gustavo <[email protected]>
  • Loading branch information
grnmeira authored Feb 14, 2025
1 parent a5f5b1c commit e9398d3
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 29 deletions.
1 change: 0 additions & 1 deletion envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ class AsyncClient {
sampled_ = sampled;
return *this;
}

StreamOptions& setSidestreamWatermarkCallbacks(SidestreamWatermarkCallbacks* callbacks) {
sidestream_watermark_callbacks = callbacks;
return *this;
Expand Down
17 changes: 12 additions & 5 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,28 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
if (!creation_status.ok()) {
return;
}

const Router::MetadataMatchCriteria* metadata_matching_criteria = nullptr;
if (options.parent_context.stream_info != nullptr) {
stream_info_.setParentStreamInfo(*options.parent_context.stream_info);
metadata_matching_criteria =
options.parent_context.stream_info->route()
? options.parent_context.stream_info->route()->routeEntry()->metadataMatchCriteria()
: nullptr;
}

auto route_or_error = NullRouteImpl::create(
parent_.cluster_->name(),
retry_policy_ != nullptr ? *retry_policy_ : *options.parsed_retry_policy,
parent_.factory_context_.regexEngine(), options.timeout, options.hash_policy);
parent_.factory_context_.regexEngine(), options.timeout, options.hash_policy,
metadata_matching_criteria);
SET_AND_RETURN_IF_NOT_OK(route_or_error.status(), creation_status);
route_ = std::move(*route_or_error);
stream_info_.dynamicMetadata().MergeFrom(options.metadata);
stream_info_.setIsShadow(options.is_shadow);
stream_info_.setUpstreamClusterInfo(parent_.cluster_);
stream_info_.route_ = route_;

if (options.parent_context.stream_info != nullptr) {
stream_info_.setParentStreamInfo(*options.parent_context.stream_info);
}

if (options.buffer_body_for_retry) {
buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
}
Expand Down
33 changes: 21 additions & 12 deletions source/common/http/null_route_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ struct RouteEntryImpl : public Router::RouteEntry {
create(const std::string& cluster_name, const absl::optional<std::chrono::milliseconds>& timeout,
const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>&
hash_policy,
const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine) {
const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine,
const Router::MetadataMatchCriteria* metadata_match) {
absl::Status creation_status = absl::OkStatus();
auto ret = std::unique_ptr<RouteEntryImpl>(new RouteEntryImpl(
cluster_name, timeout, hash_policy, retry_policy, regex_engine, creation_status));
auto ret = std::unique_ptr<RouteEntryImpl>(
new RouteEntryImpl(cluster_name, timeout, hash_policy, retry_policy, regex_engine,
creation_status, metadata_match));
RETURN_IF_NOT_OK(creation_status);
return ret;
}
Expand All @@ -109,8 +111,9 @@ struct RouteEntryImpl : public Router::RouteEntry {
const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>&
hash_policy,
const Router::RetryPolicy& retry_policy, Regex::Engine& regex_engine,
absl::Status& creation_status)
: retry_policy_(retry_policy), cluster_name_(cluster_name), timeout_(timeout) {
absl::Status& creation_status, const Router::MetadataMatchCriteria* metadata_match)
: metadata_match_(metadata_match), retry_policy_(retry_policy), cluster_name_(cluster_name),
timeout_(timeout) {
if (!hash_policy.empty()) {
auto policy_or_error = HashPolicyImpl::create(hash_policy, regex_engine);
SET_AND_RETURN_IF_NOT_OK(policy_or_error.status(), creation_status);
Expand Down Expand Up @@ -148,7 +151,9 @@ struct RouteEntryImpl : public Router::RouteEntry {
}
const HashPolicy* hashPolicy() const override { return hash_policy_.get(); }
const Router::HedgePolicy& hedgePolicy() const override { return hedge_policy_; }
const Router::MetadataMatchCriteria* metadataMatchCriteria() const override { return nullptr; }
const Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
return metadata_match_;
}
Upstream::ResourcePriority priority() const override {
return Upstream::ResourcePriority::Default;
}
Expand Down Expand Up @@ -207,6 +212,7 @@ struct RouteEntryImpl : public Router::RouteEntry {
const Router::RouteEntry::UpgradeMap& upgradeMap() const override { return upgrade_map_; }
const Router::EarlyDataPolicy& earlyDataPolicy() const override { return *early_data_policy_; }

const Router::MetadataMatchCriteria* metadata_match_;
std::unique_ptr<const HashPolicyImpl> hash_policy_;
const Router::RetryPolicy& retry_policy_;

Expand All @@ -233,10 +239,12 @@ struct NullRouteImpl : public Router::Route {
create(const std::string cluster_name, const Router::RetryPolicy& retry_policy,
Regex::Engine& regex_engine, const absl::optional<std::chrono::milliseconds>& timeout = {},
const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>&
hash_policy = {}) {
hash_policy = {},
const Router::MetadataMatchCriteria* metadata_match = nullptr) {
absl::Status creation_status;
auto ret = std::unique_ptr<NullRouteImpl>(new NullRouteImpl(
cluster_name, retry_policy, regex_engine, timeout, hash_policy, creation_status));
auto ret = std::unique_ptr<NullRouteImpl>(new NullRouteImpl(cluster_name, retry_policy,
regex_engine, timeout, hash_policy,
creation_status, metadata_match));
RETURN_IF_NOT_OK(creation_status);
return ret;
}
Expand Down Expand Up @@ -272,9 +280,10 @@ struct NullRouteImpl : public Router::Route {
const absl::optional<std::chrono::milliseconds>& timeout,
const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>&
hash_policy,
absl::Status& creation_status) {
auto entry_or_error =
RouteEntryImpl::create(cluster_name, timeout, hash_policy, retry_policy, regex_engine);
absl::Status& creation_status,
const Router::MetadataMatchCriteria* metadata_match) {
auto entry_or_error = RouteEntryImpl::create(cluster_name, timeout, hash_policy, retry_policy,
regex_engine, metadata_match);
SET_AND_RETURN_IF_NOT_OK(entry_or_error.status(), creation_status);
route_entry_ = std::move(*entry_or_error);
}
Expand Down
25 changes: 14 additions & 11 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ Http::FilterHeadersStatus Filter::continueDecodeHeaders(
continue;
}
auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
auto options =
const auto options =
Http::AsyncClient::RequestOptions()
.setTimeout(timeout_.global_timeout_)
.setParentSpan(callbacks_->activeSpan())
Expand All @@ -869,8 +869,9 @@ Http::FilterHeadersStatus Filter::continueDecodeHeaders(
// A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
// because a buffer limit of zero on async clients is interpreted as no buffer limit.
.setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_)
.setDiscardResponseBody(true);
options.setFilterConfig(config_);
.setDiscardResponseBody(true)
.setFilterConfig(config_)
.setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()});
if (end_stream) {
// This is a header-only request, and can be dispatched immediately to the shadow
// without waiting.
Expand Down Expand Up @@ -1139,14 +1140,16 @@ void Filter::maybeDoShadowing() {
if (shadow_trailers_) {
request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
}
auto options = Http::AsyncClient::RequestOptions()
.setTimeout(timeout_.global_timeout_)
.setParentSpan(callbacks_->activeSpan())
.setChildSpanName("mirror")
.setSampled(shadow_policy.traceSampled())
.setIsShadow(true)
.setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend());
options.setFilterConfig(config_);
const auto options =
Http::AsyncClient::RequestOptions()
.setTimeout(timeout_.global_timeout_)
.setParentSpan(callbacks_->activeSpan())
.setChildSpanName("mirror")
.setSampled(shadow_policy.traceSampled())
.setIsShadow(true)
.setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend())
.setFilterConfig(config_)
.setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()});
config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
options);
}
Expand Down
48 changes: 48 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4959,6 +4959,54 @@ TEST_P(RouterShadowingTest, ShadowCallbacksNotCalledInDestructor) {
EXPECT_CALL(foo_request, cancel()).Times(0);
}

TEST_P(RouterShadowingTest, ShadowRequestCarriesParentContext) {
ShadowPolicyPtr policy = makeShadowPolicy("foo", "", "bar");
callbacks_.route_->route_entry_.shadow_policies_.push_back(policy);
ON_CALL(callbacks_, streamId()).WillByDefault(Return(43));

NiceMock<Http::MockRequestEncoder> encoder;
Http::ResponseDecoder* response_decoder = nullptr;
expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10);

EXPECT_CALL(
runtime_.snapshot_,
featureEnabled("bar", testing::Matcher<const envoy::type::v3::FractionalPercent&>(Percent(0)),
43))
.WillOnce(Return(true));
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
NiceMock<Http::MockAsyncClient> foo_client;
NiceMock<Http::MockAsyncClientOngoingRequest> foo_request(&foo_client);

if (streaming_shadow_) {
EXPECT_CALL(*shadow_writer_, streamingShadow_("foo", _, _))
.WillOnce(Invoke([&](const std::string&, Http::RequestHeaderMapPtr&,
const Http::AsyncClient::RequestOptions& options) {
EXPECT_NE(options.parent_context.stream_info, nullptr);
EXPECT_EQ(callbacks_.streamInfo().route(), options.parent_context.stream_info->route());
return &foo_request;
}));
} else {
EXPECT_CALL(*shadow_writer_, shadow_("foo", _, _))
.WillOnce(Invoke([&](const std::string&, Http::RequestMessagePtr&,
const Http::AsyncClient::RequestOptions& options) {
EXPECT_NE(options.parent_context.stream_info, nullptr);
EXPECT_EQ(callbacks_.streamInfo().route(), options.parent_context.stream_info->route());
return &foo_request;
}));
}

const auto should_end_stream = !streaming_shadow_;
router_->decodeHeaders(headers, should_end_stream);

if (streaming_shadow_) {
EXPECT_CALL(foo_request, removeWatermarkCallbacks());
EXPECT_CALL(foo_request, cancel());
}

router_->onDestroy();
}

TEST_F(RouterTest, AltStatName) {
// Also test no upstream timeout here.
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ envoy_cc_test(
":http_integration_lib",
":integration_lib",
":socket_interface_swap_lib",
"//source/extensions/load_balancing_policies/subset:config",
"//test/integration/filters:add_header_filter_config_lib",
"//test/integration/filters:encoder_decoder_buffer_filter_lib",
"//test/integration/filters:on_local_reply_filter_config_lib",
Expand Down
42 changes: 42 additions & 0 deletions test/integration/shadow_policy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -993,5 +993,47 @@ TEST_P(ShadowPolicyIntegrationTest, ShadowedClusterHostHeaderDisabledAppendSuffi
EXPECT_EQ(mirror_headers_->Host()->value().getStringView(), "sni.lyft.com");
}

TEST_P(ShadowPolicyIntegrationTest, ShadowedRequestMetadataLoadbalancing) {
initialConfigSetup("cluster_1", "");
config_helper_.addConfigModifier(
[](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) -> void {
auto* metadata_match = hcm.mutable_route_config()
->mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->mutable_metadata_match();
TestUtility::loadFromYaml(R"EOF(
filterMetadata:
envoy.lb:
stack: "default"
)EOF",
*metadata_match);
});
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
auto clusters = bootstrap.mutable_static_resources()->mutable_clusters();
for (auto& cluster : *clusters) {
TestUtility::loadFromYaml(R"EOF(
subsetSelectors:
- keys:
- "stack"
)EOF",
*cluster.mutable_lb_subset_config());

auto lb_endpoint =
cluster.mutable_load_assignment()->mutable_endpoints(0)->mutable_lb_endpoints(0);

TestUtility::loadFromYaml(R"EOF(
filterMetadata:
envoy.lb:
stack: "default"
)EOF",
*lb_endpoint->mutable_metadata());
}
});
initialize();
sendRequestAndValidateResponse();
}

} // namespace
} // namespace Envoy

0 comments on commit e9398d3

Please sign in to comment.