Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5, Core: Supplement test case for metadata_log_entries after expire snapshot #11901

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ private static StaticDataTask.Row metadataLogEntryToRow(
latestSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, metadataLogEntry.timestampMillis());
latestSnapshot = table.snapshot(latestSnapshotId);
} catch (IllegalArgumentException ignored) {
// implies this metadata file was created at table creation
// implies this metadata file was created at table creation or
// its corresponding or subsequent snapshot has been removed
}

return StaticDataTask.Row.of(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ private static List<HistoryEntry> updateSnapshotLog(
// any invalid entry causes the history before it to be removed. otherwise, there could be
// history gaps that cause time-travel queries to produce incorrect results. for example,
// if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
// [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
// [(t1, s1), (t3, s3)] because it appears that s1 was current during the time between t2
// and t3 when in fact s2 was the current snapshot.
newSnapshotLog.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,4 +855,101 @@ public void metadataLogEntriesAfterReplacingTable() throws Exception {
assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
.containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry, fifthEntry);
}

@TestTemplate
public void metadataLogEntriesAfterExpireSnapshots() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
+ "TBLPROPERTIES "
+ "('format-version'='2')",
tableName);

Table table = Spark3Util.loadIcebergTable(spark, tableName);
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
assertThat(tableMetadata.snapshots()).isEmpty();
assertThat(tableMetadata.snapshotLog()).isEmpty();
assertThat(tableMetadata.currentSnapshot()).isNull();

Object[] firstEntry =
row(
DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 1000),
tableMetadata.metadataFileLocation(),
null,
null,
null);

assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName)).containsExactly(firstEntry);

sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
tableMetadata = ((HasTableOperations) table).operations().refresh();
assertThat(tableMetadata.snapshots()).hasSize(1);
assertThat(tableMetadata.snapshotLog()).hasSize(1);
Snapshot currentSnapshot = tableMetadata.currentSnapshot();
Object[] secondEntry =
row(
DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 1000),
tableMetadata.metadataFileLocation(),
currentSnapshot.snapshotId(),
currentSnapshot.schemaId(),
currentSnapshot.sequenceNumber());

sql("INSERT INTO %s (id, data) VALUES (2, 'b')", tableName);
tableMetadata = ((HasTableOperations) table).operations().refresh();
assertThat(tableMetadata.snapshots()).hasSize(2);
assertThat(tableMetadata.snapshotLog()).hasSize(2);
currentSnapshot = tableMetadata.currentSnapshot();
long toBeExpiredId = currentSnapshot.snapshotId();
Object[] thirdEntry =
row(
DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 1000),
tableMetadata.metadataFileLocation(),
toBeExpiredId,
currentSnapshot.schemaId(),
currentSnapshot.sequenceNumber());

sql("INSERT INTO %s (id, data) VALUES (3, 'c')", tableName);
tableMetadata = ((HasTableOperations) table).operations().refresh();
assertThat(tableMetadata.snapshots()).hasSize(3);
assertThat(tableMetadata.snapshotLog()).hasSize(3);
currentSnapshot = tableMetadata.currentSnapshot();
Object[] fourthEntry =
row(
DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 1000),
tableMetadata.metadataFileLocation(),
currentSnapshot.snapshotId(),
currentSnapshot.schemaId(),
currentSnapshot.sequenceNumber());

// query metadata log entries before expire specified snapshot
assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
.containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry);

// expire snapshot by specifying snapshot id
sql(
"CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids => ARRAY(%d))",
catalogName, tableIdent, toBeExpiredId);

tableMetadata = ((HasTableOperations) table).operations().refresh();
currentSnapshot = tableMetadata.currentSnapshot();
assertThat(tableMetadata.snapshots()).hasSize(2);
assertThat(tableMetadata.snapshotLog()).hasSize(1);

secondEntry = row(secondEntry[0], secondEntry[1], null, null, null);

thirdEntry = row(thirdEntry[0], thirdEntry[1], null, null, null);

// a new log entry is created by `call expire_snapshots(...)`
Object[] fifthEntry =
row(
DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() * 1000),
tableMetadata.metadataFileLocation(),
currentSnapshot.snapshotId(),
currentSnapshot.schemaId(),
currentSnapshot.sequenceNumber());

// query metadata log entries after expire specified snapshot
assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
.containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry, fifthEntry);
}
}