Skip to content

Commit

Permalink
Add 'DoneBarrier' to syncronize operation parts (ydb-platform#12678)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Dec 18, 2024
1 parent 323ce49 commit 47dde79
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 6 deletions.
12 changes: 8 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,7 @@ TDone::TDone(const TOperationId& id)
IgnoreMessages(DebugHint(), AllIncomingEvents());
}

bool TDone::ProgressState(TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState");

bool TDone::Process(TOperationContext& context) {
const auto* txState = context.SS->FindTx(OperationId);

const auto& pathId = txState->TargetPathId;
Expand Down Expand Up @@ -480,6 +477,13 @@ bool TDone::ProgressState(TOperationContext& context) {
return true;
}

bool TDone::ProgressState(TOperationContext& context) {
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState");

return Process(context);
}

namespace {

template <typename T, typename TFuncCheck, typename TFuncToString>
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class TDone: public TSubOperationState {
<< " opId# " << OperationId;
}

bool Process(TOperationContext& context);

public:
explicit TDone(const TOperationId& id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,25 @@ class TProposeAtTableWithInitialScan: public NCdcStreamState::TProposeAtTable {

class TDoneWithInitialScan: public TDone {
public:
using TDone::TDone;
explicit TDoneWithInitialScan(const TOperationId& id)
: TDone(id)
{
auto events = AllIncomingEvents();
events.erase(TEvPrivate::TEvCompleteBarrier::EventType);
IgnoreMessages(DebugHint(), events);
}

bool ProgressState(TOperationContext& context) override {
if (!TDone::ProgressState(context)) {
LOG_I(DebugHint() << "ProgressState");

context.OnComplete.Barrier(OperationId, "DoneBarrier");
return false;
}

bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override {
LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier");

if (!TDone::Process(context)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ void TSideEffects::ApplyOnExecute(TSchemeShard* ss, NTabletFlatExecutor::TTransa
DoDoneParts(ss, ctx);
DoSetBarriers(ss, ctx);
DoCheckBarriers(ss, txc, ctx);
DoDoneParts(ss, ctx);

DoWaitShardCreated(ss, ctx);
DoActivateShardCreated(ss, ctx);
Expand Down
42 changes: 42 additions & 0 deletions ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,48 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
runtime.Send(blockedAlterStream.Release(), 0, true);
}

void PqTransactions(bool enable) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions()
.EnableChangefeedInitialScan(true)
.EnablePQConfigTransactionsAtSchemeShard(enable));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
State: ECdcStreamStateScan
}
)");
env.TestWaitNotification(runtime, txId);

NKikimrSchemeOp::ECdcStreamState state;
do {
runtime.SimulateSleep(TDuration::Seconds(1));
state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream")
.GetPathDescription().GetCdcStreamDescription().GetState();
} while (state != NKikimrSchemeOp::ECdcStreamStateReady);
}

Y_UNIT_TEST(WithoutPqTransactions) {
PqTransactions(false);
}

Y_UNIT_TEST(WithPqTransactions) {
PqTransactions(true);
}

Y_UNIT_TEST(AlterStream) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,4 +819,54 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}

template <typename T>
void PqTransactions(bool enable) {
T t;
t.GetTestEnvOptions()
.EnableChangefeedInitialScan(true)
.EnablePQConfigTransactionsAtSchemeShard(enable);

t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
{
TInactiveZone inactive(activeZone);
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}

TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
State: ECdcStreamStateScan
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);

{
TInactiveZone inactive(activeZone);
NKikimrSchemeOp::ECdcStreamState state;
do {
runtime.SimulateSleep(TDuration::Seconds(1));
state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream")
.GetPathDescription().GetCdcStreamDescription().GetState();
} while (state != NKikimrSchemeOp::ECdcStreamStateReady);
}
});
}

Y_UNIT_TEST_WITH_REBOOTS(WithoutPqTransactions) {
PqTransactions<T>(false);
}

Y_UNIT_TEST_WITH_REBOOTS(WithPqTransactions) {
PqTransactions<T>(true);
}

} // TCdcStreamWithRebootsTests

0 comments on commit 47dde79

Please sign in to comment.