Skip to content

Commit

Permalink
correct hard verification (ydb-platform#3480)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 5, 2024
1 parent d8cb949 commit 805196e
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 65 deletions.
9 changes: 0 additions & 9 deletions ydb/core/tx/columnshard/background_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,4 @@ TString TBackgroundController::DebugStringIndexation() const {
return sb;
}

TString TBackgroundActivity::DebugString() const {
return TStringBuilder()
<< "indexation:" << HasIndexation() << ";"
<< "compaction:" << HasCompaction() << ";"
<< "cleanup:" << HasCleanup() << ";"
<< "ttl:" << HasTtl() << ";"
;
}

}
36 changes: 0 additions & 36 deletions ydb/core/tx/columnshard/background_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,6 @@ class TColumnEngineChanges;

namespace NKikimr::NColumnShard {

class TBackgroundActivity {
public:
enum EBackActivity : ui32 {
NONE = 0x00,
INDEX = 0x01,
COMPACT = 0x02,
CLEAN = 0x04,
TTL = 0x08,
ALL = 0xffff
};

static TBackgroundActivity Indexation() { return TBackgroundActivity(INDEX); }
static TBackgroundActivity Compaction() { return TBackgroundActivity(COMPACT); }
static TBackgroundActivity Cleanup() { return TBackgroundActivity(CLEAN); }
static TBackgroundActivity Ttl() { return TBackgroundActivity(TTL); }
static TBackgroundActivity All() { return TBackgroundActivity(ALL); }
static TBackgroundActivity None() { return TBackgroundActivity(NONE); }

TBackgroundActivity() = default;

bool HasIndexation() const { return Activity & INDEX; }
bool HasCompaction() const { return Activity & COMPACT; }
bool HasCleanup() const { return Activity & CLEAN; }
bool HasTtl() const { return Activity & TTL; }
bool HasAll() const { return Activity == ALL; }

TString DebugString() const;

private:
EBackActivity Activity = NONE;

TBackgroundActivity(EBackActivity activity)
: Activity(activity)
{}
};

class TBackgroundController {
private:
THashMap<TString, TMonotonic> ActiveIndexationTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
const ui64 bytesWritten = changes->GetBlobsAction().GetWritingTotalSize();

if (!Ev->Get()->IndexChanges->IsAborted()) {
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context);
}

Self->EnqueueBackgroundActivities(false, TriggerActivity);
Self->EnqueueBackgroundActivities(false);
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self, Ev->Get()->GetPutStatus() == NKikimrProto::OK);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(*changes, *Self);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class TTxWriteIndex: public TTransactionBase<TColumnShard> {

TEvPrivate::TEvWriteIndex::TPtr Ev;
const ui32 TabletTxNo;
TBackgroundActivity TriggerActivity = TBackgroundActivity::All();
bool CompleteReady = false;

TStringBuilder TxPrefix() const {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
}
}

void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) {
void TColumnShard::EnqueueBackgroundActivities(const bool periodic) {
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID()));
ACFL_DEBUG("event", "EnqueueBackgroundActivities")("periodic", periodic)("activity", activity.DebugString());
ACFL_DEBUG("event", "EnqueueBackgroundActivities")("periodic", periodic);
StoragesManager->GetOperatorVerified(NOlap::IStoragesManager::DefaultStorageId);
StoragesManager->GetSharedBlobsManager()->GetStorageManagerVerified(NOlap::IStoragesManager::DefaultStorageId);
CSCounters.OnStartBackground();
Expand Down Expand Up @@ -596,9 +596,10 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
}
virtual bool DoOnError(const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus());
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())
("debug", TxEvent->IndexChanges->DebugString());
TxEvent->SetPutStatus(NKikimrProto::ERROR);
Counters.ReadErrors->Add(1);
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class TColumnShard
TWriteId BuildNextWriteId(NIceDb::TNiceDb& db);

void EnqueueProgressTx(const TActorContext& ctx);
void EnqueueBackgroundActivities(bool periodic = false, TBackgroundActivity activity = TBackgroundActivity::All());
void EnqueueBackgroundActivities(const bool periodic = false);
virtual void Enqueue(STFUNC_SIG) override;

void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ TIndexationCounters::TIndexationCounters(const TString& module)
: TBase(module)
{
ReadBytes = TBase::GetDeriviative("Read/Bytes");
ReadErrors = TBase::GetDeriviative("Read/Errors/Count");
AnalizeInsertedPortions = TBase::GetDeriviative("AnalizeInsertion/Portions");
AnalizeInsertedBytes = TBase::GetDeriviative("AnalizeInsertion/Bytes");
RepackedInsertedPortions = TBase::GetDeriviative("RepackedInsertion/Portions");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TIndexationCounters: public TCommonCountersOwner {
public:
NMonitoring::TDynamicCounters::TCounterPtr CompactionInputBytes;

NMonitoring::TDynamicCounters::TCounterPtr ReadErrors;
NMonitoring::TDynamicCounters::TCounterPtr ReadBytes;
NMonitoring::TDynamicCounters::TCounterPtr AnalizeCompactedPortions;
NMonitoring::TDynamicCounters::TCounterPtr AnalizeInsertedPortions;
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class TTransactionContext;
namespace NKikimr::NColumnShard {
class TBlobManagerDb;
class TColumnShard;
class TBackgroundActivity;
}

namespace NKikimr::NOlap {
Expand Down Expand Up @@ -156,15 +155,13 @@ class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext {
const ui32 BlobsWritten;
const ui64 BytesWritten;
const TDuration Duration;
NColumnShard::TBackgroundActivity& TriggerActivity;
TColumnEngineForLogs& EngineLogs;
TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten
, const TDuration d, NColumnShard::TBackgroundActivity& triggerActivity, TColumnEngineForLogs& engineLogs)
, const TDuration d, TColumnEngineForLogs& engineLogs)
: ActorContext(actorContext)
, BlobsWritten(blobsWritten)
, BytesWritten(bytesWritten)
, Duration(d)
, TriggerActivity(triggerActivity)
, EngineLogs(engineLogs)
{

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/storage/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ bool TGranuleMeta::InCompaction() const {
return Activity.contains(EActivity::GeneralCompaction);
}

std::shared_ptr<NKikimr::NOlap::TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(const TPortionInfo& portion) {
std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(const TPortionInfo& portion) {
auto it = Portions.find(portion.GetPortion());
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "UpsertPortionOnLoad")("portion_info", portion.DebugString());
if (it == Portions.end()) {
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,

NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NColumnShard::TBackgroundActivity triggered;
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
changes->WriteIndexOnComplete(nullptr, contextComplete);
changes->AbortEmergency();
return result;
Expand Down Expand Up @@ -328,8 +327,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, N
const bool result = engine.ApplyChanges(db, changes, snap);
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NColumnShard::TBackgroundActivity triggered;
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
changes->WriteIndexOnComplete(nullptr, contextComplete);
if (blobsPool) {
for (auto&& i : changes->AppendedPortions) {
Expand All @@ -356,8 +354,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
const bool result = engine.ApplyChanges(db, changes, snap);
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NColumnShard::TBackgroundActivity triggered;
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
changes->WriteIndexOnComplete(nullptr, contextComplete);
changes->AbortEmergency();
return result;
Expand All @@ -375,8 +372,7 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1));
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NColumnShard::TBackgroundActivity triggered;
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
changes->WriteIndexOnComplete(nullptr, contextComplete);
changes->AbortEmergency();
return result;
Expand Down

0 comments on commit 805196e

Please sign in to comment.