Skip to content

Commit

Permalink
Fix put impl class (ydb-platform#2829)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Mar 15, 2024
1 parent e1b557d commit 9228c95
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 188 deletions.
71 changes: 23 additions & 48 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) {
IsChanged = true;
}

void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) {
Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present);
BlobIdx = blobIdx;
IsChanged = true;
}

bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
const TIntervalVec<i32> fullBlobInterval(0, Id.BlobSize());
const TIntervalSet<i32> here = Whole.Here();
Expand Down Expand Up @@ -227,7 +221,7 @@ TString TBlobState::ToString() const {
for (ui32 i = 0; i < Disks.size(); ++i) {
str << Endl << " Disks[" << i << "]# " << Disks[i].ToString() << Endl;
}
str << " BlobIdx# " << (ui32)BlobIdx << Endl;
str << " BlobIdx# " << BlobIdx << Endl;
str << "}";
return str.Str();
}
Expand Down Expand Up @@ -304,7 +298,7 @@ void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui3
}

void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx) {
PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx);
}

Expand Down Expand Up @@ -340,20 +334,6 @@ void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& part
(*this)[id].AddPartToPut(partIdx, std::move(partData));
}

void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() == 0);
Y_ABORT_UNLESS(id.BlobSize() != 0);
(*this)[id].MarkBlobReadyToPut(blobIdx);
}

void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() == 0);
Y_ABORT_UNLESS(id.BlobSize() != 0);
DoneBlobStates.insert(BlobStates.extract(id));
}

void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
Expand Down Expand Up @@ -390,8 +370,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
}

EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
TString errorReason;
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
auto& blob = it->second;
if (!std::exchange(blob.IsChanged, false)) {
Expand All @@ -401,23 +380,19 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec

// recalculate blob outcome if it is not yet determined
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
TString errorReason;
for (IStrategy *strategy : s) {
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
case EStrategyOutcome::IN_PROGRESS:
status = NKikimrProto::UNKNOWN;
break;

case EStrategyOutcome::ERROR:
if (IsAllRequestsTogether) {
if (!finished) {
return res;
}
if (errorReason) {
errorReason += " && ";
errorReason += res.ErrorReason;
} else {
errorReason = res.ErrorReason;
}
status = NKikimrProto::ERROR;
errorReason = std::move(res.ErrorReason);
break;

case EStrategyOutcome::DONE:
Expand All @@ -431,26 +406,25 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
status = NKikimrProto::UNKNOWN;
}
if (status != NKikimrProto::UNKNOWN) {
if (finished) { // we are operating on independent blobs
finished->push_back(TFinishedBlob{
blob.BlobIdx,
status,
std::move(errorReason),
});
}
const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++));
Y_ABORT_UNLESS(inserted);
if (!IsAllRequestsTogether) {
blob.Status = status;
if (finished) {
finished->push_back(&*doneIt);
}
}
} else {
++it;
}
}

EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
outcome.ErrorReason = std::move(errorReason);
return outcome;
return BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS;
}

EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
}

Expand All @@ -464,8 +438,7 @@ TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
<< " blobId# " << fullId
<< " BlackBoard# " << ToString());
}
TBlobState &state = it->second;
return state;
return it->second;
}

ssize_t TBlackboard::AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex) {
Expand Down Expand Up @@ -512,8 +485,12 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T
}
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id) {
(*this)[id];
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
const auto [it, inserted] = BlobStates.try_emplace(id);
Y_ABORT_UNLESS(inserted);
TBlobState& state = it->second;
state.Init(id, *Info);
state.BlobIdx = blobIdx;
}

TBlobState& TBlackboard::operator [](const TLogoBlobID& id) {
Expand Down Expand Up @@ -559,9 +536,7 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
for (auto& [id, state] : BlobStates) {
if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
TBlobState::TDisk& disk = state.Disks[diskIdx];
for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
for (TBlobState::TDiskPart& part : state.Disks[diskIdx].DiskParts) {
if (part.Situation == TBlobState::ESituation::Present) {
part.Situation = TBlobState::ESituation::Unknown;
if (state.WholeSituation == TBlobState::ESituation::Present) {
Expand Down
30 changes: 15 additions & 15 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,12 @@ struct TBlobState {
TStackVec<TState, TypicalPartsInBlob> Parts;
TStackVec<TDisk, TypicalDisksInSubring> Disks;
TVector<TEvBlobStorage::TEvGetResult::TPartMapItem> PartMap;
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
ui8 BlobIdx;
size_t BlobIdx;
bool IsChanged = false;

void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
void AddPartToPut(ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
ui32 shift, TRope&& data);
Expand Down Expand Up @@ -133,9 +131,9 @@ struct TDiskPutRequest {
TRope Buffer;
EPutReason Reason;
bool IsHandoff;
ui8 BlobIdx;
size_t BlobIdx;

TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx)
TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, size_t blobIdx)
: OrderNumber(orderNumber)
, Id(id)
, Buffer(std::move(buffer))
Expand All @@ -152,7 +150,7 @@ struct TGroupDiskRequests {
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size);
void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx);
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx);
};

struct TBlackboard;
Expand All @@ -170,6 +168,12 @@ struct TBlackboard {
AccelerationModeSkipMarked
};

struct TFinishedBlob {
size_t BlobIdx;
NKikimrProto::EReplyStatus Status;
TString ErrorReason;
};

using TBlobStates = TMap<TLogoBlobID, TBlobState>;
TBlobStates BlobStates;
TBlobStates DoneBlobStates;
Expand All @@ -179,31 +183,27 @@ struct TBlackboard {
EAccelerationMode AccelerationMode;
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
const NKikimrBlobStorage::EGetHandleClass GetHandleClass;
const bool IsAllRequestsTogether;

TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass,
bool isAllRequestsTogether = true)
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass)
: Info(info)
, GroupQueues(groupQueues)
, AccelerationMode(AccelerationModeSkipOneSlowest)
, PutHandleClass(putHandleClass)
, GetHandleClass(getHandleClass)
, IsAllRequestsTogether(isAllRequestsTogether)
{}

void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);

EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
TBatchedVec<TFinishedBlob> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TFinishedBlob> *finished = nullptr,
const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
TBlobState& GetState(const TLogoBlobID &id);
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
Expand All @@ -221,7 +221,7 @@ struct TBlackboard {

void InvalidatePartStates(ui32 orderNumber);

void RegisterBlobForPut(const TLogoBlobID& id);
void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx);

TBlobState& operator [](const TLogoBlobID& id);
};
Expand Down
35 changes: 15 additions & 20 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
SanityCheck(); // May Die
}

bool Action() {
bool Action(bool accelerate = false) {
UpdateExpiredVDiskSet();

TPutImpl::TPutResultVec putResults;
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet);
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet, accelerate);
if (ReplyAndDieWithLastResponse(putResults)) {
return true;
}
Expand All @@ -133,9 +133,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
return;
}
IsAccelerated = true;

PutImpl.Accelerate(LogCtx);
Action();
Action(true);
// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
}

Expand Down Expand Up @@ -210,9 +208,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
HandleIncarnation(issue, orderNumber, record.GetIncarnationGuid());
}

if (Action()) {
return;
}
Action();
}

void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
Expand Down Expand Up @@ -265,7 +261,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) {
TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId;
TPutImpl::TPutResultVec putResults;
PutImpl.PrepareOneReply(status, blobId.FullID(), blobIdx, LogCtx, std::move(error), putResults);
PutImpl.PrepareOneReply(status, blobIdx, LogCtx, std::move(error), putResults);
ReplyAndDieWithLastResponse(putResults);
} else {
PutImpl.ProcessResponse(*ev->Get());
Expand Down Expand Up @@ -351,7 +347,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors
if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) {
ErrorReason = TStringBuilder() << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vdiskId;
PutImpl.PrepareOneReply(itemStatus, blobId.FullID(), blobIdx, LogCtx, ErrorReason, putResults);
PutImpl.PrepareOneReply(itemStatus, blobIdx, LogCtx, ErrorReason, putResults);
}
}
if (ReplyAndDieWithLastResponse(putResults)) {
Expand Down Expand Up @@ -405,7 +401,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
return false;
}

void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, ui64 blobIdx) {
void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, size_t blobIdx) {
NKikimrProto::EReplyStatus status = putResult->Status;
A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21",
"SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent
Expand Down Expand Up @@ -449,7 +445,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TString BlobIdSequenceToString() const {
TStringBuilder blobIdsStr;
blobIdsStr << '[';
for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (blobIdx) {
blobIdsStr << ' ';
}
Expand Down Expand Up @@ -603,7 +599,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt

StartTime = TActivationContext::Monotonic();

for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}

Expand Down Expand Up @@ -703,12 +699,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
s << ' ';
}
s << i;
auto& record = IncarnationRecords[i];
s << '{';
s << "IncarnationGuid# " << record.IncarnationGuid;
s << " ExpirationTimestamp# " << record.ExpirationTimestamp;
s << " StatusIssueTimestamp# " << record.StatusIssueTimestamp;
s << '}';
auto& r = IncarnationRecords[i];
s << '{' << r.IncarnationGuid
<< ' ' << (r.ExpirationTimestamp != TMonotonic::Max() ? TStringBuilder() << r.ExpirationTimestamp : "-"_sb)
<< ' ' << (r.StatusIssueTimestamp != TMonotonic::Zero() ? TStringBuilder() << r.StatusIssueTimestamp : "-"_sb)
<< '}';
}
s << '}';
return s.Str();
Expand All @@ -735,7 +730,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

STATEFN(StateWait) {
if (ProcessEvent(ev, IsManyPuts)) {
if (ProcessEvent(ev, true)) {
return;
}
const ui32 type = ev->GetTypeRewrite();
Expand Down
Loading

0 comments on commit 9228c95

Please sign in to comment.