Skip to content

Commit

Permalink
[FLINK-36125] Fix concurrent duplicated discard of logical file in cp…
Browse files Browse the repository at this point in the history
… file-merging (apache#25236)
  • Loading branch information
Zakelly authored Sep 3, 2024
1 parent c869326 commit 2c16d88
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;

Expand Down Expand Up @@ -59,7 +60,7 @@ public static LogicalFileId generateRandomId() {
private long lastUsedCheckpointID = -1L;

/** Whether this logical file is removed by checkpoint subsumption/abortion. */
boolean discarded = false;
AtomicBoolean discarded = new AtomicBoolean(false);

/** The physical file where this logical file is stored. This should never be null. */
@Nonnull private final PhysicalFile physicalFile;
Expand Down Expand Up @@ -114,10 +115,9 @@ public void advanceLastCheckpointId(long checkpointId) {
* @throws IOException if anything goes wrong with file system.
*/
public void discardWithCheckpointId(long checkpointId) throws IOException {
if (!discarded && checkpointId >= lastUsedCheckpointID) {
if (checkpointId >= lastUsedCheckpointID && discarded.compareAndSet(false, true)) {
physicalFile.decRefCount();
physicalFile.decSize(length);
discarded = true;
}
}

Expand Down Expand Up @@ -145,7 +145,7 @@ public SubtaskKey getSubtaskKey() {

@VisibleForTesting
public boolean isDiscarded() {
return discarded;
return discarded.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException {
assertThat(physicalFile1.isDeleted()).isFalse();
assertThat(physicalFile1.getRefCount()).isZero();

// duplicated discard takes no effect
logicalFile1.discardWithCheckpointId(2);
assertThat(physicalFile1.getRefCount()).isZero();

physicalFile1.close();
assertThat(physicalFile1.isOpen()).isFalse();
assertThat(physicalFile1.isDeleted()).isTrue();
Expand Down

0 comments on commit 2c16d88

Please sign in to comment.