From 7e061e7fa0b2220e31f85f1a6dc823f3dec075f4 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Tue, 17 Sep 2024 13:22:37 -0700 Subject: [PATCH] Fix a bug. Add more tests. --- .../iceberg/BaseIncrementalChangelogScan.java | 68 +++----- .../TestBaseIncrementalChangelogScan.java | 154 +++++++++++------- 2 files changed, 114 insertions(+), 108 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java index 617c0069d39e..743b9ae86050 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -26,6 +26,7 @@ import org.apache.iceberg.ManifestGroup.TaskContext; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; @@ -62,11 +63,6 @@ protected CloseableIterable doPlanFiles( Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); - // map of delete file to the snapshot where the delete file is added - // the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal - Map deleteFileToSnapshotOrdinal = - computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); - Iterable> plans = FluentIterable.from(changelogSnapshots) .transform( @@ -91,9 +87,10 @@ protected CloseableIterable doPlanFiles( } long snapshotId = snapshot.snapshotId(); + long sequenceNumber = snapshot.sequenceNumber(); + int changeOrdinal = snapshotOrdinals.get(snapshotId); return manifestGroup.plan( - new CreateDataFileChangeTasks( - snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal)); + new CreateDataFileChangeTasks(snapshotId, sequenceNumber, changeOrdinal)); }); return CloseableIterable.concat(plans); @@ -131,21 +128,6 @@ private static Map computeSnapshotOrdinals(Deque snapsh return snapshotOrdinals; } - private Map computeDeleteFileToSnapshotOrdinal( - Deque snapshots, Map snapshotOrdinals) { - Map deleteFileToSnapshotOrdinal = Maps.newHashMap(); - - for (Snapshot snapshot : snapshots) { - Iterable deleteFiles = snapshot.addedDeleteFiles(table().io()); - for (DeleteFile deleteFile : deleteFiles) { - deleteFileToSnapshotOrdinal.put( - deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); - } - } - - return deleteFileToSnapshotOrdinal; - } - private static class DummyChangelogScanTask implements ChangelogScanTask { public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); @@ -169,34 +151,13 @@ public long commitSnapshotId() { private static class CreateDataFileChangeTasks implements CreateTasksFunction { private final long snapshotId; + private final long sequenceNumber; private final int changeOrdinal; - private final Map snapshotOrdinals; - private final Map deleteFileToSnapshotOrdinal; - CreateDataFileChangeTasks( - long snapshotId, - Map snapshotOrdinals, - Map deleteFileToSnapshotOrdinal) { + CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) { this.snapshotId = snapshotId; - this.snapshotOrdinals = snapshotOrdinals; - this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; - this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); - } - - private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { - return FluentIterable.from(deleteFiles) - .filter( - deleteFile -> - deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) - .toArray(DeleteFile.class); - } - - private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { - return FluentIterable.from(deleteFiles) - .filter( - deleteFile -> - deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) - .toArray(DeleteFile.class); + this.sequenceNumber = sequenceNumber; + this.changeOrdinal = changeOrdinal; } @Override @@ -210,8 +171,17 @@ public CloseableIterable apply( long entrySnapshotId = entry.snapshotId(); DataFile dataFile = entry.file().copy(context.shouldKeepStats()); DeleteFile[] deleteFiles = context.deletes().forEntry(entry); - DeleteFile[] addedDeleteFiles = filterAdded(deleteFiles); - DeleteFile[] existingDeleteFiles = filterExisting(deleteFiles); + List added = Lists.newArrayList(); + List existing = Lists.newArrayList(); + for (DeleteFile deleteFile : deleteFiles) { + if (sequenceNumber == deleteFile.dataSequenceNumber()) { + added.add(deleteFile); + } else { + existing.add(deleteFile); + } + } + DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]); + DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]); switch (entry.status()) { case ADDED: diff --git a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java index d5f7c1f2697b..ada4cee6c6e4 100644 --- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java +++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java @@ -132,40 +132,90 @@ public void testFileDeletes() { } @TestTemplate - public void testPositionDeletes() { + public void testRowDeletes() { assumeThat(formatVersion).isEqualTo(2); - table.newFastAppend().appendFile(FILE_A).commit(); + table + .newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_A2) + .appendFile(FILE_B) + .appendFile(FILE_C) + .commit(); Snapshot snap1 = table.currentSnapshot(); - table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + // position delete + table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); Snapshot snap2 = table.currentSnapshot(); + // equality delete + table.newRowDelta().addDeletes(FILE_C2_DELETES).commit(); + Snapshot snap3 = table.currentSnapshot(); + + // mix of position and equality deletes + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + Snapshot snap4 = table.currentSnapshot(); + IncrementalChangelogScan scan = - newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId()); + newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId()); List tasks = plan(scan); - assertThat(tasks).as("Must have 1 task").hasSize(1); + assertThat(tasks).as("Must have 4 tasks").hasSize(4); DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0); assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); - assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); + assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); assertThat(t1.addedDeletes().get(0).path()) .as("Added delete file must match") - .isEqualTo(FILE_A_DELETES.path()); + .isEqualTo(FILE_B_DELETES.path()); assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); + + DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1); + assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1); + assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); + assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path()); + assertThat(t2.addedDeletes().get(0).path()) + .as("Added delete file must match") + .isEqualTo(FILE_C2_DELETES.path()); + assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + + DeletedRowsScanTask t3 = (DeletedRowsScanTask) Iterables.get(tasks, 2); + assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2); + assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); + assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); + assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); + assertThat(t3.addedDeletes().get(0).path()) + .as("Added delete file must match") + .isEqualTo(FILE_A2_DELETES.path()); + assertThat(t3.addedDeletes().get(1).path()) + .as("Added delete file must match") + .isEqualTo(FILE_A_DELETES.path()); + assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty(); + + DeletedRowsScanTask t4 = (DeletedRowsScanTask) Iterables.get(tasks, 3); + assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2); + assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId()); + assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); + assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); + assertThat(t4.addedDeletes().get(0).path()) + .as("Added delete file must match") + .isEqualTo(FILE_A2_DELETES.path()); + assertThat(t4.addedDeletes().get(1).path()) + .as("Added delete file must match") + .isEqualTo(FILE_A_DELETES.path()); + assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty(); } @TestTemplate - public void testEqualityDeletes() { + public void testAddingAndDeletingInSameCommit() { assumeThat(formatVersion).isEqualTo(2); - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit(); + table.newFastAppend().appendFile(FILE_A).commit(); Snapshot snap1 = table.currentSnapshot(); - table.newRowDelta().addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit(); Snapshot snap2 = table.currentSnapshot(); IncrementalChangelogScan scan = @@ -173,94 +223,80 @@ public void testEqualityDeletes() { List tasks = plan(scan); - assertThat(tasks).as("Must have 2 tasks").hasSize(2); + assertThat(tasks).as("Must have 1 tasks").hasSize(1); - DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0); + AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.get(tasks, 0); assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); - assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); - assertThat(t1.addedDeletes().get(0).path()) - .as("Added delete file must match") - .isEqualTo(FILE_A2_DELETES.path()); - assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); - - DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1); - assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0); - assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); - assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); - assertThat(t2.addedDeletes().get(0).path()) - .as("Added delete file must match") - .isEqualTo(FILE_A2_DELETES.path()); - assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); + assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); + assertThat(t1.deletes().get(0).path()) + .as("Delete file must match") + .isEqualTo(FILE_B_DELETES.path()); } @TestTemplate - public void testMixOfPositionAndEqualityDeletes() { + public void testDeletingRowsInDataFileWithExistingDeletes() { assumeThat(formatVersion).isEqualTo(2); - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit(); + table.newFastAppend().appendFile(FILE_A).commit(); Snapshot snap1 = table.currentSnapshot(); - table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); Snapshot snap2 = table.currentSnapshot(); + table.newRowDelta().addDeletes(FILE_A2_DELETES).commit(); + Snapshot snap3 = table.currentSnapshot(); + IncrementalChangelogScan scan = - newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId()); + newScan().fromSnapshotExclusive(snap2.snapshotId()).toSnapshot(snap3.snapshotId()); List tasks = plan(scan); - assertThat(tasks).as("Must have 2 tasks").hasSize(2); + assertThat(tasks).as("Must have 1 task").hasSize(1); DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0); assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); - assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); - assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path()); - assertThat(t1.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); + assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); + assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); + assertThat(t1.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(1); assertThat(t1.addedDeletes().get(0).path()) .as("Added delete file must match") .isEqualTo(FILE_A2_DELETES.path()); - assertThat(t1.addedDeletes().get(1).path()) - .as("Added delete file must match") + assertThat(t1.existingDeletes().size()) + .as("Number of existing delete files must match") + .isEqualTo(1); + assertThat(t1.existingDeletes().get(0).path()) + .as("Existing delete file must match") .isEqualTo(FILE_A_DELETES.path()); - assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty(); - - DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1); - assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0); - assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); - assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path()); - assertThat(t2.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2); - assertThat(t2.addedDeletes().get(0).path()) - .as("Added delete file must match") - .isEqualTo(FILE_A2_DELETES.path()); - assertThat(t2.addedDeletes().get(1).path()) - .as("Added delete file must match") - .isEqualTo(FILE_A_DELETES.path()); - assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty(); } @TestTemplate - public void testAddingAndDeletingInSameCommit() { + public void testDeletingDataFileWithExistingDeletes() { assumeThat(formatVersion).isEqualTo(2); - table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); Snapshot snap1 = table.currentSnapshot(); - table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit(); + table.newRowDelta().addDeletes(FILE_B_DELETES).commit(); Snapshot snap2 = table.currentSnapshot(); + table.newDelete().deleteFile(FILE_B).commit(); + Snapshot snap3 = table.currentSnapshot(); + IncrementalChangelogScan scan = - newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId()); + newScan().fromSnapshotExclusive(snap2.snapshotId()).toSnapshot(snap3.snapshotId()); List tasks = plan(scan); - assertThat(tasks).as("Must have 1 tasks").hasSize(1); + assertThat(tasks).as("Must have 1 task").hasSize(1); - AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.get(tasks, 0); + DeletedDataFileScanTask t1 = (DeletedDataFileScanTask) Iterables.get(tasks, 0); assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0); - assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId()); + assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId()); assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path()); - assertThat(t1.deletes().get(0).path()) - .as("Delete file must match") + assertThat(t1.existingDeletes()).hasSize(1); + assertThat(t1.existingDeletes().get(0).path()) + .as("Existing delete file must match") .isEqualTo(FILE_B_DELETES.path()); }