From 47dde79fb9f4ed8a909e68b893981e1c8ba23102 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 18 Dec 2024 17:26:26 +0300 Subject: [PATCH] Add 'DoneBarrier' to syncronize operation parts (#12678) --- .../schemeshard__operation_common.cpp | 12 +++-- .../schemeshard__operation_common.h | 2 + ...hemeshard__operation_create_cdc_stream.cpp | 19 ++++++- .../schemeshard__operation_side_effects.cpp | 1 + .../ut_cdc_stream/ut_cdc_stream.cpp | 42 ++++++++++++++++ .../ut_cdc_stream_reboots.cpp | 50 +++++++++++++++++++ 6 files changed, 120 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 498063a8c722..0068d86fc9bb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -428,10 +428,7 @@ TDone::TDone(const TOperationId& id) IgnoreMessages(DebugHint(), AllIncomingEvents()); } -bool TDone::ProgressState(TOperationContext& context) { - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState"); - +bool TDone::Process(TOperationContext& context) { const auto* txState = context.SS->FindTx(OperationId); const auto& pathId = txState->TargetPathId; @@ -480,6 +477,13 @@ bool TDone::ProgressState(TOperationContext& context) { return true; } +bool TDone::ProgressState(TOperationContext& context) { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[" << context.SS->SelfTabletId() << "] " << DebugHint() << " ProgressState"); + + return Process(context); +} + namespace { template diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 01e9068db9d2..e6182d631d1f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -116,6 +116,8 @@ class TDone: public TSubOperationState { << " opId# " << OperationId; } + bool Process(TOperationContext& context); + public: explicit TDone(const TOperationId& id); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 41a18a4cd89e..05409e951742 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -381,10 +381,25 @@ class TProposeAtTableWithInitialScan: public NCdcStreamState::TProposeAtTable { class TDoneWithInitialScan: public TDone { public: - using TDone::TDone; + explicit TDoneWithInitialScan(const TOperationId& id) + : TDone(id) + { + auto events = AllIncomingEvents(); + events.erase(TEvPrivate::TEvCompleteBarrier::EventType); + IgnoreMessages(DebugHint(), events); + } bool ProgressState(TOperationContext& context) override { - if (!TDone::ProgressState(context)) { + LOG_I(DebugHint() << "ProgressState"); + + context.OnComplete.Barrier(OperationId, "DoneBarrier"); + return false; + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override { + LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier"); + + if (!TDone::Process(context)) { return false; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index ef5070e3ace3..a638bf2fec96 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -165,6 +165,7 @@ void TSideEffects::ApplyOnExecute(TSchemeShard* ss, NTabletFlatExecutor::TTransa DoDoneParts(ss, ctx); DoSetBarriers(ss, ctx); DoCheckBarriers(ss, txc, ctx); + DoDoneParts(ss, ctx); DoWaitShardCreated(ss, ctx); DoActivateShardCreated(ss, ctx); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index f81110939025..2fcde7dc4da2 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1588,6 +1588,48 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { runtime.Send(blockedAlterStream.Release(), 0, true); } + void PqTransactions(bool enable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnablePQConfigTransactionsAtSchemeShard(enable)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + NKikimrSchemeOp::ECdcStreamState state; + do { + runtime.SimulateSleep(TDuration::Seconds(1)); + state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream") + .GetPathDescription().GetCdcStreamDescription().GetState(); + } while (state != NKikimrSchemeOp::ECdcStreamStateReady); + } + + Y_UNIT_TEST(WithoutPqTransactions) { + PqTransactions(false); + } + + Y_UNIT_TEST(WithPqTransactions) { + PqTransactions(true); + } + Y_UNIT_TEST(AlterStream) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions() diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 47209679f222..19f1f52d3187 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -819,4 +819,54 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + template + void PqTransactions(bool enable) { + T t; + t.GetTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnablePQConfigTransactionsAtSchemeShard(enable); + + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + NKikimrSchemeOp::ECdcStreamState state; + do { + runtime.SimulateSleep(TDuration::Seconds(1)); + state = DescribePrivatePath(runtime, "/MyRoot/Table/Stream") + .GetPathDescription().GetCdcStreamDescription().GetState(); + } while (state != NKikimrSchemeOp::ECdcStreamStateReady); + } + }); + } + + Y_UNIT_TEST_WITH_REBOOTS(WithoutPqTransactions) { + PqTransactions(false); + } + + Y_UNIT_TEST_WITH_REBOOTS(WithPqTransactions) { + PqTransactions(true); + } + } // TCdcStreamWithRebootsTests