diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index 34a51620768e..b7a476693ac9 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -37,7 +37,7 @@ class TLocalPartitionReader if (!LogPrefix) { LogPrefix = TStringBuilder() << "[LocalPartitionReader]" - << "[" << PQTablet << "]" + << PQTablet << "[" << Partition << "]" << SelfId() << " "; } @@ -62,14 +62,14 @@ class TLocalPartitionReader void HandleInit(TEvWorker::TEvHandshake::TPtr& ev) { Worker = ev->Sender; - LOG_D("Handshake" + LOG_D("HandleInit TEvWorker::TEvHandshake" << ": worker# " << Worker); Send(PQTablet, CreateGetOffsetRequest().Release()); } void HandleInit(TEvPersQueue::TEvResponse::TPtr& ev) { - LOG_D("Handle " << ev->Get()->ToString()); + LOG_D("HandleInit " << ev->Get()->ToString()); auto& record = ev->Get()->Record; if (record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup); diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 2c02d7a7fe6e..caae4290edab 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1209,7 +1209,7 @@ class TKqpGatewayProxy : public IKikimrGateway { } NKikimrSchemeOp::TModifyScheme tx; - tx.SetWorkingDir(GetDatabase()); + tx.SetWorkingDir(GetDatabase() ? GetDatabase() : GetDomainName().GetOrElse(TString{})); tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateBackupCollection); auto& op = *tx.MutableCreateBackupCollection(); @@ -1279,7 +1279,7 @@ class TKqpGatewayProxy : public IKikimrGateway { } NKikimrSchemeOp::TModifyScheme tx; - tx.SetWorkingDir(GetDatabase()); + tx.SetWorkingDir(GetDatabase() ? GetDatabase() : GetDomainName().GetOrElse(TString{})); tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterBackupCollection); auto& op = *tx.MutableAlterBackupCollection(); @@ -1326,7 +1326,7 @@ class TKqpGatewayProxy : public IKikimrGateway { } NKikimrSchemeOp::TModifyScheme tx; - tx.SetWorkingDir(GetDatabase()); + tx.SetWorkingDir(GetDatabase() ? GetDatabase() : GetDomainName().GetOrElse(TString{})); if (settings.Cascade) { return MakeFuture(ResultFromError("Unimplemented")); } else { @@ -1374,7 +1374,7 @@ class TKqpGatewayProxy : public IKikimrGateway { } NKikimrSchemeOp::TModifyScheme tx; - tx.SetWorkingDir(GetDatabase()); + tx.SetWorkingDir(GetDatabase() ? GetDatabase() : GetDomainName().GetOrElse(TString{})); tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupBackupCollection); auto& op = *tx.MutableBackupBackupCollection(); @@ -1417,7 +1417,7 @@ class TKqpGatewayProxy : public IKikimrGateway { } NKikimrSchemeOp::TModifyScheme tx; - tx.SetWorkingDir(GetDatabase()); + tx.SetWorkingDir(GetDatabase() ? GetDatabase() : GetDomainName().GetOrElse(TString{})); tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupIncrementalBackupCollection); auto& op = *tx.MutableBackupIncrementalBackupCollection(); diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 9744a9c0ac5e..943853664e26 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -359,6 +359,77 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { )")); } -} // Cdc + Y_UNIT_TEST_TWIN(SimpleBackupBackupCollection, WithIncremental) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10) + , (2, 20) + , (3, 30) + ; + )"); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = ')" + TString(WithIncremental ? "true" : "false") + R"(' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + -- TODO: fix with navigate after proper scheme cache handling + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000001Z_full/Table` + ORDER BY key + )"), + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/Table` + ORDER BY key + )")); + + + if (WithIncremental) { + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (2, 200); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + -- TODO: fix with navigate after proper scheme cache handling + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 2 } items { uint32_value: 200 } }"); + } + } + +} // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index d40abbb57100..6a3db21d6377 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -30,8 +30,8 @@ class TIncrementalRestoreScan LogPrefix = TStringBuilder() << "[TIncrementalRestoreScan]" << "[" << TxId << "]" - << "[" << SourcePathId << "]" - << "[" << TargetPathId << "]" + << SourcePathId + << TargetPathId << SelfId() /* contains brackets */ << " "; } diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index fdc7276019dc..159b40584842 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -411,6 +411,8 @@ class TLocalTableWriter CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions())); if (!Initialized) { + LOG_D("Send handshake" + << ": worker# " << Worker); Send(Worker, new TEvWorker::TEvHandshake()); Initialized = true; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp index f8ee48da85cb..eead561dc0e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp @@ -9,7 +9,7 @@ #include -NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) { +NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, const NKikimrSchemeOp::TCopyTableConfig& descr) { using namespace NKikimr::NSchemeShard; auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable); @@ -18,9 +18,13 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, auto operation = scheme.MutableCreateTable(); operation->SetName(dst.LeafName()); operation->SetCopyFromTable(src.PathString()); - operation->SetOmitFollowers(omitFollowers); - operation->SetIsBackup(isBackup); - operation->SetAllowUnderSameOperation(allowUnderSameOp); + operation->SetOmitFollowers(descr.GetOmitFollowers()); + operation->SetIsBackup(descr.GetIsBackup()); + operation->SetAllowUnderSameOperation(descr.GetAllowUnderSameOperation()); + if (descr.HasCreateSrcCdcStream()) { + auto* coOp = scheme.MutableCreateCdcStream(); + coOp->CopyFrom(descr.GetCreateSrcCdcStream()); + } return scheme; } @@ -144,8 +148,10 @@ bool CreateConsistentCopyTables( sequences.emplace(sequenceName); } - result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences)); + result.push_back(CreateCopyTable( + NextPartId(nextId, result), + CopyTableTask(srcPath, dstPath, descr), + sequences)); TVector sequenceDescriptions; for (const auto& child: srcPath.Base()->GetChildren()) { @@ -190,8 +196,9 @@ bool CreateConsistentCopyTables( Y_ABORT_UNLESS(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second); TPath dstImplTable = dstIndexPath.Child(srcImplTableName); - result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()))); + result.push_back(CreateCopyTable( + NextPartId(nextId, result), + CopyTableTask(srcImplTable, dstImplTable, descr))); } for (auto&& sequenceDescription : sequenceDescriptions) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 125acdfa227f..4445a15470d3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -717,6 +717,18 @@ class TCopyTable: public TSubOperation { dstPath.Base()->IncShardsInside(shardsToCreate); parent.Base()->IncAliveChildren(1, isBackup); + LOG_TRACE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopyTable Propose creating new table" + << " opId# " << OperationId + << " srcPath# " << srcPath.PathString() + << " srcPathId# " << srcPath.Base()->PathId + << " path# " << dstPath.PathString() + << " pathId# " << newTable->PathId + << " withNewCdc# " << (Transaction.HasCreateCdcStream() ? "true" : "false") + << " schemeshard# " << ssId + << " tx# " << Transaction.DebugString() + ); + SetState(NextState()); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index cc140faf3e8f..adb9564aa885 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -730,6 +730,15 @@ class TCreateTable: public TSubOperation { dstPath.Base()->IncShardsInside(shardsToCreate); parentPath.Base()->IncAliveChildren(); + LOG_TRACE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCreateTable Propose creating new table" + << " opId# " << OperationId + << " path# " << dstPath.PathString() + << " pathId# " << newTable->PathId + << " schemeshard# " << ssId + << " tx# " << Transaction.DebugString() + ); + SetState(NextState()); return result; }