Skip to content

Commit

Permalink
Core: Optimize MergingSnapshotProducer to use referenced manifests to…
Browse files Browse the repository at this point in the history
… determine if manifest needs to be rewritten (#11131)
  • Loading branch information
amogh-jahagirdar authored Nov 21, 2024
1 parent c1f1f8b commit 90be5d7
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -70,14 +73,17 @@ public class ReplaceDeleteFilesBenchmark {
private static final HadoopTables TABLES = new HadoopTables();

private Table table;
private List<DeleteFile> deleteFiles;
private List<DeleteFile> deleteFilesToReplace;
private List<DeleteFile> pendingDeleteFiles;

@Param({"50000", "100000", "500000", "1000000", "2500000"})
@Param({"50000", "100000", "500000", "1000000", "2000000"})
private int numFiles;

@Param({"5", "25", "50", "100"})
private int percentDeleteFilesReplaced;

@Setup
public void setupBenchmark() {
public void setupBenchmark() throws IOException {
initTable();
initFiles();
}
Expand All @@ -90,10 +96,13 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void replaceDeleteFiles() {
Snapshot currentSnapshot = table.currentSnapshot();
RowDelta rowDelta = table.newRowDelta();
deleteFiles.forEach(rowDelta::removeDeletes);
rowDelta.validateFromSnapshot(currentSnapshot.snapshotId());
deleteFilesToReplace.forEach(rowDelta::removeDeletes);
pendingDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.commit();
table.manageSnapshots().rollbackTo(currentSnapshot.snapshotId()).commit();
}

private void initTable() {
Expand All @@ -104,27 +113,44 @@ private void dropTable() {
TABLES.dropTable(TABLE_IDENT);
}

private void initFiles() {
List<DeleteFile> generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);
private void initFiles() throws IOException {
List<DeleteFile> generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);

int numDeleteFilesToReplace = (int) Math.ceil(numFiles * (percentDeleteFilesReplaced / 100.0));
Map<String, DeleteFile> filesToReplace =
Maps.newHashMapWithExpectedSize(numDeleteFilesToReplace);
RowDelta rowDelta = table.newRowDelta();

for (int ordinal = 0; ordinal < numFiles; ordinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
rowDelta.addRows(dataFile);

DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);

DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
if (numDeleteFilesToReplace > 0) {
filesToReplace.put(deleteFile.location(), deleteFile);
DeleteFile pendingDeleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
numDeleteFilesToReplace--;
}
}

rowDelta.commit();

this.deleteFiles = generatedDeleteFiles;
List<DeleteFile> deleteFilesReadFromManifests = Lists.newArrayList();
for (ManifestFile deleteManifest : table.currentSnapshot().deleteManifests(table.io())) {
try (ManifestReader<DeleteFile> manifestReader =
ManifestFiles.readDeleteManifest(deleteManifest, table.io(), table.specs())) {
manifestReader
.iterator()
.forEachRemaining(
file -> {
if (filesToReplace.containsKey(file.location())) {
deleteFilesReadFromManifests.add(file);
}
});
}
}

this.pendingDeleteFiles = generatedPendingDeleteFiles;
this.deleteFilesToReplace = deleteFilesReadFromManifests;
}
}
105 changes: 69 additions & 36 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -69,6 +70,7 @@ public String partition() {
private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final Set<F> deleteFiles = newFileSet();
private final Set<String> manifestsWithDeletes = Sets.newHashSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private Expression deleteExpression = Expressions.alwaysFalse();
Expand All @@ -77,6 +79,7 @@ public String partition() {
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
Expand Down Expand Up @@ -121,13 +124,15 @@ protected void deleteByRowFilter(Expression expr) {
Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
invalidateFilteredCache();
this.deleteExpression = Expressions.or(deleteExpression, expr);
this.allDeletesReferenceManifests = false;
}

/** Add a partition tuple to drop from the table during the delete phase. */
protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
dropPartitions.add(specId, partition);
this.allDeletesReferenceManifests = false;
}

/**
Expand All @@ -154,6 +159,13 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();

if (file.manifestLocation() == null) {
this.allDeletesReferenceManifests = false;
} else {
manifestsWithDeletes.add(file.manifestLocation());
}

deleteFiles.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}
Expand All @@ -162,6 +174,7 @@ void delete(F file) {
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.allDeletesReferenceManifests = false;
deletePaths.add(path);
}

Expand All @@ -185,6 +198,7 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
return ImmutableList.of();
}

boolean trustManifestReferences = canTrustManifestReferences(manifests);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
Expand All @@ -193,7 +207,8 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
.executeWith(workerPoolSupplier.get())
.run(
index -> {
ManifestFile manifest = filterManifest(tableSchema, manifests.get(index));
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustManifestReferences);
filtered[index] = manifest;
});

Expand All @@ -202,6 +217,16 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
return Arrays.asList(filtered);
}

// Use the current set of referenced manifests as a source of truth when it's a subset of all
// manifests and all removals which were performed reference manifests.
// If a manifest without live files is not in the trusted referenced set, this means that the
// manifest has no deleted entries and does not need to be rewritten.
private boolean canTrustManifestReferences(List<ManifestFile> manifests) {
Set<String> manifestLocations =
manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
return allDeletesReferenceManifests && manifestLocations.containsAll(manifestsWithDeletes);
}

/**
* Creates a snapshot summary builder with the files deleted from the set of filtered manifests.
*
Expand Down Expand Up @@ -307,14 +332,14 @@ private void invalidateFilteredCache() {
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
private ManifestFile filterManifest(
Schema tableSchema, ManifestFile manifest, boolean trustManifestReferences) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}

boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
if (!canContainDeletedFiles(manifest, trustManifestReferences)) {
filteredManifests.put(manifest, manifest);
return manifest;
}
Expand All @@ -323,66 +348,74 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
if (manifestHasDeletedFiles(evaluator, manifest, reader)) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
} else {
filteredManifests.put(manifest, manifest);
return manifest;
}

return filterManifestWithDeletedFiles(evaluator, manifest, reader);

} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
private boolean canContainDeletedFiles(ManifestFile manifest, boolean trustManifestReferences) {
if (hasNoLiveFiles(manifest)) {
return false;
}

if (trustManifestReferences) {
return manifestsWithDeletes.contains(manifest.path());
}

return canContainDroppedFiles(manifest)
|| canContainExpressionDeletes(manifest)
|| canContainDroppedPartitions(manifest);
}

private boolean hasNoLiveFiles(ManifestFile manifest) {
return !manifest.hasAddedFiles() && !manifest.hasExistingFiles();
}

private boolean canContainExpressionDeletes(ManifestFile manifest) {
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
ManifestEvaluator.forRowFilter(
deleteExpression, specsById.get(manifest.partitionSpecId()), caseSensitive);
canContainExpressionDeletes = manifestEvaluator.eval(manifest);
} else {
canContainExpressionDeletes = false;
return manifestEvaluator.eval(manifest);
}

boolean canContainDroppedPartitions;
return false;
}

private boolean canContainDroppedPartitions(ManifestFile manifest) {
if (!dropPartitions.isEmpty()) {
canContainDroppedPartitions =
ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
} else {
canContainDroppedPartitions = false;
return ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
}

boolean canContainDroppedFiles;
return false;
}

private boolean canContainDroppedFiles(ManifestFile manifest) {
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
return true;
} else if (!deleteFiles.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
} else {
canContainDroppedFiles = false;
return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
}

boolean canContainDropBySeq =
manifest.content() == ManifestContent.DELETES
&& manifest.minSequenceNumber() < minSequenceNumber;

return canContainExpressionDeletes
|| canContainDroppedPartitions
|| canContainDroppedFiles
|| canContainDropBySeq;
return false;
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
private boolean manifestHasDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestReader<F> reader) {
PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader<F> reader) {
if (manifestsWithDeletes.contains(manifest.path())) {
return true;
}

boolean isDelete = reader.isDeleteManifestReader();

for (ManifestEntry<F> entry : reader.liveEntries()) {
Expand Down
Loading

0 comments on commit 90be5d7

Please sign in to comment.