From 2ee2c91c92bcd7ca2f79d727de68f59350b1b4ad Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 28 Sep 2022 08:39:56 -0700 Subject: [PATCH] Reland x2: Make GetDefaultEventEngine return a shared_ptr (#30619) * Reland x2: Make GetDefaultEventEngine return a shared_ptr * remove thread leak from NativeDNSResolver This is not going to work for resolvers that support cancellation. * give resolvers bounded lifetimes Some resolver own EventEngines. EventEngines cannot run off the end of the process since they have unjoined threads (problematic in a small set of environments). This gives resolvers bounded lifetimes, and allows replacement of resolvers without ASAN issues of deleting resolvers in active use (occurs in tests). * fix * fix windows * fix surface init test * fix * sanitize * use after move * the test must wait for the callback to be destroyed * windows fix: delete the resolver on iomgr shutdown, not before * Make TimerManager threads non-joinable On gRPC shutdown, any unjoined TimerManager threads will cause TSAN to detect thread leaks. This fix resolves issues I saw in end2end test shutdown in another PR, where a single timer manager thread was always alive after the test ended. The long-term solution is to integrate the new ThreadPool here, but this unblocks me for now. * backport fix * fix * shared_ptr in EventEngine benchmarks --- BUILD | 8 ++ CMakeLists.txt | 37 +++++ build_autogenerated.yaml | 10 ++ include/grpc/event_engine/event_engine.h | 16 ++- .../channel_idle/channel_idle_filter.cc | 42 ++++-- .../channel_idle/channel_idle_filter.h | 15 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 9 +- .../weighted_target/weighted_target.cc | 15 +- .../resolver/dns/c_ares/dns_resolver_ares.cc | 11 +- .../ext/filters/client_channel/subchannel.cc | 8 +- .../ext/filters/client_channel/subchannel.h | 2 + src/core/ext/xds/xds_client.cc | 16 +-- src/core/ext/xds/xds_client.h | 7 + src/core/lib/channel/promise_based_filter.cc | 4 +- src/core/lib/channel/promise_based_filter.h | 13 +- .../lib/event_engine/default_event_engine.cc | 39 ++++-- .../lib/event_engine/default_event_engine.h | 11 +- src/core/lib/http/httpcli.cc | 7 +- src/core/lib/http/httpcli.h | 2 + src/core/lib/iomgr/iomgr_posix.cc | 5 +- src/core/lib/iomgr/iomgr_posix_cfstream.cc | 4 +- src/core/lib/iomgr/iomgr_windows.cc | 4 +- src/core/lib/iomgr/resolve_address.cc | 11 +- src/core/lib/iomgr/resolve_address.h | 7 +- src/core/lib/iomgr/resolve_address_posix.cc | 11 +- src/core/lib/iomgr/resolve_address_posix.h | 3 +- src/core/lib/iomgr/resolve_address_windows.cc | 12 +- src/core/lib/iomgr/resolve_address_windows.h | 6 +- src/core/lib/promise/activity.h | 5 +- src/core/lib/promise/sleep.cc | 17 ++- src/core/lib/promise/sleep.h | 3 +- src/cpp/server/orca/orca_service.cc | 12 +- src/php/bin/run_tests.sh | 4 + .../tests/MemoryLeakTest/ignore_leaks.supp | 13 ++ test/core/channel/channel_args_test.cc | 1 + .../resolvers/dns_resolver_cooldown_test.cc | 25 ++-- test/core/end2end/dualstack_socket_test.cc | 1 + .../end2end/fixtures/http_proxy_fixture.cc | 1 + test/core/end2end/fuzzers/api_fuzzer.cc | 39 +++--- test/core/end2end/goaway_server_test.cc | 29 ++-- test/core/event_engine/BUILD | 13 ++ .../default_engine_methods_test.cc | 129 ++++++++++++++++++ test/core/event_engine/factory_test.cc | 79 +++++++++++ .../event_engine/fuzzing_event_engine/BUILD | 1 + .../fuzzing_event_engine.cc | 46 ++++--- .../fuzzing_event_engine.h | 8 +- .../posix/lock_free_event_test.cc | 13 +- test/core/event_engine/smoke_test.cc | 4 +- test/core/filters/filter_fuzzer.cc | 21 ++- test/core/iomgr/resolve_address_test.cc | 20 +-- test/core/promise/sleep_test.cc | 51 ++++--- test/core/surface/init_test.cc | 13 +- test/core/transport/bdp_estimator_test.cc | 13 +- test/core/util/resolve_localhost_ip46.cc | 1 + .../microbenchmarks/bm_event_engine_run.cc | 5 +- tools/run_tests/generated/tests.json | 24 ++++ 56 files changed, 680 insertions(+), 246 deletions(-) create mode 100644 src/php/tests/MemoryLeakTest/ignore_leaks.supp create mode 100644 test/core/event_engine/default_engine_methods_test.cc create mode 100644 test/core/event_engine/factory_test.cc diff --git a/BUILD b/BUILD index fd19737dcc675..d842a3ff1850d 100644 --- a/BUILD +++ b/BUILD @@ -1423,6 +1423,7 @@ grpc_cc_library( external_deps = ["absl/status"], deps = [ "activity", + "context", "default_event_engine", "event_engine_base_hdrs", "exec_ctx", @@ -1665,6 +1666,7 @@ grpc_cc_library( "absl/status", "absl/types:optional", "absl/types:variant", + "absl/utility", ], language = "c++", public_hdrs = [ @@ -2985,9 +2987,13 @@ grpc_cc_library( ], external_deps = ["absl/functional:any_invocable"], deps = [ + "context", "default_event_engine_factory", "event_engine_base_hdrs", + "event_engine_trace", "gpr", + "grpc_trace", + "no_destruct", ], ) @@ -3416,6 +3422,7 @@ grpc_cc_library( "json", "latch", "memory_quota", + "no_destruct", "notification", "orphanable", "packed_table", @@ -4066,6 +4073,7 @@ grpc_cc_library( "closure", "config", "debug_location", + "default_event_engine", "exec_ctx", "exec_ctx_wakeup_scheduler", "gpr", diff --git a/CMakeLists.txt b/CMakeLists.txt index 30b4d211fdd83..ae053afc655dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -921,6 +921,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx crl_ssl_transport_security_test) endif() + add_dependencies(buildtests_cxx default_engine_methods_test) add_dependencies(buildtests_cxx delegating_channel_test) add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test) add_dependencies(buildtests_cxx dns_resolver_cooldown_test) @@ -8666,6 +8667,41 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(default_engine_methods_test + test/core/event_engine/default_engine_methods_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(default_engine_methods_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(default_engine_methods_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(delegating_channel_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc @@ -9601,6 +9637,7 @@ target_link_libraries(exec_ctx_wakeup_scheduler_test absl::any_invocable absl::type_traits absl::statusor + absl::utility gpr upb ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 869c9eba8e16a..71d60c38fe1d5 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5330,6 +5330,15 @@ targets: - linux - posix - mac +- name: default_engine_methods_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/event_engine/default_engine_methods_test.cc + deps: + - grpc_test_util - name: delegating_channel_test gtest: true build: test @@ -5840,6 +5849,7 @@ targets: - absl/functional:any_invocable - absl/meta:type_traits - absl/status:statusor + - absl/utility:utility - gpr - upb uses_polling: false diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index 3af75bb212b08..304f2cdfa7126 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -427,16 +427,24 @@ class EventEngine : public std::enable_shared_from_this { /// Replace gRPC's default EventEngine factory. /// -/// Applications may call \a SetDefaultEventEngineFactory at any time to replace -/// the default factory used within gRPC. EventEngines will be created when -/// necessary, when they are otherwise not provided by the application. +/// Applications may call \a SetEventEngineFactory time to replace the default +/// factory used within gRPC. EventEngines will be created when necessary, when +/// they are otherwise not provided by the application. /// /// To be certain that none of the gRPC-provided built-in EventEngines are /// created, applications must set a custom EventEngine factory method *before* /// grpc is initialized. -void SetDefaultEventEngineFactory( +void SetEventEngineFactory( absl::AnyInvocable()> factory); +/// Revert to using gRPC's default EventEngine factory. +/// +/// Applications that have called \a SetEventEngineFactory can unregister their +/// custom factory, reverting to use gRPC's built-in default EventEngines. This +/// has no effect on any EventEngines that were already created using the custom +/// factory. +void RevertToDefaultEventEngineFactory(); + /// Create an EventEngine using the default factory. std::unique_ptr CreateEventEngine(); diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index 8faa2e99961ac..7c46c3a9a9030 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -26,6 +26,7 @@ #include "absl/types/optional.h" +#include #include #include @@ -34,6 +35,7 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" @@ -52,6 +54,9 @@ namespace grpc_core { namespace { + +using ::grpc_event_engine::experimental::EventEngine; + // TODO(ctiller): The idle filter was disabled in client channel by default // due to b/143502997. Now the bug is fixed enable the filter by default. const auto kDefaultIdleTimeout = Duration::Infinity(); @@ -119,15 +124,19 @@ struct MaxAgeFilter::Config { absl::StatusOr ClientIdleFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { - ClientIdleFilter filter(filter_args.channel_stack(), - GetClientIdleTimeout(args)); + // TODO(hork): pull EventEngine from args + ClientIdleFilter filter( + filter_args.channel_stack(), GetClientIdleTimeout(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } absl::StatusOr MaxAgeFilter::Create( const ChannelArgs& args, ChannelFilter::Args filter_args) { + // TODO(hork): pull EventEngine from args MaxAgeFilter filter(filter_args.channel_stack(), - Config::FromChannelArgs(args)); + Config::FromChannelArgs(args), + grpc_event_engine::experimental::GetDefaultEventEngine()); return absl::StatusOr(std::move(filter)); } @@ -194,12 +203,14 @@ void MaxAgeFilter::PostInit() { [this] { return Sleep(Timestamp::Now() + max_connection_age_grace_); }), - ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { + ExecCtxWakeupScheduler(), + [channel_stack, this](absl::Status status) { // OnDone -- close the connection if the promise completed // successfully. // (if it did not, it was cancelled) if (status.ok()) CloseChannel(); - })); + }, + engine_.get())); } } @@ -255,10 +266,12 @@ void ChannelIdleFilter::StartIdleTimer() { } }); }); - activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{}, - [channel_stack, this](absl::Status status) { - if (status.ok()) CloseChannel(); - })); + activity_.Set(MakeActivity( + std::move(promise), ExecCtxWakeupScheduler{}, + [channel_stack, this](absl::Status status) { + if (status.ok()) CloseChannel(); + }, + engine_.get())); } void ChannelIdleFilter::CloseChannel() { @@ -300,10 +313,13 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { }); } -MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack, - const Config& max_age_config) - : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), +MaxAgeFilter::MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine) + : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle, + engine), max_connection_age_(max_age_config.max_connection_age), - max_connection_age_grace_(max_age_config.max_connection_age_grace) {} + max_connection_age_grace_(max_age_config.max_connection_age_grace), + engine_(engine) {} } // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h index d926d09f71bcc..bc49a2a0174e0 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.h @@ -22,6 +22,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include #include #include "src/core/ext/filters/channel_idle/idle_filter_state.h" @@ -58,10 +59,12 @@ class ChannelIdleFilter : public ChannelFilter { using SingleSetActivityPtr = SingleSetPtr; - ChannelIdleFilter(grpc_channel_stack* channel_stack, - Duration client_idle_timeout) + ChannelIdleFilter( + grpc_channel_stack* channel_stack, Duration client_idle_timeout, + std::shared_ptr engine) : channel_stack_(channel_stack), - client_idle_timeout_(client_idle_timeout) {} + client_idle_timeout_(client_idle_timeout), + engine_(engine) {} grpc_channel_stack* channel_stack() { return channel_stack_; }; @@ -87,6 +90,7 @@ class ChannelIdleFilter : public ChannelFilter { std::make_shared(false)}; SingleSetActivityPtr activity_; + std::shared_ptr engine_; }; class ClientIdleFilter final : public ChannelIdleFilter { @@ -127,13 +131,16 @@ class MaxAgeFilter final : public ChannelIdleFilter { MaxAgeFilter* filter_; }; - MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config); + MaxAgeFilter( + grpc_channel_stack* channel_stack, const Config& max_age_config, + std::shared_ptr engine); void Shutdown() override; SingleSetActivityPtr max_age_activity_; Duration max_connection_age_; Duration max_connection_age_grace_; + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 770e3396d2c23..2bf8aa88803e8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -160,7 +160,6 @@ const char kGrpcLbAddressAttributeKey[] = "grpclb"; namespace { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr absl::string_view kGrpclb = "grpclb"; @@ -304,6 +303,7 @@ class GrpcLb : public LoadBalancingPolicy { bool client_load_report_is_due_ = false; // The closure used for the completion of sending the load report. grpc_closure client_load_report_done_closure_; + std::shared_ptr engine_; }; class SubchannelWrapper : public DelegatingSubchannel { @@ -887,7 +887,8 @@ GrpcLb::BalancerCallState::BalancerCallState( : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState" : nullptr), - grpclb_policy_(std::move(parent_grpclb_policy)) { + grpclb_policy_(std::move(parent_grpclb_policy)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(!grpclb_policy()->shutting_down_); // Init the LB call. Note that the LB call will progress every time there's @@ -945,7 +946,7 @@ void GrpcLb::BalancerCallState::Orphan() { // call, then the following cancellation will be a no-op. grpc_call_cancel_internal(lb_call_); if (client_load_report_handle_.has_value() && - GetDefaultEventEngine()->Cancel(client_load_report_handle_.value())) { + engine_->Cancel(client_load_report_handle_.value())) { Unref(DEBUG_LOCATION, "client_load_report cancelled"); } // Note that the initial ref is hold by lb_on_balancer_status_received_ @@ -1031,7 +1032,7 @@ void GrpcLb::BalancerCallState::StartQuery() { void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_handle_ = - GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] { + engine_->RunAfter(client_stats_report_interval_, [this] { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; grpclb_policy()->work_serializer()->Run( diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index 7f35acc476e43..39546cd02e855 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -72,7 +72,6 @@ TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); namespace { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; constexpr absl::string_view kWeightedTarget = "weighted_target_experimental"; @@ -212,6 +211,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { RefCountedPtr weighted_child_; absl::optional timer_handle_; + std::shared_ptr engine_; }; // Methods for dealing with the child policy. @@ -479,9 +479,10 @@ void WeightedTargetLb::UpdateStateLocked() { WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( RefCountedPtr weighted_child) - : weighted_child_(std::move(weighted_child)) { - timer_handle_ = GetDefaultEventEngine()->RunAfter( - kChildRetentionInterval, [self = Ref()]() mutable { + : weighted_child_(std::move(weighted_child)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { + timer_handle_ = + engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; self->weighted_child_->weighted_target_policy_->work_serializer()->Run( @@ -499,7 +500,7 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { weighted_child_->weighted_target_policy_.get(), weighted_child_.get(), weighted_child_->name_.c_str()); } - GetDefaultEventEngine()->Cancel(*timer_handle_); + engine_->Cancel(*timer_handle_); } Unref(); } @@ -746,7 +747,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); - } + } // namespace absl::string_view name() const override { return kWeightedTarget; } @@ -763,7 +764,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { return LoadRefCountedFromJson( json, JsonArgs(), "errors validating weighted_target LB policy config"); } -}; +}; // namespace grpc_core } // namespace diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 57f1577706096..458d92fd57bf2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -728,12 +728,6 @@ class AresDNSResolver : public DNSResolver { const std::function)> on_resolved_; }; - // gets the singleton instance, possibly creating it first - static AresDNSResolver* GetOrCreate() { - static AresDNSResolver* instance = new AresDNSResolver(); - return instance; - } - TaskHandle LookupHostname( std::function>)> on_resolved, @@ -811,7 +805,7 @@ class AresDNSResolver : public DNSResolver { } // the previous default DNS resolver, used to delegate blocking DNS calls to - DNSResolver* default_resolver_ = GetDNSResolver(); + std::shared_ptr default_resolver_ = GetDNSResolver(); Mutex mu_; grpc_event_engine::experimental::LookupTaskHandleSet open_requests_ ABSL_GUARDED_BY(mu_); @@ -852,7 +846,8 @@ void grpc_resolver_dns_ares_init() { GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error); return; } - grpc_core::SetDNSResolver(grpc_core::AresDNSResolver::GetOrCreate()); + grpc_core::ResetDNSResolver( + absl::make_unique()); } } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0962dfc95551f..cff10fa686aeb 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -81,7 +81,6 @@ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -624,7 +623,8 @@ Subchannel::Subchannel(SubchannelKey key, args_(args), pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), - backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)) { + backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { // A grpc_init is added here to ensure that grpc_shutdown does not happen // until the subchannel is destroyed. Subchannels can persist longer than // channels because they maybe reused/shared among multiple channels. As a @@ -761,7 +761,7 @@ void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { + engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = Timestamp::Now(); @@ -910,7 +910,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = GetDefaultEventEngine()->RunAfter( + retry_timer_handle_ = engine_->RunAfter( time_until_next_attempt, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index ebe803b675db8..f16511dfa8b41 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "absl/base/thread_annotations.h" @@ -428,6 +429,7 @@ class Subchannel : public DualRefCounted { // Data producer map. std::map data_producer_map_ ABSL_GUARDED_BY(mu_); + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 16244388c08e7..8436464202643 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -58,7 +58,6 @@ namespace grpc_core { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); @@ -195,7 +194,7 @@ class XdsClient::ChannelState::AdsCallState if (state.resource != nullptr) return; // Start timer. ads_calld_ = std::move(ads_calld); - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter( ads_calld_->xds_client()->request_timeout_, [self = Ref(DEBUG_LOCATION, "timer")]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -218,7 +217,7 @@ class XdsClient::ChannelState::AdsCallState // TODO(roth): Find a way to write a test for this case. timer_start_needed_ = false; if (timer_handle_.has_value()) { - GetDefaultEventEngine()->Cancel(*timer_handle_); + ads_calld_->xds_client()->engine()->Cancel(*timer_handle_); timer_handle_.reset(); } } @@ -592,7 +591,7 @@ void XdsClient::ChannelState::RetryableCall::Orphan() { shutting_down_ = true; calld_.reset(); if (timer_handle_.has_value()) { - GetDefaultEventEngine()->Cancel(*timer_handle_); + chand()->xds_client()->engine()->Cancel(*timer_handle_); timer_handle_.reset(); } this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned"); @@ -635,7 +634,7 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { chand()->xds_client(), chand()->server_.server_uri().c_str(), timeout.millis()); } - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = chand()->xds_client()->engine()->RunAfter( timeout, [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -1157,7 +1156,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { if (timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*timer_handle_)) { + xds_client()->engine()->Cancel(*timer_handle_)) { timer_handle_.reset(); Unref(DEBUG_LOCATION, "Orphan"); } @@ -1165,7 +1164,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { void XdsClient::ChannelState::LrsCallState::Reporter:: ScheduleNextReportLocked() { - timer_handle_ = GetDefaultEventEngine()->RunAfter(report_interval_, [this]() { + timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; if (OnNextReportTimer()) { @@ -1436,7 +1435,8 @@ XdsClient::XdsClient(std::unique_ptr bootstrap, transport_factory_(std::move(transport_factory)), request_timeout_(resource_request_timeout), xds_federation_enabled_(XdsFederationEnabled()), - api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_) { + api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 67229be1fca5b..48f178199e8cb 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -32,6 +32,8 @@ #include "absl/strings/string_view.h" #include "upb/def.hpp" +#include + #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" @@ -143,6 +145,10 @@ class XdsClient : public DualRefCounted { // implementation. std::string DumpClientConfigBinary(); + grpc_event_engine::experimental::EventEngine* engine() { + return engine_.get(); + } + private: struct XdsResourceKey { std::string id; @@ -301,6 +307,7 @@ class XdsClient : public DualRefCounted { const bool xds_federation_enabled_; XdsApi api_; WorkSerializer work_serializer_; + std::shared_ptr engine_; Mutex mu_; diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index f9da611c26355..70dc6cccb6d58 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -26,6 +26,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/slice/slice.h" @@ -43,7 +44,8 @@ BaseCallData::BaseCallData(grpc_call_element* elem, arena_(args->arena), call_combiner_(args->call_combiner), deadline_(args->deadline), - context_(args->context) { + context_(args->context), + event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { if (flags & kFilterExaminesServerInitialMetadata) { server_initial_metadata_latch_ = arena_->New>(); } diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 1c1ceb2f5d652..28beb9179c8bf 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -32,6 +33,7 @@ #include "absl/meta/type_traits.h" #include "absl/status/status.h" +#include #include #include @@ -40,6 +42,7 @@ #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" +#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/call_combiner.h" @@ -152,7 +155,9 @@ class BaseCallData : public Activity, private Wakeable { : public promise_detail::Context, public promise_detail::Context, public promise_detail::Context, - public promise_detail::Context { + public promise_detail::Context, + public promise_detail::Context< + grpc_event_engine::experimental::EventEngine> { public: explicit ScopedContext(BaseCallData* call_data) : promise_detail::Context(call_data->arena_), @@ -160,8 +165,9 @@ class BaseCallData : public Activity, private Wakeable { call_data->context_), promise_detail::Context( call_data->pollent_.load(std::memory_order_acquire)), - promise_detail::Context(&call_data->finalization_) { - } + promise_detail::Context(&call_data->finalization_), + promise_detail::Context( + call_data->event_engine_.get()) {} }; class Flusher { @@ -268,6 +274,7 @@ class BaseCallData : public Activity, private Wakeable { grpc_call_context_element* const context_; std::atomic pollent_{nullptr}; Latch* server_initial_metadata_latch_ = nullptr; + std::shared_ptr event_engine_; }; class ClientCallData : public BaseCallData { diff --git a/src/core/lib/event_engine/default_event_engine.cc b/src/core/lib/event_engine/default_event_engine.cc index 70c4eb1717526..a69a255fa6f73 100644 --- a/src/core/lib/event_engine/default_event_engine.cc +++ b/src/core/lib/event_engine/default_event_engine.cc @@ -23,7 +23,11 @@ #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/default_event_engine_factory.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/gprpp/no_destruct.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_event_engine { namespace experimental { @@ -31,14 +35,22 @@ namespace experimental { namespace { std::atomic()>*> g_event_engine_factory{nullptr}; -std::atomic g_event_engine{nullptr}; +grpc_core::NoDestruct g_mu; +grpc_core::NoDestruct> g_event_engine; } // namespace -void SetDefaultEventEngineFactory( +void SetEventEngineFactory( absl::AnyInvocable()> factory) { delete g_event_engine_factory.exchange( new absl::AnyInvocable()>( std::move(factory))); + // Forget any previous EventEngines + grpc_core::MutexLock lock(&*g_mu); + g_event_engine->reset(); +} + +void RevertToDefaultEventEngineFactory() { + delete g_event_engine_factory.exchange(nullptr); } std::unique_ptr CreateEventEngine() { @@ -48,23 +60,22 @@ std::unique_ptr CreateEventEngine() { return DefaultEventEngineFactory(); } -EventEngine* GetDefaultEventEngine() { - EventEngine* engine = g_event_engine.load(std::memory_order_acquire); - if (engine == nullptr) { - auto* created = CreateEventEngine().release(); - if (g_event_engine.compare_exchange_strong(engine, created, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - engine = created; - } else { - delete created; - } +std::shared_ptr GetDefaultEventEngine() { + grpc_core::MutexLock lock(&*g_mu); + if (std::shared_ptr engine = g_event_engine->lock()) { + GRPC_EVENT_ENGINE_TRACE("DefaultEventEngine::%p use_count:%ld", + engine.get(), engine.use_count()); + return engine; } + std::shared_ptr engine{CreateEventEngine()}; + GRPC_EVENT_ENGINE_TRACE("Created DefaultEventEngine::%p", engine.get()); + *g_event_engine = engine; return engine; } void ResetDefaultEventEngine() { - delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel); + grpc_core::MutexLock lock(&*g_mu); + g_event_engine->reset(); } } // namespace experimental diff --git a/src/core/lib/event_engine/default_event_engine.h b/src/core/lib/event_engine/default_event_engine.h index 1a182bd73f3e8..88181ad423e07 100644 --- a/src/core/lib/event_engine/default_event_engine.h +++ b/src/core/lib/event_engine/default_event_engine.h @@ -17,8 +17,17 @@ #include +#include + #include +#include "src/core/lib/promise/context.h" + +namespace grpc_core { +template <> +struct ContextType {}; +} // namespace grpc_core + namespace grpc_event_engine { namespace experimental { @@ -26,7 +35,7 @@ namespace experimental { /// /// The concept of a global EventEngine may go away in a post-iomgr world. /// Strongly consider whether you could use \a CreateEventEngine instead. -EventEngine* GetDefaultEventEngine(); +std::shared_ptr GetDefaultEventEngine(); /// Reset the default event engine void ResetDefaultEventEngine(); diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index a13f8afa922e3..3b844f6e0056f 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -169,7 +169,8 @@ HttpRequest::HttpRequest( resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)), pollent_(pollent), pollset_set_(grpc_pollset_set_create()), - test_only_generate_response_(std::move(test_only_generate_response)) { + test_only_generate_response_(std::move(test_only_generate_response)), + resolver_(GetDNSResolver()) { grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response); grpc_slice_buffer_init(&incoming_); grpc_slice_buffer_init(&outgoing_); @@ -206,7 +207,7 @@ void HttpRequest::Start() { return; } Ref().release(); // ref held by pending DNS resolution - dns_request_handle_ = GetDNSResolver()->LookupHostname( + dns_request_handle_ = resolver_->LookupHostname( absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(), uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_, /*name_server=*/""); @@ -219,7 +220,7 @@ void HttpRequest::Orphan() { cancelled_ = true; // cancel potentially pending DNS resolution. if (dns_request_handle_.has_value() && - GetDNSResolver()->Cancel(dns_request_handle_.value())) { + resolver_->Cancel(dns_request_handle_.value())) { Finish(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "cancelled during DNS resolution")); Unref(); diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index c5a4fbe275bb3..34320851ace4b 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -24,6 +24,7 @@ #include #include +#include #include #include "absl/base/thread_annotations.h" @@ -261,6 +262,7 @@ class HttpRequest : public InternallyRefCounted { grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_); grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus(); + std::shared_ptr resolver_; absl::optional dns_request_handle_ ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle; }; diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 0a862ccad6421..82453649e52cd 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -22,7 +22,6 @@ #ifdef GRPC_POSIX_SOCKET_IOMGR -#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -39,6 +38,8 @@ extern grpc_pollset_vtable grpc_posix_pollset_vtable; extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable; static void iomgr_platform_init(void) { + grpc_core::ResetDNSResolver( + absl::make_unique()); grpc_wakeup_fd_global_init(); grpc_event_engine_init(); grpc_tcp_posix_init(); @@ -50,6 +51,7 @@ static void iomgr_platform_shutdown(void) { grpc_tcp_posix_shutdown(); grpc_event_engine_shutdown(); grpc_wakeup_fd_global_destroy(); + grpc_core::ResetDNSResolver(nullptr); // delete the resolver } static void iomgr_platform_shutdown_background_closure(void) { @@ -79,7 +81,6 @@ void grpc_set_default_iomgr_platform() { grpc_set_timer_impl(&grpc_generic_timer_vtable); grpc_set_pollset_vtable(&grpc_posix_pollset_vtable); grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable); - grpc_core::SetDNSResolver(grpc_core::NativeDNSResolver::GetOrCreate()); grpc_tcp_client_global_init(); grpc_set_iomgr_platform_vtable(&vtable); } diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 90677e516a167..518a268cc8a6b 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -63,6 +63,7 @@ static void apple_iomgr_platform_flush(void) {} static void apple_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); + grpc_core::ResetDNSResolver(nullptr); // delete the resolver } static void apple_iomgr_platform_shutdown_background_closure(void) {} @@ -179,7 +180,8 @@ void grpc_set_default_iomgr_platform() { } grpc_tcp_client_global_init(); grpc_set_timer_impl(&grpc_generic_timer_vtable); - grpc_core::SetDNSResolver(grpc_core::NativeDNSResolver::GetOrCreate()); + grpc_core::ResetDNSResolver( + absl::make_unique()); } bool grpc_iomgr_run_in_background() { diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 87509d8a039bf..d45f2de2812d6 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -61,6 +61,8 @@ static void iomgr_platform_init(void) { grpc_iocp_init(); grpc_pollset_global_init(); grpc_wsa_socket_flags_init(); + grpc_core::ResetDNSResolver( + absl::make_unique()); } static void iomgr_platform_flush(void) { grpc_iocp_flush(); } @@ -69,6 +71,7 @@ static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); grpc_iocp_shutdown(); winsock_shutdown(); + grpc_core::ResetDNSResolver(nullptr); // delete the resolver } static void iomgr_platform_shutdown_background_closure(void) {} @@ -96,7 +99,6 @@ void grpc_set_default_iomgr_platform() { grpc_set_timer_impl(&grpc_generic_timer_vtable); grpc_set_pollset_vtable(&grpc_windows_pollset_vtable); grpc_set_pollset_set_vtable(&grpc_windows_pollset_set_vtable); - grpc_core::SetDNSResolver(grpc_core::NativeDNSResolver::GetOrCreate()); grpc_set_iomgr_platform_vtable(&vtable); } diff --git a/src/core/lib/iomgr/resolve_address.cc b/src/core/lib/iomgr/resolve_address.cc index 363146607e3ee..3c87223eb01df 100644 --- a/src/core/lib/iomgr/resolve_address.cc +++ b/src/core/lib/iomgr/resolve_address.cc @@ -23,19 +23,24 @@ #include #include +#include + +#include "src/core/lib/gprpp/no_destruct.h" namespace grpc_core { const char* kDefaultSecurePort = "https"; namespace { -DNSResolver* g_dns_resolver; +NoDestruct> g_dns_resolver; } constexpr DNSResolver::TaskHandle DNSResolver::kNullHandle; -void SetDNSResolver(DNSResolver* resolver) { g_dns_resolver = resolver; } +void ResetDNSResolver(std::shared_ptr resolver) { + *g_dns_resolver = std::move(resolver); +} -DNSResolver* GetDNSResolver() { return g_dns_resolver; } +std::shared_ptr GetDNSResolver() { return *g_dns_resolver; } std::string DNSResolver::HandleToString(TaskHandle handle) { return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}"); diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index 201f4aa94cfb0..8654f27702df3 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -104,13 +104,12 @@ class DNSResolver { }; // Override the active DNS resolver which should be used for all DNS -// resolution in gRPC. Note this should only be used during library -// initialization or within tests. -void SetDNSResolver(DNSResolver* resolver); +// resolution in gRPC. +void ResetDNSResolver(std::shared_ptr resolver); // Get the singleton DNS resolver instance which should be used for all // DNS resolution in gRPC. -DNSResolver* GetDNSResolver(); +std::shared_ptr GetDNSResolver(); } // namespace grpc_core diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 84210e6d3754b..15e4285d4a534 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -47,8 +47,6 @@ namespace grpc_core { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - class NativeDNSRequest { public: NativeDNSRequest( @@ -81,11 +79,6 @@ class NativeDNSRequest { } // namespace -NativeDNSResolver* NativeDNSResolver::GetOrCreate() { - static NativeDNSResolver* instance = new NativeDNSResolver(); - return instance; -} - DNSResolver::TaskHandle NativeDNSResolver::LookupHostname( std::function>)> on_done, @@ -183,7 +176,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV( absl::string_view /* name */, Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { - GetDefaultEventEngine()->Run([on_resolved] { + grpc_event_engine::experimental::GetDefaultEventEngine()->Run([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -198,7 +191,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT( grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + grpc_event_engine::experimental::GetDefaultEventEngine()->Run([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( diff --git a/src/core/lib/iomgr/resolve_address_posix.h b/src/core/lib/iomgr/resolve_address_posix.h index a07163d144eb8..690915c98540b 100644 --- a/src/core/lib/iomgr/resolve_address_posix.h +++ b/src/core/lib/iomgr/resolve_address_posix.h @@ -29,8 +29,7 @@ namespace grpc_core { // A DNS resolver which uses the native platform's getaddrinfo API. class NativeDNSResolver : public DNSResolver { public: - // Gets the singleton instance, creating it first if it doesn't exist - static NativeDNSResolver* GetOrCreate(); + NativeDNSResolver() = default; TaskHandle LookupHostname( std::function>)> diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index f03d79c77102e..25ec37ee14965 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -50,8 +50,6 @@ namespace grpc_core { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - class NativeDNSRequest { public: NativeDNSRequest( @@ -84,10 +82,8 @@ class NativeDNSRequest { } // namespace -NativeDNSResolver* NativeDNSResolver::GetOrCreate() { - static NativeDNSResolver* instance = new NativeDNSResolver(); - return instance; -} +NativeDNSResolver::NativeDNSResolver() + : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} DNSResolver::TaskHandle NativeDNSResolver::LookupHostname( std::function>)> @@ -167,7 +163,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupSRV( absl::string_view /* name */, Duration /* deadline */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -182,7 +178,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT( grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( diff --git a/src/core/lib/iomgr/resolve_address_windows.h b/src/core/lib/iomgr/resolve_address_windows.h index 901d47331d3d6..6469046483010 100644 --- a/src/core/lib/iomgr/resolve_address_windows.h +++ b/src/core/lib/iomgr/resolve_address_windows.h @@ -29,8 +29,7 @@ namespace grpc_core { // A DNS resolver which uses the native platform's getaddrinfo API. class NativeDNSResolver : public DNSResolver { public: - // Gets the singleton instance, creating it first if it doesn't exist - static NativeDNSResolver* GetOrCreate(); + NativeDNSResolver(); TaskHandle LookupHostname( std::function>)> @@ -57,6 +56,9 @@ class NativeDNSResolver : public DNSResolver { // NativeDNSResolver does not support cancellation. bool Cancel(TaskHandle handle) override; + + private: + std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index ec274a4309561..d89d514419454 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -28,6 +28,7 @@ #include "absl/status/status.h" #include "absl/types/optional.h" #include "absl/types/variant.h" +#include "absl/utility/utility.h" #include @@ -478,8 +479,8 @@ class PromiseActivity final : public FreestandingActivity, // Notification that we're no longer executing - it's ok to destruct the // promise. void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { - GPR_ASSERT(!done_); - done_ = true; + GPR_ASSERT(!absl::exchange(done_, true)); + ScopedContext contexts(this); Destruct(&promise_holder_.promise); } diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc index 44d5651d781e9..44071a452d985 100644 --- a/src/core/lib/promise/sleep.cc +++ b/src/core/lib/promise/sleep.cc @@ -19,16 +19,18 @@ #include #include +#include -#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/poll.h" namespace grpc_core { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::EventEngine; Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {} @@ -52,9 +54,12 @@ Poll Sleep::operator()() { } Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) - : waker_(Activity::current()->MakeOwningWaker()), - timer_handle_(GetDefaultEventEngine()->RunAfter( - deadline - Timestamp::Now(), this)) {} + : waker_(Activity::current()->MakeOwningWaker()) { + auto engine = GetContext(); + GPR_ASSERT(engine != nullptr && + "An EventEngine context is required for Promise Sleep"); + timer_handle_ = engine->RunAfter(deadline - Timestamp::Now(), this); +} void Sleep::ActiveClosure::Run() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -71,7 +76,7 @@ void Sleep::ActiveClosure::Cancel() { // If we cancel correctly then we must own both refs still and can simply // delete without unreffing twice, otherwise try unreffing since this may be // the last owned ref. - if (GetDefaultEventEngine()->Cancel(timer_handle_) || Unref()) { + if (GetContext()->Cancel(timer_handle_) || Unref()) { delete this; } } diff --git a/src/core/lib/promise/sleep.h b/src/core/lib/promise/sleep.h index 892270fcb5b9b..3c74e7face25e 100644 --- a/src/core/lib/promise/sleep.h +++ b/src/core/lib/promise/sleep.h @@ -72,8 +72,7 @@ class Sleep final { Waker waker_; // One ref dropped by Run(), the other by Cancel(). std::atomic refs_{2}; - const grpc_event_engine::experimental::EventEngine::TaskHandle - timer_handle_; + grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_; }; Timestamp deadline_; diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc index 65b99295c987b..82efc04311484 100644 --- a/src/cpp/server/orca/orca_service.cc +++ b/src/cpp/server/orca/orca_service.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -54,7 +55,6 @@ namespace grpc { namespace experimental { using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetDefaultEventEngine; // // OrcaService::Reactor @@ -64,7 +64,9 @@ class OrcaService::Reactor : public ServerWriteReactor, public grpc_core::RefCounted { public: explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer) - : RefCounted("OrcaService::Reactor"), service_(service) { + : RefCounted("OrcaService::Reactor"), + service_(service), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) { // Get slice from request. Slice slice; GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok()); @@ -125,7 +127,7 @@ class OrcaService::Reactor : public ServerWriteReactor, bool MaybeScheduleTimer() { grpc::internal::MutexLock lock(&timer_mu_); if (cancelled_) return false; - timer_handle_ = GetDefaultEventEngine()->RunAfter( + timer_handle_ = engine_->RunAfter( report_interval_, [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); }); return true; @@ -134,8 +136,7 @@ class OrcaService::Reactor : public ServerWriteReactor, bool MaybeCancelTimer() { grpc::internal::MutexLock lock(&timer_mu_); cancelled_ = true; - if (timer_handle_.has_value() && - GetDefaultEventEngine()->Cancel(*timer_handle_)) { + if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) { timer_handle_.reset(); return true; } @@ -159,6 +160,7 @@ class OrcaService::Reactor : public ServerWriteReactor, grpc_core::Duration report_interval_; ByteBuffer response_; + std::shared_ptr engine_; }; // diff --git a/src/php/bin/run_tests.sh b/src/php/bin/run_tests.sh index 6dd66b9fcb191..6b204bc7b612b 100755 --- a/src/php/bin/run_tests.sh +++ b/src/php/bin/run_tests.sh @@ -47,8 +47,12 @@ if [ -x "$(command -v valgrind)" ]; then # TODO(jtattermusch): reenable the test once https://github.com/grpc/grpc/issues/29098 is fixed. if [ "$(uname -m)" != "aarch64" ]; then $(which valgrind) --error-exitcode=10 --leak-check=yes \ + -v \ + --num-callers=30 \ + --suppressions=../tests/MemoryLeakTest/ignore_leaks.supp \ $VALGRIND_UNDEF_VALUE_ERRORS \ $(which php) $extension_dir -d max_execution_time=300 \ ../tests/MemoryLeakTest/MemoryLeakTest.php fi fi + diff --git a/src/php/tests/MemoryLeakTest/ignore_leaks.supp b/src/php/tests/MemoryLeakTest/ignore_leaks.supp new file mode 100644 index 0000000000000..dccd088d90500 --- /dev/null +++ b/src/php/tests/MemoryLeakTest/ignore_leaks.supp @@ -0,0 +1,13 @@ +{ + static Posix NativeDNSResolver + Memcheck:Leak + match-leak-kinds: possible + ... + fun:pthread_create@@GLIBC_2.2.5 + ... + fun:_ZN17grpc_event_engine12experimental21GetDefaultEventEngineEv + fun:_ZN9grpc_core17NativeDNSResolverC1Ev + fun:_ZN9grpc_core17NativeDNSResolver11GetOrCreateEv + ... +} + diff --git a/test/core/channel/channel_args_test.cc b/test/core/channel/channel_args_test.cc index 6110d706513c8..178a72f3c96d8 100644 --- a/test/core/channel/channel_args_test.cc +++ b/test/core/channel/channel_args_test.cc @@ -28,6 +28,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index e578f97d9c8d8..d4fc5084320e0 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include @@ -63,12 +64,12 @@ static struct iomgr_args { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - -grpc_core::DNSResolver* g_default_dns_resolver; - class TestDNSResolver : public grpc_core::DNSResolver { public: + explicit TestDNSResolver( + std::shared_ptr default_resolver) + : default_resolver_(std::move(default_resolver)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} // Wrapper around default resolve_address in order to count the number of // times we incur in a system-level name resolution. TaskHandle LookupHostname( @@ -77,7 +78,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::string_view name, absl::string_view default_port, grpc_core::Duration timeout, grpc_pollset_set* interested_parties, absl::string_view name_server) override { - auto result = g_default_dns_resolver->LookupHostname( + auto result = default_resolver_->LookupHostname( std::move(on_resolved), name, default_port, timeout, interested_parties, name_server); ++g_resolution_count; @@ -105,7 +106,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::StatusOr> LookupHostnameBlocking( absl::string_view name, absl::string_view default_port) override { - return g_default_dns_resolver->LookupHostnameBlocking(name, default_port); + return default_resolver_->LookupHostnameBlocking(name, default_port); } TaskHandle LookupSRV( @@ -114,7 +115,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -129,7 +130,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -140,6 +141,10 @@ class TestDNSResolver : public grpc_core::DNSResolver { // Not cancellable bool Cancel(TaskHandle /*handle*/) override { return false; } + + private: + std::shared_ptr default_resolver_; + std::shared_ptr engine_; }; } // namespace @@ -385,8 +390,8 @@ TEST(DnsResolverCooldownTest, MainTest) { g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares; grpc_dns_lookup_hostname_ares = test_dns_lookup_ares; - g_default_dns_resolver = grpc_core::GetDNSResolver(); - grpc_core::SetDNSResolver(new TestDNSResolver()); + grpc_core::ResetDNSResolver( + absl::make_unique(grpc_core::GetDNSResolver())); test_cooldown(); diff --git a/test/core/end2end/dualstack_socket_test.cc b/test/core/end2end/dualstack_socket_test.cc index 03d48b56d09e1..fea4a7faed0c3 100644 --- a/test/core/end2end/dualstack_socket_test.cc +++ b/test/core/end2end/dualstack_socket_test.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "absl/status/statusor.h" diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 712c66df20bca..ce2f89d4e0daa 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 5f3bd068bbc84..3c5deda61a66a 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -138,6 +138,7 @@ static void finish_resolve(void* arg, grpc_error_handle error) { namespace { +using ::grpc_event_engine::experimental::FuzzingEventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine; class FuzzerDNSResolver : public grpc_core::DNSResolver { @@ -177,11 +178,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { grpc_timer timer_; }; - // Gets the singleton instance, possibly creating it first - static FuzzerDNSResolver* GetOrCreate() { - static FuzzerDNSResolver* instance = new FuzzerDNSResolver(); - return instance; - } + explicit FuzzerDNSResolver(FuzzingEventEngine* engine) : engine_(engine) {} TaskHandle LookupHostname( std::function>)> @@ -206,7 +203,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -221,7 +218,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -232,6 +229,9 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { // FuzzerDNSResolver does not support cancellation. bool Cancel(TaskHandle /*handle*/) override { return false; } + + private: + FuzzingEventEngine* engine_; }; } // namespace @@ -820,21 +820,23 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { gpr_set_log_function(dont_log); } grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); - grpc_event_engine::experimental::SetDefaultEventEngineFactory( + + grpc_event_engine::experimental::SetEventEngineFactory( [actions = msg.event_engine_actions()]() { - return absl::make_unique< - grpc_event_engine::experimental::FuzzingEventEngine>( - grpc_event_engine::experimental::FuzzingEventEngine::Options(), - actions); + return absl::make_unique( + FuzzingEventEngine::Options(), actions); }); - grpc_event_engine::experimental::GetDefaultEventEngine(); + auto engine = + std::dynamic_pointer_cast(GetDefaultEventEngine()); + FuzzingEventEngine::SetGlobalNowImplEngine(engine.get()); grpc_init(); grpc_timer_manager_set_threading(false); { grpc_core::ExecCtx exec_ctx; grpc_core::Executor::SetThreadingAll(false); } - grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate()); + grpc_core::ResetDNSResolver( + absl::make_unique(engine.get())); grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_cancel_ares_request = my_cancel_ares_request; @@ -871,14 +873,10 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { while (action_index < msg.actions_size() || g_channel != nullptr || g_server != nullptr || pending_channel_watches > 0 || pending_pings > 0 || ActiveCall() != nullptr) { - static_cast( - grpc_event_engine::experimental::GetDefaultEventEngine()) - ->Tick(); + engine->Tick(); if (action_index == msg.actions_size()) { - static_cast( - grpc_event_engine::experimental::GetDefaultEventEngine()) - ->FuzzingDone(); + engine->FuzzingDone(); if (g_channel != nullptr) { grpc_channel_destroy(g_channel); g_channel = nullptr; @@ -1225,4 +1223,5 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_resource_quota_unref(g_resource_quota); grpc_shutdown_blocking(); + FuzzingEventEngine::UnsetGlobalNowImplEngine(engine.get()); } diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 996e132bfb0a3..6f3c4bb770ba7 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -82,12 +82,12 @@ static void set_resolve_port(int port) { namespace { -using ::grpc_event_engine::experimental::GetDefaultEventEngine; - -grpc_core::DNSResolver* g_default_dns_resolver; - class TestDNSResolver : public grpc_core::DNSResolver { public: + explicit TestDNSResolver( + std::shared_ptr default_resolver) + : default_resolver_(std::move(default_resolver)), + engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} TaskHandle LookupHostname( std::function>)> on_resolved, @@ -95,9 +95,9 @@ class TestDNSResolver : public grpc_core::DNSResolver { grpc_core::Duration timeout, grpc_pollset_set* interested_parties, absl::string_view name_server) override { if (name != "test") { - return g_default_dns_resolver->LookupHostname( - std::move(on_resolved), name, default_port, timeout, - interested_parties, name_server); + return default_resolver_->LookupHostname(std::move(on_resolved), name, + default_port, timeout, + interested_parties, name_server); } MakeDNSRequest(std::move(on_resolved)); return kNullHandle; @@ -105,7 +105,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::StatusOr> LookupHostnameBlocking( absl::string_view name, absl::string_view default_port) override { - return g_default_dns_resolver->LookupHostnameBlocking(name, default_port); + return default_resolver_->LookupHostnameBlocking(name, default_port); } TaskHandle LookupSRV( @@ -114,7 +114,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { absl::string_view /* name */, grpc_core::Duration /* timeout */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -129,7 +129,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) override { // Not supported - GetDefaultEventEngine()->Run([on_resolved] { + engine_->Run([on_resolved] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -163,6 +163,8 @@ class TestDNSResolver : public grpc_core::DNSResolver { std::move(addrs)); } } + std::shared_ptr default_resolver_; + std::shared_ptr engine_; }; } // namespace @@ -213,9 +215,8 @@ int main(int argc, char** argv) { gpr_mu_init(&g_mu); grpc_init(); - g_default_dns_resolver = grpc_core::GetDNSResolver(); - auto* resolver = new TestDNSResolver(); - grpc_core::SetDNSResolver(resolver); + grpc_core::ResetDNSResolver( + absl::make_unique(grpc_core::GetDNSResolver())); iomgr_dns_lookup_ares = grpc_dns_lookup_hostname_ares; iomgr_cancel_ares_request = grpc_cancel_ares_request; grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; @@ -446,8 +447,6 @@ int main(int argc, char** argv) { grpc_completion_queue_destroy(cq); - grpc_core::SetDNSResolver(g_default_dns_resolver); - delete resolver; grpc_shutdown(); gpr_mu_destroy(&g_mu); diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index ff680bcbe3a5b..89ad17517434a 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -74,6 +74,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "default_engine_methods_test", + srcs = ["default_engine_methods_test.cc"], + external_deps = ["gtest"], + deps = [ + "//:default_event_engine", + "//:event_engine_base_hdrs", + "//:gpr", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "smoke_test", srcs = ["smoke_test.cc"], diff --git a/test/core/event_engine/default_engine_methods_test.cc b/test/core/event_engine/default_engine_methods_test.cc new file mode 100644 index 0000000000000..94e6fbf8be383 --- /dev/null +++ b/test/core/event_engine/default_engine_methods_test.cc @@ -0,0 +1,129 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include + +#include +#include + +#include +#include + +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/sync.h" +#include "test/core/util/test_config.h" + +namespace { + +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + +class DefaultEngineTest : public testing::Test { + protected: + // Does nothing, fills space that a nullptr could not + class FakeEventEngine : public EventEngine { + public: + FakeEventEngine() = default; + ~FakeEventEngine() override = default; + absl::StatusOr> CreateListener( + Listener::AcceptCallback /* on_accept */, + absl::AnyInvocable /* on_shutdown */, + const grpc_event_engine::experimental::EndpointConfig& /* config */, + std::unique_ptr< + grpc_event_engine::experimental:: + MemoryAllocatorFactory> /* memory_allocator_factory */) + override { + return absl::UnimplementedError("test"); + }; + ConnectionHandle Connect( + OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */, + const grpc_event_engine::experimental::EndpointConfig& /* args */, + grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */, + Duration /* timeout */) override { + return {-1, -1}; + }; + bool CancelConnect(ConnectionHandle /* handle */) override { + return false; + }; + bool IsWorkerThread() override { return false; }; + std::unique_ptr GetDNSResolver( + const DNSResolver::ResolverOptions& /* options */) override { + return nullptr; + }; + void Run(Closure* /* closure */) override{}; + void Run(absl::AnyInvocable /* closure */) override{}; + TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override { + return {-1, -1}; + } + TaskHandle RunAfter(Duration /* when */, + absl::AnyInvocable /* closure */) override { + return {-1, -1}; + } + bool Cancel(TaskHandle /* handle */) override { return false; }; + }; +}; + +TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) { + int create_count = 0; + grpc_event_engine::experimental::SetEventEngineFactory([&create_count] { + ++create_count; + return absl::make_unique(); + }); + std::shared_ptr ee2; + { + std::shared_ptr ee1 = GetDefaultEventEngine(); + ASSERT_EQ(1, create_count); + ee2 = GetDefaultEventEngine(); + ASSERT_EQ(1, create_count); + ASSERT_EQ(ee2.use_count(), 2); + } + // Ensure the first shared_ptr did not delete the global + ASSERT_TRUE(ee2.unique()); + ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN + // destroy the global engine via the last shared_ptr, and create a new one. + ee2.reset(); + ee2 = GetDefaultEventEngine(); + ASSERT_EQ(2, create_count); + ASSERT_TRUE(ee2.unique()); + grpc_event_engine::experimental::RevertToDefaultEventEngineFactory(); +} + +TEST_F(DefaultEngineTest, StressTestSharedPtr) { + constexpr int thread_count = 13; + constexpr absl::Duration spin_time = absl::Seconds(3); + std::vector threads; + threads.reserve(thread_count); + for (int i = 0; i < thread_count; i++) { + threads.emplace_back([&spin_time] { + auto timeout = absl::Now() + spin_time; + do { + GetDefaultEventEngine().reset(); + } while (timeout > absl::Now()); + }); + } + for (auto& thd : threads) { + thd.join(); + } +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/core/event_engine/factory_test.cc b/test/core/event_engine/factory_test.cc new file mode 100644 index 0000000000000..a4daabe9977e9 --- /dev/null +++ b/test/core/event_engine/factory_test.cc @@ -0,0 +1,79 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include + +#include "src/core/lib/event_engine/default_event_engine.h" +#include "test/core/event_engine/util/aborting_event_engine.h" +#include "test/core/util/test_config.h" + +namespace { +using ::grpc_event_engine::experimental::AbortingEventEngine; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::EventEngineFactoryReset; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; +using ::grpc_event_engine::experimental::SetEventEngineFactory; + +class EventEngineFactoryTest : public testing::Test { + public: + EventEngineFactoryTest() = default; + ~EventEngineFactoryTest() { EventEngineFactoryReset(); } +}; + +TEST_F(EventEngineFactoryTest, CustomFactoryIsUsed) { + int counter{0}; + SetEventEngineFactory([&counter] { + ++counter; + return absl::make_unique(); + }); + auto ee1 = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + auto ee2 = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + ASSERT_EQ(ee1, ee2); +} + +TEST_F(EventEngineFactoryTest, FactoryResetWorks) { + // eliminate a global default if one has been created already. + EventEngineFactoryReset(); + int counter{0}; + SetEventEngineFactory([&counter]() -> std::unique_ptr { + // called at most twice; + EXPECT_LE(++counter, 2); + return absl::make_unique(); + }); + auto custom_ee = GetDefaultEventEngine(); + ASSERT_EQ(counter, 1); + auto same_ee = GetDefaultEventEngine(); + ASSERT_EQ(custom_ee, same_ee); + ASSERT_EQ(counter, 1); + EventEngineFactoryReset(); + auto default_ee = GetDefaultEventEngine(); + ASSERT_NE(custom_ee, default_ee); +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/core/event_engine/fuzzing_event_engine/BUILD b/test/core/event_engine/fuzzing_event_engine/BUILD index 3766178ff34b4..ec67a7c2675e0 100644 --- a/test/core/event_engine/fuzzing_event_engine/BUILD +++ b/test/core/event_engine/fuzzing_event_engine/BUILD @@ -27,6 +27,7 @@ grpc_cc_library( hdrs = ["fuzzing_event_engine.h"], deps = [ ":fuzzing_event_engine_proto", + "//:default_event_engine", "//:event_engine_base_hdrs", "//:time", ], diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index 18f20247b27e9..41dabe544ea12 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -31,16 +31,12 @@ namespace experimental { namespace { const intptr_t kTaskHandleSalt = 12345; FuzzingEventEngine* g_fuzzing_event_engine = nullptr; +gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type); } // namespace FuzzingEventEngine::FuzzingEventEngine( Options options, const fuzzing_event_engine::Actions& actions) : final_tick_length_(options.final_tick_length) { - GPR_ASSERT(g_fuzzing_event_engine == nullptr); - g_fuzzing_event_engine = this; - - gpr_now_impl = GlobalNowImpl; - tick_increments_.clear(); task_delays_.clear(); tasks_by_id_.clear(); @@ -57,7 +53,8 @@ FuzzingEventEngine::FuzzingEventEngine( grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC)); auto update_delay = [](std::map* map, - fuzzing_event_engine::Delay delay, Duration max) { + const fuzzing_event_engine::Delay& delay, + Duration max) { auto& value = (*map)[delay.id()]; if (delay.delay_us() > static_cast(max.count() / GPR_NS_PER_US)) { value = max; @@ -84,11 +81,6 @@ void FuzzingEventEngine::FuzzingDone() { tick_increments_.clear(); } -FuzzingEventEngine::~FuzzingEventEngine() { - GPR_ASSERT(g_fuzzing_event_engine == this); - g_fuzzing_event_engine = nullptr; -} - gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) { // TODO(ctiller): add a facility to track realtime and monotonic clocks // separately to simulate divergence. @@ -98,15 +90,6 @@ gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) { return {secs.count(), static_cast((d - secs).count()), clock_type}; } -gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) { - if (g_fuzzing_event_engine == nullptr) { - return gpr_inf_future(clock_type); - } - GPR_ASSERT(g_fuzzing_event_engine != nullptr); - grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_); - return g_fuzzing_event_engine->NowAsTimespec(clock_type); -} - void FuzzingEventEngine::Tick() { std::vector> to_run; { @@ -210,5 +193,28 @@ bool FuzzingEventEngine::Cancel(TaskHandle handle) { return true; } +gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) { + if (g_fuzzing_event_engine == nullptr) { + return gpr_inf_future(clock_type); + } + GPR_ASSERT(g_fuzzing_event_engine != nullptr); + grpc_core::MutexLock lock(&g_fuzzing_event_engine->mu_); + return g_fuzzing_event_engine->NowAsTimespec(clock_type); +} + +void FuzzingEventEngine::SetGlobalNowImplEngine(FuzzingEventEngine* engine) { + GPR_ASSERT(g_fuzzing_event_engine == nullptr); + g_fuzzing_event_engine = engine; + g_orig_gpr_now_impl = gpr_now_impl; + gpr_now_impl = GlobalNowImpl; +} + +void FuzzingEventEngine::UnsetGlobalNowImplEngine(FuzzingEventEngine* engine) { + GPR_ASSERT(g_fuzzing_event_engine == engine); + g_fuzzing_event_engine = nullptr; + gpr_now_impl = g_orig_gpr_now_impl; + g_orig_gpr_now_impl = nullptr; +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index 97e8295a57567..a18bc613e62dd 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -40,7 +40,7 @@ class FuzzingEventEngine : public EventEngine { }; explicit FuzzingEventEngine(Options options, const fuzzing_event_engine::Actions& actions); - ~FuzzingEventEngine() override; + ~FuzzingEventEngine() override = default; void FuzzingDone(); void Tick(); @@ -76,6 +76,11 @@ class FuzzingEventEngine : public EventEngine { Time Now() ABSL_LOCKS_EXCLUDED(mu_); + static void SetGlobalNowImplEngine(FuzzingEventEngine* engine) + ABSL_LOCKS_EXCLUDED(mu_); + static void UnsetGlobalNowImplEngine(FuzzingEventEngine* engine) + ABSL_LOCKS_EXCLUDED(mu_); + private: struct Task { Task(intptr_t id, absl::AnyInvocable closure) @@ -88,7 +93,6 @@ class FuzzingEventEngine : public EventEngine { ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type) ABSL_LOCKS_EXCLUDED(mu_); - const Duration final_tick_length_; grpc_core::Mutex mu_; diff --git a/test/core/event_engine/posix/lock_free_event_test.cc b/test/core/event_engine/posix/lock_free_event_test.cc index b07eb87af00d2..4a31ff2f3f058 100644 --- a/test/core/event_engine/posix/lock_free_event_test.cc +++ b/test/core/event_engine/posix/lock_free_event_test.cc @@ -26,13 +26,14 @@ #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/gprpp/sync.h" +using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::posix_engine::Scheduler; namespace { class TestScheduler : public Scheduler { public: - explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine) - : engine_(engine) {} + explicit TestScheduler(std::shared_ptr engine) + : engine_(std::move(engine)) {} void Run( grpc_event_engine::experimental::EventEngine::Closure* closure) override { engine_->Run(closure); @@ -43,7 +44,7 @@ class TestScheduler : public Scheduler { } private: - grpc_event_engine::experimental::EventEngine* engine_; + std::shared_ptr engine_; }; TestScheduler* g_scheduler; @@ -149,9 +150,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_event_engine::experimental::EventEngine* engine = - grpc_event_engine::experimental::GetDefaultEventEngine(); - EXPECT_NE(engine, nullptr); - g_scheduler = new TestScheduler(engine); + g_scheduler = new TestScheduler( + grpc_event_engine::experimental::GetDefaultEventEngine()); return RUN_ALL_TESTS(); } diff --git a/test/core/event_engine/smoke_test.cc b/test/core/event_engine/smoke_test.cc index a4a99e21a7692..3c5d0ce79b4c7 100644 --- a/test/core/event_engine/smoke_test.cc +++ b/test/core/event_engine/smoke_test.cc @@ -27,13 +27,13 @@ using ::testing::MockFunction; class EventEngineSmokeTest : public testing::Test {}; -TEST_F(EventEngineSmokeTest, SetDefaultEventEngineFactoryLinks) { +TEST_F(EventEngineSmokeTest, SetEventEngineFactoryLinks) { // See https://github.com/grpc/grpc/pull/28707 testing::MockFunction< std::unique_ptr()> factory; EXPECT_CALL(factory, Call()).Times(1); - grpc_event_engine::experimental::SetDefaultEventEngineFactory( + grpc_event_engine::experimental::SetEventEngineFactory( factory.AsStdFunction()); EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine()); } diff --git a/test/core/filters/filter_fuzzer.cc b/test/core/filters/filter_fuzzer.cc index 9f4e33f1a6433..0b7648fa50590 100644 --- a/test/core/filters/filter_fuzzer.cc +++ b/test/core/filters/filter_fuzzer.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "absl/memory/memory.h" @@ -36,14 +37,15 @@ bool squelch = true; static void dont_log(gpr_log_func_args* /*args*/) {} -static gpr_timespec g_now; +static grpc_core::Mutex g_now_mu; +static gpr_timespec g_now ABSL_GUARDED_BY(g_now_mu); extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); static gpr_timespec now_impl(gpr_clock_type clock_type) { GPR_ASSERT(clock_type != GPR_TIMESPAN); - gpr_timespec ts = g_now; - ts.clock_type = clock_type; - return ts; + grpc_core::MutexLock lock(&g_now_mu); + g_now.clock_type = clock_type; + return g_now; } namespace grpc_core { @@ -280,11 +282,13 @@ class MainLoop { switch (action.type_case()) { case filter_fuzzer::Action::TYPE_NOT_SET: break; - case filter_fuzzer::Action::kAdvanceTimeMicroseconds: + case filter_fuzzer::Action::kAdvanceTimeMicroseconds: { + MutexLock lock(&g_now_mu); g_now = gpr_time_add( g_now, gpr_time_from_micros(action.advance_time_microseconds(), GPR_TIMESPAN)); break; + } case filter_fuzzer::Action::kCancel: calls_.erase(action.call()); break; @@ -586,8 +590,11 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) { if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { gpr_set_log_function(dont_log); } - g_now = {1, 0, GPR_CLOCK_MONOTONIC}; - grpc_core::TestOnlySetProcessEpoch(g_now); + { + grpc_core::MutexLock lock(&g_now_mu); + g_now = {1, 0, GPR_CLOCK_MONOTONIC}; + grpc_core::TestOnlySetProcessEpoch(g_now); + } gpr_now_impl = now_impl; grpc_init(); grpc_timer_manager_set_threading(false); diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index 7c258901f5167..0caa1d2c11a8b 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -362,10 +362,11 @@ TEST_F(ResolveAddressTest, UnparseableHostPortsBadLocalhostWithPort) { // test doesn't care what the result is, just that we don't crash etc. TEST_F(ResolveAddressTest, ImmediateCancel) { grpc_core::ExecCtx exec_ctx; - auto request_handle = grpc_core::GetDNSResolver()->LookupHostname( + auto resolver = grpc_core::GetDNSResolver(); + auto request_handle = resolver->LookupHostname( absl::bind_front(&ResolveAddressTest::DontCare, this), "localhost:1", "1", grpc_core::kDefaultDNSRequestTimeout, pollset_set(), ""); - if (grpc_core::GetDNSResolver()->Cancel(request_handle)) { + if (resolver->Cancel(request_handle)) { Finish(); } grpc_core::ExecCtx::Get()->Flush(); @@ -375,12 +376,13 @@ TEST_F(ResolveAddressTest, ImmediateCancel) { // Attempt to cancel a request after it has completed. TEST_F(ResolveAddressTest, CancelDoesNotSucceed) { grpc_core::ExecCtx exec_ctx; - auto request_handle = grpc_core::GetDNSResolver()->LookupHostname( + auto resolver = grpc_core::GetDNSResolver(); + auto request_handle = resolver->LookupHostname( absl::bind_front(&ResolveAddressTest::MustSucceed, this), "localhost:1", "1", grpc_core::kDefaultDNSRequestTimeout, pollset_set(), ""); grpc_core::ExecCtx::Get()->Flush(); PollPollsetUntilRequestDone(); - ASSERT_FALSE(grpc_core::GetDNSResolver()->Cancel(request_handle)); + ASSERT_FALSE(resolver->Cancel(request_handle)); } namespace { @@ -419,12 +421,13 @@ TEST_F(ResolveAddressTest, CancelWithNonResponsiveDNSServer) { grpc_ares_test_only_inject_config = InjectNonResponsiveDNSServer; // Run the test grpc_core::ExecCtx exec_ctx; - auto request_handle = grpc_core::GetDNSResolver()->LookupHostname( + auto resolver = grpc_core::GetDNSResolver(); + auto request_handle = resolver->LookupHostname( absl::bind_front(&ResolveAddressTest::MustNotBeCalled, this), "foo.bar.com:1", "1", grpc_core::kDefaultDNSRequestTimeout, pollset_set(), ""); grpc_core::ExecCtx::Get()->Flush(); // initiate DNS requests - ASSERT_TRUE(grpc_core::GetDNSResolver()->Cancel(request_handle)); + ASSERT_TRUE(resolver->Cancel(request_handle)); Finish(); // let cancellation work finish to ensure the callback is not called grpc_core::ExecCtx::Get()->Flush(); @@ -483,12 +486,13 @@ TEST_F(ResolveAddressTest, DeleteInterestedPartiesAfterCancellation) { // Create a pollset_set, destroyed immediately after cancellation std::unique_ptr pss = PollsetSetWrapper::Create(); // Run the test - auto request_handle = grpc_core::GetDNSResolver()->LookupHostname( + auto resolver = grpc_core::GetDNSResolver(); + auto request_handle = resolver->LookupHostname( absl::bind_front(&ResolveAddressTest::MustNotBeCalled, this), "foo.bar.com:1", "1", grpc_core::kDefaultDNSRequestTimeout, pss->pollset_set(), ""); grpc_core::ExecCtx::Get()->Flush(); // initiate DNS requests - ASSERT_TRUE(grpc_core::GetDNSResolver()->Cancel(request_handle)); + ASSERT_TRUE(resolver->Cancel(request_handle)); } { // let cancellation work finish to ensure the callback is not called diff --git a/test/core/promise/sleep_test.cc b/test/core/promise/sleep_test.cc index 1babe8f93dc2a..4cff30568eeeb 100644 --- a/test/core/promise/sleep_test.cc +++ b/test/core/promise/sleep_test.cc @@ -24,6 +24,7 @@ #include +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" @@ -37,12 +38,15 @@ TEST(Sleep, Zzzz) { ExecCtx exec_ctx; Notification done; Timestamp done_time = Timestamp::Now() + Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for one second then set done to true. - auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto activity = MakeActivity( + Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_GE(Timestamp::Now(), done_time); @@ -52,12 +56,15 @@ TEST(Sleep, AlreadyDone) { ExecCtx exec_ctx; Notification done; Timestamp done_time = Timestamp::Now() - Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for no time at all then set done to true. - auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto activity = MakeActivity( + Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); } @@ -65,13 +72,16 @@ TEST(Sleep, Cancel) { ExecCtx exec_ctx; Notification done; Timestamp done_time = Timestamp::Now() + Duration::Seconds(1); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); // Sleep for one second but race it to complete immediately auto activity = MakeActivity( Race(Sleep(done_time), [] { return absl::CancelledError(); }), - InlineWakeupScheduler(), [&done](absl::Status r) { + InlineWakeupScheduler(), + [&done](absl::Status r) { EXPECT_EQ(r, absl::CancelledError()); done.Notify(); - }); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_LT(Timestamp::Now(), done_time); @@ -84,11 +94,14 @@ TEST(Sleep, MoveSemantics) { Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111); Sleep donor(done_time); Sleep sleeper = std::move(donor); - auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(), - [&done](absl::Status r) { - EXPECT_EQ(r, absl::OkStatus()); - done.Notify(); - }); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); + auto activity = MakeActivity( + std::move(sleeper), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }, + engine.get()); done.WaitForNotification(); exec_ctx.InvalidateNow(); EXPECT_GE(Timestamp::Now(), done_time); @@ -100,13 +113,15 @@ TEST(Sleep, StressTest) { ExecCtx exec_ctx; std::vector> notifications; std::vector activities; + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities); for (int i = 0; i < kNumActivities; i++) { auto notification = std::make_shared(); auto activity = MakeActivity( Sleep(Timestamp::Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(), - [notification](absl::Status /*r*/) { notification->Notify(); }); + [notification](absl::Status /*r*/) { notification->Notify(); }, + engine.get()); notifications.push_back(std::move(notification)); activities.push_back(std::move(activity)); } diff --git a/test/core/surface/init_test.cc b/test/core/surface/init_test.cc index 359b874e9f2a9..173d01bbad85c 100644 --- a/test/core/surface/init_test.cc +++ b/test/core/surface/init_test.cc @@ -126,15 +126,18 @@ TEST(Init, repeatedly_blocking) { TEST(Init, TimerManagerHoldsLastInit) { grpc_init(); - grpc_core::Notification n; - grpc_event_engine::experimental::GetDefaultEventEngine()->RunAfter( - std::chrono::seconds(1), [&n] { + // the temporary engine is deleted immediately, and the callback owns a copy. + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); + engine->RunAfter( + std::chrono::seconds(1), + [engine = grpc_event_engine::experimental::GetDefaultEventEngine()] { grpc_core::ApplicationCallbackExecCtx app_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_shutdown(); - n.Notify(); }); - n.WaitForNotification(); + while (engine.use_count() != 1) { + absl::SleepFor(absl::Microseconds(15)); + } } int main(int argc, char** argv) { diff --git a/test/core/transport/bdp_estimator_test.cc b/test/core/transport/bdp_estimator_test.cc index 93dbe1d7d4da6..0832874766b06 100644 --- a/test/core/transport/bdp_estimator_test.cc +++ b/test/core/transport/bdp_estimator_test.cc @@ -21,12 +21,12 @@ #include #include +#include #include "gtest/gtest.h" #include -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer_manager.h" #include "test/core/util/test_config.h" @@ -36,22 +36,17 @@ extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); namespace grpc_core { namespace testing { namespace { -int g_clock = 123; -Mutex mu_; +std::atomic g_clock{123}; gpr_timespec fake_gpr_now(gpr_clock_type clock_type) { - MutexLock lock(&mu_); gpr_timespec ts; - ts.tv_sec = g_clock; + ts.tv_sec = g_clock.load(); ts.tv_nsec = 0; ts.clock_type = clock_type; return ts; } -void inc_time(void) { - MutexLock lock(&mu_); - g_clock += 30; -} +void inc_time(void) { g_clock.fetch_add(30); } } // namespace TEST(BdpEstimatorTest, NoOp) { BdpEstimator est("test"); } diff --git a/test/core/util/resolve_localhost_ip46.cc b/test/core/util/resolve_localhost_ip46.cc index 70cec0cb247e5..4d069a7f2540d 100644 --- a/test/core/util/resolve_localhost_ip46.cc +++ b/test/core/util/resolve_localhost_ip46.cc @@ -18,6 +18,7 @@ #include "test/core/util/resolve_localhost_ip46.h" +#include #include #include "absl/status/statusor.h" diff --git a/test/cpp/microbenchmarks/bm_event_engine_run.cc b/test/cpp/microbenchmarks/bm_event_engine_run.cc index 93bb46897188e..4db0e21f51d7a 100644 --- a/test/cpp/microbenchmarks/bm_event_engine_run.cc +++ b/test/cpp/microbenchmarks/bm_event_engine_run.cc @@ -164,7 +164,8 @@ FanoutParameters GetFanoutParameters(benchmark::State& state) { // parameter will become invalid and crash some callbacks, and 2) in my RBE // tests, copies are slightly faster than a shared_ptr // alternative. -void FanOutCallback(EventEngine* engine, const FanoutParameters params, +void FanOutCallback(std::shared_ptr engine, + const FanoutParameters params, grpc_core::Notification& signal, std::atomic_int& count, int processing_layer) { int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1; @@ -197,7 +198,7 @@ void BM_EventEngine_Lambda_FanOut(benchmark::State& state) { BENCHMARK(BM_EventEngine_Lambda_FanOut)->Apply(FanoutTestArguments); void ClosureFanOutCallback(EventEngine::Closure* child_closure, - EventEngine* engine, + std::shared_ptr engine, grpc_core::Notification** signal_holder, std::atomic_int& count, const FanoutParameters params) { diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 84bad94acd23a..b6fbefd6bb9ea 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2419,6 +2419,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "default_engine_methods_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,