Skip to content

Commit

Permalink
issue-2854: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vladstepanyuk committed Jan 29, 2025
1 parent 3e54196 commit 9686d14
Show file tree
Hide file tree
Showing 18 changed files with 1,093 additions and 84 deletions.
17 changes: 17 additions & 0 deletions cloud/blockstore/libs/storage/partition_nonrepl/my_ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
UNITTEST_FOR(cloud/blockstore/libs/storage/partition_nonrepl)

INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc)

SRCS(
part_mirror_split_request_helpers_ut.cpp
)

PEERDIR(
cloud/blockstore/config
cloud/blockstore/libs/diagnostics
cloud/blockstore/libs/rdma_test
cloud/blockstore/libs/storage/testlib
cloud/blockstore/libs/storage/disk_agent/actors
)

END()
Original file line number Diff line number Diff line change
Expand Up @@ -275,27 +275,6 @@ void TMirrorPartitionActor::StartResyncRange(
BlockDigestGenerator);
}

TResultOrError<TSet<NActors::TActorId>>
TMirrorPartitionActor::GetActorsForBlockRange(const TBlockRange64 blockRange)
{
TSet<TActorId> replicaActorIds;
const ui32 readReplicaCount = Min<ui32>(
Max<ui32>(1, Config->GetMirrorReadReplicaCount()),
State.GetReplicaInfos().size());
for (ui32 i = 0; i < readReplicaCount; ++i) {
TActorId replicaActorId;
const auto error = State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
return error;
}

if (!replicaActorIds.insert(replicaActorId).second) {
break;
}
}
return replicaActorIds;
}

void TMirrorPartitionActor::ReplyAndDie(const TActorContext& ctx)
{
NCloud::Reply(ctx, *Poisoner, std::make_unique<TEvents::TEvPoisonTaken>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class TMirrorPartitionActor final
ui64 scrubbingRangeId);
void StartResyncRange(const NActors::TActorContext& ctx);

TResultOrError<TSet<NActors::TActorId>> GetActorsForBlockRange(const TBlockRange64 blockRange);
TResultOrError<THashSet<NActors::TActorId>> GetPartitionsToReadBlockRange(
const TBlockRange64 blockRange);

private:
STFUNC(StateWork);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
#include "part_mirror_actor.h"

#include "mirror_request_actor.h"
#include "part_mirror_split_requests_utils.h"
#include "part_mirror_split_request_helpers.h"

#include <cloud/blockstore/libs/common/block_checksum.h>
#include <cloud/blockstore/libs/storage/api/undelivered.h>
#include <cloud/blockstore/libs/storage/core/config.h>

#include <ranges>

#include <cloud/storage/core/libs/common/verify.h>

namespace NCloud::NBlockStore::NStorage {

using namespace NActors;

using namespace NSplitRequest;

namespace {

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -135,8 +139,8 @@ TRequestActor<TMethod>::TRequestActor(
, DiskId(std::move(diskId))
, ParentActorId(parentActorId)
, RequestIdentityKey(requestIdentityKey)
, ResponseChecksums(Partitions.size(), 0)
, SendResponseToParent(sendResponseToParent)
, ResponseChecksums(Partitions.size(), 0)
{}

template <typename TMethod>
Expand Down Expand Up @@ -400,22 +404,25 @@ STFUNC(TRequestActor<TMethod>::StateWork)

template <typename TMethod>
class TSplittedRequestActor final
: public TActorBootstrapped<TRequestActor<TMethod>>
: public TActorBootstrapped<TSplittedRequestActor<TMethod>>
{
private:
using TRequestRecordType = TMethod::TRequest::ProtoRecordType;
using TResponseRecordType = TMethod::TResponse::ProtoRecordType;

class TChildActorsInfo {
class TChildActorsInfo
{
private:
THashMap<TActorId, size_t> ActorId2Index;
TVector<TResponseRecordType> Responses;
TVector<TUnifyResponsesContext<TMethod>> Responses;
TVector<bool> ReceivedResponseFromActor;

public:
TChildActorsInfo() = default;

TChildActorsInfo(
THashMap<TActorId, size_t> actorIds,
TVector<TResponseRecordType> responses)
TVector<TUnifyResponsesContext<TMethod>> responses)
: ActorId2Index(std::move(actorIds))
, Responses(std::move(responses))
, ReceivedResponseFromActor(ActorId2Index.size(), false)
Expand Down Expand Up @@ -450,11 +457,16 @@ class TSplittedRequestActor final
return;
}

Responses[it->second] = std::move(response);
Responses[it->second].Response = std::move(response);
ReceivedResponseFromActor[it->second] = true;
}

static TVector<TResponseRecordType> ExtractResponses(
auto GetActorIds()
{
return ActorId2Index | std::views::transform([](const auto& el)
{ return el.first; });
}
static TVector<TUnifyResponsesContext<TMethod>> ExtractResponses(
TChildActorsInfo childActorsInfo)
{
return std::move(childActorsInfo.Responses);
Expand All @@ -463,19 +475,18 @@ class TSplittedRequestActor final

private:
const TRequestInfoPtr RequestInfo;
NSplitRequest::TSplittedRequest<TMethod> Requests;
TSplittedRequest<TMethod> Requests;
const TString DiskId;
const NActors::TActorId ParentActorId;
const ui64 RequestIdentityKey;
const ui64 BlockSize;

ui32 PendingRequests = 0;



TChildActorsInfo ChildActors;

using TResponseProto = typename TMethod::TResponse::ProtoRecordType;
using TBase = TActorBootstrapped<TRequestActor<TMethod>>;
using TBase = TActorBootstrapped<TSplittedRequestActor<TMethod>>;

TVector<ui32> ResponseChecksums;
ui32 ResponseCount = 0;
Expand All @@ -485,10 +496,11 @@ class TSplittedRequestActor final
public:
TSplittedRequestActor(
TRequestInfoPtr requestInfo,
NSplitRequest::TSplittedRequest<TMethod> requests,
TSplittedRequest<TMethod> requests,
TString diskId,
TActorId parentActorId,
ui64 requestIdentityKey);
ui64 requestIdentityKey,
ui64 blockSize);

void Bootstrap(const TActorContext& ctx);

Expand Down Expand Up @@ -516,30 +528,32 @@ class TSplittedRequestActor final
template <typename TMethod>
TSplittedRequestActor<TMethod>::TSplittedRequestActor(
TRequestInfoPtr requestInfo,
NSplitRequest::TSplittedRequest<TMethod> requests,
TSplittedRequest<TMethod> requests,
TString diskId,
TActorId parentActorId,
ui64 requestIdentityKey)
ui64 requestIdentityKey,
ui64 blockSize)
: RequestInfo(std::move(requestInfo))
, Requests(std::move(requests))
, DiskId(std::move(diskId))
, ParentActorId(parentActorId)
, RequestIdentityKey(requestIdentityKey)
, BlockSize(blockSize)
{}

template <typename TMethod>
void TSplittedRequestActor<TMethod>::Bootstrap(const TActorContext& ctx)
{
THashMap<TActorId, size_t> actorIds;
actorIds.reserve(Requests.size());
TVector<std::optional<TResponseRecordType>> responses;
TVector<TUnifyResponsesContext<TMethod>> responses;
responses.reserve(Requests.size());
for (auto& [request, partitions, blockSubRange]: Requests) {
auto actorId = NCloud::Register<TRequestActor<TMethod>>(
ctx,
RequestInfo,
std::move(partitions),
std::move(request),
request,
blockSubRange,
DiskId,
ctx.SelfID,
Expand All @@ -548,7 +562,7 @@ void TSplittedRequestActor<TMethod>::Bootstrap(const TActorContext& ctx)
);
++PendingRequests;
actorIds[actorId] = responses.size();
responses.push_back(std::nullopt);
responses.push_back({TResponseRecordType(), blockSubRange.Size()});
}

ChildActors = TChildActorsInfo(std::move(actorIds), std::move(responses));
Expand All @@ -570,7 +584,7 @@ void TSplittedRequestActor<TMethod>::ReplyAndDie(
auto allResponses =
TChildActorsInfo::ExtractResponses(std::move(ChildActors));
auto responseToReply =
NSplitRequest::UnifyResponses<TMethod>(allResponses);
UnifyResponses<TMethod>(allResponses, BlockSize);

return std::make_unique<typename TMethod::TResponse>(
std::move(responseToReply));
Expand All @@ -588,12 +602,12 @@ void TSplittedRequestActor<TMethod>::OnActorResponse(
{
auto* msg = ev->Get();
if (HasError(msg->GetError())) {
for (const auto& [actorId, _]: ChildActors.ActorId2Index) {
for (const auto& actorId: ChildActors.GetActorIds()) {
if (ChildActors.ReceivedResponseFrom(actorId)) {
continue;
}

NCloud::Send(ctx, actorId, new TEvents::TEvPoisonPill());
NCloud::Send<TEvents::TEvPoisonPill>(ctx, actorId);
}

ReplyAndDie(ctx, std::move(msg->GetError()));
Expand Down Expand Up @@ -649,6 +663,30 @@ void TSplittedRequestActor<TMethod>::HandlePoisonPill(

////////////////////////////////////////////////////////////////////////////////

TResultOrError<THashSet<NActors::TActorId>>
TMirrorPartitionActor::GetPartitionsToReadBlockRange(
const TBlockRange64 blockRange)
{
THashSet<TActorId> replicaActorIds;
const ui32 readReplicaCount = Min<ui32>(
Max<ui32>(1, Config->GetMirrorReadReplicaCount()),
State.GetReplicaInfos().size());
for (ui32 i = 0; i < readReplicaCount; ++i) {
TActorId replicaActorId;
const auto error = State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
return error;
}

if (!replicaActorIds.insert(replicaActorId).second) {
break;
}
}
return replicaActorIds;
}

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod>
void TMirrorPartitionActor::ReadBlocks(
const typename TMethod::TRequest::TPtr& ev,
Expand Down Expand Up @@ -685,8 +723,8 @@ void TMirrorPartitionActor::ReadBlocks(
return;
}

auto actorIdsOrError = GetActorsForBlockRange(blockRange);
if (!actorIdsOrError.HasError()) {
auto actorIdsOrError = GetPartitionsToReadBlockRange(blockRange);
if (!HasError(actorIdsOrError)) {
auto replicaActorIds = actorIdsOrError.ExtractResult();

LOG_DEBUG(
Expand All @@ -709,17 +747,16 @@ void TMirrorPartitionActor::ReadBlocks(
State.GetReplicaInfos()[0].Config->GetName(),
SelfId(),
requestIdentityKey,
false // sendResponseToParent
);
false);

return;
}

auto blockRangeSplittedByDeviceBorders = State.SplitRangeByDeviceBorders(blockRange);
TVector<THashSet<TActorId>> actorIdsForRequests;
for (auto blockSubRange: blockRangeSplittedByDeviceBorders) {
auto actorIdsOrError = GetActorsForBlockRange(blockSubRange);
if (actorIdsOrError.HasError()) {
auto actorIdsOrError = GetPartitionsToReadBlockRange(blockSubRange);
if (HasError(actorIdsOrError)) {
NCloud::Reply(
ctx,
*ev,
Expand All @@ -731,7 +768,7 @@ void TMirrorPartitionActor::ReadBlocks(
actorIdsForRequests.emplace_back(actorIdsOrError.ExtractResult());
}

auto splittedRequest = NSplitRequest::SplitRequest<TMethod>(
auto splittedRequest = SplitRequest<TMethod>(
record,
blockRangeSplittedByDeviceBorders,
actorIdsForRequests);
Expand All @@ -751,10 +788,11 @@ void TMirrorPartitionActor::ReadBlocks(
NCloud::Register<TSplittedRequestActor<TMethod>>(
ctx,
std::move(requestInfo),
std::move(splittedRequest),
std::move(splittedRequest.value()),
State.GetReplicaInfos()[0].Config->GetName(),
SelfId(),
requestIdentityKey);
requestIdentityKey,
State.GetBlockSize());
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 9686d14

Please sign in to comment.