From aafd0fa83dba93c7dd6466d0999d9b1fac0c4464 Mon Sep 17 00:00:00 2001 From: Gabor Kaszab Date: Thu, 5 Dec 2024 23:08:19 +0100 Subject: [PATCH] Core: Bulk deletion in RemoveSnapshots The current implementation uses the deleteFile() of the FileIO even if it supports bulk operations. Even though the user of the RemoveSnapshots API can provide a custom Consumer to perform bulk deletion, Iceberg can be clever enough itself to find out if bulk deletion is possible on the FileIO. --- .../apache/iceberg/BulkDeleteConsumer.java | 53 ++++ .../apache/iceberg/FileCleanupStrategy.java | 28 +- .../org/apache/iceberg/RemoveSnapshots.java | 5 + .../apache/iceberg/TestRemoveSnapshots.java | 265 ++++++++++++------ .../java/org/apache/iceberg/TestTables.java | 21 ++ 5 files changed, 279 insertions(+), 93 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/BulkDeleteConsumer.java diff --git a/core/src/main/java/org/apache/iceberg/BulkDeleteConsumer.java b/core/src/main/java/org/apache/iceberg/BulkDeleteConsumer.java new file mode 100644 index 000000000000..977ed06a829e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BulkDeleteConsumer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.function.Consumer; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Consumer class to collect file paths one by one and perform a bulk deletion on them. Not thread + * safe. + */ +public class BulkDeleteConsumer implements Consumer { + private final List files = Lists.newArrayList(); + + private final SupportsBulkOperations ops; + + public BulkDeleteConsumer(SupportsBulkOperations ops) { + this.ops = ops; + } + + @Override + public void accept(String file) { + files.add(file); + } + + public void consumeAll() { + if (files.isEmpty()) { + return; + } + + ops.deleteFiles(files); + + files.clear(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index dae99c572c78..c592834978ea 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -23,6 +23,7 @@ import java.util.function.Consumer; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -75,14 +76,25 @@ protected CloseableIterable readManifests(Snapshot snapshot) { } protected void deleteFiles(Set pathsToDelete, String fileType) { - Tasks.foreach(pathsToDelete) - .executeWith(deleteExecutorService) - .retry(3) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure( - (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) - .run(deleteFunc::accept); + if (deleteFunc instanceof BulkDeleteConsumer) { + pathsToDelete.forEach(deleteFunc); + + try { + ((BulkDeleteConsumer) deleteFunc).consumeAll(); + } catch (BulkDeletionFailureException e) { + LOG.warn("Bulk deletion failed for {} file(s).", e.numberFailedObjects(), e); + } + } else { + Tasks.foreach(pathsToDelete) + .executeWith(deleteExecutorService) + .retry(3) + .stopRetryOn(NotFoundException.class) + .stopOnFailure() + .suppressFailureWhenFinished() + .onFailure( + (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) + .run(deleteFunc::accept); + } } protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7558ea7d8a3e..7d498ddf7e17 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -43,6 +43,7 @@ import java.util.function.Consumer; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -105,6 +106,10 @@ public void accept(String file) { this.defaultMaxRefAgeMs = PropertyUtil.propertyAsLong(base.properties(), MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT); + + if (ops.io() instanceof SupportsBulkOperations) { + this.deleteFunc = new BulkDeleteConsumer((SupportsBulkOperations) ops.io()); + } } @Override diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 44bbd069e27d..2f744681c6e2 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -34,11 +34,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinWriter; @@ -55,13 +60,16 @@ public class TestRemoveSnapshots extends TestBase { @Parameter(index = 1) private boolean incrementalCleanup; - @Parameters(name = "formatVersion = {0}, incrementalCleanup = {1}") + @Parameter(index = 2) + private boolean bulkDelete; + + @Parameters(name = "formatVersion = {0}, incrementalCleanup = {1}, bulkDelete = {2}") protected static List parameters() { return Arrays.asList( - new Object[] {1, true}, - new Object[] {2, true}, - new Object[] {1, false}, - new Object[] {2, false}); + new Object[] {1, true, false}, + new Object[] {2, true, true}, + new Object[] {1, false, true}, + new Object[] {2, false, false}); } private long waitUntilAfter(long timestampMillis) { @@ -86,13 +94,15 @@ public void testExpireOlderThan() { long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(tAfterCommits)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); - assertThat(deletedFiles).containsExactly(firstSnapshot.manifestListLocation()); + assertThat(deletedFiles) + .containsExactlyInAnyOrderElementsOf( + Lists.newArrayList(firstSnapshot.manifestListLocation())); } @TestTemplate @@ -119,9 +129,9 @@ public void testExpireOlderThanWithDelete() { long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(tAfterCommits)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); @@ -129,7 +139,7 @@ public void testExpireOlderThanWithDelete() { assertThat(deletedFiles) .as("Should remove expired manifest lists and deleted data file") - .isEqualTo( + .containsExactlyInAnyOrderElementsOf( Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired firstSnapshot @@ -178,9 +188,9 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(tAfterCommits)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); @@ -188,7 +198,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { assertThat(deletedFiles) .as("Should remove expired manifest lists and deleted data file") - .isEqualTo( + .containsExactlyInAnyOrderElementsOf( Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired firstSnapshot @@ -226,9 +236,9 @@ public void testExpireOlderThanWithRollback() { long snapshotId = table.currentSnapshot().snapshotId(); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(tAfterCommits)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); assertThat(table.snapshot(firstSnapshot.snapshotId())) @@ -240,7 +250,7 @@ public void testExpireOlderThanWithRollback() { assertThat(deletedFiles) .as("Should remove expired manifest lists and reverted appended data file") - .isEqualTo( + .containsExactlyInAnyOrderElementsOf( Sets.newHashSet( secondSnapshot.manifestListLocation(), // snapshot expired Iterables.getOnlyElement(secondSnapshotManifests) @@ -271,9 +281,9 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { long snapshotId = table.currentSnapshot().snapshotId(); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(tAfterCommits)); assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); assertThat(table.snapshot(firstSnapshot.snapshotId())) @@ -285,7 +295,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { assertThat(deletedFiles) .as("Should remove expired manifest lists and reverted appended data file") - .isEqualTo( + .containsExactlyInAnyOrderElementsOf( Sets.newHashSet( secondSnapshot.manifestListLocation(), // snapshot expired secondSnapshotManifests.stream() @@ -648,9 +658,8 @@ public void testScanExpiredManifestInValidSnapshotAppend() { t3 = System.currentTimeMillis(); } - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots((RemoveSnapshots) removeSnapshots(table).expireOlderThan(t3)); assertThat(deletedFiles).contains(FILE_A.location().toString()); } @@ -674,9 +683,8 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { t3 = System.currentTimeMillis(); } - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots((RemoveSnapshots) removeSnapshots(table).expireOlderThan(t3)); assertThat(deletedFiles).contains(FILE_A.location().toString()); } @@ -711,9 +719,8 @@ public void dataFilesCleanup() throws IOException { rewriteManifests.addManifest(newManifest); rewriteManifests.commit(); - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots((RemoveSnapshots) removeSnapshots(table).expireOlderThan(t4)); assertThat(deletedFiles).contains(FILE_A.location().toString()); assertThat(deletedFiles).contains(FILE_B.location().toString()); @@ -811,13 +818,9 @@ public void noDataFileCleanup() throws IOException { t4 = System.currentTimeMillis(); } - Set deletedFiles = Sets.newHashSet(); - - removeSnapshots(table) - .cleanExpiredFiles(false) - .expireOlderThan(t4) - .deleteWith(deletedFiles::add) - .commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).cleanExpiredFiles(false).expireOlderThan(t4)); assertThat(deletedFiles).isEmpty(); } @@ -841,13 +844,11 @@ public void testWithExpiringDanglingStageCommit() { // `C` commit table.newAppend().appendFile(FILE_C).commit(); - Set deletedFiles = Sets.newHashSet(); - // Expire all commits including dangling staged snapshot. - removeSnapshots(table) - .deleteWith(deletedFiles::add) - .expireOlderThan(snapshotB.timestampMillis() + 1) - .commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) + removeSnapshots(table).expireOlderThan(snapshotB.timestampMillis() + 1)); Set expectedDeletes = Sets.newHashSet(); expectedDeletes.add(snapshotA.manifestListLocation()); @@ -871,7 +872,7 @@ public void testWithExpiringDanglingStageCommit() { expectedDeletes.add(file.path()); } }); - assertThat(deletedFiles).isEqualTo(expectedDeletes); + assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletes); // Take the diff expectedDeletes.removeAll(deletedFiles); assertThat(expectedDeletes).isEmpty(); @@ -908,13 +909,12 @@ public void testWithCherryPickTableSnapshot() { // Move the table back to `C` table.manageSnapshots().setCurrentSnapshot(snapshotC.snapshotId()).commit(); - List deletedFiles = Lists.newArrayList(); // Expire `C` - removeSnapshots(table) - .deleteWith(deletedFiles::add) - .expireOlderThan(snapshotC.timestampMillis() + 1) - .commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) + removeSnapshots(table).expireOlderThan(snapshotC.timestampMillis() + 1)); // Make sure no dataFiles are deleted for the B, C, D snapshot Lists.newArrayList(snapshotB, snapshotC, snapshotD) @@ -953,14 +953,10 @@ public void testWithExpiringStagedThenCherrypick() { base = readMetadata(); Snapshot snapshotD = base.snapshots().get(3); - List deletedFiles = Lists.newArrayList(); - // Expire `B` commit. - table - .expireSnapshots() - .deleteWith(deletedFiles::add) - .expireSnapshotId(snapshotB.snapshotId()) - .commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) table.expireSnapshots().expireSnapshotId(snapshotB.snapshotId())); // Make sure no dataFiles are deleted for the staged snapshot Lists.newArrayList(snapshotB) @@ -1014,14 +1010,11 @@ public void testExpireWithDefaultRetainLast() { table.updateProperties().set(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "3").commit(); - Set deletedFiles = Sets.newHashSet(); - Snapshot snapshotBeforeExpiration = table.currentSnapshot(); - removeSnapshots(table) - .expireOlderThan(System.currentTimeMillis()) - .deleteWith(deletedFiles::add) - .commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(System.currentTimeMillis())); assertThat(table.currentSnapshot()).isEqualTo(snapshotBeforeExpiration); assertThat(table.snapshots()).hasSize(3); @@ -1049,10 +1042,8 @@ public void testExpireWithDefaultSnapshotAge() { table.updateProperties().set(TableProperties.MAX_SNAPSHOT_AGE_MS, "1").commit(); - Set deletedFiles = Sets.newHashSet(); - // rely solely on default configs - removeSnapshots(table).deleteWith(deletedFiles::add).commit(); + List deletedFiles = executeRemoveSnapshots(removeSnapshots(table)); assertThat(table.currentSnapshot()).isEqualTo(thirdSnapshot); assertThat(table.snapshots()).hasSize(1); @@ -1098,13 +1089,16 @@ public void testExpireWithDeleteFiles() { Snapshot fourthSnapshot = table.currentSnapshot(); long fourthSnapshotTs = waitUntilAfter(fourthSnapshot.timestampMillis()); - Set deletedFiles = Sets.newHashSet(); - removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) removeSnapshots(table).expireOlderThan(fourthSnapshotTs)); + + ImmutableSet.Builder expectedDeletedFiles = ImmutableSet.builder(); assertThat(deletedFiles) .as("Should remove old delete files and delete file manifests") - .isEqualTo( - ImmutableSet.builder() + .containsExactlyInAnyOrderElementsOf( + expectedDeletedFiles .add(FILE_A.location()) .add(FILE_A_DELETES.location()) .add(firstSnapshot.manifestListLocation()) @@ -1581,16 +1575,17 @@ public void testRetainFilesOnRetainedBranches() { table.newAppend().appendFile(FILE_B).commit(); long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); - Set deletedFiles = Sets.newHashSet(); - Set expectedDeletes = Sets.newHashSet(); - // Only deletionA's manifest list and manifests should be removed + Set expectedDeletes = Sets.newHashSet(); expectedDeletes.add(deletionA.manifestListLocation()); expectedDeletes.addAll(manifestPaths(deletionA, table.io())); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + + List deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) table.expireSnapshots().expireOlderThan(tAfterCommits)); assertThat(table.snapshots()).hasSize(2); - assertThat(deletedFiles).isEqualTo(expectedDeletes); + assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletes); // Delete A on test branch table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit(); @@ -1600,15 +1595,14 @@ public void testRetainFilesOnRetainedBranches() { table.newAppend().appendFile(FILE_C).toBranch(testBranch).commit(); Snapshot testBranchHead = table.snapshot(testBranch); - deletedFiles = Sets.newHashSet(); expectedDeletes = Sets.newHashSet(); waitUntilAfter(testBranchHead.timestampMillis()); - table - .expireSnapshots() - .expireOlderThan(testBranchHead.timestampMillis()) - .deleteWith(deletedFiles::add) - .commit(); + + deletedFiles = + executeRemoveSnapshots( + (RemoveSnapshots) + table.expireSnapshots().expireOlderThan(testBranchHead.timestampMillis())); expectedDeletes.add(appendA.manifestListLocation()); expectedDeletes.addAll(manifestPaths(appendA, table.io())); @@ -1617,7 +1611,88 @@ public void testRetainFilesOnRetainedBranches() { expectedDeletes.add(FILE_A.location().toString()); assertThat(table.snapshots()).hasSize(2); - assertThat(deletedFiles).isEqualTo(expectedDeletes); + assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletes); + } + + @TestTemplate + public void testRemoveFromTableWithBulkIO() { + List deletedFiles = Lists.newArrayList(); + + Table tableWithBulkOps = + TestTables.create( + tableDir, + "tableWithBulkIO", + SCHEMA, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + formatVersion, + new TestBulkLocalFileIO(deletedFiles::add)); + + tableWithBulkOps.newAppend().appendFile(FILE_A).commit(); + + Snapshot firstSnapshot = tableWithBulkOps.currentSnapshot(); + + waitUntilAfter(tableWithBulkOps.currentSnapshot().timestampMillis()); + + tableWithBulkOps.newAppend().appendFile(FILE_B).commit(); + + long snapshotId = tableWithBulkOps.currentSnapshot().snapshotId(); + + long tAfterCommits = waitUntilAfter(tableWithBulkOps.currentSnapshot().timestampMillis()); + + removeSnapshots(tableWithBulkOps).expireOlderThan(tAfterCommits).commit(); + + assertThat(tableWithBulkOps.currentSnapshot().snapshotId()).isEqualTo(snapshotId); + assertThat(tableWithBulkOps.snapshot(firstSnapshot.snapshotId())).isNull(); + assertThat(deletedFiles).containsExactly(firstSnapshot.manifestListLocation()); + } + + @TestTemplate + public void testDeleteExceptionSuppressed() { + table.newAppend().appendFile(FILE_A).commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + waitUntilAfter(table.currentSnapshot().timestampMillis()); + + table.newAppend().appendFile(FILE_B).commit(); + + long snapshotId = table.currentSnapshot().snapshotId(); + + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Consumer failure = + new Consumer() { + @Override + public void accept(String s) { + if (bulkDelete) { + throw new BulkDeletionFailureException(1); + } + + throw new NotFoundException("Test exception"); + } + }; + if (bulkDelete) { + failure = new BulkDeleteConsumer(new TestBulkLocalFileIO(failure)); + } + + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(failure).commit(); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); + assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); + } + + private List executeRemoveSnapshots(RemoveSnapshots removeSnapshots) { + List filesDeleted = Lists.newArrayList(); + + Consumer deletedFiles = filesDeleted::add; + if (bulkDelete) { + deletedFiles = new BulkDeleteConsumer(new TestBulkLocalFileIO(filesDeleted::add)); + } + + removeSnapshots.deleteWith(deletedFiles).commit(); + + return filesDeleted; } private Set manifestPaths(Snapshot snapshot, FileIO io) { @@ -1700,4 +1775,24 @@ private static PartitionStatisticsFile reusePartitionStatsFile( private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); } + + private static class TestBulkLocalFileIO extends TestTables.LocalFileIO + implements SupportsBulkOperations { + + private final Consumer filesDeleted; + + TestBulkLocalFileIO(Consumer deletedFiles) { + this.filesDeleted = deletedFiles; + } + + @Override + public void deleteFile(String path) { + throw new RuntimeIOException("Expected to call the bulk delete interface."); + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + pathsToDelete.forEach(filesDeleted); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index eeff5db8e5a6..88ff13146bbf 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -72,6 +72,27 @@ public static TestTable create( return new TestTable(ops, name); } + public static TestTable create( + File temp, + String name, + Schema schema, + PartitionSpec spec, + SortOrder sortOrder, + int formatVersion, + FileIO fileIO) { + TestTableOperations ops = new TestTableOperations(name, temp, fileIO); + if (ops.current() != null) { + throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); + } + + ops.commit( + null, + newTableMetadata( + schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), formatVersion)); + + return new TestTable(ops, name); + } + public static TestTable create( File temp, String name,