From 9228c958177e747a44ab86c9006022e604e60e28 Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Fri, 15 Mar 2024 23:49:48 +0300 Subject: [PATCH] Fix put impl class (#2829) --- .../dsproxy/dsproxy_blackboard.cpp | 71 ++++-------- .../blobstorage/dsproxy/dsproxy_blackboard.h | 30 ++--- ydb/core/blobstorage/dsproxy/dsproxy_put.cpp | 35 +++--- .../blobstorage/dsproxy/dsproxy_put_impl.cpp | 104 +++++++----------- .../blobstorage/dsproxy/dsproxy_put_impl.h | 44 ++------ .../blobstorage/dsproxy/ut/dsproxy_put_ut.cpp | 10 +- .../dsproxy/ut_strategy/strategy_ut.cpp | 2 +- 7 files changed, 108 insertions(+), 188 deletions(-) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 97706a1b6f13..6df5e4ce8698 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -48,12 +48,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) { IsChanged = true; } -void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) { - Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present); - BlobIdx = blobIdx; - IsChanged = true; -} - bool TBlobState::Restore(const TBlobStorageGroupInfo &info) { const TIntervalVec fullBlobInterval(0, Id.BlobSize()); const TIntervalSet here = Whole.Here(); @@ -227,7 +221,7 @@ TString TBlobState::ToString() const { for (ui32 i = 0; i < Disks.size(); ++i) { str << Endl << " Disks[" << i << "]# " << Disks[i].ToString() << Endl; } - str << " BlobIdx# " << (ui32)BlobIdx << Endl; + str << " BlobIdx# " << BlobIdx << Endl; str << "}"; return str.Str(); } @@ -304,7 +298,7 @@ void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui3 } void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) { + TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx) { PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx); } @@ -340,20 +334,6 @@ void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& part (*this)[id].AddPartToPut(partIdx, std::move(partData)); } -void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) { - Y_ABORT_UNLESS(bool(id)); - Y_ABORT_UNLESS(id.PartId() == 0); - Y_ABORT_UNLESS(id.BlobSize() != 0); - (*this)[id].MarkBlobReadyToPut(blobIdx); -} - -void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) { - Y_ABORT_UNLESS(bool(id)); - Y_ABORT_UNLESS(id.PartId() == 0); - Y_ABORT_UNLESS(id.BlobSize() != 0); - DoneBlobStates.insert(BlobStates.extract(id)); -} - void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) { Y_ABORT_UNLESS(bool(id)); Y_ABORT_UNLESS(id.PartId() != 0); @@ -390,8 +370,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) { } EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec& s, - TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { - TString errorReason; + TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { for (auto it = BlobStates.begin(); it != BlobStates.end(); ) { auto& blob = it->second; if (!std::exchange(blob.IsChanged, false)) { @@ -401,6 +380,7 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec // recalculate blob outcome if it is not yet determined NKikimrProto::EReplyStatus status = NKikimrProto::OK; + TString errorReason; for (IStrategy *strategy : s) { switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) { case EStrategyOutcome::IN_PROGRESS: @@ -408,16 +388,11 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec break; case EStrategyOutcome::ERROR: - if (IsAllRequestsTogether) { + if (!finished) { return res; } - if (errorReason) { - errorReason += " && "; - errorReason += res.ErrorReason; - } else { - errorReason = res.ErrorReason; - } status = NKikimrProto::ERROR; + errorReason = std::move(res.ErrorReason); break; case EStrategyOutcome::DONE: @@ -431,26 +406,25 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec status = NKikimrProto::UNKNOWN; } if (status != NKikimrProto::UNKNOWN) { + if (finished) { // we are operating on independent blobs + finished->push_back(TFinishedBlob{ + blob.BlobIdx, + status, + std::move(errorReason), + }); + } const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++)); Y_ABORT_UNLESS(inserted); - if (!IsAllRequestsTogether) { - blob.Status = status; - if (finished) { - finished->push_back(&*doneIt); - } - } } else { ++it; } } - EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS); - outcome.ErrorReason = std::move(errorReason); - return outcome; + return BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS; } EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s, - TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { + TBatchedVec *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) { return RunStrategies(logCtx, {const_cast(&s)}, finished, expired); } @@ -464,8 +438,7 @@ TBlobState& TBlackboard::GetState(const TLogoBlobID &id) { << " blobId# " << fullId << " BlackBoard# " << ToString()); } - TBlobState &state = it->second; - return state; + return it->second; } ssize_t TBlackboard::AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex) { @@ -512,8 +485,12 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T } } -void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id) { - (*this)[id]; +void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) { + const auto [it, inserted] = BlobStates.try_emplace(id); + Y_ABORT_UNLESS(inserted); + TBlobState& state = it->second; + state.Init(id, *Info); + state.BlobIdx = blobIdx; } TBlobState& TBlackboard::operator [](const TLogoBlobID& id) { @@ -559,9 +536,7 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) { const TVDiskID vdiskId = Info->GetVDiskId(orderNumber); for (auto& [id, state] : BlobStates) { if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) { - TBlobState::TDisk& disk = state.Disks[diskIdx]; - for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { - TBlobState::TDiskPart& part = disk.DiskParts[partIdx]; + for (TBlobState::TDiskPart& part : state.Disks[diskIdx].DiskParts) { if (part.Situation == TBlobState::ESituation::Present) { part.Situation = TBlobState::ESituation::Unknown; if (state.WholeSituation == TBlobState::ESituation::Present) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index c128a013e507..84dc4780c9a5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -81,14 +81,12 @@ struct TBlobState { TStackVec Parts; TStackVec Disks; TVector PartMap; - NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; - ui8 BlobIdx; + size_t BlobIdx; bool IsChanged = false; void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); void AddPartToPut(ui32 partIdx, TRope&& partData); - void MarkBlobReadyToPut(ui8 blobIdx = 0); bool Restore(const TBlobStorageGroupInfo &info); void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, ui32 shift, TRope&& data); @@ -133,9 +131,9 @@ struct TDiskPutRequest { TRope Buffer; EPutReason Reason; bool IsHandoff; - ui8 BlobIdx; + size_t BlobIdx; - TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx) + TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, size_t blobIdx) : OrderNumber(orderNumber) , Id(id) , Buffer(std::move(buffer)) @@ -152,7 +150,7 @@ struct TGroupDiskRequests { void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet &intervalSet); void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size); void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx); + TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx); }; struct TBlackboard; @@ -170,6 +168,12 @@ struct TBlackboard { AccelerationModeSkipMarked }; + struct TFinishedBlob { + size_t BlobIdx; + NKikimrProto::EReplyStatus Status; + TString ErrorReason; + }; + using TBlobStates = TMap; TBlobStates BlobStates; TBlobStates DoneBlobStates; @@ -179,31 +183,27 @@ struct TBlackboard { EAccelerationMode AccelerationMode; const NKikimrBlobStorage::EPutHandleClass PutHandleClass; const NKikimrBlobStorage::EGetHandleClass GetHandleClass; - const bool IsAllRequestsTogether; TBlackboard(const TIntrusivePtr &info, const TIntrusivePtr &groupQueues, - NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass, - bool isAllRequestsTogether = true) + NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass) : Info(info) , GroupQueues(groupQueues) , AccelerationMode(AccelerationModeSkipOneSlowest) , PutHandleClass(putHandleClass) , GetHandleClass(getHandleClass) - , IsAllRequestsTogether(isAllRequestsTogether) {} void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize); void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData); - void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); - void MoveBlobStateToDone(const TLogoBlobID &id); void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data); void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber); void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber); void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber); void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber); + EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec& strategies, - TBatchedVec *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); - EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished = nullptr, + TBatchedVec *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); + EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); TBlobState& GetState(const TLogoBlobID &id); ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex); @@ -221,7 +221,7 @@ struct TBlackboard { void InvalidatePartStates(ui32 orderNumber); - void RegisterBlobForPut(const TLogoBlobID& id); + void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx); TBlobState& operator [](const TLogoBlobID& id); }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index a092e45872a7..4f815af5a8d9 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -103,11 +103,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorNodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); } @@ -210,9 +208,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGet()); @@ -351,7 +347,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor putResult, ui64 blobIdx) { + void SendReply(std::unique_ptr putResult, size_t blobIdx) { NKikimrProto::EReplyStatus status = putResult->Status; A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21", "SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent @@ -449,7 +445,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetTypeRewrite(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 81f56bed5363..3049ab1a039b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -11,22 +11,44 @@ namespace NKikimr { using TPutResultVec = TPutImpl::TPutResultVec; -void TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { +void TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired, bool accelerate) { + if (accelerate) { + ChangeAll(); + } + switch (Info->Type.GetErasure()) { case TBlobStorageGroupType::ErasureMirror3dc: - return RunStrategy(logCtx, TPut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults, expired); + return accelerate + ? RunStrategy(logCtx, TAcceleratePut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults, expired) + : RunStrategy(logCtx, TPut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults, expired); case TBlobStorageGroupType::ErasureMirror3of4: - return RunStrategy(logCtx, TPut3of4Strategy(Tactic), outPutResults, expired); + return accelerate + ? RunStrategy(logCtx, TPut3of4Strategy(Tactic, true), outPutResults, expired) + : RunStrategy(logCtx, TPut3of4Strategy(Tactic), outPutResults, expired); default: - return RunStrategy(logCtx, TRestoreStrategy(), outPutResults, expired); + return accelerate + ? RunStrategy(logCtx, TAcceleratePutStrategy(), outPutResults, expired) + : RunStrategy(logCtx, TRestoreStrategy(), outPutResults, expired); } } void TPutImpl::RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { - TBatchedVec finished; + Y_VERIFY_S(Blackboard.BlobStates.size(), "State# " << DumpFullState()); + TBatchedVec finished; const EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, strategy, &finished, &expired); - PrepareReply(logCtx, outcome.ErrorReason, finished, outPutResults); + for (const TBlackboard::TFinishedBlob& item : finished) { + Y_ABORT_UNLESS(item.BlobIdx < Blobs.size()); + Y_ABORT_UNLESS(!IsDone[item.BlobIdx]); + PrepareOneReply(item.Status, item.BlobIdx, logCtx, item.ErrorReason, outPutResults); + Y_VERIFY_S(IsDone[item.BlobIdx], "State# " << DumpFullState()); + } + if (outcome == EStrategyOutcome::DONE) { + for (const auto& done : IsDone) { + Y_VERIFY_S(done, "finished.size# " << finished.size() << " State# " << DumpFullState()); + } + } } NLog::EPriority GetPriorityForReply(TAtomicLogPriorityMuteChecker &checker, @@ -40,62 +62,23 @@ NLog::EPriority GetPriorityForReply(TAtomicLogPriorityMuteCheckerGroupID, - ApproximateFreeSpaceShare)); - outPutResults.back().second->ErrorReason = errorReason; - NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, status); - A_LOG_LOG_SX(logCtx, true, priority, "BPP12", "Result# " << outPutResults.back().second->Print(false)); - MarkBlobAsSent(blobIdx); + if (!std::exchange(IsDone[blobIdx], true)) { + auto ev = std::make_unique(status, Blobs[blobIdx].BlobId, StatusFlags, + Info->GroupID, ApproximateFreeSpaceShare); + ev->ErrorReason = std::move(errorReason); + const NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, status); + A_LOG_LOG_SX(logCtx, true, priority, "BPP12", "Result# " << ev->Print(false)); + outPutResults.emplace_back(blobIdx, std::move(ev)); } } void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason, TPutResultVec &outPutResults) { A_LOG_DEBUG_SX(logCtx, "BPP34", "PrepareReply status# " << status << " errorReason# " << errorReason); - for (ui64 idx = 0; idx < Blobs.size(); ++idx) { - if (IsDone[idx]) { - A_LOG_DEBUG_SX(logCtx, "BPP35", "blob# " << Blobs[idx].ToString() << - " idx# " << idx << " is sent, skipped"); - continue; - } - - outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].BlobId, StatusFlags, - Info->GroupID, ApproximateFreeSpaceShare)); - outPutResults.back().second->ErrorReason = errorReason; - - NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, status); - A_LOG_LOG_SX(logCtx, true, priority, "BPP38", - "PrepareReply Result# " << outPutResults.back().second->Print(false)); - - if (IsInitialized) { - MarkBlobAsSent(idx); - } - } -} - -void TPutImpl::PrepareReply(TLogContext &logCtx, TString errorReason, - TBatchedVec& finished, TPutResultVec &outPutResults) { - A_LOG_DEBUG_SX(logCtx, "BPP36", "PrepareReply errorReason# " << errorReason); - Y_ABORT_UNLESS(IsInitialized); - for (auto item : finished) { - auto &[blobId, state] = *item; - const ui64 idx = state.BlobIdx; - Y_ABORT_UNLESS(blobId == Blobs[idx].BlobId, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s", - idx, state.ToString().c_str(), Blackboard.ToString().c_str()); - Y_ABORT_UNLESS(!IsDone[idx]); - Y_ABORT_UNLESS(state.Status != NKikimrProto::UNKNOWN); - outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(state.Status, blobId, StatusFlags, - Info->GroupID, ApproximateFreeSpaceShare)); - outPutResults.back().second->ErrorReason = errorReason; - - NLog::EPriority priority = GetPriorityForReply(Info->PutErrorMuteChecker, state.Status); - A_LOG_LOG_SX(logCtx, true, priority, "BPP37", - "PrepareReply Result# " << outPutResults.back().second->Print(false)); - MarkBlobAsSent(idx); + for (size_t blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { + PrepareOneReply(status, blobIdx, logCtx, errorReason, outPutResults); } } @@ -125,7 +108,7 @@ TString TPutImpl::DumpFullState() const { str << Endl; str << " Blobs# " << Blobs.ToString(); str << Endl; - str << "IsDone# " << IsDone.ToString(); + str << " IsDone# " << IsDone.ToString(); str << Endl; str << " HandoffPartsSent# " << HandoffPartsSent; str << Endl; @@ -143,15 +126,6 @@ TString TPutImpl::DumpFullState() const { return str.Str(); } -bool TPutImpl::MarkBlobAsSent(ui64 idx) { - Y_ABORT_UNLESS(idx < Blobs.size()); - Y_ABORT_UNLESS(!IsDone[idx]); - Blackboard.MoveBlobStateToDone(Blobs[idx].BlobId); - IsDone[idx] = true; - DoneBlobs++; - return true; -} - }//NKikimr Y_DECLARE_OUT_SPEC(, NKikimr::TPutImpl::TBlobInfo, stream, value) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 5c499eda22ab..b15b864837aa 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -43,8 +43,6 @@ class TPutImpl { bool AtLeastOneResponseWasNotOk = false; bool EnableRequestMod3x3ForMinLatecy = false; - ui64 DoneBlobs = 0; - const TEvBlobStorage::TEvPut::ETactic Tactic; struct TBlobInfo { @@ -99,8 +97,6 @@ class TPutImpl { friend class TBlobStorageGroupPutRequest; - bool IsInitialized = false; - friend void ::Out(IOutputStream&, const TBlobInfo&); public: @@ -109,7 +105,7 @@ class TPutImpl { bool enableRequestMod3x3ForMinLatecy, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId) : Deadline(ev->Deadline) , Info(info) - , Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false) + , Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead) , IsDone(1) , WrittenBeyondBarrier(1) , StatusFlags(0) @@ -133,7 +129,7 @@ class TPutImpl { bool enableRequestMod3x3ForMinLatecy) : Deadline(TInstant::Zero()) , Info(info) - , Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false) + , Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead) , IsDone(events.size()) , WrittenBeyondBarrier(events.size()) , StatusFlags(0) @@ -175,47 +171,26 @@ class TPutImpl { Y_VERIFY_S(partSets.size() == Blobs.size(), "partSets.size# " << partSets.size() << " Blobs.size# " << Blobs.size()); const ui32 totalParts = Info->Type.TotalPartCount(); - for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { + for (size_t blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { TBlobInfo& blob = Blobs[blobIdx]; - Blackboard.RegisterBlobForPut(blob.BlobId); + Blackboard.RegisterBlobForPut(blob.BlobId, blobIdx); for (ui32 i = 0; i < totalParts; ++i) { if (Info->Type.PartSize(TLogoBlobID(blob.BlobId, i + 1))) { Blackboard.AddPartToPut(blob.BlobId, i, TRope(partSets[blobIdx][i])); } } - Blackboard.MarkBlobReadyToPut(blob.BlobId, blobIdx); } - IsInitialized = true; } void PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason, TPutResultVec &outPutResults); - void PrepareReply(TLogContext &logCtx, TString errorReason, TBatchedVec& finished, - TPutResultVec &outPutResults); - void PrepareOneReply(NKikimrProto::EReplyStatus status, TLogoBlobID blobId, ui64 blobIdx, TLogContext &logCtx, + void PrepareOneReply(NKikimrProto::EReplyStatus status, size_t blobIdx, TLogContext &logCtx, TString errorReason, TPutResultVec &outPutResults); ui64 GetTimeToAccelerateNs(TLogContext &logCtx); - void Accelerate(TLogContext &logCtx) { - Blackboard.ChangeAll(); - switch (Info->Type.GetErasure()) { - case TBlobStorageGroupType::ErasureMirror3dc: - Blackboard.RunStrategy(logCtx, TAcceleratePut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy)); - break; - case TBlobStorageGroupType::ErasureMirror3of4: - Blackboard.RunStrategy(logCtx, TPut3of4Strategy(Tactic, true)); - break; - default: - Blackboard.RunStrategy(logCtx, TAcceleratePutStrategy()); - break; - } - } - TString DumpFullState() const; - bool MarkBlobAsSent(ui64 blobIdx); - TString ToString() const; void InvalidatePartStates(ui32 orderNumber) { @@ -226,8 +201,8 @@ class TPutImpl { Blackboard.ChangeAll(); } - void Step(TLogContext &logCtx, TPutResultVec& putResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { - RunStrategies(logCtx, putResults, expired); + void Step(TLogContext &logCtx, TPutResultVec& putResults, const TBlobStorageGroupInfo::TGroupVDisks& expired, bool accelerate) { + RunStrategies(logCtx, putResults, expired, accelerate); } TDeque GeneratePutRequests() { @@ -299,7 +274,8 @@ class TPutImpl { } protected: - void RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired); + void RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired, + bool accelerate); void RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired); @@ -339,7 +315,7 @@ class TPutImpl { void ProcessResponseCommonPart(TProtobuf& record) { Y_ABORT_UNLESS(record.HasStatus()); const NKikimrProto::EReplyStatus status = record.GetStatus(); - Y_ABORT_UNLESS(status != NKikimrProto::BLOCKED && status != NKikimrProto::RACE && status != NKikimrProto::DEADLINE); + Y_ABORT_UNLESS(status != NKikimrProto::RACE); if (record.HasStatusFlags()) { StatusFlags.Merge(record.GetStatusFlags()); } diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp index 09dca39bb5fd..2af3baccace5 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp @@ -76,7 +76,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) TPutImpl::TPutResultVec putResults; putImpl.GenerateInitialRequests(logCtx, partSetSingleton); - putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}); + putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}, false); auto vPuts = putImpl.GeneratePutRequests(); group.SetError(0, NKikimrProto::ERROR); @@ -119,7 +119,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) vPutResult.MakeError(status, TString(), vPut.Record); putImpl.ProcessResponse(vPutResult); - putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}); + putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}, false); auto nextVPuts = putImpl.GeneratePutRequests(); if (putResults.size()) { @@ -273,7 +273,7 @@ struct TTestPutAllOk { } std::visit([&](auto &ev) { putImpl.ProcessResponse(*ev); }, vPutResults[resIdx]); - putImpl.Step(LogCtx, putResults, &Group.GetInfo()->GetTopology()); + putImpl.Step(LogCtx, putResults, &Group.GetInfo()->GetTopology(), false); auto vPuts = putImpl.GeneratePutRequests(); if (putResults.size() == BlobCount) { break; @@ -309,7 +309,7 @@ struct TTestPutAllOk { } putImpl->GenerateInitialRequests(LogCtx, PartSets); - putImpl->Step(LogCtx, putResults, &Group.GetInfo()->GetTopology()); + putImpl->Step(LogCtx, putResults, &Group.GetInfo()->GetTopology(), false); auto vPuts = putImpl->GeneratePutRequests(); UNIT_ASSERT(vPuts.size() == 6 || !IsVPut); TDeque vPutResults; @@ -367,7 +367,7 @@ Y_UNIT_TEST(TestMirror3dcWith3x3MinLatencyMod) { ErasureSplit((TErasureType::ECrcMode)blobId.CrcMode(), env.Info->Type, TRope(encryptedData), partSetSingleton[0]); putImpl.GenerateInitialRequests(logCtx, partSetSingleton); TPutImpl::TPutResultVec putResults; - putImpl.Step(logCtx, putResults, &env.Info->GetTopology()); + putImpl.Step(logCtx, putResults, &env.Info->GetTopology(), false); auto vPuts = putImpl.GeneratePutRequests(); UNIT_ASSERT_VALUES_EQUAL(vPuts.size(), 9); diff --git a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp index 8b77cc4ee6b8..64ace509dd63 100644 --- a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp @@ -105,7 +105,7 @@ void RunStrategyTest(TBlobStorageGroupType type) { TLogoBlobID id(1'000'000'000, 1, 1, 0, data.size(), 0); std::vector parts(type.TotalPartCount()); ErasureSplit(TBlobStorageGroupType::CrcModeNone, type, TRope(data), parts); - blackboard.RegisterBlobForPut(id); + blackboard.RegisterBlobForPut(id, 0); for (ui32 i = 0; i < parts.size(); ++i) { blackboard.AddPartToPut(id, i, TRope(parts[i])); }