Skip to content

Commit

Permalink
Modify the condition to skip compaction schdule after insertion compa…
Browse files Browse the repository at this point in the history
…ction task selection (#14644) (#14663)

* Modify the condition to skip compaction schdule after insertion compaction task selection

* add ut

* delay insertion compaction

* fix ut
  • Loading branch information
shuwenwei authored Jan 10, 2025
1 parent ff13ec7 commit a8d21d8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -2658,31 +2657,32 @@ public int executeCompaction() throws InterruptedException {
if (!isCompactionSelecting.compareAndSet(false, true)) {
return 0;
}
int trySubmitCount = 0;
CompactionScheduleContext context = new CompactionScheduleContext();
try {
List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from largest to smallest
timePartitions.sort(Comparator.reverseOrder());

CompactionScheduleContext context = new CompactionScheduleContext();

// schedule insert compaction
trySubmitCount += executeInsertionCompaction(timePartitions, context);
context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, trySubmitCount);
int[] submitCountOfTimePartitions = executeInsertionCompaction(timePartitions, context);

// schedule the other compactions
if (trySubmitCount == 0) {
// the name of this variable is trySubmitCount, because the task submitted to the queue
// could be evicted due to the low priority of the task
for (long timePartition : timePartitions) {
CompactionScheduler.sharedLockCompactionSelection();
try {
trySubmitCount +=
CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, context);
} finally {
context.clearTimePartitionDeviceInfoCache();
CompactionScheduler.sharedUnlockCompactionSelection();
}
for (int i = 0; i < timePartitions.size(); i++) {
boolean skipOtherCompactionSchedule =
submitCountOfTimePartitions[i] > 0
&& !config
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.IOT_CONSENSUS_V2);
if (skipOtherCompactionSchedule) {
continue;
}
long timePartition = timePartitions.get(i);
CompactionScheduler.sharedLockCompactionSelection();
try {
CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, context);
} finally {
context.clearTimePartitionDeviceInfoCache();
CompactionScheduler.sharedUnlockCompactionSelection();
}
}
if (context.hasSubmitTask()) {
Expand All @@ -2695,7 +2695,7 @@ public int executeCompaction() throws InterruptedException {
} finally {
isCompactionSelecting.set(false);
}
return trySubmitCount;
return context.getSubmitCompactionTaskNum();
}

/** Schedule settle compaction for ttl check. */
Expand Down Expand Up @@ -2742,40 +2742,36 @@ public int executeTTLCheck() throws InterruptedException {
return trySubmitCount;
}

protected int executeInsertionCompaction(
protected int[] executeInsertionCompaction(
List<Long> timePartitions, CompactionScheduleContext context) throws InterruptedException {
int trySubmitCount = 0;
int[] trySubmitCountOfTimePartitions = new int[timePartitions.size()];
CompactionScheduler.sharedLockCompactionSelection();
try {
while (true) {
int currentSubmitCount = 0;
for (long timePartition : timePartitions) {
while (true) {
Phaser insertionTaskPhaser = new Phaser(1);
int selectedTaskNum =
CompactionScheduler.scheduleInsertionCompaction(
tsFileManager, timePartition, insertionTaskPhaser, context);
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
currentSubmitCount += selectedTaskNum;
if (selectedTaskNum <= 0) {
break;
}
}
for (int i = 0; i < timePartitions.size(); i++) {
long timePartition = timePartitions.get(i);
int selectedTaskNum =
CompactionScheduler.scheduleInsertionCompaction(
tsFileManager, timePartition, context);
currentSubmitCount += selectedTaskNum;
trySubmitCountOfTimePartitions[i] += selectedTaskNum;
context.clearTimePartitionDeviceInfoCache();
}
if (currentSubmitCount <= 0) {
break;
}
trySubmitCount += currentSubmitCount;
context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, currentSubmitCount);
}
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
logger.error("Meet error in insertion compaction schedule.", e);
} finally {
context.clearTimePartitionDeviceInfoCache();
CompactionScheduler.sharedUnlockCompactionSelection();
}
return trySubmitCount;
return trySubmitCountOfTimePartitions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class CompactionScheduleContext {
private int submitSeqInnerSpaceCompactionTaskNum = 0;
Expand All @@ -46,8 +48,19 @@ public class CompactionScheduleContext {
private final Map<TsFileResource, DeviceTimeIndex> partitionFileDeviceInfoCache;
private long cachedDeviceInfoSize = 0;

private final Set<Long> timePartitionsDelayInsertionSelection;

public CompactionScheduleContext() {
this.partitionFileDeviceInfoCache = new HashMap<>();
this.timePartitionsDelayInsertionSelection = new HashSet<>();
}

public void delayInsertionSelection(long timePartitionId) {
timePartitionsDelayInsertionSelection.add(timePartitionId);
}

public boolean isInsertionSelectionDelayed(long timePartitionId) {
return timePartitionsDelayInsertionSelection.remove(timePartitionId);
}

public void addResourceDeviceTimeIndex(
Expand Down Expand Up @@ -132,6 +145,14 @@ public int getSubmitSettleCompactionTaskNum() {
return submitSettleCompactionTaskNum;
}

public int getSubmitCompactionTaskNum() {
return submitSeqInnerSpaceCompactionTaskNum
+ submitUnseqInnerSpaceCompactionTaskNum
+ submitCrossSpaceCompactionTaskNum
+ submitInsertionCrossSpaceCompactionTaskNum
+ submitSettleCompactionTaskNum;
}

public boolean hasSubmitTask() {
return submitCrossSpaceCompactionTaskNum
+ submitInsertionCrossSpaceCompactionTaskNum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,32 @@ public static void exclusiveUnlockCompactionSelection() {
* @param context the context of compaction schedule
* @return the count of submitted task
*/
public static int scheduleCompaction(
public static void scheduleCompaction(
TsFileManager tsFileManager, long timePartition, CompactionScheduleContext context)
throws InterruptedException {
if (!tsFileManager.isAllowCompaction()) {
return 0;
return;
}
// the name of this variable is trySubmitCount, because the task submitted to the queue could be
// evicted due to the low priority of the task
int trySubmitCount = 0;
try {
trySubmitCount +=
int submitInnerTaskNum = 0;
submitInnerTaskNum +=
tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true, context);
trySubmitCount +=
submitInnerTaskNum +=
tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false, context);
trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition, context);
trySubmitCount +=
tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context, false);
boolean executeDelayedInsertionSelection =
submitInnerTaskNum == 0 && context.isInsertionSelectionDelayed(timePartition);
if (executeDelayedInsertionSelection) {
scheduleInsertionCompaction(tsFileManager, timePartition, context);
}
tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition, context);
tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context, false);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
LOGGER.error("Meet error in compaction schedule.", e);
}
return trySubmitCount;
}

@TestOnly
Expand All @@ -124,22 +127,6 @@ public static void scheduleCompaction(TsFileManager tsFileManager, long timePart
scheduleCompaction(tsFileManager, timePartition, new CompactionScheduleContext());
}

public static int scheduleInsertionCompaction(
TsFileManager tsFileManager,
long timePartition,
Phaser insertionTaskPhaser,
CompactionScheduleContext context)
throws InterruptedException {
if (!tsFileManager.isAllowCompaction()) {
return 0;
}
int trySubmitCount = 0;
trySubmitCount +=
tryToSubmitInsertionCompactionTask(
tsFileManager, timePartition, insertionTaskPhaser, context);
return trySubmitCount;
}

public static int tryToSubmitInnerSpaceCompactionTask(
TsFileManager tsFileManager,
long timePartition,
Expand Down Expand Up @@ -223,13 +210,31 @@ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task)
return true;
}

private static int tryToSubmitInsertionCompactionTask(
public static int scheduleInsertionCompaction(
TsFileManager tsFileManager, long timePartition, CompactionScheduleContext context)
throws InterruptedException {
int count = 0;
while (true) {
Phaser insertionTaskPhaser = new Phaser(1);
int selectedTaskNum =
tryToSubmitInsertionCompactionTask(
tsFileManager, timePartition, insertionTaskPhaser, context);
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
if (selectedTaskNum <= 0) {
break;
}
count += selectedTaskNum;
}
return count;
}

public static int tryToSubmitInsertionCompactionTask(
TsFileManager tsFileManager,
long timePartition,
Phaser insertionTaskPhaser,
CompactionScheduleContext context)
throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
if (!tsFileManager.isAllowCompaction() || !config.isEnableCrossSpaceCompaction()) {
return 0;
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
Expand Down Expand Up @@ -349,6 +354,7 @@ public static int tryToSubmitSettleCompactionTask(
trySubmitCount++;
}
}
context.incrementSubmitTaskNum(CompactionTaskType.SETTLE, trySubmitCount);
return trySubmitCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
protected TsFileManager tsFileManager;

private static boolean hasPrintedLog = false;
private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 500;
private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 100;
private static int maxFileNumToSelectInsertionTaskInOnePartition = 200;

private final long memoryBudget;
private final int maxCrossCompactionFileNum;
Expand Down Expand Up @@ -166,6 +167,14 @@ public InsertionCrossCompactionTaskResource selectOneInsertionTask(
"Selecting insertion cross compaction task resources from {} seqFile, {} unseqFiles",
candidate.getSeqFiles().size(),
candidate.getUnseqFiles().size());
boolean delaySelection =
candidate.getSeqFiles().size() + candidate.getUnseqFiles().size()
> maxFileNumToSelectInsertionTaskInOnePartition;
if (delaySelection) {
context.delayInsertionSelection(timePartition);
return new InsertionCrossCompactionTaskResource();
}

InsertionCrossCompactionTaskResource result =
insertionCrossSpaceCompactionSelector.executeInsertionCrossSpaceCompactionTaskSelection();
if (result.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource()

Phaser phaser = new Phaser(1);
int submitTaskNum =
CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context);
CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 0, phaser, context);
Assert.assertEquals(1, submitTaskNum);
// perform insertion compaction
phaser.awaitAdvanceInterruptibly(phaser.arrive());
Expand All @@ -170,7 +170,7 @@ public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource()
// unseq resource2 d2[10, 20]

submitTaskNum =
CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context);
CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 0, phaser, context);
Assert.assertEquals(0, submitTaskNum);
Assert.assertTrue(
TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true)));
Expand Down
Loading

0 comments on commit a8d21d8

Please sign in to comment.