Skip to content

Commit

Permalink
Refactor tracing configurator to be available from actor system threa…
Browse files Browse the repository at this point in the history
…ds (#12067)
  • Loading branch information
UgnineSirdis authored Dec 4, 2024
1 parent b2da93a commit 4f81e50
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 36 deletions.
2 changes: 2 additions & 0 deletions ydb/core/base/appdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_configurator.h>
#include <ydb/core/tablet_flat/shared_cache_pages.h>
#include <ydb/core/protos/auth.pb.h>
#include <ydb/core/protos/bootstrap.pb.h>
Expand Down Expand Up @@ -123,6 +124,7 @@ TAppData::TAppData(
, MemoryControllerConfig(Impl->MemoryControllerConfig)
, ReplicationConfig(Impl->ReplicationConfig)
, KikimrShouldContinue(kikimrShouldContinue)
, TracingConfigurator(MakeIntrusive<NJaegerTracing::TSamplingThrottlingConfigurator>(TimeProvider, RandomProvider))
{}

TAppData::~TAppData()
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace NKikimr {
namespace NSharedCache {
class TSharedCachePages;
}
namespace NJaegerTracing {
class TSamplingThrottlingConfigurator;
}
}

namespace NKikimrCms {
Expand Down Expand Up @@ -267,6 +270,9 @@ struct TAppData {

bool YamlConfigEnabled = false;

// Tracing configurator (look for tracing config in ydb/core/jaeger_tracing/actors_tracing_control)
TIntrusivePtr<NKikimr::NJaegerTracing::TSamplingThrottlingConfigurator> TracingConfigurator;

TAppData(
ui32 sysPoolId, ui32 userPoolId, ui32 ioPoolId, ui32 batchPoolId,
TMap<TString, ui32> servicePools,
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/base/wilson_tracing_control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include "wilson_tracing_control.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_configurator.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_control.h>

#include <util/thread/singleton.h>
#include <util/system/compiler.h>
#include <util/system/tls.h>
#include <util/system/yassert.h>

namespace NKikimr::NJaegerTracing {

namespace {

Y_POD_STATIC_THREAD(TSamplingThrottlingControl*) TracingControlRawPtr;

class TSamplingThrottlingControlTlsHolder {
public:
TSamplingThrottlingControlTlsHolder()
: Control(CreateNewTracingControl())
{}

TSamplingThrottlingControl* GetTracingControlPtr() {
if (Y_UNLIKELY(!Control)) {
Control = CreateNewTracingControl();
}
return Control.Get();
}

void ResetTracingControl() {
Control = nullptr;
}

private:
static TIntrusivePtr<TSamplingThrottlingControl> CreateNewTracingControl() {
Y_ASSERT(HasAppData()); // In general we must call this from actor thread
if (Y_UNLIKELY(!HasAppData())) {
return nullptr;
}

return AppData()->TracingConfigurator->GetControl();
}

private:
TIntrusivePtr<TSamplingThrottlingControl> Control;
};

TSamplingThrottlingControl* GetTracingControlTls() {
if (Y_UNLIKELY(!TracingControlRawPtr)) {
TracingControlRawPtr = FastTlsSingleton<TSamplingThrottlingControlTlsHolder>()->GetTracingControlPtr();
}
return TracingControlRawPtr;
}

} // namespace

void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator) {
TSamplingThrottlingControl* control = GetTracingControlTls();
if (Y_LIKELY(control)) {
control->HandleTracing(traceId, discriminator);
}
}

void ClearTracingControl() {
if (TracingControlRawPtr) {
TracingControlRawPtr = nullptr;
FastTlsSingleton<TSamplingThrottlingControlTlsHolder>()->ResetTracingControl();
}
}

} // namespace NKikimr::NJaegerTracing
16 changes: 16 additions & 0 deletions ydb/core/base/wilson_tracing_control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once
#include <ydb/core/jaeger_tracing/request_discriminator.h>
#include <ydb/library/actors/wilson/wilson_trace.h>

namespace NKikimr::NJaegerTracing {

// Generate a new trace id (or throttle existing one)
// with probability according to current configuration and request type.
// Can be called from actor system threads.
void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator);

// For test purposes
// Clears tracing control TLS variables that depend on AppData
void ClearTracingControl();

} // namespace NKikimr::NJaegerTracing
4 changes: 3 additions & 1 deletion ydb/core/base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SRCS(
board_replica.cpp
blobstorage.h
blobstorage.cpp
blobstorage_grouptype.cpp
channel_profiles.h
counters.cpp
counters.h
Expand Down Expand Up @@ -72,7 +73,7 @@ SRCS(
tx_processing.h
tx_processing.cpp
user_registry.h
blobstorage_grouptype.cpp
wilson_tracing_control.cpp
)

PEERDIR(
Expand All @@ -92,6 +93,7 @@ PEERDIR(
ydb/core/debug
ydb/core/erasure
ydb/core/graph/api
ydb/core/jaeger_tracing
ydb/core/protos
ydb/core/protos/out
ydb/library/aclib
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/cms/console/jaeger_tracing_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfi
return NKikimrServices::TActivity::JAEGER_TRACING_CONFIGURATOR;
}

TJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator,
TJaegerTracingConfigurator(TIntrusivePtr<TSamplingThrottlingConfigurator> tracingConfigurator,
NKikimrConfig::TTracingConfig cfg);

void Bootstrap(const TActorContext& ctx);
Expand All @@ -35,12 +35,12 @@ class TJaegerTracingConfigurator : public TActorBootstrapped<TJaegerTracingConfi
static TMaybe<TString> GetDatabase(const NKikimrConfig::TTracingConfig::TSelectors& selectors);
static TSettings<double, TWithTag<TThrottlingSettings>> GetSettings(const NKikimrConfig::TTracingConfig& cfg);

TSamplingThrottlingConfigurator TracingConfigurator;
TIntrusivePtr<TSamplingThrottlingConfigurator> TracingConfigurator;
NKikimrConfig::TTracingConfig initialConfig;
};

TJaegerTracingConfigurator::TJaegerTracingConfigurator(
TSamplingThrottlingConfigurator tracingConfigurator,
TIntrusivePtr<TSamplingThrottlingConfigurator> tracingConfigurator,
NKikimrConfig::TTracingConfig cfg)
: TracingConfigurator(std::move(tracingConfigurator))
, initialConfig(std::move(cfg))
Expand Down Expand Up @@ -73,7 +73,7 @@ void TJaegerTracingConfigurator::Handle(TEvConsole::TEvConfigNotificationRequest

void TJaegerTracingConfigurator::ApplyConfigs(const NKikimrConfig::TTracingConfig& cfg) {
auto settings = GetSettings(cfg);
return TracingConfigurator.UpdateSettings(std::move(settings));
return TracingConfigurator->UpdateSettings(std::move(settings));
}

TVector<ERequestType> TJaegerTracingConfigurator::GetRequestTypes(const NKikimrConfig::TTracingConfig::TSelectors& selectors) {
Expand Down Expand Up @@ -213,7 +213,7 @@ TSettings<double, TWithTag<TThrottlingSettings>> TJaegerTracingConfigurator::Get
return settings;
}

IActor* CreateJaegerTracingConfigurator(TSamplingThrottlingConfigurator tracingConfigurator,
IActor* CreateJaegerTracingConfigurator(TIntrusivePtr<TSamplingThrottlingConfigurator> tracingConfigurator,
NKikimrConfig::TTracingConfig cfg) {
return new TJaegerTracingConfigurator(std::move(tracingConfigurator), std::move(cfg));
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/cms/console/jaeger_tracing_configurator.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace NKikimr::NConsole {

IActor* CreateJaegerTracingConfigurator(NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator,
IActor* CreateJaegerTracingConfigurator(TIntrusivePtr<NJaegerTracing::TSamplingThrottlingConfigurator> tracingConfigurator,
NKikimrConfig::TTracingConfig cfg);

} // namespace NKikimr::NConsole
10 changes: 5 additions & 5 deletions ydb/core/cms/console/jaeger_tracing_configurator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TTenantTestConfig DefaultConsoleTestConfig() {

void InitJaegerTracingConfigurator(
TTenantTestRuntime& runtime,
TSamplingThrottlingConfigurator configurator,
TIntrusivePtr<TSamplingThrottlingConfigurator> configurator,
const NKikimrConfig::TTracingConfig& initCfg
) {
runtime.Register(CreateJaegerTracingConfigurator(std::move(configurator), initCfg));
Expand Down Expand Up @@ -99,7 +99,7 @@ class TTracingControls {

std::pair<ETraceState, ui8> HandleTracing(bool isExternal, TRequestDiscriminator discriminator) {
auto& control = RandomChoice(Controls);

NWilson::TTraceId traceId;
if (isExternal) {
traceId = NWilson::TTraceId::NewTraceId(TComponentTracingLevels::ProductionVerbose, Max<ui32>());
Expand All @@ -125,13 +125,13 @@ class TTracingControls {
TVector<TIntrusivePtr<TSamplingThrottlingControl>> Controls;
};

std::pair<TTracingControls, TSamplingThrottlingConfigurator>
std::pair<TTracingControls, TIntrusivePtr<TSamplingThrottlingConfigurator>>
CreateSamplingThrottlingConfigurator(size_t n, TIntrusivePtr<ITimeProvider> timeProvider) {
auto randomProvider = CreateDefaultRandomProvider();
TSamplingThrottlingConfigurator configurator(timeProvider, randomProvider);
TIntrusivePtr<TSamplingThrottlingConfigurator> configurator = MakeIntrusive<TSamplingThrottlingConfigurator>(timeProvider, randomProvider);
TVector<TIntrusivePtr<TSamplingThrottlingControl>> controls;
for (size_t i = 0; i < n; ++i) {
controls.emplace_back(configurator.GetControl());
controls.emplace_back(configurator->GetControl());
}

return {TTracingControls(std::move(controls)), std::move(configurator)};
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1633,11 +1633,10 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se

if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId(0))) {
const size_t proxyCount = Config.HasGRpcConfig() ? Config.GetGRpcConfig().GetGRpcProxyCount() : 1UL;
NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator(appData->TimeProvider, appData->RandomProvider);
for (size_t i = 0; i < proxyCount; ++i) {
auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck()
? NGRpcService::CreateGRpcRequestProxySimple(Config)
: NGRpcService::CreateGRpcRequestProxy(Config, tracingConfigurator.GetControl());
: NGRpcService::CreateGRpcRequestProxy(Config);
setup->LocalServices.push_back(std::pair<TActorId,
TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(i),
TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled,
Expand All @@ -1646,7 +1645,7 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(
TActorId(),
TActorSetupCmd(
NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Config.GetTracingConfig()),
NConsole::CreateJaegerTracingConfigurator(appData->TracingConfigurator, Config.GetTracingConfig()),
TMailboxType::ReadAsFilled,
appData->UserPoolId)));
}
Expand Down
12 changes: 5 additions & 7 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/path.h>
#include <ydb/core/base/nameservice.h>
#include <ydb/core/base/wilson_tracing_control.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_control.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_board/scheme_board.h>
#include <ydb/library/wilson_ids/wilson.h>
Expand Down Expand Up @@ -57,9 +57,8 @@ class TGRpcRequestProxyImpl
{
using TBase = TActorBootstrapped<TGRpcRequestProxyImpl>;
public:
explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl)
explicit TGRpcRequestProxyImpl(const NKikimrConfig::TAppConfig& appConfig)
: ChannelBufferSize(appConfig.GetTableServiceConfig().GetResourceManager().GetChannelBufferSize())
, TracingControl(std::move(tracingControl))
{ }

void Bootstrap(const TActorContext& ctx);
Expand Down Expand Up @@ -326,7 +325,6 @@ class TGRpcRequestProxyImpl
bool DynamicNode = false;
TString RootDatabase;
IGRpcProxyCounters::TPtr Counters;
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> TracingControl;
};

void TGRpcRequestProxyImpl::Bootstrap(const TActorContext& ctx) {
Expand Down Expand Up @@ -451,7 +449,7 @@ void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
if (const auto otelHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER)) {
traceId = NWilson::TTraceId::FromTraceparentHeader(otelHeader.GetRef(), TComponentTracingLevels::ProductionVerbose);
}
TracingControl->HandleTracing(traceId, ctx.GetRequestDiscriminator());
NJaegerTracing::HandleTracing(traceId, ctx.GetRequestDiscriminator());
if (traceId) {
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
if (auto database = ctx.GetDatabaseName()) {
Expand Down Expand Up @@ -616,8 +614,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
}
}

IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl) {
return new TGRpcRequestProxyImpl(appConfig, std::move(tracingControl));
IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig) {
return new TGRpcRequestProxyImpl(appConfig);
}

} // namespace NGRpcService
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/grpc_request_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "grpc_request_proxy_handle_methods.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/jaeger_tracing/sampling_throttling_control.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -23,7 +22,7 @@ struct TAppData;

namespace NGRpcService {

IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig, TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> tracingControl);
IActor* CreateGRpcRequestProxy(const NKikimrConfig::TAppConfig& appConfig);
IActor* CreateGRpcRequestProxySimple(const NKikimrConfig::TAppConfig& appConfig);

class TGRpcRequestProxy : public TGRpcRequestProxyHandleMethods, public IFacilityProvider {
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/jaeger_tracing/sampling_throttling_configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ template<class T>
void PropagateUnspecifiedRequest(TRulesContainer<T>& rules) {
constexpr auto unspecifiedRequestType = static_cast<size_t>(ERequestType::UNSPECIFIED);
const auto& unspecifiedRequestTypeRules = rules[unspecifiedRequestType];

for (size_t requestType = 0; requestType < kRequestTypesCnt; ++requestType) {
if (requestType == unspecifiedRequestType) {
continue;
Expand Down Expand Up @@ -45,7 +45,9 @@ TSamplingThrottlingConfigurator::TSamplingThrottlingConfigurator(TIntrusivePtr<I

TIntrusivePtr<TSamplingThrottlingControl> TSamplingThrottlingConfigurator::GetControl() {
auto control = TIntrusivePtr(new TSamplingThrottlingControl(GenerateSetup()));
IssuedControls.push_back(control);
with_lock (ControlMutex) {
IssuedControls.push_back(control);
}
return control;
}

Expand All @@ -55,8 +57,10 @@ void TSamplingThrottlingConfigurator::UpdateSettings(TSettings<double, TWithTag<
PropagateUnspecifiedRequest(enrichedSettings.ExternalThrottlingRules);
CurrentSettings = std::move(enrichedSettings);

for (auto& control : IssuedControls) {
control->UpdateImpl(GenerateSetup());
with_lock (ControlMutex) {
for (auto& control : IssuedControls) {
control->UpdateImpl(GenerateSetup());
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions ydb/core/jaeger_tracing/sampling_throttling_configurator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include <library/cpp/time_provider/time_provider.h>

#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
#include <util/generic/vector.h>
#include <util/system/mutex.h>

namespace NKikimr::NJaegerTracing {

Expand All @@ -22,7 +24,7 @@ struct TWithTag {
size_t Tag;
};

class TSamplingThrottlingConfigurator: private TMoveOnly {
class TSamplingThrottlingConfigurator: public TRefCounted<TSamplingThrottlingConfigurator, TAtomicCounter> {
public:
TSamplingThrottlingConfigurator(TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider>& randomProvider);
Expand All @@ -34,13 +36,14 @@ class TSamplingThrottlingConfigurator: private TMoveOnly {
private:
TSettings<double, TIntrusivePtr<TThrottler>> GenerateThrottlers(
TSettings<double, TWithTag<TThrottlingSettings>> settings);

std::unique_ptr<TSamplingThrottlingControl::TSamplingThrottlingImpl> GenerateSetup();

TVector<TIntrusivePtr<TSamplingThrottlingControl>> IssuedControls;
TIntrusivePtr<ITimeProvider> TimeProvider;
TFastRng64 Rng;
TSettings<double, TIntrusivePtr<TThrottler>> CurrentSettings;
TSettings<double, TIntrusivePtr<TThrottler>> CurrentSettings;
TMutex ControlMutex;
};

} // namespace NKikimr::NJaegerTracing
Loading

0 comments on commit 4f81e50

Please sign in to comment.