Skip to content

Commit

Permalink
API: Support removeUnusedSpecs in ExpireSnapshots (#10755)
Browse files Browse the repository at this point in the history
Implement an API in ExpireSnapshots to remove partition specs no longer in use by performing a reachability analysis, so that metadata sizes can be maintained.

Co-authored-by: Russell_Spitzer <[email protected]>
Co-authored-by: Amogh Jahagirdar <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent 1cbc163 commit 67e084c
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 6 deletions.
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,15 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
* @return this for method chaining
*/
ExpireSnapshots cleanExpiredFiles(boolean clean);

/**
* Enable cleaning up unused metadata, such as partition specs, schemas, etc.
*
* @param clean remove unused partition specs, schemas, or other metadata when true
* @return this for method chaining
*/
default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
});

Set<String> filesToDelete =
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());

deleteFiles(filesToDelete, "data");
deleteFiles(manifestsToDelete, "manifest");
Expand All @@ -273,7 +275,7 @@ private Set<String> findFilesToDelete(
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
TableMetadata current) {
Map<Integer, PartitionSpec> specsById) {
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
Expand All @@ -285,8 +287,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
// deleted
Expand All @@ -311,8 +312,7 @@ private Set<String> findFilesToDelete(
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, fileIO, current.specsById())) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemovePartitionSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemovePartitionSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSpecs(specIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// RemovePartitionSpecs
private static final String SPEC_IDS = "spec-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static void writeRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -596,4 +611,8 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}

private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
}
}
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@ public void accept(String file) {
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
private boolean cleanExpiredMetadata = false;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) {
return this;
}

@Override
public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
this.cleanExpiredMetadata = clean;
return this;
}

@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
Expand Down Expand Up @@ -209,6 +217,26 @@ private TableMetadata internalApply() {
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (cleanExpiredMetadata) {
// TODO: Support cleaning expired schema as well.
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));

Set<Integer> specsToRemove =
base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSpecs(specsToRemove);
}

return updatedMetaBuilder.build();
}

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,19 @@ public Builder setDefaultPartitionSpec(int specId) {
return this;
}

Builder removeSpecs(Iterable<Integer> specIds) {
Set<Integer> specIdsToRemove = Sets.newHashSet(specIds);
Preconditions.checkArgument(
!specIdsToRemove.contains(defaultSpecId), "Cannot remove the default partition spec");

this.specs =
specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
}

return this;
Expand Down Expand Up @@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
// require that the default partition spec has not changed
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}

// require that no branches have changed, so that old specs won't be written.
if (base != null && !isReplace) {
base.refs()
.forEach(
(name, ref) -> {
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
}
});
}
}

private List<UpdateRequirement> build() {
return requirements.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,17 @@ public void testRemovePartitionStatistics() {
.isEqualTo(json);
}

@Test
public void testRemovePartitionSpec() {
String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove partition specs should convert to the correct JSON value")
.isEqualTo(json);
}

public void assertEquals(
String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) {
switch (action) {
Expand Down Expand Up @@ -1016,6 +1027,11 @@ public void assertEquals(
(MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
(MetadataUpdate.SetCurrentViewVersion) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
assertEqualsRemovePartitionSpecs(
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
break;
default:
fail("Unrecognized metadata update action: " + action);
}
Expand Down Expand Up @@ -1237,6 +1253,11 @@ private static void assertEqualsSetCurrentViewVersion(
assertThat(actual.versionId()).isEqualTo(expected.versionId());
}

private static void assertEqualsRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) {
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
}

private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
Expand Down
Loading

0 comments on commit 67e084c

Please sign in to comment.