diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index d903c11b52fc..f250d2e12289 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -126,6 +126,7 @@ public static TableMetadata replacePaths( metadata.snapshotLog(), metadataLogEntries, metadata.refs(), + // TODO: update statistic file paths metadata.statisticsFiles(), metadata.partitionStatisticsFiles(), metadata.changes()); @@ -276,7 +277,7 @@ private static List manifestFilesInSnapshot(FileIO io, Snapshot sn * @param targetPrefix target prefix that will replace it * @return a copy plan of content files in the manifest that was rewritten */ - public static List> rewriteManifest( + public static RewriteResult rewriteDataManifest( ManifestFile manifestFile, OutputFile outputFile, FileIO io, @@ -292,7 +293,7 @@ public static List> rewriteManifest( ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { return StreamSupport.stream(reader.entries().spliterator(), false) .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) - .collect(Collectors.toList()); + .reduce(new RewriteResult<>(), RewriteResult::append); } } @@ -335,12 +336,13 @@ public static RewriteResult rewriteDeleteManifest( } } - private static Pair writeDataFileEntry( + private static RewriteResult writeDataFileEntry( ManifestEntry entry, PartitionSpec spec, String sourcePrefix, String targetPrefix, ManifestWriter writer) { + RewriteResult result = new RewriteResult<>(); DataFile dataFile = entry.file(); String sourceDataFilePath = dataFile.location(); Preconditions.checkArgument( @@ -352,7 +354,8 @@ private static Pair writeDataFileEntry( DataFile newDataFile = DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); appendEntryWithFile(entry, writer, newDataFile); - return Pair.of(sourceDataFilePath, newDataFile.location()); + result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + return result; } private static RewriteResult writeDeleteFileEntry( @@ -386,7 +389,7 @@ private static RewriteResult writeDeleteFileEntry( case EQUALITY_DELETES: DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); appendEntryWithFile(entry, writer, eqDeleteFile); - // we do not need to recursively rewrite the equality delete, just move it + // No need to rewrite equality delete files as they do not contain absolute file paths. result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); return result; @@ -469,10 +472,7 @@ public static void rewritePositionDeleteFile( String path = deleteFile.location(); if (!path.startsWith(sourcePrefix)) { throw new UnsupportedOperationException( - "Expected delete file to be under the source prefix: " - + sourcePrefix - + " but was " - + path); + String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix)); } InputFile sourceFile = io.newInputFile(path); try (CloseableIterable reader = diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java b/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java deleted file mode 100644 index 5fc80b9f968a..000000000000 --- a/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -public class TableMetadataUtil { - private TableMetadataUtil() {} -} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 199664dc0bdd..8f8c1078d531 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -20,12 +20,13 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; @@ -207,7 +208,16 @@ private void validateAndSetStartVersion() { } private String validateVersion(TableMetadata tableMetadata, String versionFileName) { - String versionFile = versionFile(tableMetadata, versionFileName); + String versionFile = null; + if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) { + versionFile = tableMetadata.metadataFileLocation(); + } + + for (MetadataLogEntry log : tableMetadata.previousFiles()) { + if (versionInFilePath(log.file(), versionFileName)) { + versionFile = log.file(); + } + } Preconditions.checkNotNull( versionFile, "Version file %s does not exist in metadata log.", versionFile); @@ -216,19 +226,6 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa return versionFile; } - private String versionFile(TableMetadata metadata, String versionFileName) { - if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) { - return metadata.metadataFileLocation(); - } - - for (MetadataLogEntry log : metadata.previousFiles()) { - if (versionInFilePath(log.file(), versionFileName)) { - return log.file(); - } - } - return null; - } - private boolean versionInFilePath(String path, String version) { return RewriteTablePathUtil.fileName(path).equals(version); } @@ -273,10 +270,9 @@ private String rebuildMetadata() { // rebuild version files RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); - Set diffSnapshots = - getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite()); + Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); - Set manifestsToRewrite = manifestsToRewrite(diffSnapshots, startMetadata); + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); Set validSnapshots = Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); @@ -287,11 +283,16 @@ private String rebuildMetadata() { .reduce(new RewriteResult<>(), RewriteResult::append); // rebuild manifest files - RewriteResult rewriteManifestResult = + RewriteContentFileResult rewriteManifestResult = rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite()); // rebuild position delete files - rewritePositionDeletes(endMetadata, rewriteManifestResult.toRewrite()); + Set deleteFiles = + rewriteManifestResult.toRewrite().stream() + .filter(e -> e instanceof DeleteFile) + .map(e -> (DeleteFile) e) + .collect(Collectors.toSet()); + rewritePositionDeletes(endMetadata, deleteFiles); Set> copyPlan = Sets.newHashSet(); copyPlan.addAll(rewriteVersionResult.copyPlan()); @@ -318,8 +319,7 @@ private String saveFileList(Set> filesToMove) { return fileListPath; } - private Set getDiffSnapshotIds( - TableMetadata startMetadata, Set allSnapshots) { + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { if (startMetadata == null) { return allSnapshots; } else { @@ -396,33 +396,48 @@ private RewriteResult rewriteManifestList( return result; } - private Set manifestsToRewrite(Set diffSnapshots, TableMetadata startMetadata) { + private Set manifestsToRewrite( + Set deltaSnapshots, TableMetadata startMetadata) { try { Table endStaticTable = newStaticTable(endVersionName, table.io()); Dataset lastVersionFiles = manifestDS(endStaticTable).select("path"); if (startMetadata == null) { return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); } else { - Set diffSnapshotIds = - diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + Set deltaSnapshotIds = + deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); return Sets.newHashSet( lastVersionFiles .distinct() - .filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds)) + .filter( + functions + .column(ManifestFile.SNAPSHOT_ID.name()) + .isInCollection(deltaSnapshotIds)) .as(Encoders.STRING()) .collectAsList()); } } catch (Exception e) { throw new UnsupportedOperationException( - "Failed to build the manifest files dataframe, the end version you are " - + "trying to copy may contain invalid snapshots, please a younger version that doesn't have invalid " - + "snapshots", + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots.", e); } } - public static class RewriteDeleteFileResult extends RewriteResult { - public RewriteDeleteFileResult append(RewriteDeleteFileResult r1) { + public static class RewriteContentFileResult extends RewriteResult> { + public RewriteContentFileResult append(RewriteResult> r1) { + this.copyPlan().addAll(r1.copyPlan()); + this.toRewrite().addAll(r1.toRewrite()); + return this; + } + + public RewriteContentFileResult appendDataFile(RewriteResult r1) { + this.copyPlan().addAll(r1.copyPlan()); + this.toRewrite().addAll(r1.toRewrite()); + return this; + } + + public RewriteContentFileResult appendDeleteFile(RewriteResult r1) { this.copyPlan().addAll(r1.copyPlan()); this.toRewrite().addAll(r1.toRewrite()); return this; @@ -430,10 +445,10 @@ public RewriteDeleteFileResult append(RewriteDeleteFileResult r1) { } /** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */ - private RewriteResult rewriteManifests( + private RewriteContentFileResult rewriteManifests( TableMetadata tableMetadata, Set toRewrite) { if (toRewrite.isEmpty()) { - return new RewriteResult<>(); + return new RewriteContentFileResult(); } Encoder manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class); @@ -454,13 +469,13 @@ private RewriteResult rewriteManifests( specsById, sourcePrefix, targetPrefix), - Encoders.bean(RewriteDeleteFileResult.class)) + Encoders.bean(RewriteContentFileResult.class)) // duplicates are expected here as the same data file can have different statuses // (e.g. added and deleted) - .reduce((ReduceFunction) RewriteDeleteFileResult::append); + .reduce((ReduceFunction) RewriteContentFileResult::append); } - private static MapFunction toManifests( + private static MapFunction toManifests( Broadcast tableBroadcast, String stagingLocation, int format, @@ -469,23 +484,21 @@ private static MapFunction toManifests( String targetPrefix) { return manifestFile -> { - RewriteDeleteFileResult result = new RewriteDeleteFileResult(); + RewriteContentFileResult result = new RewriteContentFileResult(); switch (manifestFile.content()) { case DATA: - result - .copyPlan() - .addAll( - writeDataManifest( - manifestFile, - tableBroadcast, - stagingLocation, - format, - specsById, - sourcePrefix, - targetPrefix)); + result.appendDataFile( + writeDataManifest( + manifestFile, + tableBroadcast, + stagingLocation, + format, + specsById, + sourcePrefix, + targetPrefix)); break; case DELETES: - result.append( + result.appendDeleteFile( writeDeleteManifest( manifestFile, tableBroadcast, @@ -503,7 +516,7 @@ private static MapFunction toManifests( }; } - private static List> writeDataManifest( + private static RewriteResult writeDataManifest( ManifestFile manifestFile, Broadcast
tableBroadcast, String stagingLocation, @@ -516,10 +529,8 @@ private static List> writeDataManifest( FileIO io = tableBroadcast.getValue().io(); OutputFile outputFile = io.newOutputFile(stagingPath); Map specsById = specsByIdBroadcast.getValue(); - - return new ArrayList<>( - RewriteTablePathUtil.rewriteManifest( - manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix)); + return RewriteTablePathUtil.rewriteDataManifest( + manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix); } catch (IOException e) { throw new RuntimeIOException(e); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 2cab2df975ff..2ea034b073a9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -181,25 +181,18 @@ public void testRewritePath() throws Exception { .select("file_path") .as(Encoders.STRING()) .collectAsList(); - assertThat(validDataFilesAfterRebuilt.size()).isEqualTo(2); - for (String item : validDataFilesAfterRebuilt) { - assertThat(item).startsWith(targetTableLocation); - } + assertThat(validDataFilesAfterRebuilt) + .hasSize(2) + .allMatch(item -> item.startsWith(targetTableLocation)); // verify data rows - Dataset resultDF = spark.read().format("iceberg").load(targetTableLocation); - List actualRecords = - resultDF.sort("c1", "c2", "c3").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); - - List expectedRecords = Lists.newArrayList(); - expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); - expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); - - assertThat(expectedRecords).isEqualTo(actualRecords); + List actual = rows(targetTableLocation); + List expected = rows(tableLocation); + assertEquals("Rows should match after copy", expected, actual); } @Test - public void testSameLocations() throws Exception { + public void testSameLocations() { assertThatThrownBy( () -> actions() @@ -546,9 +539,8 @@ public void testMoveVersionWithInvalidSnapshots() throws Exception { .execute()) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining( - "Failed to build the manifest files dataframe, " - + "the end version you are trying to copy may contain invalid snapshots, " - + "please a younger version that doesn't have invalid snapshots"); + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots."); } @Test @@ -1000,4 +992,8 @@ private static String fileName(String path) { private TableMetadata currentMetadata(Table tbl) { return ((HasTableOperations) tbl).operations().current(); } + + private List rows(String location) { + return rowsToJava(spark.read().format("iceberg").load(location).collectAsList()); + } }