Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
Reland x2: Make GetDefaultEventEngine return a shared_ptr (grpc#30619)
Browse files Browse the repository at this point in the history
* Reland x2: Make GetDefaultEventEngine return a shared_ptr

* remove thread leak from NativeDNSResolver

This is not going to work for resolvers that support cancellation.

* give resolvers bounded lifetimes

Some resolver own EventEngines. EventEngines cannot run off the end of
the process since they have unjoined threads (problematic in a small set
of environments). This gives resolvers bounded lifetimes, and allows
replacement of resolvers without ASAN issues of deleting resolvers in
active use (occurs in tests).

* fix

* fix windows

* fix surface init test

* fix

* sanitize

* use after move

* the test must wait for the callback to be destroyed

* windows fix: delete the resolver on iomgr shutdown, not before

* Make TimerManager threads non-joinable

On gRPC shutdown, any unjoined TimerManager threads will cause TSAN to
detect thread leaks. This fix resolves issues I saw in end2end test
shutdown in another PR, where a single timer manager thread was always
alive after the test ended.

The long-term solution is to integrate the new ThreadPool here, but this
unblocks me for now.

* backport fix

* fix

* shared_ptr<EventEngine> in EventEngine benchmarks
  • Loading branch information
drfloob authored Sep 28, 2022
1 parent ecd7e14 commit 2ee2c91
Show file tree
Hide file tree
Showing 56 changed files with 680 additions and 246 deletions.
8 changes: 8 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,7 @@ grpc_cc_library(
external_deps = ["absl/status"],
deps = [
"activity",
"context",
"default_event_engine",
"event_engine_base_hdrs",
"exec_ctx",
Expand Down Expand Up @@ -1665,6 +1666,7 @@ grpc_cc_library(
"absl/status",
"absl/types:optional",
"absl/types:variant",
"absl/utility",
],
language = "c++",
public_hdrs = [
Expand Down Expand Up @@ -2985,9 +2987,13 @@ grpc_cc_library(
],
external_deps = ["absl/functional:any_invocable"],
deps = [
"context",
"default_event_engine_factory",
"event_engine_base_hdrs",
"event_engine_trace",
"gpr",
"grpc_trace",
"no_destruct",
],
)

Expand Down Expand Up @@ -3416,6 +3422,7 @@ grpc_cc_library(
"json",
"latch",
"memory_quota",
"no_destruct",
"notification",
"orphanable",
"packed_table",
Expand Down Expand Up @@ -4066,6 +4073,7 @@ grpc_cc_library(
"closure",
"config",
"debug_location",
"default_event_engine",
"exec_ctx",
"exec_ctx_wakeup_scheduler",
"gpr",
Expand Down
37 changes: 37 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions include/grpc/event_engine/event_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,24 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {

/// Replace gRPC's default EventEngine factory.
///
/// Applications may call \a SetDefaultEventEngineFactory at any time to replace
/// the default factory used within gRPC. EventEngines will be created when
/// necessary, when they are otherwise not provided by the application.
/// Applications may call \a SetEventEngineFactory time to replace the default
/// factory used within gRPC. EventEngines will be created when necessary, when
/// they are otherwise not provided by the application.
///
/// To be certain that none of the gRPC-provided built-in EventEngines are
/// created, applications must set a custom EventEngine factory method *before*
/// grpc is initialized.
void SetDefaultEventEngineFactory(
void SetEventEngineFactory(
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);

/// Revert to using gRPC's default EventEngine factory.
///
/// Applications that have called \a SetEventEngineFactory can unregister their
/// custom factory, reverting to use gRPC's built-in default EventEngines. This
/// has no effect on any EventEngines that were already created using the custom
/// factory.
void RevertToDefaultEventEngineFactory();

/// Create an EventEngine using the default factory.
std::unique_ptr<EventEngine> CreateEventEngine();

Expand Down
42 changes: 29 additions & 13 deletions src/core/ext/filters/channel_idle/channel_idle_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "absl/types/optional.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>

Expand All @@ -34,6 +35,7 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
Expand All @@ -52,6 +54,9 @@
namespace grpc_core {

namespace {

using ::grpc_event_engine::experimental::EventEngine;

// TODO(ctiller): The idle filter was disabled in client channel by default
// due to b/143502997. Now the bug is fixed enable the filter by default.
const auto kDefaultIdleTimeout = Duration::Infinity();
Expand Down Expand Up @@ -119,15 +124,19 @@ struct MaxAgeFilter::Config {

absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
// TODO(hork): pull EventEngine from args
ClientIdleFilter filter(
filter_args.channel_stack(), GetClientIdleTimeout(args),
grpc_event_engine::experimental::GetDefaultEventEngine());
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}

absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
// TODO(hork): pull EventEngine from args
MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args));
Config::FromChannelArgs(args),
grpc_event_engine::experimental::GetDefaultEventEngine());
return absl::StatusOr<MaxAgeFilter>(std::move(filter));
}

Expand Down Expand Up @@ -194,12 +203,14 @@ void MaxAgeFilter::PostInit() {
[this] {
return Sleep(Timestamp::Now() + max_connection_age_grace_);
}),
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) {
ExecCtxWakeupScheduler(),
[channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed
// successfully.
// (if it did not, it was cancelled)
if (status.ok()) CloseChannel();
}));
},
engine_.get()));
}
}

Expand Down Expand Up @@ -255,10 +266,12 @@ void ChannelIdleFilter::StartIdleTimer() {
}
});
});
activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
}));
activity_.Set(MakeActivity(
std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
},
engine_.get()));
}

void ChannelIdleFilter::CloseChannel() {
Expand Down Expand Up @@ -300,10 +313,13 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
});
}

MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
const Config& max_age_config)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
MaxAgeFilter::MaxAgeFilter(
grpc_channel_stack* channel_stack, const Config& max_age_config,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle,
engine),
max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
max_connection_age_grace_(max_age_config.max_connection_age_grace),
engine_(engine) {}

} // namespace grpc_core
15 changes: 11 additions & 4 deletions src/core/ext/filters/channel_idle/channel_idle_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>

#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
Expand Down Expand Up @@ -58,10 +59,12 @@ class ChannelIdleFilter : public ChannelFilter {
using SingleSetActivityPtr =
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>;

ChannelIdleFilter(grpc_channel_stack* channel_stack,
Duration client_idle_timeout)
ChannelIdleFilter(
grpc_channel_stack* channel_stack, Duration client_idle_timeout,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
: channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout) {}
client_idle_timeout_(client_idle_timeout),
engine_(engine) {}

grpc_channel_stack* channel_stack() { return channel_stack_; };

Expand All @@ -87,6 +90,7 @@ class ChannelIdleFilter : public ChannelFilter {
std::make_shared<IdleFilterState>(false)};

SingleSetActivityPtr activity_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};

class ClientIdleFilter final : public ChannelIdleFilter {
Expand Down Expand Up @@ -127,13 +131,16 @@ class MaxAgeFilter final : public ChannelIdleFilter {
MaxAgeFilter* filter_;
};

MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config);
MaxAgeFilter(
grpc_channel_stack* channel_stack, const Config& max_age_config,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);

void Shutdown() override;

SingleSetActivityPtr max_age_activity_;
Duration max_connection_age_;
Duration max_connection_age_grace_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};

} // namespace grpc_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace {

using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;

constexpr absl::string_view kGrpclb = "grpclb";

Expand Down Expand Up @@ -304,6 +303,7 @@ class GrpcLb : public LoadBalancingPolicy {
bool client_load_report_is_due_ = false;
// The closure used for the completion of sending the load report.
grpc_closure client_load_report_done_closure_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};

class SubchannelWrapper : public DelegatingSubchannel {
Expand Down Expand Up @@ -887,7 +887,8 @@ GrpcLb::BalancerCallState::BalancerCallState(
: InternallyRefCounted<BalancerCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
: nullptr),
grpclb_policy_(std::move(parent_grpclb_policy)) {
grpclb_policy_(std::move(parent_grpclb_policy)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
GPR_ASSERT(grpclb_policy_ != nullptr);
GPR_ASSERT(!grpclb_policy()->shutting_down_);
// Init the LB call. Note that the LB call will progress every time there's
Expand Down Expand Up @@ -945,7 +946,7 @@ void GrpcLb::BalancerCallState::Orphan() {
// call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_);
if (client_load_report_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) {
engine_->Cancel(client_load_report_handle_.value())) {
Unref(DEBUG_LOCATION, "client_load_report cancelled");
}
// Note that the initial ref is hold by lb_on_balancer_status_received_
Expand Down Expand Up @@ -1031,7 +1032,7 @@ void GrpcLb::BalancerCallState::StartQuery() {

void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_handle_ =
GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] {
engine_->RunAfter(client_stats_report_interval_, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
grpclb_policy()->work_serializer()->Run(
Expand Down
Loading

0 comments on commit 2ee2c91

Please sign in to comment.