From 3352d84a8e4180cbc88881cca5e3fa73313a8d91 Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 21 Nov 2024 17:44:12 +0800 Subject: [PATCH] [feat](binlog) Add replace table binlog (#44263) close #43436 Related PR: https://github.com/selectdb/ccr-syncer/pull/245 --- .../java/org/apache/doris/alter/Alter.java | 2 +- .../apache/doris/binlog/BinlogManager.java | 18 +++++++ .../org/apache/doris/binlog/DBBinlog.java | 53 ++++++++++++++----- .../org/apache/doris/persist/BarrierLog.java | 4 ++ .../org/apache/doris/persist/EditLog.java | 11 ++-- .../persist/ReplaceTableOperationLog.java | 28 +++++++++- .../persist/ReplaceTableOperationLogTest.java | 4 +- gensrc/thrift/FrontendService.thrift | 4 +- 8 files changed, 101 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 15c8df9195ddf1..ebb194ed6a6262 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -603,7 +603,7 @@ public void processReplaceTable(Database db, OlapTable origTable, String newTblN replaceTableInternal(db, origTable, olapNewTbl, swapTable, false, isForce); // write edit log ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(), - origTable.getId(), olapNewTbl.getId(), swapTable, isForce); + origTable.getId(), oldTblName, olapNewTbl.getId(), newTblName, swapTable, isForce); Env.getCurrentEnv().getEditLog().logReplaceTable(log); LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 6d483a41314111..1f785713666437 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -33,6 +33,7 @@ import org.apache.doris.persist.ModifyCommentOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.ReplacePartitionOperationLog; +import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TableRenameColumnInfo; @@ -45,6 +46,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -367,6 +369,22 @@ public void addModifyViewDef(AlterViewInfo alterViewInfo, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterViewInfo); } + public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) { + if (StringUtils.isEmpty(info.getOrigTblName()) || StringUtils.isEmpty(info.getNewTblName())) { + LOG.warn("skip replace table binlog, because origTblName or newTblName is empty. info: {}", info); + return; + } + + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(info.getOrigTblId()); + long timestamp = -1; + TBinlogType type = TBinlogType.REPLACE_TABLE; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + // get binlog by dbId, return first binlog.version > version public Pair getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index e2eef7966be0d4..c96e994be91c3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -22,7 +22,9 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -626,19 +628,29 @@ private void recordDroppedResources(TBinlog binlog) { // A method to record the dropped tables, indexes, and partitions. private void recordDroppedResources(TBinlog binlog, Object raw) { + recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw); + } + + private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) { if (raw == null) { - switch (binlog.getType()) { + switch (binlogType) { case DROP_PARTITION: - raw = DropPartitionInfo.fromJson(binlog.data); + raw = DropPartitionInfo.fromJson(data); break; case DROP_TABLE: - raw = DropTableRecord.fromJson(binlog.data); + raw = DropTableRecord.fromJson(data); break; case ALTER_JOB: - raw = AlterJobRecord.fromJson(binlog.data); + raw = AlterJobRecord.fromJson(data); break; case TRUNCATE_TABLE: - raw = TruncateTableRecord.fromJson(binlog.data); + raw = TruncateTableRecord.fromJson(data); + break; + case REPLACE_TABLE: + raw = ReplaceTableOperationLog.fromJson(data); + break; + case BARRIER: + raw = BarrierLog.fromJson(data); break; default: break; @@ -648,29 +660,44 @@ private void recordDroppedResources(TBinlog binlog, Object raw) { } } - if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { + recordDroppedResources(binlogType, commitSeq, raw); + } + + private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) { + if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { long partitionId = ((DropPartitionInfo) raw).getPartitionId(); if (partitionId > 0) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + droppedPartitions.add(Pair.of(partitionId, commitSeq)); } - } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { + } else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { long tableId = ((DropTableRecord) raw).getTableId(); if (tableId > 0) { - droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); + droppedTables.add(Pair.of(tableId, commitSeq)); } - } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) { + } else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) { AlterJobRecord alterJobRecord = (AlterJobRecord) raw; if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) { for (Long indexId : alterJobRecord.getOriginIndexIdList()) { if (indexId != null && indexId > 0) { - droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); + droppedIndexes.add(Pair.of(indexId, commitSeq)); } } } - } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) { + } else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) { TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw; for (long partitionId : truncateTableRecord.getOldPartitionIds()) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + droppedPartitions.add(Pair.of(partitionId, commitSeq)); + } + } else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof ReplaceTableOperationLog) { + ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw; + if (!record.isSwapTable()) { + droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq)); + } + } else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) { + BarrierLog log = (BarrierLog) raw; + // keep compatible with doris 2.0/2.1 + if (log.hasBinlog()) { + recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java index 4a9ce13e03b3ed..86d56fb4a6487e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java @@ -109,6 +109,10 @@ public String toJson() { return GsonUtils.GSON.toJson(this); } + public static BarrierLog fromJson(String json) { + return GsonUtils.GSON.fromJson(json, BarrierLog.class); + } + @Override public String toString() { return toJson(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index df0cdb092a8b53..5ae6f62ebb20e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -306,7 +306,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_RENAME_TABLE: { TableInfo info = (TableInfo) journal.getData(); env.replayRenameTable(info); - Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId); + env.getBinlogManager().addTableRename(info, logId); break; } case OperationType.OP_MODIFY_VIEW_DEF: { @@ -318,7 +318,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_RENAME_PARTITION: { TableInfo info = (TableInfo) journal.getData(); env.replayRenamePartition(info); - Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId); + env.getBinlogManager().addTableRename(info, logId); break; } case OperationType.OP_RENAME_COLUMN: { @@ -366,7 +366,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_RENAME_ROLLUP: { TableInfo info = (TableInfo) journal.getData(); env.replayRenameRollup(info); - Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId); + env.getCurrentEnv().getBinlogManager().addTableRename(info, logId); break; } case OperationType.OP_LOAD_START: @@ -898,6 +898,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_REPLACE_TABLE: { ReplaceTableOperationLog log = (ReplaceTableOperationLog) journal.getData(); env.getAlterInstance().replayReplaceTable(log); + env.getBinlogManager().addReplaceTable(log, logId); break; } case OperationType.OP_CREATE_SQL_BLOCK_RULE: { @@ -1950,7 +1951,9 @@ public void logGlobalVariableV2(GlobalVarPersistInfo info) { } public void logReplaceTable(ReplaceTableOperationLog log) { - logEdit(OperationType.OP_REPLACE_TABLE, log); + long logId = logEdit(OperationType.OP_REPLACE_TABLE, log); + LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log); + Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId); } public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 op) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java index 7a685f3741f51d..6a2b09336e1eae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java @@ -32,17 +32,25 @@ public class ReplaceTableOperationLog implements Writable { private long dbId; @SerializedName(value = "origTblId") private long origTblId; + @SerializedName(value = "origTblName") + private String origTblName; @SerializedName(value = "newTblName") private long newTblId; + @SerializedName(value = "actualNewTblName") + private String newTblName; @SerializedName(value = "swapTable") private boolean swapTable; @SerializedName(value = "isForce") private boolean isForce = true; // older version it was force. so keep same. - public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId, boolean swapTable, boolean isForce) { + public ReplaceTableOperationLog(long dbId, long origTblId, + String origTblName, long newTblId, String newTblName, + boolean swapTable, boolean isForce) { this.dbId = dbId; this.origTblId = origTblId; + this.origTblName = origTblName; this.newTblId = newTblId; + this.newTblName = newTblName; this.swapTable = swapTable; this.isForce = isForce; } @@ -55,10 +63,18 @@ public long getOrigTblId() { return origTblId; } + public String getOrigTblName() { + return origTblName; + } + public long getNewTblId() { return newTblId; } + public String getNewTblName() { + return newTblName; + } + public boolean isSwapTable() { return swapTable; } @@ -67,13 +83,21 @@ public boolean isForce() { return isForce; } + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + public static ReplaceTableOperationLog fromJson(String json) { + return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); Text.writeString(out, json); } - public static ReplaceTableOperationLog read(DataInput in) throws IOException { + public static ReplaceTableOperationLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java index e05d16141ced72..ed56e4c7941342 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java @@ -34,7 +34,7 @@ public void testSerialization() throws Exception { file.createNewFile(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); - ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3, true, true); + ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, "old", 3, "new", true, true); log.write(dos); dos.flush(); @@ -48,6 +48,8 @@ public void testSerialization() throws Exception { Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId()); Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId()); Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable()); + Assert.assertTrue(readLog.getOrigTblName().equals(log.getOrigTblName())); + Assert.assertTrue(readLog.getNewTblName().equals(log.getNewTblName())); // 3. delete files dis.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index ec2a685098ce69..47b88552862254 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1191,6 +1191,7 @@ enum TBinlogType { RENAME_COLUMN = 15, MODIFY_COMMENT = 16, MODIFY_VIEW_DEF = 17, + REPLACE_TABLE = 18, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1207,8 +1208,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 18, - UNKNOWN_3 = 19, + MIN_UNKNOWN = 19, UNKNOWN_4 = 20, UNKNOWN_5 = 21, UNKNOWN_6 = 22,