Skip to content

Commit

Permalink
[backport] Spark 3.2: Add prefix mismatch mode for deleting orphan fi…
Browse files Browse the repository at this point in the history
…les (apache#4652) (apache#31)

(cherry picked from commit 679c3d4)
  • Loading branch information
puchengy authored Feb 13, 2023
1 parent 58330d0 commit b214e7f
Show file tree
Hide file tree
Showing 6 changed files with 584 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.iceberg.actions;

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* An action that deletes orphan metadata, data and delete files in a table.
Expand Down Expand Up @@ -80,9 +83,78 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);

/**
* Passes a prefix mismatch mode that determines how this action should handle situations when the
* metadata references files that match listed/provided files except for authority/scheme.
*
* <p>Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch mode is "ERROR", which
* means an exception is thrown whenever there is a mismatch in authority/scheme. It's the
* recommended mismatch mode and should be changed only in some rare circumstances. If there is a
* mismatch, use {@link #equalSchemes(Map)} and {@link #equalAuthorities(Map)} to resolve
* conflicts by providing equivalent schemes and authorities. If it is impossible to determine
* whether the conflicting authorities/schemes are equal, set the prefix mismatch mode to "IGNORE"
* to skip files with mismatches. If you have manually inspected all conflicting
* authorities/schemes, provided equivalent schemes/authorities and are absolutely confident the
* remaining ones are different, set the prefix mismatch mode to "DELETE" to consider files with
* mismatches as orphan. It will be impossible to recover files after deletion, so the "DELETE"
* prefix mismatch mode must be used with extreme caution.
*
* @param newPrefixMismatchMode mode for handling prefix mismatches
* @return this for method chaining
*/
default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement prefixMismatchMode");
}

/**
* Passes schemes that should be considered equal.
*
* <p>The key may include a comma-separated list of schemes. For instance, Map("s3a,s3,s3n",
* "s3").
*
* @param newEqualSchemes list of equal schemes
* @return this for method chaining
*/
default DeleteOrphanFiles equalSchemes(Map<String, String> newEqualSchemes) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement equalSchemes");
}

/**
* Passes authorities that should be considered equal.
*
* <p>The key may include a comma-separate list of authorities. For instance, Map("s1name,s2name",
* "servicename").
*
* @param newEqualAuthorities list of equal authorities
* @return this for method chaining
*/
default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthorities) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement equalAuthorities");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns locations of orphan files. */
Iterable<String> orphanFileLocations();
}

/**
* Defines the action behavior when location prefixes (scheme/authority) mismatch.
*
* <p>{@link #ERROR} - throw an exception. {@link #IGNORE} - no action. {@link #DELETE} - delete
* files.
*/
enum PrefixMismatchMode {
ERROR,
IGNORE,
DELETE;

public static PrefixMismatchMode fromString(String modeAsString) {
Preconditions.checkArgument(modeAsString != null, "Mode should not be null");
return PrefixMismatchMode.valueOf(modeAsString.toUpperCase(Locale.ENGLISH));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;

import java.io.IOException;
import java.net.URI;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
Expand All @@ -30,18 +31,26 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -437,4 +446,88 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws Exception {
resultDF.as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}

@Test
public void testRemoveOrphanFilesProcedureWithPrefixMode()
throws NoSuchTableException, ParseException, IOException {
if (catalogName.equals("testhadoop")) {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
} else {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'",
tableName, temp.newFolder().toURI().toString());
}
Table table = Spark3Util.loadIcebergTable(spark, tableName);
String location = table.location();
Path originalPath = new Path(location);

URI uri = originalPath.toUri();
Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());

DataFile dataFile1 =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(new Path(newParentPath, "path/to/data-a.parquet").toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
DataFile dataFile2 =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(new Path(newParentPath, "path/to/data-b.parquet").toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();

table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();

Timestamp lastModifiedTimestamp = new Timestamp(10000);

List<FilePathLastModifiedRecord> allFiles =
Lists.newArrayList(
new FilePathLastModifiedRecord(
new Path(originalPath, "path/to/data-a.parquet").toString(), lastModifiedTimestamp),
new FilePathLastModifiedRecord(
new Path(originalPath, "path/to/data-b.parquet").toString(), lastModifiedTimestamp),
new FilePathLastModifiedRecord(
ReachableFileUtil.versionHintLocation(table), lastModifiedTimestamp));

for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
allFiles.add(new FilePathLastModifiedRecord(file, lastModifiedTimestamp));
}

for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
allFiles.add(new FilePathLastModifiedRecord(manifest.path(), lastModifiedTimestamp));
}

Dataset<Row> compareToFileList =
spark
.createDataFrame(allFiles, FilePathLastModifiedRecord.class)
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");
String fileListViewName = "files_view";
compareToFileList.createOrReplaceTempView(fileListViewName);
List<Object[]> orphanFiles =
sql(
"CALL %s.system.remove_orphan_files("
+ "table => '%s',"
+ "equal_schemes => map('file1', 'file'),"
+ "file_list_view => '%s')",
catalogName, tableIdent, fileListViewName);
Assert.assertEquals(0, orphanFiles.size());

// Test with no equal schemes
AssertHelpers.assertThrows(
"Should complain about removing orphan files",
ValidationException.class,
"Conflicting authorities/schemes: [(file1, file)]",
() ->
sql(
"CALL %s.system.remove_orphan_files("
+ "table => '%s',"
+ "file_list_view => '%s')",
catalogName, tableIdent, fileListViewName));

// Drop table in afterEach has purge and fails due to invalid scheme "file1" used in this test
// Dropping the table here
sql("DROP TABLE %s", tableName);
}
}
Loading

0 comments on commit b214e7f

Please sign in to comment.