From 5f6200c71916a889ece594974db4343aef62d0e7 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 14 Aug 2024 12:57:02 +0200 Subject: [PATCH] [FLINK-35886][source] Do not track already finished splits in watermark alignment --- .../source/mocks/MockSourceReader.java | 14 ++++++++ .../api/operators/SourceOperator.java | 5 +++ .../ProgressiveTimestampsAndWatermarks.java | 1 + .../source/TimestampsAndWatermarks.java | 3 ++ .../source/WatermarkToDataOutput.java | 3 ++ ...ceOperatorSplitWatermarkAlignmentTest.java | 33 +++++++++++++++++++ 6 files changed, 59 insertions(+) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index facfcf346cd02..b85a2e075bde2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -92,6 +93,7 @@ public void start() { @Override public InputStatus pollNext(ReaderOutput sourceOutput) throws Exception { + releaseFinishedSplits(sourceOutput); if (waitingForSplitsBehaviour == WaitingForSplits.WAIT_FOR_INITIAL && splitsAssignmentState == SplitsAssignmentState.NO_SPLITS_ASSIGNED) { @@ -141,6 +143,18 @@ final int record = sourceSplit.getNext(false)[0]; } } + private void releaseFinishedSplits(ReaderOutput sourceOutput) { + Iterator assignedSplitsIterator = assignedSplits.iterator(); + while (assignedSplitsIterator.hasNext()) { + MockSourceSplit assignedSplit = assignedSplitsIterator.next(); + if (assignedSplit.isFinished()) { + sourceOutput.releaseOutputForSplit(assignedSplit.splitId()); + assignedSplitsIterator.remove(); + pausedSplits.remove(assignedSplit.splitId()); + } + } + } + @Override public List snapshotState(long checkpointId) { return assignedSplits; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index d1257e2caf541..b915ab5ba883c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -664,6 +664,11 @@ public void updateCurrentSplitWatermark(String splitId, long watermark) { } } + @Override + public void splitFinished(String splitId) { + splitCurrentWatermarks.remove(splitId); + } + /** * Finds the splits that are beyond the current max watermark and pauses them. At the same time, * splits that have been paused and where the global watermark caught up are resumed. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java index ca0c5f47b9bb6..5b96c5dc0c969 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java @@ -304,6 +304,7 @@ private PausableRelativeClock createInputActivityClock(String splitId) { } void releaseOutputForSplit(String splitId) { + watermarkUpdateListener.splitFinished(splitId); localOutputs.remove(splitId); watermarkMultiplexer.unregisterOutput(splitId); PausableRelativeClock inputActivityClock = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java index bdbd3479da51e..4d96e53e2e4b2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java @@ -62,6 +62,9 @@ interface WatermarkUpdateListener { /** Notifies about changes to per split watermarks. */ void updateCurrentSplitWatermark(String splitId, long watermark); + + /** Notifies that split has finished. */ + void splitFinished(String splitId); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java index d4d81b64f45a3..4fcf46ca9f968 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java @@ -53,6 +53,9 @@ public void updateCurrentEffectiveWatermark(long watermark) {} @Override public void updateCurrentSplitWatermark(String splitId, long watermark) {} + + @Override + public void splitFinished(String splitId) {} }); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index e776395d31ad5..3031801b07c69 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -38,6 +38,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; +import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -222,6 +223,38 @@ void testSplitWatermarkAlignmentAndIdleness() throws Exception { assertThat(dataOutput.getEvents()).doNotHave(new WatermarkAbove(maxEmittedWatermark)); } + @Test + void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception { + long idleTimeout = 100; + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + SourceOperator operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + MockSourceSplit split0 = new MockSourceSplit(0, 0, 1); + MockSourceSplit split1 = new MockSourceSplit(1, 10, 20); + int maxAllowedWatermark = 4; + int maxEmittedWatermark = maxAllowedWatermark + 1; + // the intention is that only first record from split0 gets emitted, then split0 gets + // blocked and record (maxEmittedWatermark + 100) is never emitted from split0 + split0.addRecord(maxEmittedWatermark); + split1.addRecord(3); + + operator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(split0, split1), new MockSourceSplitSerializer())); + CollectingDataOutput dataOutput = new CollectingDataOutput<>(); + + while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) { + // split0 emits its only record and is finished/released + } + operator.handleOperatorEvent( + new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks split0 + assertThat(sourceReader.getPausedSplits()).isEmpty(); + } + private SourceOperator createAndOpenSourceOperatorWithIdleness( MockSourceReader sourceReader, TestProcessingTimeService processingTimeService,