Skip to content

Commit

Permalink
ReadOnly pdisk
Browse files Browse the repository at this point in the history
  • Loading branch information
SammyVimes committed Dec 3, 2024
1 parent 5c8e3e6 commit 7c6d62e
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 34 deletions.
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ struct TEvReadLogResult : public TEventLocal<TEvReadLogResult, TEvBlobStorage::E
TLogPosition Position;
TLogPosition NextPosition;
bool IsEndOfLog;
ui32 LastGoodChunkIdx = 0;
ui64 LastGoodSectorIdx = 0;

TStatusFlags StatusFlags;
TString ErrorReason;
TOwner Owner;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ class TPDisk;
IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon,
ui64 reorderingCycles, ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags,
ui32 maxQueuedCompletionActions, ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap,
TPDisk * const pdisk = nullptr);
TPDisk * const pdisk = nullptr, bool readOnly = false);
IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr);
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk = nullptr, bool readOnly = false);

} // NPDisk
} // NKikimr
20 changes: 12 additions & 8 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ class TRealBlockDevice : public IBlockDevice {
TFlightControl FlightControl;
TAtomicBlockCounter QuitCounter;
TString LastWarning;
bool ReadOnly;
TDeque<IAsyncIoOperation*> Trash;
TMutex TrashMutex;

Expand All @@ -843,7 +844,7 @@ class TRealBlockDevice : public IBlockDevice {
public:
TRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap)
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, bool readOnly)
: Mon(mon)
, Path(path)
, CompletionThreads(nullptr)
Expand All @@ -864,6 +865,7 @@ class TRealBlockDevice : public IBlockDevice {
, DeviceInFlight(FastClp2(deviceInFlight))
, FlightControl(CountTrailingZeroBits(DeviceInFlight))
, LastWarning(IsPowerOf2(deviceInFlight) ? "" : "Device inflight must be a power of 2")
, ReadOnly(readOnly)
{
if (sectorMap) {
DriveData = TDriveData();
Expand Down Expand Up @@ -1038,6 +1040,7 @@ class TRealBlockDevice : public IBlockDevice {
}

void TrimSync(ui32 size, ui64 offset) override {
Y_ABORT_UNLESS(!ReadOnly);
IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(nullptr, {}, nullptr);
IoContext->PreparePTrim(op, size, offset);
IsTrimEnabled = IoContext->DoTrim(op);
Expand All @@ -1064,9 +1067,9 @@ class TRealBlockDevice : public IBlockDevice {
void PwriteAsync(const void *data, ui64 size, ui64 offset, TCompletionAction *completionAction, TReqId reqId,
NWilson::TTraceId *traceId) override {
Y_ABORT_UNLESS(completionAction);
Y_ABORT_UNLESS(!ReadOnly);
if (!IsInitialized) {
completionAction->Release(PCtx->ActorSystem);
return;
}
if (data && size) {
Y_ABORT_UNLESS(intptr_t(data) % 512 == 0);
Expand All @@ -1080,6 +1083,7 @@ class TRealBlockDevice : public IBlockDevice {

void FlushAsync(TCompletionAction *completionAction, TReqId reqId) override {
Y_ABORT_UNLESS(completionAction);
Y_ABORT_UNLESS(!ReadOnly);
if (!IsInitialized) {
completionAction->Release(PCtx->ActorSystem);
return;
Expand Down Expand Up @@ -1348,9 +1352,9 @@ class TCachedBlockDevice : public TRealBlockDevice {
public:
TCachedBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk)
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk, bool readOnly)
: TRealBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
maxQueuedCompletionActions, completionThreadsCount, sectorMap)
maxQueuedCompletionActions, completionThreadsCount, sectorMap, readOnly)
, ReadsInFly(0)
, PDisk(pdisk)
{}
Expand Down Expand Up @@ -1486,14 +1490,14 @@ class TCachedBlockDevice : public TRealBlockDevice {

IBlockDevice* CreateRealBlockDevice(const TString &path, TPDiskMon &mon, ui64 reorderingCycles,
ui64 seekCostNs, ui64 deviceInFlight, TDeviceMode::TFlags flags, ui32 maxQueuedCompletionActions,
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk) {
ui32 completionThreadsCount, TIntrusivePtr<TSectorMap> sectorMap, TPDisk * const pdisk, bool readOnly) {
return new TCachedBlockDevice(path, mon, reorderingCycles, seekCostNs, deviceInFlight, flags,
maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk);
maxQueuedCompletionActions, completionThreadsCount, sectorMap, pdisk, readOnly);
}

IBlockDevice* CreateRealBlockDeviceWithDefaults(const TString &path, TPDiskMon &mon, TDeviceMode::TFlags flags,
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk) {
IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk);
TIntrusivePtr<TSectorMap> sectorMap, TActorSystem *actorSystem, TPDisk * const pdisk, bool readOnly) {
IBlockDevice *device = CreateRealBlockDevice(path, mon, 0, 0, 4, flags, 8, 1, sectorMap, pdisk, readOnly);
device->Initialize(std::make_shared<TPDiskCtx>(actorSystem));
return device;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ struct TPDiskConfig : public TThrRefBase {

bool MetadataOnly = false;

bool ReadOnly = false;

TPDiskConfig(ui64 pDiskGuid, ui32 pdiskId, ui64 pDiskCategory)
: TPDiskConfig({}, pDiskGuid, pdiskId, pDiskCategory)
{}
Expand Down Expand Up @@ -400,6 +402,10 @@ struct TPDiskConfig : public TThrRefBase {
if (cfg->HasCompletionThreadsCount()) {
CompletionThreadsCount = cfg->GetCompletionThreadsCount();
}

if (cfg->HasReadOnly()) {
ReadOnly = cfg->GetReadOnly();
}
}
};

Expand Down
89 changes: 86 additions & 3 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
, BlockDevice(CreateRealBlockDevice(cfg->GetDevicePath(), Mon,
HPCyclesMs(ReorderingMs), DriveModel.SeekTimeNs(), cfg->DeviceInFlight,
TDeviceMode::LockFile | (cfg->UseSpdkNvmeDriver ? TDeviceMode::UseSpdk : 0),
cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this))
cfg->MaxQueuedCompletionActions, cfg->CompletionThreadsCount, cfg->SectorMap, this, cfg->ReadOnly))
, Cfg(cfg)
, CreationTime(TInstant::Now())
, ExpectedSlotCount(cfg->ExpectedSlotCount)
Expand Down Expand Up @@ -1767,8 +1767,12 @@ bool TPDisk::YardInitForKnownVDisk(TYardInit &evYardInit, TOwner owner) {
ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(ownerData.OperationLog, "YardInitForKnownVDisk, OwnerId# " << owner
<< ", evYardInit# " << evYardInit.ToString());

TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(firstUncommitted.ChunkIdx, firstUncommitted.SectorIdx);
if (Cfg->ReadOnly) {
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(LastInitialChunkIdx, LastInitialSectorIdx);
} else {
TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
ownerData.LogEndPosition = TOwnerData::TLogEndPosition(firstUncommitted.ChunkIdx, firstUncommitted.SectorIdx);
}

ownerData.OwnerRound = evYardInit.OwnerRound;
TOwnerRound ownerRound = evYardInit.OwnerRound;
Expand Down Expand Up @@ -3648,6 +3652,77 @@ void TPDisk::UpdateMinLogCostNs() {
}
}

bool TPDisk::HandleReadOnlyIfWrite(TRequestBase *request) {
const TActorId& sender = request->Sender;
TString errorReason = "PDisk is in read-only mode";

switch (request->GetType()) {
// Reads and other operations that can be processed in read-only mode.
case ERequestType::RequestLogRead:
case ERequestType::RequestLogReadContinue:
case ERequestType::RequestLogReadResultProcess:
case ERequestType::RequestLogSectorRestore:
case ERequestType::RequestChunkRead:
case ERequestType::RequestChunkReadPiece:
case ERequestType::RequestYardInit:
case ERequestType::RequestCheckSpace:
case ERequestType::RequestHarakiri:
case ERequestType::RequestYardSlay:
case ERequestType::RequestYardControl:
case ERequestType::RequestWhiteboartReport:
case ERequestType::RequestHttpInfo:
case ERequestType::RequestStopDevice:
case ERequestType::RequestReadMetadata:
case ERequestType::RequestInitialReadMetadataResult:
case ERequestType::RequestUndelivered:
case ERequestType::RequestNop:
case ERequestType::RequestConfigureScheduler:
case ERequestType::RequestPushUnformattedMetadataSector:
case ERequestType::RequestContinueReadMetadata:
return false;

// Can't be processed in read-only mode.
case ERequestType::RequestLogWrite: {
TLogWrite &ev = *static_cast<TLogWrite*>(request);
NPDisk::TEvLogResult* result = new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, errorReason);
result->Results.push_back(NPDisk::TEvLogResult::TRecord(ev.Lsn, ev.Cookie));
PCtx->ActorSystem->Send(sender, result);
return true;
}
case ERequestType::RequestChunkWrite: {
TChunkWrite &ev = *static_cast<TChunkWrite*>(request);
SendChunkWriteError(ev, errorReason, NKikimrProto::CORRUPTED);
return true;
}
case ERequestType::RequestChunkReserve:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkReserveResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;
case ERequestType::RequestChunkLock:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkLockResult(NKikimrProto::CORRUPTED, {}, 0, errorReason));
return true;
case ERequestType::RequestChunkUnlock:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkUnlockResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;
case ERequestType::RequestChunkForget:
PCtx->ActorSystem->Send(sender, new NPDisk::TEvChunkForgetResult(NKikimrProto::CORRUPTED, 0, errorReason));
return true;

case ERequestType::RequestWriteMetadata:
case ERequestType::RequestWriteMetadataResult:
case ERequestType::RequestTryTrimChunk:
case ERequestType::RequestReleaseChunks:
case ERequestType::RequestChunkWritePiece:
case ERequestType::RequestChunkTrim:
case ERequestType::RequestAskForCutLog:
case ERequestType::RequestCommitLogChunks:
case ERequestType::RequestLogCommitDone:
// These requests don't require response.
return true;
break;
}
return false;
}

void TPDisk::AddCbs(ui32 ownerId, EGate gate, const char *gateName, ui64 minBudget) {
if (!ForsetiScheduler.GetCbs(ownerId, gate)) {
NSchLab::TCbs cbs;
Expand Down Expand Up @@ -3682,6 +3757,14 @@ void TPDisk::Wakeup() {

// Pushes request to the InputQueue; almost thread-safe
void TPDisk::InputRequest(TRequestBase* request) {
if (Cfg->ReadOnly && HandleReadOnlyIfWrite(request)) {
LOG_DEBUG(*PCtx->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " ReqId# %" PRIu64
" got write request in ReadOnly mode type# %" PRIu64,
(ui32)PCtx->PDiskId, (ui64)request->ReqId.Id, (ui32)request->GetType());

delete request;
return;
}
bool isTrim = request->PriorityClass == NPriInternal::Trim;
if (request->GateId != GateFastOperation) {
ui64 timeout = 0;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class TPDisk : public IPDisk {

TIntrusivePtr<TPDiskConfig> Cfg;
TInstant CreationTime;
// Last chunk and sector indexes we have seen on initial log read.
// Used to limit log reading in read-only mode.
ui32 LastInitialChunkIdx;
ui64 LastInitialSectorIdx;

ui64 ExpectedSlotCount = 0; // Number of slots to use for space limit calculation.

Expand Down Expand Up @@ -422,6 +426,7 @@ class TPDisk : public IPDisk {
void AddCbs(ui32 ownerId, EGate gate, const char *gateName, ui64 minBudget);
void AddCbsSet(ui32 ownerId);
void UpdateMinLogCostNs();
bool HandleReadOnlyIfWrite(TRequestBase *request);
};

void ParsePayloadFromSectorOffset(const TDiskFormat& format, ui64 firstSector, ui64 lastSector, ui64 currentSector,
Expand Down
57 changes: 37 additions & 20 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ bool TPDisk::InitCommonLogger() {
ui64 sectorIdx = (InitialLogPosition.OffsetInChunk + Format.SectorSize - 1) / Format.SectorSize;

TLogChunkInfo *info = &*std::find_if(LogChunks.begin(), LogChunks.end(), [=](const TLogChunkInfo& i) {
return i.ChunkIdx == chunkIdx;
});
return i.ChunkIdx == chunkIdx;
});

if (sectorIdx >= UsableSectorsPerLogChunk() && InitialTailBuffer) {
InitialTailBuffer->Release(PCtx->ActorSystem);
Expand All @@ -84,7 +84,7 @@ bool TPDisk::InitCommonLogger() {
}
CommonLogger->SwitchToNewChunk(TReqId(TReqId::InitCommonLoggerSwitchToNewChunk, 0), nullptr);

// Log chunk can be collected as soon as noone needs it
// Log chunk can be collected as soon as no one needs it
ChunkState[chunkIdx].CommitState = TChunkState::DATA_COMMITTED;
}
bool isOk = LogNonceJump(InitialPreviousNonce);
Expand Down Expand Up @@ -570,14 +570,19 @@ void TPDisk::ProcessLogReadQueue() {
ui32 endLogChunkIdx;
ui64 endLogSectorIdx;

TOwnerData::TLogEndPosition &logEndPos = ownerData.LogEndPosition;
if (logEndPos.ChunkIdx == 0 && logEndPos.SectorIdx == 0) {
TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
endLogChunkIdx = firstUncommitted.ChunkIdx;
endLogSectorIdx = firstUncommitted.SectorIdx;
if (Cfg->ReadOnly) {
endLogChunkIdx = LastInitialChunkIdx;
endLogSectorIdx = LastInitialSectorIdx;
} else {
endLogChunkIdx = logEndPos.ChunkIdx;
endLogSectorIdx = logEndPos.SectorIdx;
TOwnerData::TLogEndPosition &logEndPos = ownerData.LogEndPosition;
if (logEndPos.ChunkIdx == 0 && logEndPos.SectorIdx == 0) {
TFirstUncommitted firstUncommitted = CommonLogger->FirstUncommitted.load();
endLogChunkIdx = firstUncommitted.ChunkIdx;
endLogSectorIdx = firstUncommitted.SectorIdx;
} else {
endLogChunkIdx = logEndPos.ChunkIdx;
endLogSectorIdx = logEndPos.SectorIdx;
}
}

ownerData.LogReader = new TLogReader(false,
Expand Down Expand Up @@ -1385,6 +1390,10 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
"Error while parsing common log at booting state"));
return;
}

LastInitialChunkIdx = evReadLogResult.LastGoodChunkIdx;
LastInitialSectorIdx = evReadLogResult.LastGoodSectorIdx;

// Initialize metadata.
InitFormattedMetadata();
// Prepare the FreeChunks list
Expand Down Expand Up @@ -1481,23 +1490,31 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
InitSysLogger();

InitPhase = EInitPhase::Initialized;
if (!InitCommonLogger()) {
// TODO: report red zone
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::CommonLoggerInitError;
*Mon.PDiskBriefState = TPDiskMon::TPDisk::Error;
*Mon.PDiskDetailedState = TPDiskMon::TPDisk::ErrorCommonLoggerInit;
PCtx->ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, "Error in common logger init"));
return;

if (!Cfg->ReadOnly) {
// We don't need logger in ReadOnly mode.
if (!InitCommonLogger()) {
// TODO: report red zone
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::CommonLoggerInitError;
*Mon.PDiskBriefState = TPDiskMon::TPDisk::Error;
*Mon.PDiskDetailedState = TPDiskMon::TPDisk::ErrorCommonLoggerInit;
PCtx->ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, "Error in common logger init"));
return;
}
}

// Now it's ok to write both logs and data.
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::Normal;
*Mon.PDiskBriefState = TPDiskMon::TPDisk::OK;
*Mon.PDiskDetailedState = TPDiskMon::TPDisk::EverythingIsOk;

auto completion = MakeHolder<TCompletionEventSender>(this, pDiskActor, new TEvLogInitResult(true, "OK"));
ReleaseUnusedLogChunks(completion.Get());
WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::AfterInitCommonLoggerSysLog, 0), {});
if (Cfg->ReadOnly) {
PCtx->ActorSystem->Send(pDiskActor, new TEvLogInitResult(true, "OK"));
} else {
auto completion = MakeHolder<TCompletionEventSender>(this, pDiskActor, new TEvLogInitResult(true, "OK"));
ReleaseUnusedLogChunks(completion.Get());
WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::AfterInitCommonLoggerSysLog, 0), {});
}

// Start reading metadata.
ReadFormattedMetadataIfNeeded();
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,10 @@ void TLogReader::ReplyOk() {
Result->Status = NKikimrProto::OK;
Result->NextPosition = IsInitial ? LastGoodToWriteLogPosition : TLogPosition::Invalid();
Result->IsEndOfLog = true;
if (IsInitial) {
Result->LastGoodChunkIdx = ChunkIdx;
Result->LastGoodSectorIdx = SectorIdx;
}
Reply();
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ void TAllPDisks::ActorSetupCmd(NActors::TActorSystemSetup *setup, ui32 node,
TPDiskCategory(deviceType, 0).GetRaw()));
pDiskConfig->GetDriveDataSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
pDiskConfig->WriteCacheSwitch = NKikimrBlobStorage::TPDiskConfig::DoNotTouch;
pDiskConfig->ReadOnly = inst.ReadOnly;
const NPDisk::TMainKey mainKey{ .Keys = { NPDisk::YdbDefaultPDiskSequence }, .IsInitialized = true };
TActorSetupCmd pDiskSetup(CreatePDisk(pDiskConfig.Get(),
mainKey, counters), TMailboxType::Revolving, 0);
Expand Down Expand Up @@ -250,7 +251,7 @@ bool TDefaultVDiskSetup::SetUp(TAllVDisks::TVDiskInstance &vdisk, TAllPDisks *pd

NKikimr::TVDiskConfig::TBaseInfo baseInfo(vdisk.VDiskID, pdisk.PDiskActorID, pdisk.PDiskGuid,
pdisk.PDiskID, NKikimr::NPDisk::DEVICE_TYPE_ROT, slotId,
NKikimrBlobStorage::TVDiskKind::Default, initOwnerRound, {});
NKikimrBlobStorage::TVDiskKind::Default, initOwnerRound, {}, false, {}, 0, 0, pdisk.ReadOnly);
vdisk.Cfg = MakeIntrusive<NKikimr::TVDiskConfig>(baseInfo);

for (auto &modifier : ConfigModifiers) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/lib/prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct TOnePDisk {
const TString Filename;
const ui32 ChunkSize;
const ui64 DiskSize;
bool ReadOnly = false;

TOnePDisk(ui32 pDiskId, ui64 pDiskGuid, const TString &filename,
ui32 chunkSize, ui64 diskSize);
Expand Down
Loading

0 comments on commit 7c6d62e

Please sign in to comment.