Skip to content

Commit

Permalink
Fix a bug. Add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon committed Sep 17, 2024
1 parent b368ba4 commit 7e061e7
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.ManifestGroup.TaskContext;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
Expand Down Expand Up @@ -62,11 +63,6 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(

Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots);

// map of delete file to the snapshot where the delete file is added
// the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal
Map<String, Integer> deleteFileToSnapshotOrdinal =
computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals);

Iterable<CloseableIterable<ChangelogScanTask>> plans =
FluentIterable.from(changelogSnapshots)
.transform(
Expand All @@ -91,9 +87,10 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
}

long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();
int changeOrdinal = snapshotOrdinals.get(snapshotId);
return manifestGroup.plan(
new CreateDataFileChangeTasks(
snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal));
new CreateDataFileChangeTasks(snapshotId, sequenceNumber, changeOrdinal));
});

return CloseableIterable.concat(plans);
Expand Down Expand Up @@ -131,21 +128,6 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}

private Map<String, Integer> computeDeleteFileToSnapshotOrdinal(
Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) {
Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap();

for (Snapshot snapshot : snapshots) {
Iterable<DeleteFile> deleteFiles = snapshot.addedDeleteFiles(table().io());
for (DeleteFile deleteFile : deleteFiles) {
deleteFileToSnapshotOrdinal.put(
deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId()));
}
}

return deleteFileToSnapshotOrdinal;
}

private static class DummyChangelogScanTask implements ChangelogScanTask {
public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask();

Expand All @@ -169,34 +151,13 @@ public long commitSnapshotId() {

private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
private final long snapshotId;
private final long sequenceNumber;
private final int changeOrdinal;
private final Map<Long, Integer> snapshotOrdinals;
private final Map<String, Integer> deleteFileToSnapshotOrdinal;

CreateDataFileChangeTasks(
long snapshotId,
Map<Long, Integer> snapshotOrdinals,
Map<String, Integer> deleteFileToSnapshotOrdinal) {
CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) {
this.snapshotId = snapshotId;
this.snapshotOrdinals = snapshotOrdinals;
this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
}

private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
return FluentIterable.from(deleteFiles)
.filter(
deleteFile ->
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
.toArray(DeleteFile.class);
}

private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
return FluentIterable.from(deleteFiles)
.filter(
deleteFile ->
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
.toArray(DeleteFile.class);
this.sequenceNumber = sequenceNumber;
this.changeOrdinal = changeOrdinal;
}

@Override
Expand All @@ -210,8 +171,17 @@ public CloseableIterable<ChangelogScanTask> apply(
long entrySnapshotId = entry.snapshotId();
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
DeleteFile[] addedDeleteFiles = filterAdded(deleteFiles);
DeleteFile[] existingDeleteFiles = filterExisting(deleteFiles);
List<DeleteFile> added = Lists.newArrayList();
List<DeleteFile> existing = Lists.newArrayList();
for (DeleteFile deleteFile : deleteFiles) {
if (sequenceNumber == deleteFile.dataSequenceNumber()) {
added.add(deleteFile);
} else {
existing.add(deleteFile);
}
}
DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]);
DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]);

switch (entry.status()) {
case ADDED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,135 +132,171 @@ public void testFileDeletes() {
}

@TestTemplate
public void testPositionDeletes() {
public void testRowDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).commit();
table
.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_A2)
.appendFile(FILE_B)
.appendFile(FILE_C)
.commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
// position delete
table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

// equality delete
table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
Snapshot snap3 = table.currentSnapshot();

// mix of position and equality deletes
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
Snapshot snap4 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 1 task").hasSize(1);
assertThat(tasks).as("Must have 4 tasks").hasSize(4);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
.isEqualTo(FILE_B_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1);
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId());
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_C.path());
assertThat(t2.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_C2_DELETES.path());
assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t3 = (DeletedRowsScanTask) Iterables.get(tasks, 2);
assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId());
assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_A2.path());
assertThat(t3.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t3.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t3.addedDeletes().get(1).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t3.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t4 = (DeletedRowsScanTask) Iterables.get(tasks, 3);
assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
assertThat(t4.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId());
assertThat(t4.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t4.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t4.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t4.addedDeletes().get(1).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t4.existingDeletes()).as("Must be no existing deletes").isEmpty();
}

@TestTemplate
public void testEqualityDeletes() {
public void testAddingAndDeletingInSameCommit() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit();
table.newFastAppend().appendFile(FILE_A).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 2 tasks").hasSize(2);
assertThat(tasks).as("Must have 1 tasks").hasSize(1);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path());
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1);
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t2.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty();
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
assertThat(t1.deletes().get(0).path())
.as("Delete file must match")
.isEqualTo(FILE_B_DELETES.path());
}

@TestTemplate
public void testMixOfPositionAndEqualityDeletes() {
public void testDeletingRowsInDataFileWithExistingDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit();
table.newFastAppend().appendFile(FILE_A).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
Snapshot snap3 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
newScan().fromSnapshotExclusive(snap2.snapshotId()).toSnapshot(snap3.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 2 tasks").hasSize(2);
assertThat(tasks).as("Must have 1 task").hasSize(1);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path());
assertThat(t1.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t1.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(1);
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t1.addedDeletes().get(1).path())
.as("Added delete file must match")
assertThat(t1.existingDeletes().size())
.as("Number of existing delete files must match")
.isEqualTo(1);
assertThat(t1.existingDeletes().get(0).path())
.as("Existing delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1);
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t2.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t2.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t2.addedDeletes().get(1).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty();
}

@TestTemplate
public void testAddingAndDeletingInSameCommit() {
public void testDeletingDataFileWithExistingDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).commit();
table.newFastAppend().appendFile(FILE_B).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

table.newDelete().deleteFile(FILE_B).commit();
Snapshot snap3 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
newScan().fromSnapshotExclusive(snap2.snapshotId()).toSnapshot(snap3.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 1 tasks").hasSize(1);
assertThat(tasks).as("Must have 1 task").hasSize(1);

AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.get(tasks, 0);
DeletedDataFileScanTask t1 = (DeletedDataFileScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
assertThat(t1.deletes().get(0).path())
.as("Delete file must match")
assertThat(t1.existingDeletes()).hasSize(1);
assertThat(t1.existingDeletes().get(0).path())
.as("Existing delete file must match")
.isEqualTo(FILE_B_DELETES.path());
}

Expand Down

0 comments on commit 7e061e7

Please sign in to comment.