From 4b639f6b75ff716f8f2903678c6bf89e0b7a1690 Mon Sep 17 00:00:00 2001 From: Vivek Rai <43493515+Blazer-007@users.noreply.github.com> Date: Thu, 24 Oct 2024 00:17:01 +0530 Subject: [PATCH] [GOBBLIN-2159] Support partition level copy in Iceberg-Distcp (#4058) --- .../copy/iceberg/BaseIcebergCatalog.java | 14 +- .../copy/iceberg/IcebergCatalog.java | 4 +- .../copy/iceberg/IcebergDatasetFinder.java | 9 +- .../copy/iceberg/IcebergHiveCatalog.java | 6 + .../IcebergOverwritePartitionsStep.java | 164 ++++++++++ .../copy/iceberg/IcebergPartitionDataset.java | 233 ++++++++++++++ .../IcebergPartitionDatasetFinder.java | 64 ++++ .../management/copy/iceberg/IcebergTable.java | 85 ++++- ...esAnyPropNamePartitionFilterPredicate.java | 67 ++++ .../IcebergPartitionFilterPredicateUtil.java | 73 +++++ .../copy/iceberg/IcebergDatasetTest.java | 45 ++- .../IcebergOverwritePartitionsStepTest.java | 156 ++++++++++ .../iceberg/IcebergPartitionDatasetTest.java | 292 ++++++++++++++++++ .../copy/iceberg/IcebergTableTest.java | 131 +++++++- ...yPropNamePartitionFilterPredicateTest.java | 60 ++++ ...ebergPartitionFilterPredicateUtilTest.java | 106 +++++++ 16 files changed, 1475 insertions(+), 34 deletions(-) create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index 9e2ae53b99c..b16e1aaa72e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -21,10 +21,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.iceberg.exceptions.NoSuchTableException; /** * Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the @@ -41,9 +43,15 @@ protected BaseIcebergCatalog(String catalogName, Class compan } @Override - public IcebergTable openTable(String dbName, String tableName) { + public IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); - return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), createTableOperations(tableId), this.getCatalogUri()); + try { + return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), + createTableOperations(tableId), this.getCatalogUri(), loadTableInstance(tableId)); + } catch (NoSuchTableException ex) { + // defend against `org.apache.iceberg.catalog.Catalog::loadTable` throwing inside some `@Override` of `loadTableInstance` + throw new IcebergTable.TableNotFoundException(tableId); + } } protected Catalog createCompanionCatalog(Map properties, Configuration configuration) { @@ -67,4 +75,6 @@ protected String getDatasetDescriptorPlatform() { } protected abstract TableOperations createTableOperations(TableIdentifier tableId); + + protected abstract Table loadTableInstance(TableIdentifier tableId); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 68e9bb31c65..05ddaf9c52a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -29,10 +29,10 @@ public interface IcebergCatalog { /** @return table identified by `dbName` and `tableName` */ - IcebergTable openTable(String dbName, String tableName); + IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException; /** @return table identified by `tableId` */ - default IcebergTable openTable(TableIdentifier tableId) { + default IcebergTable openTable(TableIdentifier tableId) throws IcebergTable.TableNotFoundException { // CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` - // but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)` return openTable(tableId.namespace().toString(), tableId.name()); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index f6668f5d189..e6afe37877b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -85,7 +85,7 @@ public String getConfigPrefix() { } protected final FileSystem sourceFs; - private final Properties properties; + protected final Properties properties; /** * Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog. @@ -153,7 +153,7 @@ protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalo IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName); // TODO: Rethink strategy to enforce dest iceberg table Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName)); - return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); + return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); } protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { @@ -165,6 +165,11 @@ protected static IcebergCatalog createIcebergCatalog(Properties properties, Cata return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } + protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) + throws IOException { + return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, shouldIncludeMetadataPath); + } + protected static boolean getConfigShouldCopyMetadataPath(Properties properties) { return Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH, DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH)); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index af541a79a56..27ea723df56 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; @@ -61,4 +62,9 @@ protected TableOperations createTableOperations(TableIdentifier tableId) { public boolean tableAlreadyExists(IcebergTable icebergTable) { return hc.tableExists(icebergTable.getTableId()); } + + @Override + protected Table loadTableInstance(TableIdentifier tableId) { + return hc.loadTable(tableId); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java new file mode 100644 index 00000000000..dffcbccb273 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SerializationUtil; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.util.retry.RetryerFactory; + +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE; +import static org.apache.gobblin.util.retry.RetryerFactory.RetryType; + +/** + * Commit step for overwriting partitions in an Iceberg table. + *

+ * This class implements the {@link CommitStep} interface and provides functionality to overwrite + * partitions in the destination Iceberg table using serialized data files. + *

+ */ +@Slf4j +public class IcebergOverwritePartitionsStep implements CommitStep { + private final String destTableIdStr; + private final Properties properties; + private final byte[] serializedDataFiles; + private final String partitionColName; + private final String partitionValue; + public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + + ".catalog.overwrite.partitions.retries"; + private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of( + RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L), + RETRY_TIMES, 3, + RETRY_TYPE, RetryType.FIXED_ATTEMPT.name())); + + /** + * Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. + * + * @param destTableIdStr the identifier of the destination table as a string + * @param serializedDataFiles [from List] the serialized data files to be used for replacing partitions + * @param properties the properties containing configuration + */ + public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) { + this.destTableIdStr = destTableIdStr; + this.partitionColName = partitionColName; + this.partitionValue = partitionValue; + this.serializedDataFiles = serializedDataFiles; + this.properties = properties; + } + + @Override + public boolean isCompleted() { + return false; + } + + /** + * Executes the partition replacement in the destination Iceberg table. + * Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()} + * + * @throws IOException if an I/O error occurs during execution + */ + @Override + public void execute() throws IOException { + // Unlike IcebergRegisterStep::execute, which validates dest table metadata has not changed between copy entity + // generation and the post-copy commit, do no such validation here, so dest table writes may continue throughout + // our copying. any new data written in the meanwhile to THE SAME partitions we are about to overwrite will be + // clobbered and replaced by the copy entities from our execution. + IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr)); + List dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles); + try { + log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; numDataFiles: {}; path[0]: {}", + this.destTableIdStr, + this.partitionColName, + this.partitionValue, + dataFiles.size(), + dataFiles.get(0).path() + ); + Retryer overwritePartitionsRetryer = createOverwritePartitionsRetryer(); + overwritePartitionsRetryer.call(() -> { + destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue); + return null; + }); + log.info("~{}~ Successful partition overwrite - partition: {}; value: {}", + this.destTableIdStr, + this.partitionColName, + this.partitionValue + ); + } catch (ExecutionException executionException) { + String msg = String.format("~%s~ Failed to overwrite partitions", this.destTableIdStr); + log.error(msg, executionException); + throw new RuntimeException(msg, executionException.getCause()); + } catch (RetryException retryException) { + String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : ""; + String msg = String.format("~%s~ Failure attempting to overwrite partition [num failures: %d] %s", + this.destTableIdStr, + retryException.getNumberOfFailedAttempts(), + interruptedNote); + Throwable informativeException = retryException.getLastFailedAttempt().hasException() + ? retryException.getLastFailedAttempt().getExceptionCause() + : retryException; + log.error(msg, informativeException); + throw new RuntimeException(msg, informativeException); + } + } + + protected IcebergCatalog createDestinationCatalog() throws IOException { + return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION); + } + + private Retryer createOverwritePartitionsRetryer() { + Config config = ConfigFactory.parseProperties(this.properties); + Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) + ? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) + : ConfigFactory.empty(); + + return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.hasException()) { + String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]", + destTableIdStr, + attempt.getAttemptNumber(), + Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString()); + log.warn(msg, attempt.getExceptionCause()); + } + } + })); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java new file mode 100644 index 00000000000..42582f09e3a --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.function.CheckedExceptionFunction; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + *

+ * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + *

+ */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + // Currently hardcoded these transforms here but eventually it will depend on filter predicate implementation and can + // be moved to a common place or inside each filter predicate. + private static final List supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate partitionFilterPredicate; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IOException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication + // Differences are getting data files, copying ancestor permission and adding post publish steps + String fileSet = this.getFileSetId(); + List copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + Map destDataFileBySrcPath = calcDestDataFileBySrcPath(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (Map.Entry entry : calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) { + Path destPath = entry.getKey(); + FileStatus srcFileStatus = entry.getValue(); + // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + List destDataFiles = new ArrayList<>(destDataFileBySrcPath.values()); + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createOverwritePostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size()); + return copyEntities; + } + + private Map calcDestDataFileBySrcPath(List srcDataFiles) + throws IcebergTable.TableNotFoundException { + String fileSet = this.getFileSetId(); + Map destDataFileBySrcPath = Maps.newHashMap(); + if (srcDataFiles.isEmpty()) { + log.warn("~{}~ found no data files for partition col : {} with partition value : {} to copy", fileSet, + this.partitionColumnName, this.partitionColValue); + return destDataFileBySrcPath; + } + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata(); + PartitionSpec partitionSpec = destTableMetadata.spec(); + // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and + // doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string. + String srcWriteDataLocation = Optional.ofNullable(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, + "")).orElse(""); + String destWriteDataLocation = Optional.ofNullable(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, + "")).orElse(""); + if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) { + log.warn( + "~{}~ Either source or destination table does not have write data location : source table write data location : {} , destination table write data location : {}", + fileSet, + srcWriteDataLocation, + destWriteDataLocation + ); + } + srcDataFiles.forEach(dataFile -> { + String srcFilePath = dataFile.path().toString(); + Path updatedDestFilePath = relocateDestPath(srcFilePath, srcWriteDataLocation, destWriteDataLocation); + log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, srcFilePath, updatedDestFilePath); + destDataFileBySrcPath.put(new Path(srcFilePath), DataFiles.builder(partitionSpec) + .copy(dataFile) + .withPath(updatedDestFilePath.toString()) + .build()); + }); + log.info("~{}~ created {} destination data files", fileSet, destDataFileBySrcPath.size()); + return destDataFileBySrcPath; + } + + private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, String prefixToReplaceWith) { + String updPathStr = curPathStr.replace(prefixToBeReplaced, prefixToReplaceWith); + return addUUIDToPath(updPathStr); + } + + private Path addUUIDToPath(String filePathStr) { + Path filePath = new Path(filePathStr); + String fileDir = filePath.getParent().toString(); + String fileName = filePath.getName(); + String newFileName = String.join("-",UUID.randomUUID().toString(), fileName); + return new Path(fileDir, newFileName); + } + + private Map calcSrcFileStatusByDestFilePath(Map destDataFileBySrcPath) + throws IOException { + Function getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus); + Map srcFileStatusByDestFilePath = Maps.newHashMap(); + try { + srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet() + .stream() + .collect(Collectors.toMap(entry -> new Path(entry.getValue().path().toString()), + entry -> getFileStatus.apply(entry.getKey()))); + } catch (CheckedExceptionFunction.WrappedIOException wrapper) { + wrapper.rethrowWrapped(); + } + return srcFileStatusByDestFilePath; + } + + private PostPublishStep createOverwritePostPublishStep(List destDataFiles) { + byte[] serializedDataFiles = SerializationUtil.serializeToBytes(destDataFiles); + + IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep( + this.getDestIcebergTable().getTableId().toString(), + this.partitionColumnName, + this.partitionColValue, + serializedDataFiles, + this.properties + ); + + return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), icebergOverwritePartitionStep, 0); + } + + private Predicate createPartitionFilterPredicate() throws IOException { + //TODO: Refactor it later using factory or other way to support different types of filter predicate + // Also take into consideration creation of Expression Filter to be used in overwrite api + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + Optional partitionColumnIndexOpt = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex( + this.partitionColumnName, + srcTableMetadata, + supportedTransforms + ); + Preconditions.checkArgument(partitionColumnIndexOpt.isPresent(), String.format( + "Partition column %s not found in table %s", + this.partitionColumnName, this.getFileSetId())); + int partitionColumnIndex = partitionColumnIndexOpt.get(); + return new IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, this.partitionColValue); + } + +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java new file mode 100644 index 00000000000..581a265e383 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; + +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +/** + * Finder class for locating and creating partitioned Iceberg datasets. + *

+ * This class extends {@link IcebergDatasetFinder} and provides functionality to create + * {@link IcebergPartitionDataset} instances based on the specified source and destination Iceberg catalogs. + *

+ */ +@Slf4j +public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { + public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; + + public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { + super(sourceFs, properties); + } + + @Override + protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, + Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException { +// TODO: Add Validator for source and destination tables later + + String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + String partitionColumnValue = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_VALUE_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnValue), + "Partition value cannot be empty"); + + return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, fs, + getConfigShouldCopyMetadataPath(properties), partitionColumnName, partitionColumnValue); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index e802e102974..5221007cdc4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -20,20 +20,29 @@ import java.io.IOException; import java.net.URI; import java.time.Instant; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import com.google.common.annotations.VisibleForTesting; @@ -47,6 +56,7 @@ import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo; @@ -77,10 +87,11 @@ public TableNotFoundException(TableIdentifier tableId) { private final String datasetDescriptorPlatform; private final TableOperations tableOps; private final String catalogUri; + private final Table table; @VisibleForTesting - IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) { - this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri); + IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri, Table table) { + this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, table); } /** @return metadata info limited to the most recent (current) snapshot */ @@ -217,4 +228,74 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws TableNotFoundException if error occurred while accessing the table metadata + * @throws RuntimeException if error occurred while reading the manifest file + */ + public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) + throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + long currentSnapshotId = currentSnapshot.snapshotId(); + List knownDataFiles = new ArrayList<>(); + GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); + //TODO: Add support for deleteManifests as well later + // Currently supporting dataManifests only + List dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + for (ManifestFile manifestFile : dataManifestFiles) { + if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) { + log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}' total known iceberg datafiles", tableId, + currentSnapshotId, + manifestFile.path(), + knownDataFiles.size() + ); + } + try (ManifestReader manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator dataFiles = manifestReader.iterator()) { + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + knownDataFiles.add(dataFile.copy()); + } + }); + } catch (IOException e) { + String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read manifest file: %s", tableId, + currentSnapshotId, manifestFile.path()); + log.error(errMsg, e); + throw new IOException(errMsg, e); + } + } + return knownDataFiles; + } + + /** + * Overwrite partition data files in the table for the specified partition col name & partition value. + *

+ * Overwrite partition replaces the partition using the expression filter provided. + *

+ * @param dataFiles the list of data files to replace partitions with + * @param partitionColName the partition column name whose data files are to be replaced + * @param partitionValue the partition column value on which data files will be replaced + */ + protected void overwritePartition(List dataFiles, String partitionColName, String partitionValue) + throws TableNotFoundException { + if (dataFiles.isEmpty()) { + return; + } + log.info("~{}~ SnapshotId before overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); + OverwriteFiles overwriteFiles = this.table.newOverwrite(); + overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); + dataFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + this.tableOps.refresh(); + // Note : this would only arise in a high-frequency commit scenario, but there's no guarantee that the current + // snapshot is necessarily the one from the commit just before. another writer could have just raced to commit + // in between. + log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); + } + } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java new file mode 100644 index 00000000000..ee5d6acb281 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Objects; +import java.util.function.Predicate; + +import org.apache.iceberg.StructLike; + +/** + * Predicate implementation for filtering Iceberg partitions based on specified partition value. + *

+ * This class filters partitions by checking if the partition value matches the specified partition value. + *

+ */ +public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements Predicate { + private final int partitionColumnIndex; + private final String partitionValue; + + /** + * Constructs an {@code IcebergMatchesAnyPropNamePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnIndex the index of the partition column in partition spec + * @param partitionValue the partition value used to match + */ + public IcebergMatchesAnyPropNamePartitionFilterPredicate(int partitionColumnIndex, String partitionValue) { + this.partitionColumnIndex = partitionColumnIndex; + this.partitionValue = partitionValue; + } + + /** + * Check if the partition value matches the specified partition value. + * + * @param partition the partition to check + * @return {@code true} if the partition value matches the specified partition value, otherwise {@code false} + */ + @Override + public boolean test(StructLike partition) { + // Just a cautious check to avoid NPE, ideally partition shouldn't be null if table is partitioned + if (Objects.isNull(partition)) { + return false; + } + + Object partitionVal = partition.get(this.partitionColumnIndex, Object.class); + // Need this check to avoid NPE on partitionVal.toString() + if (Objects.isNull(partitionVal)) { + return false; + } + + return this.partitionValue.equals(partitionVal.toString()); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java new file mode 100644 index 00000000000..358fc9de1e0 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.TableMetadata; + +/** + * Utility class for creating and managing partition filter predicates for Iceberg tables. + *

+ * This class provides methods to retrieve the index of a partition column in the table metadata + * and ensures that the partition transform is supported. + *

+ *

+ * Note: This class is not meant to be instantiated. + *

+ */ +public class IcebergPartitionFilterPredicateUtil { + private IcebergPartitionFilterPredicateUtil() { + } + + /** + * Retrieves the index of the partition column from the partition spec in the table metadata. + * + * @param partitionColumnName the name of the partition column to find + * @param tableMetadata the metadata of the Iceberg table + * @param supportedTransforms a list of supported partition transforms + * @return the index of the partition column if found, otherwise -1 + * @throws IllegalArgumentException if the partition transform is not supported + */ + public static Optional getPartitionColumnIndex( + String partitionColumnName, + TableMetadata tableMetadata, + List supportedTransforms + ) throws IOException { + List partitionFields = tableMetadata.spec().fields(); + for (int idx = 0; idx < partitionFields.size(); idx++) { + PartitionField partitionField = partitionFields.get(idx); + if (partitionField.name().equals(partitionColumnName)) { + String transform = partitionField.transform().toString().toLowerCase(); + if (!supportedTransforms.contains(transform)) { + throw new IOException( + String.format(" For ~{%s:%d}~ Partition transform %s is not supported. Supported transforms are %s", + partitionColumnName, + idx, + transform, + supportedTransforms)); + } + return Optional.of(idx); + } + } + return Optional.empty(); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index b9babbc8888..a446fbb1a72 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; @@ -92,6 +93,7 @@ public class IcebergDatasetTest { private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a"; private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a"; private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b"; + private static final String REGISTER_COMMIT_STEP = IcebergRegisterStep.class.getName(); private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0, Arrays.asList( new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0, @@ -120,7 +122,10 @@ public void testGetDatasetDescriptor() throws URISyntaxException { TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName); String qualifiedTableName = "foo_prefix." + tableId.toString(); String platformName = "Floe"; - IcebergTable table = new IcebergTable(tableId, qualifiedTableName, platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI); + IcebergTable table = new IcebergTable(tableId, qualifiedTableName, platformName, + Mockito.mock(TableOperations.class), + SRC_CATALOG_URI, + Mockito.mock(Table.class)); FileSystem mockFs = Mockito.mock(FileSystem.class); Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI); DatasetDescriptor expected = new DatasetDescriptor(platformName, URI.create(SRC_CATALOG_URI), qualifiedTableName); @@ -428,17 +433,17 @@ private static void verifyCopyEntities(Collection copyEntities, List for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); if (isCopyableFile(json)) { - String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); + String filepath = CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json); actual.add(filepath); } else{ - verifyPostPublishStep(json); + verifyPostPublishStep(json, REGISTER_COMMIT_STEP); } } Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } - private static boolean isCopyableFile(String json) { + public static boolean isCopyableFile(String json) { String objectType = new Gson().fromJson(json, JsonObject.class) .getAsJsonPrimitive("object-type") .getAsString(); @@ -452,14 +457,14 @@ private static void verifyFsOwnershipAndPermissionPreservation(Collection ancestorFileOwnerAndPermissionsList = CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); - Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); + Path filePath = new Path(CopyEntityDeserializer.getOriginFilePathAsStringFromJson(copyEntityJson)); FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); // providing path's parent to verify ancestor owner and permissions verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), expectedPathsAndFileStatuses); } else { - verifyPostPublishStep(copyEntityJson); + verifyPostPublishStep(copyEntityJson, REGISTER_COMMIT_STEP); } } } @@ -481,8 +486,7 @@ private static void verifyAncestorPermissions(List Stream transformWithIndex(Stream inputs, BiFunction getAncestorOwnerAndPermissions(String json) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java new file mode 100644 index 00000000000..6e273ca2d64 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SerializationUtil; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; + +/** Tests for {@link IcebergOverwritePartitionsStep} */ +public class IcebergOverwritePartitionsStepTest { + private final String destTableIdStr = "db.foo"; + private final String testPartitionColName = "testPartition"; + private final String testPartitionColValue = "testValue"; + private IcebergTable mockIcebergTable; + private IcebergCatalog mockIcebergCatalog; + private Properties mockProperties; + private byte[] serializedDummyDataFiles; + private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep; + + @BeforeMethod + public void setUp() throws IOException { + mockIcebergTable = Mockito.mock(IcebergTable.class); + mockIcebergCatalog = Mockito.mock(IcebergCatalog.class); + mockProperties = new Properties(); + + List dummyDataFiles = createDummyDataFiles(); + serializedDummyDataFiles = SerializationUtil.serializeToBytes(dummyDataFiles); + + spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + testPartitionColName, testPartitionColValue, serializedDummyDataFiles, mockProperties)); + + Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); + Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog(); + } + + @Test + public void testNeverIsCompleted() { + Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted()); + } + + @Test + public void testExecute() { + try { + Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), + Mockito.anyString()); + spyIcebergOverwritePartitionsStep.execute(); + Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + @Test + public void testExecuteWithRetry() { + try { + // first call throw exception which will be retried and on second call nothing happens + Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + spyIcebergOverwritePartitionsStep.execute(); + Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + @Test + public void testExecuteWithDefaultRetry() throws IcebergTable.TableNotFoundException { + try { + // Always throw exception + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + spyIcebergOverwritePartitionsStep.execute(); + } catch (RuntimeException e) { + Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + assertRetryTimes(e, 3); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + @Test + public void testExecuteWithCustomRetryConfig() throws IOException { + int retryCount = 7; + mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES, + Integer.toString(retryCount)); + spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + testPartitionColName, testPartitionColValue, serializedDummyDataFiles, mockProperties)); + Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); + Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog(); + try { + // Always throw exception + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + spyIcebergOverwritePartitionsStep.execute(); + } catch (RuntimeException e) { + Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartition(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + assertRetryTimes(e, retryCount); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + private List createDummyDataFiles() { + DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/db/foo/data/datafile1.orc") + .withFileSizeInBytes(1234) + .withRecordCount(100) + .build(); + + DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/db/foo/data/datafile2.orc") + .withFileSizeInBytes(9876) + .withRecordCount(50) + .build(); + + return ImmutableList.of(dataFile1, dataFile2); + } + + private void assertRetryTimes(RuntimeException re, Integer retryTimes) { + String msg = String.format("~%s~ Failure attempting to overwrite partition [num failures: %d]", destTableIdStr, retryTimes); + Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage()); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java new file mode 100644 index 00000000000..7e501972859 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyContext; +import org.apache.gobblin.data.management.copy.PreserveAttributes; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; +import org.apache.gobblin.dataset.DatasetDescriptor; + +import static org.mockito.ArgumentMatchers.any; + + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */ +public class IcebergPartitionDatasetTest { + private IcebergTable srcIcebergTable; + private IcebergTable destIcebergTable; + private TableMetadata srcTableMetadata; + private TableMetadata destTableMetadata; + private static FileSystem sourceFs; + private static FileSystem targetFs; + private IcebergPartitionDataset icebergPartitionDataset; + private MockedStatic icebergPartitionFilterPredicateUtil; + private static final String SRC_TEST_DB = "srcTestDB"; + private static final String SRC_TEST_TABLE = "srcTestTable"; + private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" + SRC_TEST_TABLE + "/data"; + private static final String DEST_TEST_DB = "destTestDB"; + private static final String DEST_TEST_TABLE = "destTestTable"; + private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + DEST_TEST_TABLE + "/data"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = "testPartition"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = "testValue"; + private static final String OVERWRITE_COMMIT_STEP = IcebergOverwritePartitionsStep.class.getName(); + private final Properties copyConfigProperties = new Properties(); + private final Properties properties = new Properties(); + private static final URI SRC_FS_URI; + private static final URI DEST_FS_URI; + + static { + try { + SRC_FS_URI = new URI("abc", "the.source.org", "/", null); + DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null); + } catch (URISyntaxException e) { + throw new RuntimeException("should not occur!", e); + } + } + + @BeforeMethod + public void setUp() throws Exception { + setupSrcFileSystem(); + setupDestFileSystem(); + + TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB, SRC_TEST_TABLE); + + srcIcebergTable = Mockito.mock(IcebergTable.class); + destIcebergTable = Mockito.mock(IcebergTable.class); + + srcTableMetadata = Mockito.mock(TableMetadata.class); + destTableMetadata = Mockito.mock(TableMetadata.class); + Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class)); + + Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata); + Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata); + Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + + icebergPartitionFilterPredicateUtil = Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class); + icebergPartitionFilterPredicateUtil + .when(() -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(), Mockito.any())) + .thenReturn(Optional.of(0)); + + copyConfigProperties.setProperty("data.publisher.final.dir", "/test"); + } + + @AfterMethod + public void cleanUp() { + icebergPartitionFilterPredicateUtil.close(); + } + + @Test + public void testGenerateCopyEntities() throws IOException { + List srcFilePaths = new ArrayList<>(); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + Map mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn( + new ArrayList<>(mockDataFilesBySrcPath.values())); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), true); + } + + @Test + public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException { + List srcDataFiles = Lists.newArrayList(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true, TEST_ICEBERG_PARTITION_COLUMN_NAME, TEST_ICEBERG_PARTITION_COLUMN_VALUE); + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, + Mockito.mock(CopyConfiguration.class)); + + // Since No data files are present, no copy entities should be generated + verifyCopyEntities(copyEntities, Collections.emptyList(), true); + } + + @Test + public void testMultipleCopyEntitiesGenerated() throws IOException { + List srcFilePaths = new ArrayList<>(); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc"); + + Map mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn( + new ArrayList<>(mockDataFilesBySrcPath.values())); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), true); + } + + @Test + public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException { + List srcFilePaths = new ArrayList<>(); + srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc"); + Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(SRC_WRITE_LOCATION); + Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(DEST_WRITE_LOCATION); + + Map mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn( + new ArrayList<>(mockDataFilesBySrcPath.values())); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + List copyEntities = + (List) icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), false); + } + + private static void setupSrcFileSystem() throws IOException { + sourceFs = Mockito.mock(FileSystem.class); + Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); + Mockito.when(sourceFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(SRC_FS_URI, new Path("/"))); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> { + Path path = invocation.getArgument(0, Path.class); + Path qualifiedPath = sourceFs.makeQualified(path); + return IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString()); + }); + } + + private static void setupDestFileSystem() throws IOException { + targetFs = Mockito.mock(FileSystem.class); + Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI); + Mockito.when(targetFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(DEST_FS_URI, new Path("/"))); + // Since we are adding UUID to the file name for every file while creating destination path, + // so return file not found exception if trying to find file status on destination file system + Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); + } + + private static Map createDataFileMocksBySrcPath(List srcFilePaths) throws IOException { + Map dataFileMocksBySrcPath = new HashMap<>(); + for (String srcFilePath : srcFilePaths) { + DataFile dataFile = Mockito.mock(DataFile.class); + Path dataFilePath = new Path(srcFilePath); + String qualifiedPath = sourceFs.makeQualified(dataFilePath).toString(); + Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString()); + Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn( + IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath)); + dataFileMocksBySrcPath.put(qualifiedPath, dataFile); + } + return dataFileMocksBySrcPath; + } + + private static void verifyCopyEntities(Collection copyEntities, List expectedSrcFilePaths, + boolean sameSrcAndDestWriteLocation) { + List actualSrcFilePaths = new ArrayList<>(); + String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION; + String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation ? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION); + String srcErrorMsg = String.format("Source Location should start with %s", srcWriteLocationStart); + String destErrorMsg = String.format("Destination Location should start with %s", destWriteLocationStart); + for (CopyEntity copyEntity : copyEntities) { + String json = copyEntity.toString(); + if (IcebergDatasetTest.isCopyableFile(json)) { + String originFilepath = IcebergDatasetTest.CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json); + actualSrcFilePaths.add(originFilepath); + String destFilepath = IcebergDatasetTest.CopyEntityDeserializer.getDestinationFilePathAsStringFromJson(json); + Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart), srcErrorMsg); + Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart), destErrorMsg); + String originFileName = originFilepath.substring(srcWriteLocationStart.length() + 1); + String destFileName = destFilepath.substring(destWriteLocationStart.length() + 1); + Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect file name in destination path"); + Assert.assertTrue(destFileName.length() > originFileName.length() + 1, + "Destination file name should be longer than source file name as UUID is appended"); + } else{ + IcebergDatasetTest.verifyPostPublishStep(json, OVERWRITE_COMMIT_STEP); + } + } + Assert.assertEquals(actualSrcFilePaths.size(), expectedSrcFilePaths.size(), + "Set" + actualSrcFilePaths + " vs Set" + expectedSrcFilePaths); + Assert.assertEqualsNoOrder(actualSrcFilePaths.toArray(), expectedSrcFilePaths.toArray()); + } + + /** + * See {@link org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetTest.TrickIcebergDataset} + * */ + protected static class TestIcebergPartitionDataset extends IcebergPartitionDataset { + + public TestIcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, + Properties properties, FileSystem sourceFs, boolean shouldIncludeMetadataPath) throws IOException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath, + TEST_ICEBERG_PARTITION_COLUMN_NAME, TEST_ICEBERG_PARTITION_COLUMN_VALUE); + } + + @Override + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) { + return this.sourceFs; + } + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index a1a29444ed0..63aa27221b7 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -21,11 +21,13 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -33,8 +35,11 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -61,7 +66,7 @@ public class IcebergTableTest extends HiveMetastoreTest { .fields() .name("id") .type() - .longType() + .stringType() .noDefault() .endRecord(); protected static final Schema icebergSchema = AvroSchemaUtil.toIceberg(avroDataSchema); @@ -71,6 +76,7 @@ public class IcebergTableTest extends HiveMetastoreTest { private final String dbName = "myicebergdb"; private final String tableName = "justtesting"; + private final String destTableName = "destTable"; private TableIdentifier tableId; private Table table; private String catalogUri; @@ -85,7 +91,7 @@ public void setUp() throws Exception { @BeforeMethod public void setUpEachTest() { tableId = TableIdentifier.of(dbName, tableName); - table = catalog.createTable(tableId, icebergSchema); + table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec); catalogUri = catalog.getConf().get(CatalogProperties.URI); metadataBasePath = calcMetadataBasePath(tableId); } @@ -106,7 +112,8 @@ public void testGetCurrentSnapshotInfo() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo(); + IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)).getCurrentSnapshotInfo(); verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size()); } @@ -114,7 +121,8 @@ public void testGetCurrentSnapshotInfo() throws IOException { @Test(expectedExceptions = IcebergTable.TableNotFoundException.class) public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS"); - IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo(); + IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri, + null).getCurrentSnapshotInfo(); Assert.fail("expected an exception when using table ID '" + bogusTableId + "'"); } @@ -129,7 +137,8 @@ public void testGetAllSnapshotInfosIterator() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getAllSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -149,7 +158,8 @@ public void testGetIncrementalSnapshotInfosIterator() throws IOException { ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -169,7 +179,8 @@ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOExce ); initializeSnapshots(table, perSnapshotFilesets); - List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator()); + List snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), + catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator()); Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots"); for (int i = 0; i < snapshotInfos.size(); ++i) { @@ -197,10 +208,11 @@ public void testNewTablePropertiesAreRegistered() throws Exception { // Expect existing property values to be deleted if it does not exist on the source destTableProperties.put("deletedTableProperty", "deletedTablePropertyValue"); - TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable"); + TableIdentifier destTableId = TableIdentifier.of(dbName, destTableName); catalog.createTable(destTableId, icebergSchema, null, destTableProperties); - IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri); + IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri, + catalog.loadTable(destTableId)); // Mock a source table with the same table UUID copying new properties TableMetadata newSourceTableProperties = destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties); @@ -209,6 +221,86 @@ public void testNewTablePropertiesAreRegistered() throws Exception { Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"), "newValue"); Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"), "testValueNew"); Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty")); + + catalog.dropTable(destTableId); + } + + /** Verify that getPartitionSpecificDataFiles return datafiles belonging to the partition defined by predicate */ + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + // Note - any specific file path format is not mandatory to be mapped to specific partition + List paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file4.orc", + "/path/tableName/data/id=3/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + + addPartitionDataFiles(table, createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v -> partitionData)))); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate alwaysTruePredicate = partition -> true; + Predicate alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + } + + /** Verify that overwritePartition replace data files belonging to given partition col and value */ + @Test + public void testOverwritePartition() throws IOException { + // Note - any specific file path format is not mandatory to be mapped to specific partition + List paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partition1Data = new PartitionData(icebergPartitionSpec.partitionType()); + partition1Data.set(0, "1"); + + addPartitionDataFiles(table, createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v -> partition1Data)))); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List paths2 = Arrays.asList( + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partition2Data = new PartitionData(icebergPartitionSpec.partitionType()); + partition2Data.set(0, "2"); + + List partition2DataFiles = createDataFiles(paths2.stream().collect(Collectors.toMap(Function.identity(), v -> partition2Data))); + // here, since partition data with value 2 doesn't exist yet, + // we expect it to get added to the table, w/o changing or deleting any other partitions + icebergTable.overwritePartition(partition2DataFiles, "id", "2"); + List expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List paths3 = Arrays.asList( + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file6.orc" + ); + // Reusing same partition data to create data file with different paths + List partition1NewDataFiles = createDataFiles(paths3.stream().collect(Collectors.toMap(Function.identity(), v -> partition1Data))); + // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path + icebergTable.overwritePartition(partition1NewDataFiles, "id", "1"); + List expectedPaths3 = new ArrayList<>(paths2); + expectedPaths3.addAll(paths3); + verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); } /** full validation for a particular {@link IcebergSnapshotInfo} */ @@ -333,4 +425,25 @@ protected static void verifyAnyOrder(Collection actual, Collection exp protected static > List flatten(Collection cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + private static void addPartitionDataFiles(Table table, List dataFiles) { + dataFiles.forEach(dataFile -> table.newAppend().appendFile(dataFile).commit()); + } + + private static List createDataFiles(Map pathWithPartitionData) { + return pathWithPartitionData.entrySet().stream() + .map(e -> createDataFileWithPartition(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + } + + private static DataFile createDataFileWithPartition(String path, PartitionData partitionData) { + return DataFiles.builder(icebergPartitionSpec) + .withPath(path) + .withFileSizeInBytes(8) + .withRecordCount(1) + .withPartition(partitionData) + .withFormat(FileFormat.ORC) + .build(); + } + } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java new file mode 100644 index 00000000000..4eb16500e64 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import org.apache.iceberg.StructLike; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate} */ +public class IcebergMatchesAnyPropNamePartitionFilterPredicateTest { + private static final String TEST_PARTITION_VALUE_1 = "value1"; + private IcebergMatchesAnyPropNamePartitionFilterPredicate predicate; + + @BeforeMethod + public void setup() { + predicate = new IcebergMatchesAnyPropNamePartitionFilterPredicate(0, TEST_PARTITION_VALUE_1); + } + + @Test + public void testPartitionValueNULL() { + // Just mocking, so that the partition value is NULL + Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class))); + } + + @Test + public void testWhenPartitionIsNull() { + Assert.assertFalse(predicate.test(null)); + } + + @Test + public void testPartitionValueMatch() { + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn("value1"); + Assert.assertTrue(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValueDoesNotMatch() { + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(""); + Assert.assertFalse(predicate.test(mockPartition)); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java new file mode 100644 index 00000000000..9743b5ab624 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.io.IOException; +import java.util.List; + +import java.util.Optional; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.transforms.Transform; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil} */ +public class IcebergPartitionFilterPredicateUtilTest { + private static TableMetadata mockTableMetadata; + private final List supportedTransforms = ImmutableList.of("supported1", "supported2"); + + @Test + public void testPartitionTransformNotSupported() { + setupMockData("col1", "unsupported"); + IOException exception = Assert.expectThrows(IOException.class, () -> { + IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", mockTableMetadata, supportedTransforms); + }); + Assert.assertTrue(exception.getMessage().contains("Partition transform unsupported is not supported. Supported transforms are [supported1, supported2]")); + } + + @Test + public void testPartitionTransformSupported() throws IOException { + setupMockData("col1", "supported1"); + int result = + IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", mockTableMetadata, supportedTransforms) + .get(); + Assert.assertEquals(result, 0); + } + + @Test + public void testPartitionColumnNotFound() throws IOException { + setupMockData("col", "supported1"); + Optional result = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2", + mockTableMetadata, supportedTransforms); + Assert.assertFalse(result.isPresent()); + } + + @Test + public void testPartitionColumnFoundIndex1() throws IOException { + mockTableMetadata = Mockito.mock(TableMetadata.class); + PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class); + PartitionField mockPartitionField1 = Mockito.mock(PartitionField.class); + PartitionField mockPartitionField2 = Mockito.mock(PartitionField.class); + Transform mockTransform1 = Mockito.mock(Transform.class); + Transform mockTransform2 = Mockito.mock(Transform.class); + + List partitionFields = ImmutableList.of(mockPartitionField1, mockPartitionField2); + + Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec); + Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields); + Mockito.when(mockPartitionField1.name()).thenReturn("col1"); + Mockito.when(mockPartitionField1.transform()).thenReturn(mockTransform1); + Mockito.when(mockTransform1.toString()).thenReturn("supported1"); + Mockito.when(mockPartitionField2.name()).thenReturn("col2"); + Mockito.when(mockPartitionField2.transform()).thenReturn(mockTransform2); + Mockito.when(mockTransform2.toString()).thenReturn("supported2"); + + int result = + IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2", mockTableMetadata, supportedTransforms) + .get(); + Assert.assertEquals(result, 1); + } + + private static void setupMockData(String name, String transform) { + mockTableMetadata = Mockito.mock(TableMetadata.class); + + PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class); + PartitionField mockPartitionField = Mockito.mock(PartitionField.class); + Transform mockTransform = Mockito.mock(Transform.class); + + List partitionFields = ImmutableList.of(mockPartitionField); + + Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec); + Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields); + Mockito.when(mockPartitionField.name()).thenReturn(name); + Mockito.when(mockPartitionField.transform()).thenReturn(mockTransform); + Mockito.when(mockTransform.toString()).thenReturn(transform); + } +} \ No newline at end of file