Skip to content

Commit

Permalink
Add tests in TestBaseIncrementalChangelogScan
Browse files Browse the repository at this point in the history
... and adopt some suggestions from review feedback.
  • Loading branch information
wypoon committed Sep 13, 2024
1 parent 5a79bb4 commit b368ba4
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ public CloseableIterable<ChangelogScanTask> apply(
entry -> {
long entrySnapshotId = entry.snapshotId();
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry));
DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
DeleteFile[] addedDeleteFiles = filterAdded(deleteFiles);
DeleteFile[] existingDeleteFiles = filterExisting(deleteFiles);

switch (entry.status()) {
case ADDED:
Expand All @@ -232,7 +234,7 @@ public CloseableIterable<ChangelogScanTask> apply(
snapshotId,
dataFile,
addedDeleteFiles,
filterExisting(context.deletes().forEntry(entry)),
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
Expand All @@ -245,7 +247,7 @@ public CloseableIterable<ChangelogScanTask> apply(
changeOrdinal,
snapshotId,
dataFile,
filterExisting(context.deletes().forEntry(entry)),
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
Expand All @@ -262,7 +264,7 @@ public CloseableIterable<ChangelogScanTask> apply(
snapshotId,
dataFile,
addedDeleteFiles,
filterExisting(context.deletes().forEntry(entry)),
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
Expand All @@ -273,7 +275,7 @@ public CloseableIterable<ChangelogScanTask> apply(
"Unexpected entry status: " + entry.status());
}
});
return CloseableIterable.filter(tasks, task -> !(task instanceof DummyChangelogScanTask));
return CloseableIterable.filter(tasks, task -> (task != DummyChangelogScanTask.INSTANCE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -130,6 +131,139 @@ public void testFileDeletes() {
assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
}

@TestTemplate
public void testPositionDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 1 task").hasSize(1);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();
}

@TestTemplate
public void testEqualityDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 2 tasks").hasSize(2);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path());
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1);
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t2.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty();
}

@TestTemplate
public void testMixOfPositionAndEqualityDeletes() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_A2).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 2 tasks").hasSize(2);

DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A2.path());
assertThat(t1.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t1.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t1.addedDeletes().get(1).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t1.existingDeletes()).as("Must be no existing deletes").isEmpty();

DeletedRowsScanTask t2 = (DeletedRowsScanTask) Iterables.get(tasks, 1);
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
assertThat(t2.addedDeletes().size()).as("Number of added delete files must match").isEqualTo(2);
assertThat(t2.addedDeletes().get(0).path())
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES.path());
assertThat(t2.addedDeletes().get(1).path())
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES.path());
assertThat(t2.existingDeletes()).as("Must be no existing deletes").isEmpty();
}

@TestTemplate
public void testAddingAndDeletingInSameCommit() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A).commit();
Snapshot snap1 = table.currentSnapshot();

table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit();
Snapshot snap2 = table.currentSnapshot();

IncrementalChangelogScan scan =
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());

List<ChangelogScanTask> tasks = plan(scan);

assertThat(tasks).as("Must have 1 tasks").hasSize(1);

AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.get(tasks, 0);
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
assertThat(t1.deletes().get(0).path())
.as("Delete file must match")
.isEqualTo(FILE_B_DELETES.path());
}

@TestTemplate
public void testExistingEntriesInNewDataManifestsAreIgnored() {
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,23 @@ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask t
}
}

CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
private InternalRow projectRow(InternalRow row, int[] indexes) {
InternalRow expectedRow = new GenericInternalRow(columns.length);

for (int i = 0; i < columns.length; i++) {
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
}

return expectedRow;
}

private CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter());
int[] indexes = indexesInRow(deletes.requiredSchema());

return CloseableIterable.transform(
deletes.filter(rows(task, deletes.requiredSchema())),
row -> {
InternalRow expectedRow = new GenericInternalRow(columns.length);

for (int i = 0; i < columns.length; i++) {
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
}

return expectedRow;
});
deletes.filter(rows(task, deletes.requiredSchema())), row -> projectRow(row, indexes));
}

private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
Expand All @@ -170,41 +171,23 @@ private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFi
int[] indexes = indexesInRow(deletes.requiredSchema());

return CloseableIterable.transform(
deletes.filter(rows(task, deletes.requiredSchema())),
row -> {
InternalRow expectedRow = new GenericInternalRow(columns.length);

for (int i = 0; i < columns.length; i++) {
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
}

return expectedRow;
});
deletes.filter(rows(task, deletes.requiredSchema())), row -> projectRow(row, indexes));
}

private CloseableIterable<InternalRow> openDeletedRowsScanTask(DeletedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter existingDeletes =
new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
SparkDeleteFilter newDeletes = new SparkDeleteFilter(filePath, task.addedDeletes(), counter());
Schema schema1 = existingDeletes.requiredSchema();
Schema schema2 = newDeletes.requiredSchema();
Schema requiredSchema = TypeUtil.join(schema1, schema2);
Schema requiredSchema =
TypeUtil.join(existingDeletes.requiredSchema(), newDeletes.requiredSchema());
int[] indexes = indexesInRow(requiredSchema);

return CloseableIterable.transform(
// first, apply the existing deletes and get the rows remaining
// then, see what rows are deleted by applying the new deletes
newDeletes.filterDeleted(existingDeletes.filter(rows(task, requiredSchema))),
row -> {
InternalRow expectedRow = new GenericInternalRow(columns.length);

for (int i = 0; i < columns.length; i++) {
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
}

return expectedRow;
});
row -> projectRow(row, indexes));
}

private CloseableIterable<InternalRow> rows(ContentScanTask<DataFile> task, Schema readSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.ChangelogOperation;
Expand Down Expand Up @@ -554,17 +555,10 @@ private List<InternalRow> getChangelogRows(Table tbl) throws IOException {

// order by change ordinal, change type, data, id
rows.sort(
(r1, r2) -> {
if (r1.getInt(3) != r2.getInt(3)) {
return r1.getInt(3) - r2.getInt(3);
} else if (!r1.getUTF8String(2).equals(r2.getUTF8String(2))) {
return r1.getUTF8String(2).compareTo(r2.getUTF8String(2));
} else if (!r1.getUTF8String(1).equals(r2.getUTF8String(1))) {
return r1.getUTF8String(1).compareTo(r2.getUTF8String(1));
} else {
return r1.getInt(0) - r2.getInt(0);
}
});
Comparator.comparingInt((InternalRow r) -> r.getInt(3))
.thenComparing((InternalRow r) -> r.getUTF8String(2))
.thenComparing((InternalRow r) -> r.getUTF8String(1))
.thenComparingInt((InternalRow r) -> r.getInt(0)));

return rows;
}
Expand Down

0 comments on commit b368ba4

Please sign in to comment.