Skip to content

Commit

Permalink
[FLINK-35886][source] Do not track already finished splits in waterma…
Browse files Browse the repository at this point in the history
…rk alignment
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 6d100ab commit 5f6200c
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -92,6 +93,7 @@ public void start() {

@Override
public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
releaseFinishedSplits(sourceOutput);

if (waitingForSplitsBehaviour == WaitingForSplits.WAIT_FOR_INITIAL
&& splitsAssignmentState == SplitsAssignmentState.NO_SPLITS_ASSIGNED) {
Expand Down Expand Up @@ -141,6 +143,18 @@ final int record = sourceSplit.getNext(false)[0];
}
}

private void releaseFinishedSplits(ReaderOutput<Integer> sourceOutput) {
Iterator<MockSourceSplit> assignedSplitsIterator = assignedSplits.iterator();
while (assignedSplitsIterator.hasNext()) {
MockSourceSplit assignedSplit = assignedSplitsIterator.next();
if (assignedSplit.isFinished()) {
sourceOutput.releaseOutputForSplit(assignedSplit.splitId());
assignedSplitsIterator.remove();
pausedSplits.remove(assignedSplit.splitId());
}
}
}

@Override
public List<MockSourceSplit> snapshotState(long checkpointId) {
return assignedSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,11 @@ public void updateCurrentSplitWatermark(String splitId, long watermark) {
}
}

@Override
public void splitFinished(String splitId) {
splitCurrentWatermarks.remove(splitId);
}

/**
* Finds the splits that are beyond the current max watermark and pauses them. At the same time,
* splits that have been paused and where the global watermark caught up are resumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ private PausableRelativeClock createInputActivityClock(String splitId) {
}

void releaseOutputForSplit(String splitId) {
watermarkUpdateListener.splitFinished(splitId);
localOutputs.remove(splitId);
watermarkMultiplexer.unregisterOutput(splitId);
PausableRelativeClock inputActivityClock =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ interface WatermarkUpdateListener {

/** Notifies about changes to per split watermarks. */
void updateCurrentSplitWatermark(String splitId, long watermark);

/** Notifies that split has finished. */
void splitFinished(String splitId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public void updateCurrentEffectiveWatermark(long watermark) {}

@Override
public void updateCurrentSplitWatermark(String splitId, long watermark) {}

@Override
public void splitFinished(String splitId) {}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
Expand Down Expand Up @@ -222,6 +223,38 @@ void testSplitWatermarkAlignmentAndIdleness() throws Exception {
assertThat(dataOutput.getEvents()).doNotHave(new WatermarkAbove(maxEmittedWatermark));
}

@Test
void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
long idleTimeout = 100;
MockSourceReader sourceReader =
new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
SourceOperator<Integer, MockSourceSplit> operator =
createAndOpenSourceOperatorWithIdleness(
sourceReader, processingTimeService, idleTimeout);

MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
int maxAllowedWatermark = 4;
int maxEmittedWatermark = maxAllowedWatermark + 1;
// the intention is that only first record from split0 gets emitted, then split0 gets
// blocked and record (maxEmittedWatermark + 100) is never emitted from split0
split0.addRecord(maxEmittedWatermark);
split1.addRecord(3);

operator.handleOperatorEvent(
new AddSplitEvent<>(
Arrays.asList(split0, split1), new MockSourceSplitSerializer()));
CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();

while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
// split0 emits its only record and is finished/released
}
operator.handleOperatorEvent(
new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks split0
assertThat(sourceReader.getPausedSplits()).isEmpty();
}

private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdleness(
MockSourceReader sourceReader,
TestProcessingTimeService processingTimeService,
Expand Down

0 comments on commit 5f6200c

Please sign in to comment.