Skip to content

Commit

Permalink
Optimize merging snapshot producer
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Sep 13, 2024
1 parent 5582b0c commit 0beb808
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public String partition() {

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
private final Set<String> referencedManifestLocations = Sets.newHashSet();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Expand Down Expand Up @@ -153,6 +154,10 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
if (file.manifestLocation() != null) {
referencedManifestLocations.add(file.manifestLocation());
}

deletePaths.add(file.path());
deleteFilePartitions.add(file.specId(), file.partition());
}
Expand Down Expand Up @@ -304,6 +309,11 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
return manifest;
}

boolean hasDeletedFiles = referencedManifestLocations.contains(manifest.path());
if (hasDeletedFiles) {
return filterManifestWithDeletedFiles(manifest, tableSchema);
}

try (ManifestReader<F> reader = newManifestReader(manifest)) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
Expand All @@ -312,7 +322,7 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -325,6 +335,17 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}
}

private ManifestFile filterManifestWithDeletedFiles(ManifestFile manifest, Schema schema) {
try (ManifestReader<F> reader = newManifestReader(manifest)) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(schema, spec, deleteExpression);
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
Expand Down

0 comments on commit 0beb808

Please sign in to comment.