From a8d21d8223dda554c11c78d51dd6234e62dbcc12 Mon Sep 17 00:00:00 2001 From: shuwenwei <55970239+shuwenwei@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:34:08 +0800 Subject: [PATCH] Modify the condition to skip compaction schdule after insertion compaction task selection (#14644) (#14663) * Modify the condition to skip compaction schdule after insertion compaction task selection * add ut * delay insertion compaction * fix ut --- .../storageengine/dataregion/DataRegion.java | 66 +++++++++---------- .../schedule/CompactionScheduleContext.java | 21 ++++++ .../schedule/CompactionScheduler.java | 60 +++++++++-------- .../RewriteCrossSpaceCompactionSelector.java | 11 +++- ...rtionCrossSpaceCompactionSelectorTest.java | 4 +- .../InsertionCrossSpaceCompactionTest.java | 64 ++++++++++++++++-- 6 files changed, 157 insertions(+), 69 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 54580bc177c3..a86aea4d7f5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -158,7 +158,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -2658,31 +2657,32 @@ public int executeCompaction() throws InterruptedException { if (!isCompactionSelecting.compareAndSet(false, true)) { return 0; } - int trySubmitCount = 0; + CompactionScheduleContext context = new CompactionScheduleContext(); try { List timePartitions = new ArrayList<>(tsFileManager.getTimePartitions()); // Sort the time partition from largest to smallest timePartitions.sort(Comparator.reverseOrder()); - CompactionScheduleContext context = new CompactionScheduleContext(); - // schedule insert compaction - trySubmitCount += executeInsertionCompaction(timePartitions, context); - context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, trySubmitCount); + int[] submitCountOfTimePartitions = executeInsertionCompaction(timePartitions, context); // schedule the other compactions - if (trySubmitCount == 0) { - // the name of this variable is trySubmitCount, because the task submitted to the queue - // could be evicted due to the low priority of the task - for (long timePartition : timePartitions) { - CompactionScheduler.sharedLockCompactionSelection(); - try { - trySubmitCount += - CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, context); - } finally { - context.clearTimePartitionDeviceInfoCache(); - CompactionScheduler.sharedUnlockCompactionSelection(); - } + for (int i = 0; i < timePartitions.size(); i++) { + boolean skipOtherCompactionSchedule = + submitCountOfTimePartitions[i] > 0 + && !config + .getDataRegionConsensusProtocolClass() + .equals(ConsensusFactory.IOT_CONSENSUS_V2); + if (skipOtherCompactionSchedule) { + continue; + } + long timePartition = timePartitions.get(i); + CompactionScheduler.sharedLockCompactionSelection(); + try { + CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, context); + } finally { + context.clearTimePartitionDeviceInfoCache(); + CompactionScheduler.sharedUnlockCompactionSelection(); } } if (context.hasSubmitTask()) { @@ -2695,7 +2695,7 @@ public int executeCompaction() throws InterruptedException { } finally { isCompactionSelecting.set(false); } - return trySubmitCount; + return context.getSubmitCompactionTaskNum(); } /** Schedule settle compaction for ttl check. */ @@ -2742,40 +2742,36 @@ public int executeTTLCheck() throws InterruptedException { return trySubmitCount; } - protected int executeInsertionCompaction( + protected int[] executeInsertionCompaction( List timePartitions, CompactionScheduleContext context) throws InterruptedException { - int trySubmitCount = 0; + int[] trySubmitCountOfTimePartitions = new int[timePartitions.size()]; CompactionScheduler.sharedLockCompactionSelection(); try { while (true) { int currentSubmitCount = 0; - for (long timePartition : timePartitions) { - while (true) { - Phaser insertionTaskPhaser = new Phaser(1); - int selectedTaskNum = - CompactionScheduler.scheduleInsertionCompaction( - tsFileManager, timePartition, insertionTaskPhaser, context); - insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive()); - currentSubmitCount += selectedTaskNum; - if (selectedTaskNum <= 0) { - break; - } - } + for (int i = 0; i < timePartitions.size(); i++) { + long timePartition = timePartitions.get(i); + int selectedTaskNum = + CompactionScheduler.scheduleInsertionCompaction( + tsFileManager, timePartition, context); + currentSubmitCount += selectedTaskNum; + trySubmitCountOfTimePartitions[i] += selectedTaskNum; context.clearTimePartitionDeviceInfoCache(); } if (currentSubmitCount <= 0) { break; } - trySubmitCount += currentSubmitCount; + context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, currentSubmitCount); } } catch (InterruptedException e) { throw e; } catch (Throwable e) { logger.error("Meet error in insertion compaction schedule.", e); } finally { + context.clearTimePartitionDeviceInfoCache(); CompactionScheduler.sharedUnlockCompactionSelection(); } - return trySubmitCount; + return trySubmitCountOfTimePartitions; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java index 42c7e4ab3a36..d1b9f1345278 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java @@ -27,7 +27,9 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class CompactionScheduleContext { private int submitSeqInnerSpaceCompactionTaskNum = 0; @@ -46,8 +48,19 @@ public class CompactionScheduleContext { private final Map partitionFileDeviceInfoCache; private long cachedDeviceInfoSize = 0; + private final Set timePartitionsDelayInsertionSelection; + public CompactionScheduleContext() { this.partitionFileDeviceInfoCache = new HashMap<>(); + this.timePartitionsDelayInsertionSelection = new HashSet<>(); + } + + public void delayInsertionSelection(long timePartitionId) { + timePartitionsDelayInsertionSelection.add(timePartitionId); + } + + public boolean isInsertionSelectionDelayed(long timePartitionId) { + return timePartitionsDelayInsertionSelection.remove(timePartitionId); } public void addResourceDeviceTimeIndex( @@ -132,6 +145,14 @@ public int getSubmitSettleCompactionTaskNum() { return submitSettleCompactionTaskNum; } + public int getSubmitCompactionTaskNum() { + return submitSeqInnerSpaceCompactionTaskNum + + submitUnseqInnerSpaceCompactionTaskNum + + submitCrossSpaceCompactionTaskNum + + submitInsertionCrossSpaceCompactionTaskNum + + submitSettleCompactionTaskNum; + } + public boolean hasSubmitTask() { return submitCrossSpaceCompactionTaskNum + submitInsertionCrossSpaceCompactionTaskNum diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index 100f1cb4bc18..44678a6356a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -93,29 +93,32 @@ public static void exclusiveUnlockCompactionSelection() { * @param context the context of compaction schedule * @return the count of submitted task */ - public static int scheduleCompaction( + public static void scheduleCompaction( TsFileManager tsFileManager, long timePartition, CompactionScheduleContext context) throws InterruptedException { if (!tsFileManager.isAllowCompaction()) { - return 0; + return; } // the name of this variable is trySubmitCount, because the task submitted to the queue could be // evicted due to the low priority of the task - int trySubmitCount = 0; try { - trySubmitCount += + int submitInnerTaskNum = 0; + submitInnerTaskNum += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true, context); - trySubmitCount += + submitInnerTaskNum += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false, context); - trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition, context); - trySubmitCount += - tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context, false); + boolean executeDelayedInsertionSelection = + submitInnerTaskNum == 0 && context.isInsertionSelectionDelayed(timePartition); + if (executeDelayedInsertionSelection) { + scheduleInsertionCompaction(tsFileManager, timePartition, context); + } + tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition, context); + tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context, false); } catch (InterruptedException e) { throw e; } catch (Throwable e) { LOGGER.error("Meet error in compaction schedule.", e); } - return trySubmitCount; } @TestOnly @@ -124,22 +127,6 @@ public static void scheduleCompaction(TsFileManager tsFileManager, long timePart scheduleCompaction(tsFileManager, timePartition, new CompactionScheduleContext()); } - public static int scheduleInsertionCompaction( - TsFileManager tsFileManager, - long timePartition, - Phaser insertionTaskPhaser, - CompactionScheduleContext context) - throws InterruptedException { - if (!tsFileManager.isAllowCompaction()) { - return 0; - } - int trySubmitCount = 0; - trySubmitCount += - tryToSubmitInsertionCompactionTask( - tsFileManager, timePartition, insertionTaskPhaser, context); - return trySubmitCount; - } - public static int tryToSubmitInnerSpaceCompactionTask( TsFileManager tsFileManager, long timePartition, @@ -223,13 +210,31 @@ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) return true; } - private static int tryToSubmitInsertionCompactionTask( + public static int scheduleInsertionCompaction( + TsFileManager tsFileManager, long timePartition, CompactionScheduleContext context) + throws InterruptedException { + int count = 0; + while (true) { + Phaser insertionTaskPhaser = new Phaser(1); + int selectedTaskNum = + tryToSubmitInsertionCompactionTask( + tsFileManager, timePartition, insertionTaskPhaser, context); + insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive()); + if (selectedTaskNum <= 0) { + break; + } + count += selectedTaskNum; + } + return count; + } + + public static int tryToSubmitInsertionCompactionTask( TsFileManager tsFileManager, long timePartition, Phaser insertionTaskPhaser, CompactionScheduleContext context) throws InterruptedException { - if (!config.isEnableCrossSpaceCompaction()) { + if (!tsFileManager.isAllowCompaction() || !config.isEnableCrossSpaceCompaction()) { return 0; } String logicalStorageGroupName = tsFileManager.getStorageGroupName(); @@ -349,6 +354,7 @@ public static int tryToSubmitSettleCompactionTask( trySubmitCount++; } } + context.incrementSubmitTaskNum(CompactionTaskType.SETTLE, trySubmitCount); return trySubmitCount; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index ef1fe4e02083..804d441d2f25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -69,7 +69,8 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector protected TsFileManager tsFileManager; private static boolean hasPrintedLog = false; - private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 500; + private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 100; + private static int maxFileNumToSelectInsertionTaskInOnePartition = 200; private final long memoryBudget; private final int maxCrossCompactionFileNum; @@ -166,6 +167,14 @@ public InsertionCrossCompactionTaskResource selectOneInsertionTask( "Selecting insertion cross compaction task resources from {} seqFile, {} unseqFiles", candidate.getSeqFiles().size(), candidate.getUnseqFiles().size()); + boolean delaySelection = + candidate.getSeqFiles().size() + candidate.getUnseqFiles().size() + > maxFileNumToSelectInsertionTaskInOnePartition; + if (delaySelection) { + context.delayInsertionSelection(timePartition); + return new InsertionCrossCompactionTaskResource(); + } + InsertionCrossCompactionTaskResource result = insertionCrossSpaceCompactionSelector.executeInsertionCrossSpaceCompactionTaskSelection(); if (result.isValid()) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java index 23e37044fafe..2fae1a0105fc 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java @@ -147,7 +147,7 @@ public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource() Phaser phaser = new Phaser(1); int submitTaskNum = - CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context); + CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 0, phaser, context); Assert.assertEquals(1, submitTaskNum); // perform insertion compaction phaser.awaitAdvanceInterruptibly(phaser.arrive()); @@ -170,7 +170,7 @@ public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource() // unseq resource2 d2[10, 20] submitTaskNum = - CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context); + CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 0, phaser, context); Assert.assertEquals(0, submitTaskNum); Assert.assertTrue( TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true))); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java index ddcb90f3ac8a..0e0516a46c06 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.cross; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.service.metrics.FileMetrics; @@ -452,7 +453,7 @@ public void testInsertionCompactionScheduleWithEmptySeqSpace2() } @Test - public void testInsertionCompactionScheduleWithMultiTimePartitions() + public void testInsertionCompactionScheduleWithMultiTimePartitions1() throws IOException, InterruptedException { TsFileResource unseqResource1 = generateSingleNonAlignedSeriesFileWithDevices( @@ -504,6 +505,60 @@ public void testInsertionCompactionScheduleWithMultiTimePartitions() TsFileResourceManager.getInstance().getPriorityQueueSize()); } + @Test + public void testInsertionCompactionScheduleWithMultiTimePartitions2() + throws IOException, InterruptedException { + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + int innerCompactionCandidateFileNum = config.getInnerCompactionCandidateFileNum(); + config.setInnerCompactionCandidateFileNum(2); + try { + TsFileResource unseqResource1 = + generateSingleNonAlignedSeriesFileWithDevices( + "2-2-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false); + unseqResource1.setStatusForTest(TsFileResourceStatus.NORMAL); + + TsFileResource unseqResource2 = + generateSingleNonAlignedSeriesFileWithDevices( + "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(6, 9)}, false); + unseqResource2.setStatusForTest(TsFileResourceStatus.NORMAL); + createTimePartitionDirIfNotExist(2808L); + TsFileResource unseqResource3 = + generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition( + "4-4-0-0.tsfile", + new String[] {"d1"}, + new TimeRange[] {new TimeRange(1698301490305L, 1698301490405L)}, + 2808L, + true); + TsFileResource unseqResource4 = + generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition( + "5-5-0-0.tsfile", + new String[] {"d1"}, + new TimeRange[] {new TimeRange(1698301490306L, 1698301490406L)}, + 2808L, + true); + unseqResource3.setStatusForTest(TsFileResourceStatus.NORMAL); + unseqResources.add(unseqResource1); + unseqResources.add(unseqResource2); + seqResources.add(unseqResource3); + seqResources.add(unseqResource4); + + DataRegionForCompactionTest dataRegion = createDataRegion(); + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource1); + TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource2); + TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource3); + TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource4); + tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource1); + tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource2); + tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource3); + tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource4); + // 2 insertion task + 1 inner task + Assert.assertEquals(3, dataRegion.executeCompaction()); + } finally { + config.setInnerCompactionCandidateFileNum(innerCompactionCandidateFileNum); + } + } + @Test public void testInsertionCompactionUpdateFileMetrics() throws IOException { TsFileResource unseqResource1 = @@ -606,9 +661,10 @@ public DataRegionForCompactionTest(String databaseName, String id) { } public int executeInsertionCompaction() throws InterruptedException { - return super.executeInsertionCompaction( - new ArrayList<>(this.getTsFileManager().getTimePartitions()), - new CompactionScheduleContext()); + CompactionScheduleContext context = new CompactionScheduleContext(); + super.executeInsertionCompaction( + new ArrayList<>(this.getTsFileManager().getTimePartitions()), context); + return context.getSubmitCompactionTaskNum(); } } }