Skip to content

Commit

Permalink
Revert "Fix deadlines in DSProxy for Status, MultiPut, Patch requests…
Browse files Browse the repository at this point in the history
…, add UTs for deadlines (ydb-platform#11780)" (ydb-platform#12875)
  • Loading branch information
alexvru authored Dec 23, 2024
1 parent 3c2e65a commit 1e172ec
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 386 deletions.
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;

const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(100);

const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::MilliSeconds(1);
const ui64 DsPutWakeupMs = 60000;

const ui64 BufferSizeThreshold = 1 << 20;

Expand Down
41 changes: 4 additions & 37 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;

using TDeadlineMask = std::bitset<MaxBatchedPutRequests>;
std::map<TInstant, TDeadlineMask> PutDeadlineMasks;
TDeadlineMask DeadlineMask;

TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
ui64 WaitingVDiskCount = 0;

Expand Down Expand Up @@ -651,7 +647,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " RestartCounter# " << RestartCounter);

for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
PutDeadlineMasks[PutImpl.Blobs[blobIdx].Deadline].set(blobIdx);
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}

Expand All @@ -672,8 +667,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
getTotalSize()
);

Become(&TBlobStorageGroupPutRequest::StateWait);
ScheduleWakeup(TInstant::Zero());
Become(&TBlobStorageGroupPutRequest::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);

PartSets.resize(PutImpl.Blobs.size());
for (auto& partSet : PartSets) {
Expand Down Expand Up @@ -724,27 +718,15 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " BlobIDs# " << BlobIdSequenceToString()
<< " Not answered in "
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");

const TInstant now = TActivationContext::Now();
while (!PutDeadlineMasks.empty()) {
auto [deadline, mask] = *PutDeadlineMasks.begin();
if (deadline <= now) {
DeadlineMask |= mask;
PutDeadlineMasks.erase(PutDeadlineMasks.begin());
} else {
break;
}
}

TPutImpl::TPutResultVec putResults;
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (!PutImpl.Blobs[blobIdx].Replied && DeadlineMask[blobIdx]) {
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
}
}
if (!ReplyAndDieWithLastResponse(putResults)) {
ScheduleWakeup(now);
}
ReplyAndDieWithLastResponse(putResults);
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
}

void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
Expand Down Expand Up @@ -811,21 +793,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " State# " << PutImpl.DumpFullState());
}

void ScheduleWakeup(TInstant lastWakeup) {
TInstant now = TActivationContext::Now();
TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;

// find first deadline after now
for (auto it = PutDeadlineMasks.begin(); it != PutDeadlineMasks.end(); ++it) {
deadline = std::max(deadline, it->first);
if (it->first > now) {
break;
}
}

Schedule(deadline, new TKikimrEvents::TEvWakeup);
}

STATEFN(StateWait) {
if (ProcessEvent(ev, true)) {
return;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ namespace NKikimr {
.ExecutionRelay = ev->Get()->ExecutionRelay
}
}),
ev->Get()->Deadline
TInstant::Max()
);
}

Expand Down
Loading

0 comments on commit 1e172ec

Please sign in to comment.