Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
weighted_target and xds_cluster_manager: don't update picker while up…
Browse files Browse the repository at this point in the history
…date is in flight (grpc#29313)

* weighted_target and xds_cluster_manager: don't update picker while update is in flight

* include deactivation and new child creation

* add test
  • Loading branch information
markdroth authored Apr 6, 2022
1 parent 8174203 commit 96c19e8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {

// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;

// Children.
std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
Expand Down Expand Up @@ -281,6 +282,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
}
update_in_progress_ = true;
// Update config.
config_ = std::move(args.config);
// Deactivate the targets not in the new config.
Expand All @@ -291,38 +293,37 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
child->DeactivateLocked();
}
}
// Create any children that don't already exist.
// Note that we add all children before updating any of them, because
// an update may trigger a child to immediately update its
// connectivity state (e.g., reporting TRANSIENT_FAILURE immediately when
// receiving an empty address list), and we don't want to return an
// overall state with incomplete data.
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
auto it = targets_.find(name);
if (it == targets_.end()) {
targets_.emplace(name, MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name));
}
}
// Update all children.
absl::StatusOr<HierarchicalAddressMap> address_map =
MakeHierarchicalAddressMap(args.addresses);
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second;
auto& target = targets_[name];
// Create child if it does not already exist.
if (target == nullptr) {
target = MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
absl::StatusOr<ServerAddressList> addresses;
if (address_map.ok()) {
addresses = std::move((*address_map)[name]);
} else {
addresses = address_map.status();
}
targets_[name]->UpdateLocked(config, std::move(addresses), args.args);
target->UpdateLocked(config, std::move(addresses), args.args);
}
update_in_progress_ = false;
UpdateStateLocked();
}

void WeightedTargetLb::UpdateStateLocked() {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead return a new picker once the update has been seen by
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] scanning children to determine "
Expand Down Expand Up @@ -356,6 +357,7 @@ void WeightedTargetLb::UpdateStateLocked() {
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
GPR_ASSERT(child->weight() > 0);
end += child->weight();
picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {

// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;

// Children.
std::map<std::string, OrphanablePtr<ClusterChild>> children_;
Expand Down Expand Up @@ -254,6 +255,7 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
}
update_in_progress_ = true;
// Update config.
config_ = std::move(args.config);
// Deactivate the children not in the new config.
Expand All @@ -268,19 +270,24 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
for (const auto& p : config_->cluster_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = children_.find(name);
if (it == children_.end()) {
it = children_
.emplace(name, MakeOrphanable<ClusterChild>(
Ref(DEBUG_LOCATION, "ClusterChild"), name))
.first;
auto& child = children_[name];
if (child == nullptr) {
child = MakeOrphanable<ClusterChild>(Ref(DEBUG_LOCATION, "ClusterChild"),
name);
}
it->second->UpdateLocked(config, args.addresses, args.args);
child->UpdateLocked(config, args.addresses, args.args);
}
update_in_progress_ = false;
UpdateStateLocked();
}

void XdsClusterManagerLb::UpdateStateLocked() {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead return a new picker once the update has been seen by
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
// Also count the number of children in each state, to determine the
// overall state.
size_t num_ready = 0;
Expand Down
28 changes: 28 additions & 0 deletions test/cpp/end2end/xds/xds_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11860,6 +11860,34 @@ TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
delayed_resource_setter.join();
}

TEST_P(LocalityMapTest, ConsistentWeightedTargetUpdates) {
CreateAndStartBackends(4);
// Initial update has two localities.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(1, 2)},
{"locality1", CreateEndpointsForBackends(2, 3)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(1, 3);
// Next update removes locality1.
// Also add backend 0 to locality0, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(0);
// Next update re-adds locality1.
// Also add backend 3 to locality1, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(3);
}

class FailoverTest : public XdsEnd2endTest {
public:
void SetUp() override {
Expand Down

0 comments on commit 96c19e8

Please sign in to comment.