Skip to content

Commit

Permalink
DS proxy mirror-3-dc restoration strategy test (ydb-platform#1238)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jan 24, 2024
1 parent 899f3dd commit 526ec53
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 4 deletions.
8 changes: 4 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ struct TBlobState {
};

struct TDiskGetRequest {
const TLogoBlobID Id;
const ui32 Shift;
const ui32 Size;
TLogoBlobID Id;
ui32 Shift;
ui32 Size;
ssize_t PartMapIndex = -1;

TDiskGetRequest(const TLogoBlobID &id, const ui32 shift, const ui32 size)
Expand All @@ -127,7 +127,7 @@ struct TDiskPutRequest {
ReasonInitial,
ReasonAccelerate
};
const TLogoBlobID Id;
TLogoBlobID Id;
TRope Buffer;
EPutReason Reason;
bool IsHandoff;
Expand Down
174 changes: 174 additions & 0 deletions ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h>
#include <ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h>
#include <ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_restore.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/stream/null.h>
#include <util/generic/overloaded.h>

using namespace NActors;
using namespace NKikimr;
Expand Down Expand Up @@ -193,10 +195,182 @@ void RunStrategyTest(TBlobStorageGroupType type) {
}
}

struct TGetQuery {
ui32 OrderNumber;
TLogoBlobID Id;
ui32 Shift;
ui32 Size;

auto AsTuple() const { return std::make_tuple(OrderNumber, Id, Shift, Size); }
friend bool operator ==(const TGetQuery& x, const TGetQuery& y) { return x.AsTuple() == y.AsTuple(); }
friend bool operator <(const TGetQuery& x, const TGetQuery& y) { return x.AsTuple() < y.AsTuple(); }
};

struct TPutQuery {
ui32 OrderNumber;
TLogoBlobID Id;

auto AsTuple() const { return std::make_tuple(OrderNumber, Id); }
friend bool operator ==(const TPutQuery& x, const TPutQuery& y) { return x.AsTuple() == y.AsTuple(); }
friend bool operator <(const TPutQuery& x, const TPutQuery& y) { return x.AsTuple() < y.AsTuple(); }
};

using TOperation = std::variant<TGetQuery, TPutQuery>;

void RunTestLevel(const TBlobStorageGroupInfo& info, TBlackboard& blackboard,
const std::function<EStrategyOutcome(TBlackboard&)>& runStrategies, const TLogoBlobID& id,
std::vector<TOperation>& stock, TSubgroupPartLayout presenceMask, bool nonWorkingDomain,
std::set<TOperation>& context, ui32& terminals) {
// see which operations we can add to the stock
const size_t stockSizeOnEntry = stock.size();
auto& requests = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber;
for (ui32 i = 0; i < info.GetTotalVDisksNum(); ++i) {
for (auto& j = requests[i].FirstUnsentRequestIdx; j < requests[i].GetsToSend.size(); ++j) {
auto& get = requests[i].GetsToSend[j];
stock.push_back(TGetQuery{i, get.Id, get.Shift, get.Size});
const bool inserted = context.insert(stock.back()).second;
UNIT_ASSERT(inserted);
}
for (auto& j = requests[i].FirstUnsentPutIdx; j < requests[i].PutsToSend.size(); ++j) {
auto& put = requests[i].PutsToSend[j];
stock.push_back(TPutQuery{i, put.Id});
const bool inserted = context.insert(stock.back()).second;
UNIT_ASSERT(inserted);
}
}
UNIT_ASSERT(!stock.empty());

bool canIssuePuts = true;
for (size_t i = 0; i < stock.size(); ++i) {
if (std::holds_alternative<TGetQuery>(stock[i])) {
canIssuePuts = false;
break;
}
}

// try every single operation in stock
for (size_t i = 0; i < stock.size(); ++i) {
if (!canIssuePuts && std::holds_alternative<TPutQuery>(stock[i])) {
continue;
}
if (auto *get = std::get_if<TGetQuery>(&stock[i]); get && context.contains(TPutQuery{get->OrderNumber, get->Id})) {
continue;
}

std::swap(stock[i], stock.back());
TOperation operation = std::move(stock.back());
stock.pop_back();

TBlackboard branch(blackboard);
TSubgroupPartLayout myPresenceMask(presenceMask);

std::visit(TOverloaded{
[&](const TGetQuery& op) {
const ui32 idxInSubgroup = info.GetTopology().GetIdxInSubgroup(info.GetVDiskId(op.OrderNumber), id.Hash());
if (nonWorkingDomain && idxInSubgroup % 3 == 2) {
branch.AddErrorResponse(op.Id, op.OrderNumber);
} else if (myPresenceMask.GetDisksWithPart(op.Id.PartId() - 1) >> idxInSubgroup & 1) {
const ui32 blobSize = op.Id.BlobSize();
const ui32 shift = Min(op.Shift, blobSize);
const ui32 size = Min(op.Size ? op.Size : Max<ui32>(), blobSize - shift);
branch.AddResponseData(op.Id, op.OrderNumber, shift, TRope(TString(size, 'X')));
} else {
branch.AddNoDataResponse(op.Id, op.OrderNumber);
}
},
[&](const TPutQuery& op) {
const ui32 idxInSubgroup = info.GetTopology().GetIdxInSubgroup(info.GetVDiskId(op.OrderNumber), id.Hash());
if (nonWorkingDomain && idxInSubgroup % 3 == 2) {
branch.AddErrorResponse(op.Id, op.OrderNumber);
} else {
myPresenceMask.AddItem(idxInSubgroup, op.Id.PartId() - 1, info.Type);
branch.AddPutOkResponse(op.Id, op.OrderNumber);
}
}
}, operation);

auto outcome = runStrategies(branch);
UNIT_ASSERT(outcome != EStrategyOutcome::ERROR);
if (outcome == EStrategyOutcome::DONE) {
TBlobStorageGroupInfo::TOrderNums nums;
info.GetTopology().PickSubgroup(id.Hash(), nums);
UNIT_ASSERT(info.GetQuorumChecker().GetBlobState(myPresenceMask, {&info.GetTopology()}) == TBlobStorageGroupInfo::EBS_FULL);
++terminals;
} else {
RunTestLevel(info, branch, runStrategies, id, stock, myPresenceMask, nonWorkingDomain, context, terminals);
}

stock.push_back(std::move(operation));
std::swap(stock[i], stock.back());
}

// revert stock
for (size_t i = stockSizeOnEntry; i < stock.size(); ++i) {
const size_t n = context.erase(stock[i]);
UNIT_ASSERT(n);
}
stock.resize(stockSizeOnEntry);
}

Y_UNIT_TEST_SUITE(DSProxyStrategyTest) {

Y_UNIT_TEST(Restore_block42) {
RunStrategyTest<TRestoreStrategy>(TBlobStorageGroupType::Erasure4Plus2Block);
}

Y_UNIT_TEST(Restore_mirror3dc) {
THPTimer timer;
const TBlobStorageGroupType type(TBlobStorageGroupType::ErasureMirror3dc);

TBlobStorageGroupInfo info(type, 1, 3, 3);
info.Ref();
TGroupQueues groupQueues(info.GetTopology());
groupQueues.Ref();

std::vector<TOperation> stock;

TLogContext logCtx(NKikimrServices::BS_PROXY, false);
logCtx.SuppressLog = true;

auto runStrategies = [&](TBlackboard& blackboard) {
return blackboard.RunStrategy(logCtx, TMirror3dcGetWithRestoreStrategy());
};

const ui32 base = RandomNumber(512u);
for (ui32 i = 0; i < 512; ++i) {
const ui32 diskMask = (base + i) % 512;
for (bool nonWorkingDomain : {false, true}) {
TBlackboard blackboard(&info, &groupQueues, NKikimrBlobStorage::UserData, NKikimrBlobStorage::FastRead);

const TLogoBlobID id(1'000'000'000, 1, 1, 0, 1000, 0);
TSubgroupPartLayout presenceMask;
blackboard.AddNeeded(id, 0, id.BlobSize());
bool partsAvailable = false;
for (ui32 idxInSubgroup = 0; idxInSubgroup < 9; ++idxInSubgroup) {
if (diskMask >> idxInSubgroup & 1 && (!nonWorkingDomain || idxInSubgroup % 3 != 2)) {
presenceMask.AddItem(idxInSubgroup, idxInSubgroup % 3, info.Type);
partsAvailable = true;
}
}
if (!partsAvailable) {
continue;
}

Cerr << "diskMask# " << diskMask << " nonWorkingDomain# " << nonWorkingDomain;

auto outcome = runStrategies(blackboard);
UNIT_ASSERT(outcome == EStrategyOutcome::IN_PROGRESS);

std::set<TOperation> context;
ui32 terminals = 0;
RunTestLevel(info, blackboard, runStrategies, id, stock, presenceMask, nonWorkingDomain, context, terminals);
Cerr << " " << terminals << Endl;

if (TDuration::Seconds(timer.Passed()) >= TDuration::Minutes(5)) {
return;
}
}
}
}

}
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/dsproxy/ut_strategy/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
UNITTEST()

FORK_SUBTESTS()

TIMEOUT(600)
SIZE(MEDIUM)

Expand Down

0 comments on commit 526ec53

Please sign in to comment.