Skip to content

Commit

Permalink
Optimize merging snapshot producer
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Sep 16, 2024
1 parent 5582b0c commit 4099608
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public String partition() {

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
private final Map<String, Set<Long>> deletedManifestPositions = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Expand Down Expand Up @@ -153,6 +154,12 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
if (file.manifestLocation() != null) {
deletedManifestPositions
.computeIfAbsent(file.manifestLocation(), key -> Sets.newHashSet())
.add(file.pos());
}

deletePaths.add(file.path());
deleteFilePartitions.add(file.specId(), file.partition());
}
Expand All @@ -168,7 +175,8 @@ void delete(CharSequence path) {
boolean containsDeletes() {
return !deletePaths.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
|| !dropPartitions.isEmpty()
|| !deletedManifestPositions.isEmpty();
}

/**
Expand Down Expand Up @@ -308,11 +316,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = deletedManifestPositions.containsKey(manifest.path());
if (hasDeletedFiles) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand Down Expand Up @@ -403,7 +415,7 @@ private boolean manifestHasDeletedFiles(
return false;
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private ManifestFile filterManifestWithDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Expand All @@ -421,7 +433,7 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
fileIsDeleted(file, manifest)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -481,6 +493,13 @@ private ManifestFile filterManifestWithDeletedFiles(
}
}

@SuppressWarnings("CollectionUndefinedEquality")
private boolean fileIsDeleted(F file, ManifestFile manifest) {
Set<Long> positions = deletedManifestPositions.get(manifest.path());
boolean filePositionIsDeleted = positions != null && positions.contains(file.pos());
return filePositionIsDeleted || deletePaths.contains(file.path());
}

// an evaluator that checks whether rows in a file may/must match a given expression
// this class first partially evaluates the provided expression using the partition tuple
// and then checks the remaining part of the expression using metrics evaluators
Expand Down

0 comments on commit 4099608

Please sign in to comment.