Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream 24 3 add logs to dsproxy #11269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 206 additions & 108 deletions ydb/core/blobstorage/dsproxy/dsproxy.h

Large diffs are not rendered by default.

24 changes: 8 additions & 16 deletions ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,12 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor<
return mon->ActiveAssimilate;
}

TBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, ui64 cookie,
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter,
NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Assimilate"), std::move(ev->ExecutionRelay))
, SkipBlocksUpTo(ev->SkipBlocksUpTo)
, SkipBarriersUpTo(ev->SkipBarriersUpTo)
, SkipBlobsUpTo(ev->SkipBlobsUpTo)
, PerVDiskInfo(info->GetTotalVDisksNum())
TBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParameters& params)
: TBlobStorageGroupRequestActor(params, NWilson::TSpan(TWilson::BlobStorage, std::move(params.Common.TraceId), "DSProxy.Assimilate"))
, SkipBlocksUpTo(params.Common.Event->SkipBlocksUpTo)
, SkipBarriersUpTo(params.Common.Event->SkipBarriersUpTo)
, SkipBlobsUpTo(params.Common.Event->SkipBlobsUpTo)
, PerVDiskInfo(Info->GetTotalVDisksNum())
, Result(new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::OK, {}))
{
Heap.reserve(PerVDiskInfo.size());
Expand Down Expand Up @@ -464,11 +459,8 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor<
}
};

IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) {
return new TBlobStorageGroupAssimilateRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
IActor* CreateBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParameters params) {
return new TBlobStorageGroupAssimilateRequest(params);
}

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "defs.h"

#include "dsproxy.h"
#include "request_history.h"

#include <ydb/core/blobstorage/base/batched_vec.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
Expand Down
34 changes: 11 additions & 23 deletions ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
const ui32 Generation;
const TInstant Deadline;
const ui64 IssuerGuid;
TInstant StartTime;
bool SeenAlready = false;

TGroupQuorumTracker QuorumTracker;
Expand Down Expand Up @@ -100,7 +99,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
std::unique_ptr<TEvBlobStorage::TEvBlockResult> result(new TEvBlobStorage::TEvBlockResult(status));
result->ErrorReason = ErrorReason;
A_LOG_LOG_S(true, PriorityForStatusResult(status), "DSPB04", "Result# " << result->Print(false));
Mon->CountBlockResponseTime(TActivationContext::Now() - StartTime);
Mon->CountBlockResponseTime(TActivationContext::Monotonic() - RequestStartTime);
return SendResponseAndDie(std::move(result));
}

Expand Down Expand Up @@ -132,19 +131,12 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
return mon->ActiveBlock;
}

TBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
ui64 cookie, NWilson::TSpan&& span, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(span), std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, Generation(ev->Generation)
, Deadline(ev->Deadline)
, IssuerGuid(ev->IssuerGuid)
, StartTime(now)
TBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters& params, NWilson::TSpan&& span)
: TBlobStorageGroupRequestActor(params, std::move(span))
, TabletId(params.Common.Event->TabletId)
, Generation(params.Common.Event->Generation)
, Deadline(params.Common.Event->Deadline)
, IssuerGuid(params.Common.Event->IssuerGuid)
, QuorumTracker(Info.Get())
{}

Expand Down Expand Up @@ -175,16 +167,12 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
}
};

IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
NWilson::TSpan span(TWilson::BlobStorage, std::move(traceId), "DSProxy.Block");
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params) {
NWilson::TSpan span(TWilson::BlobStorage, std::move(params.Common.TraceId), "DSProxy.Block");
if (span) {
span.Attribute("event", ev->ToString());
span.Attribute("event", params.Common.Event->ToString());
}

return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, std::move(span), now, storagePoolCounters);
return new TBlobStorageGroupBlockRequest(params, std::move(span));
}

} // NKikimr
43 changes: 16 additions & 27 deletions ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
const bool Decommission;

TGroupQuorumTracker QuorumTracker;
TInstant StartTime;

ui32 RequestsSent = 0;
ui32 ResponsesReceived = 0;
Expand Down Expand Up @@ -141,27 +140,21 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
return mon->ActiveCollectGarbage;
}

TBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie,
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter,
NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.CollectGarbage"), std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
, Channel(ev->Channel)
, Deadline(ev->Deadline)
, Keep(ev->Keep.Release())
, DoNotKeep(ev->DoNotKeep.Release())
, CollectGeneration(ev->CollectGeneration)
, CollectStep(ev->CollectStep)
, Hard(ev->Hard)
, Collect(ev->Collect)
, Decommission(ev->Decommission)
TBlobStorageGroupCollectGarbageRequest(TBlobStorageGroupCollectGarbageParameters& params)
: TBlobStorageGroupRequestActor(params, NWilson::TSpan(TWilson::BlobStorage, std::move(params.Common.TraceId), "DSProxy.CollectGarbage"))
, TabletId(params.Common.Event->TabletId)
, RecordGeneration(params.Common.Event->RecordGeneration)
, PerGenerationCounter(params.Common.Event->PerGenerationCounter)
, Channel(params.Common.Event->Channel)
, Deadline(params.Common.Event->Deadline)
, Keep(params.Common.Event->Keep.Release())
, DoNotKeep(params.Common.Event->DoNotKeep.Release())
, CollectGeneration(params.Common.Event->CollectGeneration)
, CollectStep(params.Common.Event->CollectStep)
, Hard(params.Common.Event->Hard)
, Collect(params.Common.Event->Collect)
, Decommission(params.Common.Event->Decommission)
, QuorumTracker(Info.Get())
, StartTime(now)
{}

void Bootstrap() {
Expand Down Expand Up @@ -205,12 +198,8 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
}
};

IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupCollectGarbageRequest(TBlobStorageGroupCollectGarbageParameters params) {
return new TBlobStorageGroupCollectGarbageRequest(params);
}

} // NKikimr
38 changes: 12 additions & 26 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
const bool ReadBody;
const bool DiscoverBlockedGeneration;
const TInstant Deadline;
const TInstant StartTime;

TGroupResponseTracker GroupResponseTracker;
std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> PendingResult;
Expand All @@ -295,7 +294,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
template<typename TPtr>
void SendResult(TPtr& result) {
Y_ABORT_UNLESS(result);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
Mon->CountDiscoverResponseTime(duration);
const bool success = result->Status == NKikimrProto::OK;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
Expand Down Expand Up @@ -877,25 +876,17 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
return ERequestType::Discover;
}

TBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter,
NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Discover"),
std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, ReadBody(ev->ReadBody)
, DiscoverBlockedGeneration(ev->DiscoverBlockedGeneration)
, Deadline(ev->Deadline)
, StartTime(now)
TBlobStorageGroupDiscoverRequest(TBlobStorageGroupDiscoverParameters& params)
: TBlobStorageGroupRequestActor(params, NWilson::TSpan(TWilson::BlobStorage, std::move(params.Common.TraceId), "DSProxy.Discover"))
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, Deadline(params.Common.Event->Deadline)
, GroupResponseTracker(Info)
, IsGetBlockDone(!DiscoverBlockedGeneration)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
, FromLeader(ev->FromLeader)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
, FromLeader(params.Common.Event->FromLeader)
{}

void Bootstrap() {
Expand Down Expand Up @@ -974,13 +965,8 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
}
};

IActor* CreateBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupDiscoverRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupDiscoverRequest(TBlobStorageGroupDiscoverParameters params) {
return new TBlobStorageGroupDiscoverRequest(params);
}

}//NKikimr
40 changes: 13 additions & 27 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ class TDiscoverWorker {
class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupRequestActor<TBlobStorageGroupMirror3dcDiscoverRequest>{
const ui64 TabletId;
const ui32 MinGeneration;
const TInstant StartTime;
const TInstant Deadline;
const bool ReadBody;
const bool DiscoverBlockedGeneration;
Expand Down Expand Up @@ -457,23 +456,15 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
return ERequestType::Discover;
}

TBlobStorageGroupMirror3dcDiscoverRequest(TIntrusivePtr<TBlobStorageGroupInfo> info,
TIntrusivePtr<TGroupQueues> state, const TActorId& source,
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, ev->RestartCounter,
NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Discover(mirror-3-dc)"),
std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
, Deadline(ev->Deadline)
, ReadBody(ev->ReadBody)
, DiscoverBlockedGeneration(ev->DiscoverBlockedGeneration)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
, FromLeader(ev->FromLeader)
TBlobStorageGroupMirror3dcDiscoverRequest(TBlobStorageGroupDiscoverParameters& params)
: TBlobStorageGroupRequestActor(params, NWilson::TSpan(TWilson::BlobStorage, std::move(params.Common.TraceId), "DSProxy.Discover(mirror-3-dc)"))
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, Deadline(params.Common.Event->Deadline)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
, FromLeader(params.Common.Event->FromLeader)
, GetBlockTracker(Info.Get())
{}

Expand Down Expand Up @@ -660,7 +651,7 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
R_LOG_DEBUG_S("DSPDM03", "Response# " << response->ToString());

Y_ABORT_UNLESS(!Responded);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
TabletId, Info->GroupID.GetRawId(), TLogoBlobID::MaxChannel, "", true);
SendResponseAndDie(std::move(response));
Expand All @@ -673,7 +664,7 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques

Y_ABORT_UNLESS(!Responded);
Y_ABORT_UNLESS(status != NKikimrProto::OK);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
TabletId, Info->GroupID.GetRawId(), TLogoBlobID::MaxChannel, "", false);
std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> response(new TEvBlobStorage::TEvDiscoverResult(
Expand Down Expand Up @@ -738,13 +729,8 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
}
};

IActor* CreateBlobStorageGroupMirror3dcDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupMirror3dcDiscoverRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupMirror3dcDiscoverRequest(TBlobStorageGroupDiscoverParameters params) {
return new TBlobStorageGroupMirror3dcDiscoverRequest(params);
}

}//NKikimr
Loading
Loading