From bb2efa48edc2860e1412fd352b3e404299dcfcd2 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Sat, 2 Mar 2024 23:35:35 +0530 Subject: [PATCH] Hive: Add View support for HIVE catalog --- .palantir/revapi.yml | 13 + .../NoSuchIcebergViewException.java | 36 ++ .../java/org/apache/iceberg/BaseMetadata.java | 53 +++ .../apache/iceberg/BaseMetastoreCatalog.java | 30 +- .../iceberg/BaseMetastoreOperations.java | 119 ++++++ .../iceberg/BaseMetastoreTableOperations.java | 96 +---- .../java/org/apache/iceberg/CatalogUtil.java | 37 ++ .../apache/iceberg/MetadataTableUtils.java | 5 +- .../org/apache/iceberg/TableMetadata.java | 23 +- .../iceberg/inmemory/InMemoryCatalog.java | 2 +- .../iceberg/view/BaseViewOperations.java | 3 +- .../org/apache/iceberg/view/ViewMetadata.java | 8 +- .../org/apache/iceberg/hive/HiveCatalog.java | 200 +++++++++- .../iceberg/hive/HiveOperationsBase.java | 344 +++++++++++++++++- .../iceberg/hive/HiveTableOperations.java | 280 ++------------ .../iceberg/hive/HiveViewOperations.java | 185 ++++++++++ .../apache/iceberg/hive/TestHiveCatalog.java | 2 +- .../apache/iceberg/hive/TestHiveCommits.java | 8 +- .../iceberg/hive/TestHiveViewCatalog.java | 72 ++++ 19 files changed, 1116 insertions(+), 400 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseMetadata.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java create mode 100644 hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java create mode 100644 hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a41d3ddfb8df..368988798028 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -968,6 +968,9 @@ acceptedBreaks: - code: "java.class.removed" old: "class org.apache.iceberg.rest.requests.UpdateTableRequest.Builder" justification: "Removing deprecated code" + - code: "java.class.removed" + old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" + justification: "Moved CommitStatus to BaseMetastoreOperations for better reach" - code: "java.class.removed" old: "interface org.apache.iceberg.actions.RewriteStrategy" justification: "Removing deprecated code" @@ -977,6 +980,10 @@ acceptedBreaks: - code: "java.field.serialVersionUIDChanged" new: "field org.apache.iceberg.util.SerializableMap.serialVersionUID" justification: "Serialization is not be used" + - code: "java.method.removed" + old: "method java.lang.String org.apache.iceberg.BaseMetastoreCatalog::fullTableName(java.lang.String,\ + \ org.apache.iceberg.catalog.TableIdentifier)" + justification: "Moved to CatalogUtil for better usability" - code: "java.method.removed" old: "method org.apache.iceberg.TableOperations org.apache.iceberg.BaseMetadataTable::operations()" justification: "Removing deprecated code" @@ -1018,6 +1025,12 @@ acceptedBreaks: old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::(org.apache.iceberg.Table,\ \ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)" justification: "Removing deprecated code" + - code: "java.method.returnTypeChanged" + old: "method org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\ + \ org.apache.iceberg.TableMetadata)" + new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\ + \ org.apache.iceberg.TableMetadata)" + justification: "Moved CommitStatus to BaseMetastoreOperations for better reach" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java new file mode 100644 index 000000000000..bc5da2aee280 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */ +public class NoSuchIcebergViewException extends NoSuchViewException { + @FormatMethod + public NoSuchIcebergViewException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new NoSuchIcebergViewException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadata.java b/core/src/main/java/org/apache/iceberg/BaseMetadata.java new file mode 100644 index 000000000000..033349df99b2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseMetadata.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.iceberg.util.PropertyUtil; + +/** A base class for {@link TableMetadata} and {@link org.apache.iceberg.view.ViewMetadata} */ +public interface BaseMetadata extends Serializable { + + String location(); + + Map properties(); + + @Nullable + String metadataFileLocation(); + + Schema schema(); + + default String property(String property, String defaultValue) { + return properties().getOrDefault(property, defaultValue); + } + + default boolean propertyAsBoolean(String property, boolean defaultValue) { + return PropertyUtil.propertyAsBoolean(properties(), property, defaultValue); + } + + default int propertyAsInt(String property, int defaultValue) { + return PropertyUtil.propertyAsInt(properties(), property, defaultValue); + } + + default long propertyAsLong(String property, long defaultValue) { + return PropertyUtil.propertyAsLong(properties(), property, defaultValue); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index bb7d5a0ffd9d..84765f949a6c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -56,7 +56,8 @@ public Table loadTable(TableIdentifier identifier) { } } else { - result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); + result = + new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter()); } } else if (isValidMetadataIdentifier(identifier)) { @@ -88,7 +89,7 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile); ops.commit(null, metadata); - return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); + return new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter()); } @Override @@ -203,7 +204,7 @@ public Table create() { throw new AlreadyExistsException("Table was created concurrently: %s", identifier); } - return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); + return new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter()); } @Override @@ -284,29 +285,6 @@ private Map tableOverrideProperties() { } } - protected static String fullTableName(String catalogName, TableIdentifier identifier) { - StringBuilder sb = new StringBuilder(); - - if (catalogName.contains("/") || catalogName.contains(":")) { - // use / for URI-like names: thrift://host:port/db.table - sb.append(catalogName); - if (!catalogName.endsWith("/")) { - sb.append("/"); - } - } else { - // use . for non-URI named catalogs: prod.db.table - sb.append(catalogName).append("."); - } - - for (String level : identifier.namespace().levels()) { - sb.append(level).append("."); - } - - sb.append(identifier.name()); - - return sb.toString(); - } - protected MetricsReporter metricsReporter() { if (metricsReporter == null) { metricsReporter = CatalogUtil.loadMetricsReporter(properties()); diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java new file mode 100644 index 000000000000..caae2a1fab0f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseMetastoreOperations { + private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class); + + public enum CommitStatus { + FAILURE, + SUCCESS, + UNKNOWN + } + + /** + * Attempt to load the entity and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but don't have proof that this is the case. Note that all + * the previous locations must also be searched on the chance that a second committer was able to + * successfully commit on top of our commit. + * + * @param entityName full name of the entity + * @param newMetadataLocation the path of the new commit file + * @param properties properties for retry + * @param loadMetadataLocations supply all the metadata locations + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatus( + String entityName, + String newMetadataLocation, + Map properties, + Supplier> loadMetadataLocations) { + int maxAttempts = + PropertyUtil.propertyAsInt( + properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); + long minWaitMs = + PropertyUtil.propertyAsLong( + properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT); + long maxWaitMs = + PropertyUtil.propertyAsLong( + properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT); + long totalRetryMs = + PropertyUtil.propertyAsLong( + properties, + COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, + COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); + + AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(maxAttempts) + .suppressFailureWhenFinished() + .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) + .onFailure( + (location, checkException) -> + LOG.error("Cannot check if commit to {} exists.", entityName, checkException)) + .run( + location -> { + List allMetadataLocations = loadMetadataLocations.get(); + boolean commitSuccess = allMetadataLocations.contains(newMetadataLocation); + + if (commitSuccess) { + LOG.info( + "Commit status check: Commit to {} of {} succeeded", + entityName, + newMetadataLocation); + status.set(CommitStatus.SUCCESS); + } else { + LOG.warn( + "Commit status check: Commit to {} of {} unknown, new metadata location is not current " + + "or in history", + entityName, + newMetadataLocation); + } + }); + + if (status.get() == CommitStatus.UNKNOWN) { + LOG.error( + "Cannot determine commit state to {}. Failed during checking {} times. " + + "Treating commit state as unknown.", + entityName, + maxAttempts); + } + return status.get(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 2fccef5a0ab3..cbcd24d82124 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -18,20 +18,13 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS; -import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT; - +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -42,14 +35,15 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.LocationUtil; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BaseMetastoreTableOperations implements TableOperations { +public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations + implements TableOperations { private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class); public static final String TABLE_TYPE_PROP = "table_type"; @@ -291,12 +285,6 @@ public long newSnapshotId() { }; } - protected enum CommitStatus { - FAILURE, - SUCCESS, - UNKNOWN - } - /** * Attempt to load the table and see if any current or past metadata location matches the one we * were attempting to set. This is used as a last resort when we are dealing with exceptions that @@ -309,65 +297,21 @@ protected enum CommitStatus { * @return Commit Status of Success, Failure or Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - int maxAttempts = - PropertyUtil.propertyAsInt( - config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT); - long minWaitMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_MIN_WAIT_MS, - COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT); - long maxWaitMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_MAX_WAIT_MS, - COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT); - long totalRetryMs = - PropertyUtil.propertyAsLong( - config.properties(), - COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS, - COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT); - - AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); - - Tasks.foreach(newMetadataLocation) - .retry(maxAttempts) - .suppressFailureWhenFinished() - .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) - .onFailure( - (location, checkException) -> - LOG.error("Cannot check if commit to {} exists.", tableName(), checkException)) - .run( - location -> { - TableMetadata metadata = refresh(); - String currentMetadataFileLocation = metadata.metadataFileLocation(); - boolean commitSuccess = - currentMetadataFileLocation.equals(newMetadataLocation) - || metadata.previousFiles().stream() - .anyMatch(log -> log.file().equals(newMetadataLocation)); - if (commitSuccess) { - LOG.info( - "Commit status check: Commit to {} of {} succeeded", - tableName(), - newMetadataLocation); - status.set(CommitStatus.SUCCESS); - } else { - LOG.warn( - "Commit status check: Commit to {} of {} unknown, new metadata location is not current " - + "or in history", - tableName(), - newMetadataLocation); - } - }); - - if (status.get() == CommitStatus.UNKNOWN) { - LOG.error( - "Cannot determine commit state to {}. Failed during checking {} times. " - + "Treating commit state as unknown.", - tableName(), - maxAttempts); - } - return status.get(); + return checkCommitStatus( + tableName(), newMetadataLocation, config.properties(), this::loadMetadataLocations); + } + + protected List loadMetadataLocations() { + TableMetadata metadata = refresh(); + Preconditions.checkNotNull(metadata, "Unexpected null table metadata"); + ImmutableList.Builder builder = ImmutableList.builder(); + return builder + .add(metadata.metadataFileLocation()) + .addAll( + metadata.previousFiles().stream() + .map(TableMetadata.MetadataLogEntry::file) + .collect(Collectors.toList())) + .build(); } private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index e09be748f2ee..ebb35dd6cd22 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; @@ -46,6 +47,7 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.apache.iceberg.view.ViewMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +138,18 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { deleteFile(io, metadata.metadataFileLocation(), "metadata"); } + /** + * Drops view metadata files referenced by ViewMetadata. + * + *

This should be called by dropView implementations + * + * @param io a FileIO to use for deletes + * @param metadata the last valid ViewMetadata instance for a dropped view. + */ + public static void dropViewMetaData(FileIO io, ViewMetadata metadata) { + deleteFile(io, metadata.metadataFileLocation(), "metadata"); + } + @SuppressWarnings("DangerousStringInternUsage") private static void deleteFiles(FileIO io, Set allManifests) { // keep track of deleted files in a map that can be cleaned up when memory runs low @@ -473,4 +487,27 @@ public static MetricsReporter loadMetricsReporter(Map properties return reporter; } + + public static String fullTableName(String catalogName, TableIdentifier identifier) { + StringBuilder sb = new StringBuilder(); + + if (catalogName.contains("/") || catalogName.contains(":")) { + // use / for URI-like names: thrift://host:port/db.table + sb.append(catalogName); + if (!catalogName.endsWith("/")) { + sb.append("/"); + } + } else { + // use . for non-URI named catalogs: prod.db.table + sb.append(catalogName).append("."); + } + + for (String level : identifier.namespace().levels()) { + sb.append(level).append("."); + } + + sb.append(identifier.name()); + + return sb.toString(); + } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index adb0f18ba1ad..4dbff2d94664 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -100,9 +100,8 @@ public static Table createMetadataTableInstance( TableIdentifier baseTableIdentifier, TableIdentifier metadataTableIdentifier, MetadataTableType type) { - String baseTableName = BaseMetastoreCatalog.fullTableName(catalogName, baseTableIdentifier); - String metadataTableName = - BaseMetastoreCatalog.fullTableName(catalogName, metadataTableIdentifier); + String baseTableName = CatalogUtil.fullTableName(catalogName, baseTableIdentifier); + String metadataTableName = CatalogUtil.fullTableName(catalogName, metadataTableIdentifier); return createMetadataTableInstance(ops, baseTableName, metadataTableName, type); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 9587c57a0fd2..ac4f8a524577 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg; -import java.io.Serializable; import java.util.Collection; import java.util.List; import java.util.Map; @@ -47,7 +46,7 @@ import org.apache.iceberg.util.SerializableSupplier; /** Metadata for a table. */ -public class TableMetadata implements Serializable { +public class TableMetadata implements BaseMetadata { static final long INITIAL_SEQUENCE_NUMBER = 0; static final long INVALID_SEQUENCE_NUMBER = -1; static final int DEFAULT_TABLE_FORMAT_VERSION = 2; @@ -390,6 +389,7 @@ public int formatVersion() { return formatVersion; } + @Override public String metadataFileLocation() { return metadataFileLocation; } @@ -414,6 +414,7 @@ public int lastColumnId() { return lastColumnId; } + @Override public Schema schema() { return schemasById.get(currentSchemaId); } @@ -470,30 +471,16 @@ public Map sortOrdersById() { return sortOrdersById; } + @Override public String location() { return location; } + @Override public Map properties() { return properties; } - public String property(String property, String defaultValue) { - return properties.getOrDefault(property, defaultValue); - } - - public boolean propertyAsBoolean(String property, boolean defaultValue) { - return PropertyUtil.propertyAsBoolean(properties, property, defaultValue); - } - - public int propertyAsInt(String property, int defaultValue) { - return PropertyUtil.propertyAsInt(properties, property, defaultValue); - } - - public long propertyAsLong(String property, long defaultValue) { - return PropertyUtil.propertyAsLong(properties, property, defaultValue); - } - public Snapshot snapshot(long snapshotId) { if (!snapshotsById.containsKey(snapshotId)) { ensureSnapshotsLoaded(); diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index a880f94f4385..c348408824aa 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -376,7 +376,7 @@ private class InMemoryTableOperations extends BaseMetastoreTableOperations { InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { this.fileIO = fileIO; this.tableIdentifier = tableIdentifier; - this.fullTableName = fullTableName(catalogName, tableIdentifier); + this.fullTableName = CatalogUtil.fullTableName(catalogName, tableIdentifier); } @Override diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java index 766d217346e0..7a4f546b8860 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -35,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BaseViewOperations implements ViewOperations { +public abstract class BaseViewOperations extends BaseMetastoreOperations implements ViewOperations { private static final Logger LOG = LoggerFactory.getLogger(BaseViewOperations.class); private static final String METADATA_FOLDER_NAME = "metadata"; diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java index ae837ff96882..5148c79a763a 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.view; -import java.io.Serializable; import java.util.Comparator; import java.util.List; import java.util.Locale; @@ -29,6 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; +import org.apache.iceberg.BaseMetadata; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.ValidationException; @@ -46,7 +46,7 @@ @SuppressWarnings("ImmutablesStyle") @Value.Immutable(builder = false) @Value.Style(allParameters = true, visibilityString = "PACKAGE") -public interface ViewMetadata extends Serializable { +public interface ViewMetadata extends BaseMetadata { Logger LOG = LoggerFactory.getLogger(ViewMetadata.class); int SUPPORTED_VIEW_FORMAT_VERSION = 1; int DEFAULT_VIEW_FORMAT_VERSION = 1; @@ -55,6 +55,7 @@ public interface ViewMetadata extends Serializable { int formatVersion(); + @Override String location(); default Integer currentSchemaId() { @@ -78,10 +79,12 @@ default Integer currentSchemaId() { List history(); + @Override Map properties(); List changes(); + @Override @Nullable String metadataFileLocation(); @@ -121,6 +124,7 @@ default Map schemasById() { return builder.build(); } + @Override default Schema schema() { return schemasById().get(currentSchemaId()); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 63e4aad5d817..8ee01722ceb7 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -33,7 +34,6 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -46,6 +46,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; @@ -54,17 +55,25 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { +public class HiveCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Configurable { public static final String LIST_ALL_TABLES = "list-all-tables"; public static final String LIST_ALL_TABLES_DEFAULT = "false"; public static final String HMS_TABLE_OWNER = "hive.metastore.table.owner"; + public static final String VIEW_ORIGINAL_TEXT = "hive.view.original.text"; + public static final String VIEW_EXPANDED_TEXT = "hive.view.expanded.text"; public static final String HMS_DB_OWNER = "hive.metastore.database.owner"; public static final String HMS_DB_OWNER_TYPE = "hive.metastore.database.owner-type"; @@ -222,46 +231,176 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { @Override public void renameTable(TableIdentifier from, TableIdentifier originalTo) { - if (!isValidIdentifier(from)) { - throw new NoSuchTableException("Invalid identifier: %s", from); + renameEntity(from, originalTo, "Table"); + } + + @Override + public boolean dropView(TableIdentifier identifier) { + if (!isValidIdentifier(identifier)) { + return false; + } + + try { + String database = identifier.namespace().level(0); + String viewName = identifier.name(); + Table table = clients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView( + table, CatalogUtil.fullTableName(name, identifier)); + + HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier); + ViewMetadata lastViewMetadata = null; + + try { + lastViewMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn( + "Failed to load table metadata for table: {}, continuing drop without purge", + identifier, + e); + } + + clients.run( + client -> { + client.dropTable(database, viewName); + return null; + }); + + if (lastViewMetadata != null) { + CatalogUtil.dropViewMetaData(ops.io(), lastViewMetadata); + } + + LOG.info("Dropped View: {}", identifier); + return true; + + } catch (NoSuchViewException | NoSuchObjectException e) { + LOG.info("Skipping drop, View does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropView", e); + } + } + + @Override + public List listViews(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + + try { + return listTablesByType( + namespace, TableType.VIRTUAL_VIEW, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE); + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all views under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listViews", e); } + } + + private List listTablesByType( + Namespace namespace, TableType tableType, String tableTypeProp) + throws TException, InterruptedException { + String database = namespace.level(0); + List tableNames = clients.run(client -> client.getTables(database, "*", tableType)); + + // Retrieving the Table objects from HMS in batches to avoid OOM + List filteredTableIdentifiers = Lists.newArrayList(); + Iterable> tableNameSets = Iterables.partition(tableNames, 100); + + for (List tableNameSet : tableNameSets) { + filteredTableIdentifiers.addAll(filterIcebergTables(tableNameSet, namespace, tableTypeProp)); + } + + return filteredTableIdentifiers; + } + + private List filterIcebergTables( + List tableNames, Namespace namespace, String tableTypeProp) + throws TException, InterruptedException { + List tableObjects = + clients.run(client -> client.getTableObjectsByName(namespace.level(0), tableNames)); + return tableObjects.stream() + .filter( + table -> + table.getParameters() != null + && tableTypeProp.equalsIgnoreCase( + table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) + .map(table -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public void renameView(TableIdentifier from, TableIdentifier originalTo) { + if (!namespaceExists(originalTo.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", + from, originalTo, originalTo.namespace()); + } + renameEntity(from, originalTo, "View"); + } - TableIdentifier to = removeCatalogName(originalTo); + private void renameEntity( + TableIdentifier fromIdentifierEntity, TableIdentifier toIdentifierEntity, String entityType) { + if (!isValidIdentifier(fromIdentifierEntity)) { + throw new NoSuchViewException("Invalid identifier: %s", fromIdentifierEntity); + } + + TableIdentifier to = removeCatalogName(toIdentifierEntity); Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); String toDatabase = to.namespace().level(0); - String fromDatabase = from.namespace().level(0); - String fromName = from.name(); + String fromDatabase = fromIdentifierEntity.namespace().level(0); + String fromName = fromIdentifierEntity.name(); try { - Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); - HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, from)); + Table fromEntity = clients.run(client -> client.getTable(fromDatabase, fromName)); + if (entityType.equalsIgnoreCase("Table")) { + HiveOperationsBase.validateTableIsIceberg( + fromEntity, CatalogUtil.fullTableName(name, fromIdentifierEntity)); + } else { + HiveOperationsBase.validateTableIsIcebergView( + fromEntity, CatalogUtil.fullTableName(name, fromIdentifierEntity)); + } + + validateToTableForRename(fromIdentifierEntity, to); - table.setDbName(toDatabase); - table.setTableName(to.name()); + fromEntity.setDbName(toDatabase); + fromEntity.setTableName(to.name()); clients.run( client -> { - MetastoreUtil.alterTable(client, fromDatabase, fromName, table); + MetastoreUtil.alterTable(client, fromDatabase, fromName, fromEntity); return null; }); - LOG.info("Renamed table from {}, to {}", from, to); + LOG.info("Renamed {} from {}, to {}", entityType, fromIdentifierEntity, to); - } catch (NoSuchObjectException e) { - throw new NoSuchTableException("Table does not exist: %s", from); + } catch (NoSuchObjectException | NoSuchViewException e) { + if (entityType.equalsIgnoreCase("Table")) { + throw new NoSuchTableException("Table does not exist: %s", fromIdentifierEntity); + } else { + throw new NoSuchViewException( + "Cannot rename %s to %s. View does not exist", fromIdentifierEntity, to); + } } catch (InvalidOperationException e) { if (e.getMessage() != null && e.getMessage().contains(String.format("new table %s already exists", to))) { throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Table already exists: %s", to); + "%s already exists: %s", entityType, to); } else { - throw new RuntimeException("Failed to rename " + from + " to " + to, e); + throw new RuntimeException("Failed to rename " + fromIdentifierEntity + " to " + to, e); } } catch (TException e) { - throw new RuntimeException("Failed to rename " + from + " to " + to, e); + throw new RuntimeException("Failed to rename " + fromIdentifierEntity + " to " + to, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -269,6 +408,26 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { } } + private void validateToTableForRename(TableIdentifier from, TableIdentifier to) + throws TException, InterruptedException { + Table table = null; + + String toDatabase = to.namespace().level(0); + try { + table = clients.run(client -> client.getTable(toDatabase, to.name())); + } catch (NoSuchObjectException nte) { + LOG.trace("Table not found {}.{}", toDatabase, to.name(), nte); + } + + if (table != null) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. %s already exists", + from, + to, + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table"); + } + } + @Override public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( @@ -489,6 +648,11 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) { return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); } + @Override + protected ViewOperations newViewOps(TableIdentifier tableIdentifier) { + return new HiveViewOperations(conf, clients, fileIO, name, tableIdentifier); + } + @Override protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { // This is a little edgy since we basically duplicate the HMS location generation logic. diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java index ea24fe4e1133..a9962043dfa4 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java @@ -20,21 +20,39 @@ import java.util.Collections; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.BaseMetadata; +import org.apache.iceberg.BaseMetastoreOperations; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +69,7 @@ interface HiveOperationsBase { long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; String NO_LOCK_EXPECTED_KEY = "expected_parameter_key"; String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"; + String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view"; TableType tableType(); @@ -62,6 +81,57 @@ interface HiveOperationsBase { String table(); + String catalogName(); + + String entityType(); + + BaseMetastoreOperations.CommitStatus validateNewLocationAndReturnCommitStatus( + BaseMetadata metadata, String newMetadataLocation); + + default Table loadHmsTable() throws TException, InterruptedException { + try { + return metaClients().run(client -> client.getTable(database(), table())); + } catch (NoSuchObjectException nte) { + LOG.trace("{} not found {}", entityType(), fullName(), nte); + return null; + } + } + + void setHmsParameters( + BaseMetadata metadata, + Table tbl, + String newMetadataLocation, + Set obsoleteProps, + boolean hiveEngineEnabled); + + default void setCommonHmsParameters( + Table tbl, + String tableTypeProp, + String newMetadataLocation, + Schema schema, + String uuid, + Set obsoleteProps, + Supplier previousLocationSupplier) { + Map parameters = + Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); + + if (!obsoleteProps.contains(TableProperties.UUID) && uuid != null) { + parameters.put(TableProperties.UUID, uuid); + } + + parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation); + parameters.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, tableTypeProp); + + if (previousLocationSupplier.get() != null && !previousLocationSupplier.get().isEmpty()) { + parameters.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, + previousLocationSupplier.get()); + } + + setSchema(schema, parameters); + tbl.setParameters(parameters); + } + default Map hmsEnvContext(String metadataLocation) { return metadataLocation == null ? ImmutableMap.of() @@ -76,10 +146,10 @@ default boolean exposeInHmsProperties() { return maxHiveTablePropertySize() > 0; } - default void setSchema(TableMetadata metadata, Map parameters) { + default void setSchema(Schema tableSchema, Map parameters) { parameters.remove(TableProperties.CURRENT_SCHEMA); - if (exposeInHmsProperties() && metadata.schema() != null) { - String schema = SchemaParser.toJson(metadata.schema()); + if (exposeInHmsProperties() && tableSchema != null) { + String schema = SchemaParser.toJson(tableSchema); setField(parameters, TableProperties.CURRENT_SCHEMA, schema); } } @@ -103,6 +173,18 @@ static void validateTableIsIceberg(Table table, String fullName) { tableType); } + static void validateTableIsIcebergView(Table table, String fullName) { + String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); + NoSuchIcebergViewException.check( + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + && tableTypeProp != null + && tableTypeProp.equalsIgnoreCase(ICEBERG_VIEW_TYPE_VALUE), + "Not an iceberg view: %s (type=%s) (tableType=%s)", + fullName, + tableTypeProp, + table.getTableType()); + } + default void persistTable(Table hmsTable, boolean updateHiveTable, String metadataLocation) throws TException, InterruptedException { if (updateHiveTable) { @@ -123,13 +205,14 @@ default void persistTable(Table hmsTable, boolean updateHiveTable, String metada } } - static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { - + static StorageDescriptor storageDescriptor( + Schema schema, String location, boolean hiveEngineEnabled) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); - storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema())); - storageDescriptor.setLocation(metadata.location()); + storageDescriptor.setCols(HiveSchemaUtil.convert(schema)); + storageDescriptor.setLocation(location); SerDeInfo serDeInfo = new SerDeInfo(); serDeInfo.setParameters(Maps.newHashMap()); + if (hiveEngineEnabled) { storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); @@ -139,18 +222,33 @@ static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveE storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); } + storageDescriptor.setSerdeInfo(serDeInfo); return storageDescriptor; } - static void cleanupMetadata(FileIO io, String commitStatus, String metadataLocation) { + default void cleanupMetadataAndUnlock( + HiveLock lock, + FileIO io, + BaseMetastoreOperations.CommitStatus commitStatus, + String metadataLocation) { try { - if (commitStatus.equalsIgnoreCase("FAILURE")) { + if (commitStatus.name().equalsIgnoreCase("FAILURE")) { // If we are sure the commit failed, clean up the uncommitted metadata file io.deleteFile(metadataLocation); } } catch (RuntimeException e) { LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e); + } finally { + lock.unlock(); + } + } + + default HiveLock lockObject(BaseMetadata metadata, Configuration conf, String catalogName) { + if (hiveLockEnabled(conf, metadata)) { + return new MetastoreLock(conf, metaClients(), catalogName, database(), table()); + } else { + return new NoLock(); } } @@ -181,4 +279,230 @@ default Table newHmsTable(String hmsTableOwner) { return newTable; } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + default void commitWithLocking( + Configuration conf, + BaseMetadata base, + BaseMetadata metadata, + String baseMetadataLocation, + String newMetadataLocation, + FileIO io) { + boolean newTable = base == null; + boolean hiveEngineEnabled = hiveEngineEnabled(conf, metadata); + boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); + + BaseMetastoreOperations.CommitStatus commitStatus = + BaseMetastoreOperations.CommitStatus.FAILURE; + boolean updateHiveTable = false; + HiveLock lock = lockObject(metadata, conf, catalogName()); + try { + lock.lock(); + Table tbl = loadHmsTable(); + + if (tbl != null) { + String tableType = tbl.getTableType(); + if (!tableType.equalsIgnoreCase(tableType().name())) { + throw new AlreadyExistsException( + "%s with same name already exists: %s.%s", + tableType.equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) ? "View" : "Table", + tbl.getDbName(), + tbl.getTableName()); + } + + // If we try to create the table but the metadata location is already set, then we had a + // concurrent commit + if (newTable + && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + throw new AlreadyExistsException( + "%s already exists: %s.%s", entityType(), database(), table()); + } + + updateHiveTable = true; + LOG.debug("Committing existing {}: {}", entityType().toLowerCase(), fullName()); + } else { + tbl = + newHmsTable( + metadata + .properties() + .getOrDefault(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new {}: {}", entityType().toLowerCase(), fullName()); + } + + tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes + + String metadataLocation = + tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Cannot commit: Base metadata location '%s' is not same as the current %s metadata location '%s' for %s.%s", + baseMetadataLocation, + entityType().toLowerCase(), + metadataLocation, + database(), + table()); + } + + setHmsParameters( + metadata, + tbl, + newMetadataLocation, + obsoleteProps(conf, base, metadata), + hiveEngineEnabled); + + if (!keepHiveStats) { + tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); + } + + lock.ensureActive(); + + try { + persistTable( + tbl, updateHiveTable, hiveLockEnabled(conf, metadata) ? null : baseMetadataLocation); + lock.ensureActive(); + + commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS; + } catch (LockException le) { + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + throw new CommitStateUnknownException( + "Failed to heartbeat for hive lock while " + + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " + + "Please check the commit history. If you are running into this issue, try reducing " + + "iceberg.hive.lock-heartbeat-interval-ms.", + le); + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException( + "%s already exists: %s.%s", entityType(), tbl.getDbName(), tbl.getTableName()); + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database(), table()); + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + } catch (Throwable e) { + if (e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database(), table()); + } + + if (e.getMessage() != null + && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { + throw new RuntimeException( + "Failed to acquire locks from metastore because the underlying metastore " + + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " + + "support transactions. To fix this use an alternative metastore.", + e); + } + + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database(), + table(), + e); + commitStatus = validateNewLocationAndReturnCommitStatus(metadata, newMetadataLocation); + + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } + } + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database(), table()), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } catch (LockException e) { + throw new CommitFailedException(e); + } finally { + cleanupMetadataAndUnlock(lock, io, commitStatus, newMetadataLocation); + } + } + + default String fullName() { + return catalogName() + "." + database() + "." + table(); + } + + default StorageDescriptor storageDescriptor(BaseMetadata metadata, boolean hiveEngineEnabled) { + return HiveOperationsBase.storageDescriptor( + metadata.schema(), metadata.location(), hiveEngineEnabled); + } + + default Set obsoleteProps(Configuration conf, BaseMetadata base, BaseMetadata metadata) { + Set obsoleteProps = Sets.newHashSet(); + if (base != null) { + obsoleteProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + if (!conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false)) { + obsoleteProps.add(StatsSetupConst.COLUMN_STATS_ACCURATE); + } + + return obsoleteProps; + } + + /** + * Returns if the hive engine related values should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#ENGINE_HIVE_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#ENGINE_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @return if the hive engine related values should be enabled or not + */ + default boolean hiveEngineEnabled(Configuration conf, BaseMetadata metadata) { + if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); + } + + /** + * Returns if the hive locking should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#HIVE_LOCK_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#LOCK_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @return if the hive engine related values should be enabled or not + */ + default boolean hiveLockEnabled(Configuration conf, BaseMetadata metadata) { + if (metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT); + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index a3750b9f3101..48bc633111f7 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -21,21 +21,18 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import com.fasterxml.jackson.core.JsonProcessingException; -import java.util.Collections; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetadata; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; import org.apache.iceberg.PartitionSpecParser; @@ -44,12 +41,8 @@ import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.BiMap; @@ -148,7 +141,7 @@ protected void doRefresh() { metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); - } catch (NoSuchObjectException e) { + } catch (NoSuchObjectException | NoSuchIcebergTableException e) { if (currentMetadataLocation() != null) { throw new NoSuchTableException("No such table: %s.%s", database, tableName); } @@ -166,168 +159,33 @@ protected void doRefresh() { refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); } - @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); - boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); - boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); - - CommitStatus commitStatus = CommitStatus.FAILURE; - boolean updateHiveTable = false; - - HiveLock lock = lockObject(metadata); - try { - lock.lock(); - - Table tbl = loadHmsTable(); - - if (tbl != null) { - // If we try to create the table but the metadata location is already set, then we had a - // concurrent commit - if (newTable - && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) - != null) { - throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); - } - - updateHiveTable = true; - LOG.debug("Committing existing table: {}", fullName); - } else { - tbl = - newHmsTable( - metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); - LOG.debug("Committing new table: {}", fullName); - } - - tbl.setSd( - HiveOperationsBase.storageDescriptor( - metadata, hiveEngineEnabled)); // set to pickup any schema changes - - String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); - String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; - if (!Objects.equals(baseMetadataLocation, metadataLocation)) { - throw new CommitFailedException( - "Cannot commit: Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", - baseMetadataLocation, metadataLocation, database, tableName); - } - - // get Iceberg props that have been removed - Set removedProps = Collections.emptySet(); - if (base != null) { - removedProps = - base.properties().keySet().stream() - .filter(key -> !metadata.properties().containsKey(key)) - .collect(Collectors.toSet()); - } - - Map summary = - Optional.ofNullable(metadata.currentSnapshot()) - .map(Snapshot::summary) - .orElseGet(ImmutableMap::of); - setHmsTableParameters( - newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary); - - if (!keepHiveStats) { - tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); - } - - lock.ensureActive(); - - try { - persistTable( - tbl, updateHiveTable, hiveLockEnabled(metadata, conf) ? null : baseMetadataLocation); - lock.ensureActive(); - - commitStatus = CommitStatus.SUCCESS; - } catch (LockException le) { - commitStatus = CommitStatus.UNKNOWN; - throw new CommitStateUnknownException( - "Failed to heartbeat for hive lock while " - + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " - + "Please check the commit history. If you are running into this issue, try reducing " - + "iceberg.hive.lock-heartbeat-interval-ms.", - le); - } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { - throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); - - } catch (InvalidObjectException e) { - throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); - - } catch (CommitFailedException | CommitStateUnknownException e) { - throw e; - - } catch (Throwable e) { - if (e.getMessage() - .contains( - "The table has been modified. The parameter value for key '" - + HiveTableOperations.METADATA_LOCATION_PROP - + "' is")) { - throw new CommitFailedException( - e, "The table %s.%s has been modified concurrently", database, tableName); - } - - if (e.getMessage() != null - && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { - throw new RuntimeException( - "Failed to acquire locks from metastore because the underlying metastore " - + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " - + "support transactions. To fix this use an alternative metastore.", - e); - } - - LOG.error( - "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", - database, - tableName, - e); - commitStatus = checkCommitStatus(newMetadataLocation, metadata); - switch (commitStatus) { - case SUCCESS: - break; - case FAILURE: - throw e; - case UNKNOWN: - throw new CommitStateUnknownException(e); - } - } - } catch (TException e) { - throw new RuntimeException( - String.format("Metastore operation failed for %s.%s", database, tableName), e); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted during commit", e); - - } catch (LockException e) { - throw new CommitFailedException(e); - - } finally { - cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock); - } + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + commitWithLocking(conf, base, metadata, baseMetadataLocation, newMetadataLocation, fileIO); LOG.info( "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); } - @VisibleForTesting - Table loadHmsTable() throws TException, InterruptedException { - try { - return metaClients.run(client -> client.getTable(database, tableName)); - } catch (NoSuchObjectException nte) { - LOG.trace("Table not found {}", fullName, nte); - return null; - } + @Override + public CommitStatus validateNewLocationAndReturnCommitStatus( + BaseMetadata metadata, String newMetadataLocation) { + return checkCommitStatus( + fullName, newMetadataLocation, metadata.properties(), this::loadMetadataLocations); } - private void setHmsTableParameters( - String newMetadataLocation, + @Override + public void setHmsParameters( + BaseMetadata baseMetadata, Table tbl, - TableMetadata metadata, + String newMetadataLocation, Set obsoleteProps, - boolean hiveEngineEnabled, - Map summary) { + boolean hiveEngineEnabled) { + TableMetadata metadata = (TableMetadata) baseMetadata; + Map parameters = Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); @@ -341,19 +199,20 @@ private void setHmsTableParameters( String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key); parameters.put(hmsKey, entry.getValue()); }); - if (metadata.uuid() != null) { - parameters.put(TableProperties.UUID, metadata.uuid()); - } // remove any props from HMS that are no longer present in Iceberg table props obsoleteProps.forEach(parameters::remove); - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); - parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + setCommonHmsParameters( + tbl, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + newMetadataLocation, + metadata.schema(), + metadata.uuid(), + obsoleteProps, + this::currentMetadataLocation); - if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { - parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); - } + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); // If needed set the 'storage_handler' property to enable query from Hive if (hiveEngineEnabled) { @@ -365,6 +224,10 @@ private void setHmsTableParameters( } // Set the basic statistics + Map summary = + Optional.ofNullable(metadata.currentSnapshot()) + .map(Snapshot::summary) + .orElseGet(ImmutableMap::of); if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); } @@ -376,7 +239,6 @@ private void setHmsTableParameters( } setSnapshotStats(metadata, parameters); - setSchema(metadata, parameters); setPartitionSpec(metadata, parameters); setSortOrder(metadata, parameters); @@ -458,84 +320,22 @@ public String table() { } @Override - public TableType tableType() { - return TableType.EXTERNAL_TABLE; + public String catalogName() { + return catalogName; } @Override - public ClientPool metaClients() { - return metaClients; - } - - private void cleanupMetadataAndUnlock( - CommitStatus commitStatus, String metadataLocation, HiveLock lock) { - try { - HiveOperationsBase.cleanupMetadata(io(), commitStatus.name(), metadataLocation); - } finally { - lock.unlock(); - } + public String entityType() { + return "Table"; } - /** - * Returns if the hive engine related values should be enabled on the table, or not. - * - *

The decision is made like this: - * - *

    - *
  1. Table property value {@link TableProperties#ENGINE_HIVE_ENABLED} - *
  2. If the table property is not set then check the hive-site.xml property value {@link - * ConfigProperties#ENGINE_HIVE_ENABLED} - *
  3. If none of the above is enabled then use the default value {@link - * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT} - *
- * - * @param metadata Table metadata to use - * @param conf The hive configuration to use - * @return if the hive engine related values should be enabled or not - */ - private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) { - if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) { - // We know that the property is set, so default value will not be used, - return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false); - } - - return conf.getBoolean( - ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); - } - - /** - * Returns if the hive locking should be enabled on the table, or not. - * - *

The decision is made like this: - * - *

    - *
  1. Table property value {@link TableProperties#HIVE_LOCK_ENABLED} - *
  2. If the table property is not set then check the hive-site.xml property value {@link - * ConfigProperties#LOCK_HIVE_ENABLED} - *
  3. If none of the above is enabled then use the default value {@link - * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} - *
- * - * @param metadata Table metadata to use - * @param conf The hive configuration to use - * @return if the hive engine related values should be enabled or not - */ - private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) { - if (metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { - // We know that the property is set, so default value will not be used, - return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false); - } - - return conf.getBoolean( - ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT); + @Override + public TableType tableType() { + return TableType.EXTERNAL_TABLE; } - @VisibleForTesting - HiveLock lockObject(TableMetadata metadata) { - if (hiveLockEnabled(metadata, conf)) { - return new MetastoreLock(conf, metaClients, catalogName, database, tableName); - } else { - return new NoLock(); - } + @Override + public ClientPool metaClients() { + return metaClients; } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java new file mode 100644 index 000000000000..807f9ad60561 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.Locale; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.BaseMetadata; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Hive implementation of Iceberg {@link org.apache.iceberg.view.ViewOperations}. */ +final class HiveViewOperations extends BaseViewOperations implements HiveOperationsBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveViewOperations.class); + + private final String fullName; + private final String database; + private final String viewName; + private final FileIO fileIO; + private final ClientPool metaClients; + private final long maxHiveTablePropertySize; + private final Configuration conf; + private final String catalogName; + + HiveViewOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier) { + String dbName = viewIdentifier.namespace().level(0); + this.conf = conf; + this.catalogName = catalogName; + this.metaClients = metaClients; + this.fileIO = fileIO; + this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier); + this.database = dbName; + this.viewName = viewIdentifier.name(); + this.maxHiveTablePropertySize = + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + } + + @Override + public void doRefresh() { + String metadataLocation = null; + Table table; + + try { + table = metaClients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView(table, fullName); + + metadataLocation = + table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + } catch (NoSuchObjectException | NoSuchIcebergViewException e) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s.%s", database, viewName); + } + } catch (TException e) { + String errMsg = + String.format("Failed to get view info from metastore %s.%s", database, viewName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + + refreshFromMetadataLocation(metadataLocation); + } + + @Override + public void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + commitWithLocking(conf, base, metadata, baseMetadataLocation, newMetadataLocation, fileIO); + + LOG.info( + "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); + } + + @Override + protected String viewName() { + return fullName; + } + + @Override + public TableType tableType() { + return TableType.VIRTUAL_VIEW; + } + + @Override + public ClientPool metaClients() { + return metaClients; + } + + @Override + public long maxHiveTablePropertySize() { + return maxHiveTablePropertySize; + } + + @Override + public String database() { + return database; + } + + @Override + public String table() { + return viewName; + } + + @Override + public String catalogName() { + return catalogName; + } + + @Override + public String entityType() { + return "View"; + } + + @Override + public CommitStatus validateNewLocationAndReturnCommitStatus( + BaseMetadata metadata, String newMetadataLocation) { + Preconditions.checkNotNull(metadata, "Unexpected null view metadata"); + return checkCommitStatus( + fullName, + newMetadataLocation, + metadata.properties(), + () -> ImmutableList.of(metadata.metadataFileLocation())); + } + + @Override + public void setHmsParameters( + BaseMetadata metadata, + Table tbl, + String newMetadataLocation, + Set obsoleteProps, + boolean hiveEngineEnabled) { + setCommonHmsParameters( + tbl, + HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + newMetadataLocation, + metadata.schema(), + ((ViewMetadata) metadata).uuid(), + obsoleteProps, + this::currentMetadataLocation); + } + + @Override + public FileIO io() { + return fileIO; + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 369ad46c8e49..95bf6c697c32 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -1041,7 +1041,7 @@ public void testNotExposeTableProperties() { .doesNotContainKey(CURRENT_SNAPSHOT_ID) .doesNotContainKey(CURRENT_SNAPSHOT_TIMESTAMP); - ops.setSchema(metadata, parameters); + ops.setSchema(metadata.schema(), parameters); assertThat(parameters).doesNotContainKey(CURRENT_SCHEMA); ops.setPartitionSpec(metadata, parameters); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java index aaa659042118..0fe3caa04426 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java @@ -64,7 +64,7 @@ public void testSuppressUnlockExceptions() { AtomicReference lockRef = new AtomicReference<>(); - when(spyOps.lockObject(metadataV1)) + when(spyOps.lockObject(metadataV1, catalog.getConf(), catalog.name())) .thenAnswer( i -> { HiveLock lock = (HiveLock) i.callRealMethod(); @@ -273,11 +273,11 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted AtomicReference lock = new AtomicReference<>(); doAnswer( l -> { - lock.set(ops.lockObject(metadataV1)); + lock.set(ops.lockObject(metadataV1, catalog.getConf(), catalog.name())); return lock.get(); }) .when(spyOps) - .lockObject(metadataV1); + .lockObject(metadataV1, catalog.getConf(), catalog.name()); concurrentCommitAndThrowException(ops, spyOps, table, lock); @@ -332,7 +332,7 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter HiveTableOperations spyOps = spy(ops); // Sets NoLock - doReturn(new NoLock()).when(spyOps).lockObject(any()); + doReturn(new NoLock()).when(spyOps).lockObject(any(), any(), any()); // Simulate a concurrent table modification error doThrow( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java new file mode 100644 index 000000000000..2bed205a58b7 --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.ViewCatalogTests; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestHiveViewCatalog extends ViewCatalogTests { + + private HiveCatalog catalog; + + @RegisterExtension + private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = + HiveMetastoreExtension.builder().build(); + + @BeforeEach + public void before() throws TException { + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + String.valueOf(TimeUnit.SECONDS.toMillis(10))), + HIVE_METASTORE_EXTENSION.hiveConf()); + } + + @AfterEach + public void cleanup() throws Exception { + HIVE_METASTORE_EXTENSION.metastore().reset(); + } + + @Override + protected HiveCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +}