diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 2036c16a9..cc3686db9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -128,8 +128,9 @@ public void shutdown() { } streamObjectCompactionScheduler.shutdown(); try { - if (streamObjectCompactionScheduler.awaitTermination(10, TimeUnit.SECONDS)) { + if (!streamObjectCompactionScheduler.awaitTermination(10, TimeUnit.SECONDS)) { LOGGER.warn("await streamObjectCompactionExecutor timeout 10s"); + streamObjectCompactionScheduler.shutdownNow(); } } catch (InterruptedException e) { streamObjectCompactionScheduler.shutdownNow(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 80dc72683..d5994b6c4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -44,12 +44,14 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -63,7 +65,7 @@ public class CompactionManager { private final StreamManager streamManager; private final S3Operator s3Operator; private final CompactionAnalyzer compactionAnalyzer; - private final ScheduledExecutorService compactScheduledExecutor; + private final ScheduledExecutorService compactionScheduledExecutor; private final ScheduledExecutorService bucketCallbackScheduledExecutor; private final ExecutorService compactThreadPool; private final ExecutorService forceSplitThreadPool; @@ -77,6 +79,9 @@ public class CompactionManager { private final long networkBandwidth; private final boolean s3ObjectLogEnable; private final long compactionCacheSize; + private final AtomicBoolean running = new AtomicBoolean(false); + private volatile CompletableFuture forceSplitCf = null; + private volatile CompletableFuture compactionCf = null; private Bucket compactionBucket = null; public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager, @@ -100,12 +105,13 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag maxStreamObjectNumPerCommit = config.maxStreamObjectNumPerCommit(); this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, streamSplitSize, maxStreamNumPerStreamSetObject, maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.nodeId()))); - this.compactScheduledExecutor = Threads.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger); + this.compactionScheduledExecutor = Threads.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger, true, false); this.bucketCallbackScheduledExecutor = Threads.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger); + ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger, true, false); this.compactThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("object-compaction-manager")); this.forceSplitThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("force-split-executor")); + this.running.set(true); this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " + "streamSplitSize: {} bytes, forceSplitObjectPeriod: {} min, maxObjectNumToCompact: {}, maxStreamNumInStreamSet: {}, maxStreamObjectNum: {}", compactionInterval, compactionCacheSize, streamSplitSize, forceSplitObjectPeriod, maxObjectNumToCompact, maxStreamNumPerStreamSetObject, maxStreamObjectNumPerCommit); @@ -115,9 +121,13 @@ public void start() { scheduleNextCompaction((long) this.compactionInterval * 60 * 1000); } - private void scheduleNextCompaction(long delayMillis) { + void scheduleNextCompaction(long delayMillis) { + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip scheduling next compaction"); + return; + } logger.info("Next Compaction started in {} ms", delayMillis); - this.compactScheduledExecutor.schedule(() -> { + this.compactionScheduledExecutor.schedule(() -> { TimerUtil timerUtil = new TimerUtil(); try { logger.info("Compaction started"); @@ -136,9 +146,37 @@ private void scheduleNextCompaction(long delayMillis) { } public void shutdown() { - this.compactScheduledExecutor.shutdown(); + if (!running.compareAndSet(true, false)) { + logger.warn("Compaction manager is already shutdown"); + return; + } + logger.info("Shutting down compaction manager"); + synchronized (this) { + if (forceSplitCf != null) { + // prevent block-waiting for force splitting objects + forceSplitCf.cancel(true); + } + if (compactionCf != null) { + // prevent block-waiting for uploading compacted objects + compactionCf.cancel(true); + } + } + this.compactionScheduledExecutor.shutdown(); + try { + if (!this.compactionScheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + this.compactionScheduledExecutor.shutdownNow(); + } + } catch (InterruptedException ignored) { + } this.bucketCallbackScheduledExecutor.shutdown(); + try { + if (!this.bucketCallbackScheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + this.bucketCallbackScheduledExecutor.shutdownNow(); + } + } catch (InterruptedException ignored) { + } this.uploader.shutdown(); + logger.info("Compaction manager shutdown complete"); } public CompletableFuture compact() { @@ -152,6 +190,10 @@ public CompletableFuture compact() { private void compact(List streamMetadataList, List objectMetadataList) throws CompletionException { + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip compaction"); + return; + } logger.info("Get {} stream set objects from metadata", objectMetadataList.size()); if (objectMetadataList.isEmpty()) { return; @@ -188,6 +230,10 @@ void forceSplitObjects(List streamMetadataList, List streamMetadataList, List streamMetadataList, List objectsToCompact) throws CompletionException { + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip compacting objects"); + return; + } if (objectsToCompact.isEmpty()) { return; } @@ -234,6 +284,10 @@ private void compactObjects(List streamMetadataList, List compactionPlans, Set public CompletableFuture forceSplitAll() { CompletableFuture cf = new CompletableFuture<>(); //TODO: deal with metadata delay - this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { + this.compactionScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) .map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList()); this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> { @@ -408,8 +462,7 @@ Collection> groupAndSplitStreamDataBlocks(S3Obje } CommitStreamSetObjectRequest buildSplitRequest(List streamMetadataList, - S3ObjectMetadata objectToSplit) - throws CompletionException { + S3ObjectMetadata objectToSplit) throws CompletionException { List> cfs = new ArrayList<>(); boolean status = splitStreamSetObject(streamMetadataList, objectToSplit, cfs); if (!status) { @@ -421,6 +474,20 @@ CommitStreamSetObjectRequest buildSplitRequest(List streamMetada request.setObjectId(-1L); // wait for all force split objects to complete + synchronized (this) { + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip waiting for force splitting objects"); + return null; + } + forceSplitCf = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])); + } + try { + forceSplitCf.join(); + } catch (CancellationException exception) { + logger.info("Force split objects cancelled"); + return null; + } + forceSplitCf = null; cfs.stream().map(e -> { try { return e.join(); @@ -463,6 +530,12 @@ CommitStreamSetObjectRequest buildCompactRequest(List streamMeta logCompactionPlans(compactionPlans, excludedObjectIds); objectsToCompact = objectsToCompact.stream().filter(e -> !excludedObjectIds.contains(e.objectId())).collect(Collectors.toList()); executeCompactionPlans(request, compactionPlans, objectsToCompact); + + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip constructing compaction request"); + return null; + } + compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId())))); // compact out-dated objects directly @@ -576,6 +649,10 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List e)); List sortedStreamDataBlocks = new ArrayList<>(); for (int i = 0; i < compactionPlans.size(); i++) { + if (!running.get()) { + logger.info("Compaction manager is shutdown, abort compaction progress"); + return; + } // iterate over each compaction plan CompactionPlan compactionPlan = compactionPlans.get(i); long totalSize = compactionPlan.streamDataBlocksMap().values().stream().flatMap(List::stream) @@ -602,18 +679,31 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List> cfList = new ArrayList<>(); cfList.add(streamSetObjectChainWriteCf); cfList.addAll(streamObjectCfList); - // wait for all stream objects and stream set object part to be uploaded - CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])) - .thenAccept(v -> uploader.forceUploadStreamSetObject()) - .exceptionally(ex -> { - logger.error("Error while uploading compaction objects", ex); - uploader.release().thenAccept(v -> { - for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { - compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); - } - }).join(); - throw new IllegalStateException("Error while uploading compaction objects", ex); - }).join(); + synchronized (this) { + if (!running.get()) { + logger.info("Compaction manager is shutdown, skip waiting for uploading objects"); + return; + } + // wait for all stream objects and stream set object part to be uploaded + compactionCf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])) + .thenAccept(v -> uploader.forceUploadStreamSetObject()) + .exceptionally(ex -> { + uploader.release().thenAccept(v -> { + for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + } + }).join(); + throw new IllegalStateException("Error while uploading compaction objects", ex); + }); + } + try { + compactionCf.join(); + } catch (CancellationException ex) { + logger.warn("Compaction progress {}/{} is cancelled", i + 1, compactionPlans.size()); + return; + } + compactionCf = null; + streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); } List objectStreamRanges = CompactionUtils.buildObjectStreamRangeFromGroup( diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java index c702dfe47..dfa76c147 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java @@ -53,7 +53,20 @@ public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Co public void shutdown() { this.isShutdown = true; this.streamSetObjectUploadPool.shutdown(); + try { + if (!this.streamSetObjectUploadPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.streamSetObjectUploadPool.shutdownNow(); + } + } catch (InterruptedException ignored) { + } + this.streamObjectUploadPool.shutdown(); + try { + if (!this.streamObjectUploadPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.streamObjectUploadPool.shutdownNow(); + } + } catch (InterruptedException ignored) { + } } public CompletableFuture chainWriteStreamSetObject(CompletableFuture prev, @@ -108,10 +121,7 @@ public CompletableFuture writeStreamObject(CompactedObject compact return streamObject; }).whenComplete((ret, ex) -> { if (ex != null) { - if (isShutdown) { - // TODO: remove this when we're able to abort object uploading gracefully - LOGGER.warn("write to stream object {} failed", objectId, ex); - } else { + if (!isShutdown) { LOGGER.error("write to stream object {} failed", objectId, ex); } dataBlockWriter.release(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index a972bcb5d..eb383ed27 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -182,8 +182,11 @@ private void readContinuousBlocks0(List streamDataBlocks) { } private CompletableFuture rangeRead(long start, long end) { - return rangeRead0(start, end).whenComplete((ret, ex) -> - CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes())); + return rangeRead0(start, end).whenComplete((ret, ex) -> { + if (ex == null) { + CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes()); + } + }); } private CompletableFuture rangeRead0(long start, long end) { diff --git a/s3stream/src/main/java/com/automq/stream/utils/Threads.java b/s3stream/src/main/java/com/automq/stream/utils/Threads.java index d444675e8..25df67a9d 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/Threads.java +++ b/s3stream/src/main/java/com/automq/stream/utils/Threads.java @@ -48,16 +48,21 @@ public static ExecutorService newFixedThreadPoolWithMonitor(int nThreads, String public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name, boolean daemon, Logger logger) { - return newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(name, true), logger, false); + return newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(name, true), logger, false, true); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) { - return newSingleThreadScheduledExecutor(threadFactory, logger, false); + return newSingleThreadScheduledExecutor(threadFactory, logger, false, true); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger, boolean removeOnCancelPolicy) { + return newSingleThreadScheduledExecutor(threadFactory, logger, removeOnCancelPolicy, true); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, + Logger logger, boolean removeOnCancelPolicy, boolean executeExistingDelayedTasksAfterShutdownPolicy) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory) { @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { @@ -80,6 +85,7 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD } }; executor.setRemoveOnCancelPolicy(removeOnCancelPolicy); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdownPolicy); return executor; } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index df995e40a..26f76c0a4 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -18,6 +18,7 @@ import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.compact.operator.DataBlockReader; import com.automq.stream.s3.compact.utils.CompactionUtils; +import com.automq.stream.s3.memory.MemoryMetadataManager; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.S3StreamConstant; @@ -25,10 +26,13 @@ import com.automq.stream.s3.metadata.StreamOffsetRange; import com.automq.stream.s3.metadata.StreamState; import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.objects.CommitStreamSetObjectRequest; import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.operator.DefaultS3Operator; +import com.automq.stream.s3.operator.MemoryS3Operator; +import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -40,6 +44,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -47,6 +52,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -54,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -481,6 +488,73 @@ public void testCompactWithLimit() { Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request)); } + @Test + public void testCompactionShutdown() throws Throwable { + streamManager = Mockito.mock(MemoryMetadataManager.class); + when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture( + List.of(new StreamMetadata(STREAM_0, 0, 0, 200, StreamState.OPENED)))); + + objectManager = Mockito.spy(MemoryMetadataManager.class); + s3Operator = Mockito.spy(MemoryS3Operator.class); + List>> invocations = new ArrayList<>(); + when(s3Operator.rangeRead(Mockito.anyString(), Mockito.anyLong(), Mockito.anyLong(), Mockito.eq(ThrottleStrategy.THROTTLE_2))) + .thenAnswer(invocation -> { + CompletableFuture cf = new CompletableFuture<>(); + invocations.add(Pair.of(invocation, cf)); + return cf; + }); + + List s3ObjectMetadataList = new ArrayList<>(); + // stream data for object 0 + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> { + assertEquals(OBJECT_0, objectId); + ObjectWriter objectWriter = ObjectWriter.writer(objectId, s3Operator, 1024, 1024); + StreamRecordBatch r1 = new StreamRecordBatch(STREAM_0, 0, 0, 80, TestUtils.random(80)); + objectWriter.write(STREAM_0, List.of(r1)); + objectWriter.close().join(); + List streamsIndices = List.of( + new StreamOffsetRange(STREAM_0, 0, 80) + ); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_0, S3ObjectType.STREAM_SET, streamsIndices, System.currentTimeMillis(), + System.currentTimeMillis(), objectWriter.size(), OBJECT_0); + s3ObjectMetadataList.add(objectMetadata); + r1.release(); + }).join(); + + // stream data for object 1 + objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> { + assertEquals(OBJECT_1, objectId); + ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_1, s3Operator, 1024, 1024); + StreamRecordBatch r2 = new StreamRecordBatch(STREAM_0, 0, 80, 120, TestUtils.random(120)); + objectWriter.write(STREAM_0, List.of(r2)); + objectWriter.close().join(); + List streamsIndices = List.of( + new StreamOffsetRange(STREAM_0, 80, 120) + ); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_1, S3ObjectType.STREAM_SET, streamsIndices, System.currentTimeMillis(), + System.currentTimeMillis(), objectWriter.size(), OBJECT_1); + s3ObjectMetadataList.add(objectMetadata); + r2.release(); + }).join(); + + doReturn(CompletableFuture.completedFuture(s3ObjectMetadataList)).when(objectManager).getServerObjects(); + + compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + + CompletableFuture cf = compactionManager.compact(); + Thread.sleep(2000); + compactionManager.shutdown(); + for (Pair> pair : invocations) { + CompletableFuture realCf = (CompletableFuture) pair.getLeft().callRealMethod(); + pair.getRight().complete(realCf.get()); + } + try { + cf.join(); + } catch (Exception e) { + fail("Should not throw exception"); + } + } + private boolean checkDataIntegrity(List streamMetadataList, List s3ObjectMetadata, CommitStreamSetObjectRequest request) { Map s3WALObjectMetadataMap = s3ObjectMetadata.stream()