diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 106be74fa3ad..0707dba32195 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -81,6 +81,7 @@ public String partition() { // cache filtered manifests to avoid extra work when commits fail. private final Map filteredManifests = Maps.newConcurrentMap(); + private final Map> manifestDeletedPositions = Maps.newConcurrentMap(); // tracking where files were deleted to validate retries quickly private final Map> filteredManifestToDeletedFiles = @@ -153,6 +154,12 @@ void caseSensitive(boolean newCaseSensitive) { void delete(F file) { Preconditions.checkNotNull(file, "Cannot delete file: null"); invalidateFilteredCache(); + if (file.manifestLocation() != null) { + manifestDeletedPositions + .computeIfAbsent(file.manifestLocation(), key -> Sets.newHashSet()) + .add(file.pos()); + } + deletePaths.add(file.path()); deleteFilePartitions.add(file.specId(), file.partition()); } @@ -168,7 +175,8 @@ void delete(CharSequence path) { boolean containsDeletes() { return !deletePaths.isEmpty() || deleteExpression != Expressions.alwaysFalse() - || !dropPartitions.isEmpty(); + || !dropPartitions.isEmpty() + || !manifestDeletedPositions.isEmpty(); } /** @@ -308,11 +316,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { PartitionSpec spec = reader.spec(); PartitionAndMetricsEvaluator evaluator = new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression); + boolean hasDeletedFiles = manifestDeletedPositions.containsKey(manifest.path()); + if (hasDeletedFiles) { + return filterManifestWithDeletedFiles(evaluator, manifest, reader); + } // this assumes that the manifest doesn't have files to remove and streams through the // manifest without copying data. if a manifest does have a file to remove, this will break // out of the loop and move on to filtering the manifest. - boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); + hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); if (!hasDeletedFiles) { filteredManifests.put(manifest, manifest); return manifest; @@ -403,7 +415,7 @@ private boolean manifestHasDeletedFiles( return false; } - @SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"}) + @SuppressWarnings("checkstyle:CyclomaticComplexity") private ManifestFile filterManifestWithDeletedFiles( PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader reader) { boolean isDelete = reader.isDeleteManifestReader(); @@ -421,7 +433,7 @@ private ManifestFile filterManifestWithDeletedFiles( entry -> { F file = entry.file(); boolean markedForDelete = - deletePaths.contains(file.path()) + fileIsDeleted(file, manifest) || dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.isLive() @@ -481,6 +493,13 @@ private ManifestFile filterManifestWithDeletedFiles( } } + @SuppressWarnings("CollectionUndefinedEquality") + private boolean fileIsDeleted(F file, ManifestFile manifest) { + Set positions = manifestDeletedPositions.get(manifest.path()); + boolean filePositionIsDeleted = positions != null && positions.contains(file.pos()); + return filePositionIsDeleted || deletePaths.contains(file.path()); + } + // an evaluator that checks whether rows in a file may/must match a given expression // this class first partially evaluates the provided expression using the partition tuple // and then checks the remaining part of the expression using metrics evaluators