From 8557ec09835293dbdf535bbd2719873e69239c07 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Sat, 2 Mar 2024 23:35:35 +0530 Subject: [PATCH 01/17] Hive: Add View support for HIVE catalog --- .../NoSuchIcebergViewException.java | 36 +++++ .../java/org/apache/iceberg/CatalogUtil.java | 13 ++ .../org/apache/iceberg/hive/HiveCatalog.java | 136 ++++++++++++++++- .../iceberg/hive/HiveOperationsBase.java | 14 ++ .../iceberg/hive/HiveViewOperations.java | 141 ++++++++++++++++++ .../iceberg/hive/TestHiveViewCatalog.java | 72 +++++++++ 6 files changed, 406 insertions(+), 6 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java create mode 100644 hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java create mode 100644 hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java new file mode 100644 index 000000000000..bc5da2aee280 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */ +public class NoSuchIcebergViewException extends NoSuchViewException { + @FormatMethod + public NoSuchIcebergViewException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new NoSuchIcebergViewException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index d4fcbda0686d..3f17e8dd5f4e 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -47,6 +47,7 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.apache.iceberg.view.ViewMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +138,18 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { deleteFile(io, metadata.metadataFileLocation(), "metadata"); } + /** + * Drops view metadata files referenced by ViewMetadata. + * + *

This should be called by dropView implementations + * + * @param io a FileIO to use for deletes + * @param metadata the last valid ViewMetadata instance for a dropped view. + */ + public static void dropViewMetadata(FileIO io, ViewMetadata metadata) { + deleteFile(io, metadata.metadataFileLocation(), "metadata"); + } + @SuppressWarnings("DangerousStringInternUsage") private static void deleteFiles(FileIO io, Set allManifests) { // keep track of deleted files in a map that can be cleaned up when memory runs low diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 5c58222f0c01..7a08eeb3ade6 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -34,7 +35,6 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -47,6 +47,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; @@ -56,13 +57,19 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { +public class HiveCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Configurable { public static final String LIST_ALL_TABLES = "list-all-tables"; public static final String LIST_ALL_TABLES_DEFAULT = "false"; @@ -156,6 +163,38 @@ public List listTables(Namespace namespace) { } } + @Override + public List listViews(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + + try { + String database = namespace.level(0); + List tableNames = + clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW)); + + // Retrieving the Table objects from HMS in batches to avoid OOM + List filteredTableIdentifiers = Lists.newArrayList(); + Iterable> tableNameSets = Iterables.partition(tableNames, 100); + + for (List tableNameSet : tableNameSets) { + filteredTableIdentifiers.addAll( + listIcebergTables(tableNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)); + } + + return filteredTableIdentifiers; + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all views under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listViews", e); + } + } + @Override public String name() { return name; @@ -213,11 +252,67 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { } } + @Override + public boolean dropView(TableIdentifier identifier) { + if (!isValidIdentifier(identifier)) { + return false; + } + + try { + String database = identifier.namespace().level(0); + String viewName = identifier.name(); + Table table = clients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView( + table, CatalogUtil.fullTableName(name, identifier)); + + HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier); + ViewMetadata lastViewMetadata = null; + + try { + lastViewMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn("Failed to load view metadata for view: {}", identifier, e); + } + + clients.run( + client -> { + client.dropTable(database, viewName); + return null; + }); + + if (lastViewMetadata != null) { + CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); + } + + LOG.info("Dropped View: {}", identifier); + return true; + + } catch (NoSuchViewException | NoSuchObjectException e) { + LOG.info("Skipping drop, view does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropView", e); + } + } + @Override public void renameTable(TableIdentifier from, TableIdentifier originalTo) { renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE); } + @Override + @SuppressWarnings("FormatStringAnnotation") + public void renameView(TableIdentifier from, TableIdentifier to) { + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace()); + } + renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW); + } + private List listIcebergTables( List tableNames, Namespace namespace, String tableTypeProp) throws TException, InterruptedException { @@ -268,13 +363,17 @@ private void renameTableOrView( LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to); } catch (NoSuchObjectException e) { - throw new NoSuchTableException("Table does not exist: %s", from); + switch (contentType) { + case TABLE: + throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to); + case VIEW: + throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to); + } } catch (InvalidOperationException e) { if (e.getMessage() != null && e.getMessage().contains(String.format("new table %s already exists", to))) { - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Table already exists: %s", to); + throwErrorForExistedToContent(from, removeCatalogName(from)); } else { throw new RuntimeException("Failed to rename " + from + " to " + to, e); } @@ -295,7 +394,27 @@ private void validateTableIsIcebergTableOrView( HiveOperationsBase.validateTableIsIceberg(table, fullName); break; case VIEW: - throw new UnsupportedOperationException("View is not supported."); + HiveOperationsBase.validateTableIsIcebergView(table, fullName); + } + } + + private void throwErrorForExistedToContent(TableIdentifier from, TableIdentifier to) { + String toDatabase = to.namespace().level(0); + try { + Table table = clients.run(client -> client.getTable(toDatabase, to.name())); + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. %s already exists", + from, + to, + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + ? HiveOperationsBase.ContentType.VIEW.value() + : HiveOperationsBase.ContentType.TABLE.value()); + } catch (TException e) { + throw new RuntimeException("Failed to load content " + to, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to load content", e); } } @@ -522,6 +641,11 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) { return ops; } + @Override + protected ViewOperations newViewOps(TableIdentifier identifier) { + return new HiveViewOperations(conf, clients, fileIO, name, identifier); + } + @Override protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { // This is a little edgy since we basically duplicate the HMS location generation logic. diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java index 6500e724a4f0..6b39d71d1e04 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java @@ -33,6 +33,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -53,6 +54,7 @@ interface HiveOperationsBase { long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; String NO_LOCK_EXPECTED_KEY = "expected_parameter_key"; String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"; + String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view"; enum ContentType { TABLE("Table"), @@ -129,6 +131,18 @@ static void validateTableIsIceberg(Table table, String fullName) { tableType); } + static void validateTableIsIcebergView(Table table, String fullName) { + String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); + NoSuchIcebergViewException.check( + table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + && tableTypeProp != null + && tableTypeProp.equalsIgnoreCase(ICEBERG_VIEW_TYPE_VALUE), + "Not an iceberg view: %s (type=%s) (tableType=%s)", + fullName, + tableTypeProp, + table.getTableType()); + } + default void persistTable(Table hmsTable, boolean updateHiveTable, String metadataLocation) throws TException, InterruptedException { if (updateHiveTable) { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java new file mode 100644 index 000000000000..6318ffbbfcb6 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Hive implementation of Iceberg {@link org.apache.iceberg.view.ViewOperations}. */ +final class HiveViewOperations extends BaseViewOperations implements HiveOperationsBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveViewOperations.class); + + private final String fullName; + private final String database; + private final String viewName; + private final FileIO fileIO; + private final ClientPool metaClients; + private final long maxHiveTablePropertySize; + private final Configuration conf; + private final String catalogName; + + HiveViewOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier) { + String dbName = viewIdentifier.namespace().level(0); + this.conf = conf; + this.catalogName = catalogName; + this.metaClients = metaClients; + this.fileIO = fileIO; + this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier); + this.database = dbName; + this.viewName = viewIdentifier.name(); + this.maxHiveTablePropertySize = + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + } + + @Override + public void doRefresh() { + String metadataLocation = null; + Table table; + + try { + table = metaClients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView(table, fullName); + + metadataLocation = + table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + + } catch (NoSuchObjectException | NoSuchIcebergViewException e) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s.%s", database, viewName); + } + } catch (TException e) { + String errMsg = + String.format("Failed to get view info from metastore %s.%s", database, viewName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + + refreshFromMetadataLocation(metadataLocation); + } + + @Override + public void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + + LOG.info( + "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); + } + + @Override + protected String viewName() { + return fullName; + } + + @Override + public TableType tableType() { + return TableType.VIRTUAL_VIEW; + } + + @Override + public ClientPool metaClients() { + return metaClients; + } + + @Override + public long maxHiveTablePropertySize() { + return maxHiveTablePropertySize; + } + + @Override + public String database() { + return database; + } + + @Override + public String table() { + return viewName; + } + + @Override + public FileIO io() { + return fileIO; + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java new file mode 100644 index 000000000000..2bed205a58b7 --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.ViewCatalogTests; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestHiveViewCatalog extends ViewCatalogTests { + + private HiveCatalog catalog; + + @RegisterExtension + private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = + HiveMetastoreExtension.builder().build(); + + @BeforeEach + public void before() throws TException { + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + String.valueOf(TimeUnit.SECONDS.toMillis(10))), + HIVE_METASTORE_EXTENSION.hiveConf()); + } + + @AfterEach + public void cleanup() throws Exception { + HIVE_METASTORE_EXTENSION.metastore().reset(); + } + + @Override + protected HiveCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +} From 2483b7e5cfa85c69c487ff1b804d652fc25987fd Mon Sep 17 00:00:00 2001 From: nk1506 Date: Sun, 17 Mar 2024 11:24:20 +0530 Subject: [PATCH 02/17] Hive: Add test cases for commitLocks with view concurrent commits and exceptions. Add tests with Hive-View and Hive-Iceberg View. failedCommitStatusCheckSupplier for table and view with metadataLocation and versionId respectively. --- .../apache/iceberg/view/ViewCatalogTests.java | 2 +- .../iceberg/hive/TestHiveViewCatalog.java | 93 ++++ .../iceberg/hive/TestHiveViewCommits.java | 474 ++++++++++++++++++ 3 files changed, 568 insertions(+), 1 deletion(-) create mode 100644 hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index b3765bb1eae7..cf1630f92c7f 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -59,7 +59,7 @@ public abstract class ViewCatalogTests { @@ -69,4 +82,84 @@ protected Catalog tableCatalog() { protected boolean requiresNamespaceCreate() { return true; } + + @Test + public void testListView() throws TException { + String dbName = "hivedb"; + Namespace ns = Namespace.of(dbName); + TableIdentifier identifier = TableIdentifier.of(ns, "tbl"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + assertThat(catalog.viewExists(identifier)).isFalse(); + assertThat(catalog.listViews(ns)).isEmpty(); + + String hiveTableName = "test_hive_view"; + // create a hive table + org.apache.hadoop.hive.metastore.api.Table hiveTable = + createHiveView(hiveTableName, dbName, tempDir.toUri().toString()); + HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable); + + catalog.setListAllTables(true); + List tableIdents1 = catalog.listTables(ns); + assertThat(tableIdents1).as("should have one table with type VIRTUAL_VIEW.").hasSize(1); + + List tableIdents2 = catalog.listViews(ns); + assertThat(tableIdents2).as("should have zero iceberg view.").hasSize(0); + + catalog + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from ns.tbl") + .create(); + assertThat(catalog.viewExists(identifier)).isTrue(); + + List tableIdents3 = catalog.listViews(ns); + assertThat(tableIdents3).as("should have one iceberg view .").hasSize(1); + } + + private org.apache.hadoop.hive.metastore.api.Table createHiveView( + String hiveViewName, String dbName, String location) { + Map parameters = Maps.newHashMap(); + parameters.put( + serdeConstants.SERIALIZATION_CLASS, "org.apache.hadoop.hive.serde2.thrift.test.IntString"); + parameters.put( + serdeConstants.SERIALIZATION_FORMAT, "org.apache.thrift.protocol.TBinaryProtocol"); + + SerDeInfo serDeInfo = + new SerDeInfo(null, "org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer", parameters); + + // StorageDescriptor has an empty list of fields - SerDe will report them. + StorageDescriptor sd = + new StorageDescriptor( + Lists.newArrayList(), + location, + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.mapred.TextOutputFormat", + false, + -1, + serDeInfo, + Lists.newArrayList(), + Lists.newArrayList(), + Maps.newHashMap()); + + org.apache.hadoop.hive.metastore.api.Table hiveTable = + new org.apache.hadoop.hive.metastore.api.Table( + hiveViewName, + dbName, + "test_owner", + 0, + 0, + 0, + sd, + Lists.newArrayList(), + Maps.newHashMap(), + "viewOriginalText", + "viewExpandedText", + TableType.VIRTUAL_VIEW.name()); + return hiveTable; + } } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java new file mode 100644 index 000000000000..5fd953312303 --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Test Hive locks and Hive errors and retry during commits. */ +public class TestHiveViewCommits { + + private static final String VIEW_NAME = "view_name"; + private static final String DB_NAME = "hivedb"; + private static final Namespace ns = Namespace.of(DB_NAME); + private static final Schema SCHEMA = + new Schema( + 5, + required(3, "id", Types.IntegerType.get(), "unique ID"), + required(4, "data", Types.StringType.get())); + private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(ns, VIEW_NAME); + + @RegisterExtension + protected static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = + HiveMetastoreExtension.builder().withDatabase(DB_NAME).build(); + + private View view; + + private static HiveCatalog catalog; + + @BeforeAll + public static void initCatalog() { + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + String.valueOf(TimeUnit.SECONDS.toMillis(10))), + HIVE_METASTORE_EXTENSION.hiveConf()); + } + + @BeforeEach + public void createTestView() { + view = + catalog + .buildView(VIEW_IDENTIFIER) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from ns.tbl") + .create(); + } + + @AfterEach + public void dropTestView() { + catalog.dropView(VIEW_IDENTIFIER); + } + + @Test + public void testSuppressUnlockExceptions() { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + AtomicReference lockRef = new AtomicReference<>(); + + when(spyOps.lockObject(metadataV1.properties(), catalog.getConf(), catalog.name())) + .thenAnswer( + i -> { + HiveLock lock = (HiveLock) i.callRealMethod(); + lockRef.set(lock); + return lock; + }); + + try { + spyOps.commit(metadataV2, metadataV1); + HiveLock spyLock = spy(lockRef.get()); + doThrow(new RuntimeException()).when(spyLock).unlock(); + } finally { + lockRef.get().unlock(); + } + + ops.refresh(); + + // the commit must succeed + assertThat(ops.current().properties()).hasSize(0); + } + + /** + * Pretends we throw an error while persisting, and not found with check state, commit state + * should be treated as unknown, because in reality the persisting may still succeed, just not yet + * by the time of checking. + */ + @Test + public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() + throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + + ops.refresh(); + assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); + assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); + assertThat(metadataFileCount(ops.current())) + .as( + "New metadata files should still exist, new location not in history but" + + " the commit may still succeed") + .isEqualTo(2); + } + + /** Pretends we throw an error while persisting that actually does commit serverside */ + @Test + public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + assertThat(ops.current().properties()).hasSize(1); + ViewMetadata metadataV2 = ops.current(); + + HiveViewOperations spyOps = spy(ops); + + // Simulate a communication error after a successful commit + commitAndThrowException(ops, spyOps); + + // Shouldn't throw because the commit actually succeeds even though persistTable throws an + // exception + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(metadataFileExists(ops.current())) + .as("Current metadata file should still exist") + .isTrue(); + assertThat(metadataFileCount(ops.current())) + .as("Commit should have been successful and new metadata file should be made") + .isEqualTo(2); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to + * find out, but in reality the commit failed + */ + @Test + public void testThriftExceptionUnknownFailedCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageStartingWith("Datacenter on fire"); + + ops.refresh(); + + assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); + assertThat(metadataFileExists(ops.current())) + .as("Current metadata file should still exist") + .isTrue(); + assertThat(metadataFileCount(ops.current())) + .as("Client could not determine outcome so new metadata file should also exist") + .isEqualTo(2); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to + * find out, but in reality the commit succeeded + */ + @Test + public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + commitAndThrowException(ops, spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageStartingWith("Datacenter on fire"); + + ops.refresh(); + + assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(metadataFileExists(ops.current())) + .as("Current metadata file should still exist") + .isTrue(); + } + + /** + * Pretends we threw an exception while persisting, the commit succeeded, the lock expired, and a + * second committer placed a commit on top of ours before the first committer was able to check if + * their commit succeeded or not + * + *

Timeline: + * + *

+ * + *

This tests to make sure a disconnected client 1 doesn't think their commit failed just + * because it isn't the current one during the recheck phase. + */ + @Test + public void testThriftExceptionConcurrentCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + AtomicReference lock = new AtomicReference<>(); + doAnswer( + l -> { + lock.set(ops.lockObject(metadataV1.properties(), catalog.getConf(), catalog.name())); + return lock.get(); + }) + .when(spyOps) + .lockObject(metadataV1.properties(), catalog.getConf(), catalog.name()); + + concurrentCommitAndThrowException(ops, spyOps, (BaseView) view, lock); + + /* + This commit and our concurrent commit should succeed even though this commit throws an exception + after the persist operation succeeds + */ + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(metadataFileExists(ops.current())) + .as("Current metadata file should still exist") + .isTrue(); + assertThat(ops.current().properties()) + .as("The properties addition from the concurrent commit should have been successful") + .hasSize(1); + } + + @Test + public void testInvalidObjectException() { + TableIdentifier badTi = TableIdentifier.of(DB_NAME, "`view_name`"); + assertThatThrownBy( + () -> + catalog + .buildView(badTi) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from ns.tbl") + .create()) + .isInstanceOf(ValidationException.class) + .hasMessage(String.format("Invalid Hive object for %s.%s", DB_NAME, "`view_name`")); + } + + /** Uses NoLock and pretends we throw an error because of a concurrent commit */ + @Test + public void testNoLockThriftExceptionConcurrentCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + // Sets NoLock + doReturn(new NoLock()).when(spyOps).lockObject(any(), any(), any()); + + // Simulate a concurrent view modification error + doThrow( + new RuntimeException( + "MetaException(message:The table has been modified. The parameter value for key 'metadata_location' is")) + .when(spyOps) + .persistTable(any(), anyBoolean(), any()); + + ops.refresh(); + assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); + assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); + assertThat(metadataFileCount(ops.current())) + .as("New metadata files should not exist") + .isEqualTo(2); + } + + @Test + public void testLockExceptionUnknownSuccessCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(ops.current().properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + ViewMetadata metadataV2 = ops.current(); + assertThat(ops.current().properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + // Simulate a communication error after a successful commit + doAnswer( + i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); + String location = i.getArgument(2, String.class); + ops.persistTable(tbl, true, location); + throw new LockException("Datacenter on fire"); + }) + .when(spyOps) + .persistTable(any(), anyBoolean(), any()); + + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .hasMessageContaining("Failed to heartbeat for hive lock while") + .isInstanceOf(CommitStateUnknownException.class); + + ops.refresh(); + + assertThat(ops.current().location()) + .as("Current metadata should have changed to metadata V1") + .isEqualTo(metadataV1.location()); + assertThat(metadataFileExists(ops.current())) + .as("Current metadata file should still exist") + .isTrue(); + } + + private void commitAndThrowException( + HiveViewOperations realOperations, HiveViewOperations spyOperations) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer( + i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); + String location = i.getArgument(2, String.class); + realOperations.persistTable(tbl, true, location); + throw new TException("Datacenter on fire"); + }) + .when(spyOperations) + .persistTable(any(), anyBoolean(), any()); + } + + private void concurrentCommitAndThrowException( + HiveViewOperations realOperations, + HiveViewOperations spyOperations, + BaseView baseView, + AtomicReference lock) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer( + i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); + String location = i.getArgument(2, String.class); + realOperations.persistTable(tbl, true, location); + // Simulate lock expiration or removal + lock.get().unlock(); + baseView.operations().refresh(); + baseView.updateProperties().set("k1", "v1").commit(); + throw new TException("Datacenter on fire"); + }) + .when(spyOperations) + .persistTable(any(), anyBoolean(), any()); + } + + private void failCommitAndThrowException(HiveViewOperations spyOperations) + throws TException, InterruptedException { + doThrow(new TException("Datacenter on fire")) + .when(spyOperations) + .persistTable(any(), anyBoolean(), any()); + } + + private void breakFallbackCatalogCommitCheck(HiveViewOperations spyOperations) { + when(spyOperations.refresh()) + .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check + } + + private boolean metadataFileExists(ViewMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")).exists(); + } + + private int metadataFileCount(ViewMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")) + .getParentFile() + .listFiles(file -> file.getName().endsWith("metadata.json")) + .length; + } +} From 52ca0ad2ee879fa79607ca641d9a9ff767ec0bd3 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Sun, 17 Mar 2024 20:45:25 +0530 Subject: [PATCH 03/17] Hive: Check if content exists with another type before building it. Tests to do validation hive and iceberg view with the same name. --- .../org/apache/iceberg/hive/HiveCatalog.java | 20 +++++++++ .../iceberg/hive/HiveViewOperations.java | 3 +- .../iceberg/hive/TestHiveViewCatalog.java | 45 +++++++++++++++++-- .../iceberg/hive/TestHiveViewCommits.java | 10 +++-- 4 files changed, 69 insertions(+), 9 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 7a08eeb3ade6..fc3416ddaeb2 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -39,6 +39,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; @@ -62,6 +63,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewBuilder; import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewOperations; import org.apache.thrift.TException; @@ -124,6 +126,24 @@ public void initialize(String inputName, Map properties) { this.fileIOTracker = new FileIOTracker(); } + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.buildTable(identifier, schema); + } + + @Override + public ViewBuilder buildView(TableIdentifier identifier) { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.buildView(identifier); + } + @Override public List listTables(Namespace namespace) { Preconditions.checkArgument( diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index 6318ffbbfcb6..6988c830f7c1 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -27,7 +27,6 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ClientPool; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchIcebergViewException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.view.BaseViewOperations; @@ -79,7 +78,7 @@ public void doRefresh() { metadataLocation = table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); - } catch (NoSuchObjectException | NoSuchIcebergViewException e) { + } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { throw new NoSuchViewException("View does not exist: %s.%s", database, viewName); } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index 187d9de5f6bd..f4e24e5695a2 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hive; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -83,11 +85,46 @@ protected boolean requiresNamespaceCreate() { return true; } + @Test + public void testHiveViewAndIcebergViewWithSameName() throws TException { + String dbName = "hivedb"; + Namespace ns = Namespace.of(dbName); + String viewName = "test_hive_view"; + TableIdentifier identifier = TableIdentifier.of(ns, viewName); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + assertThat(catalog.listViews(ns)).isEmpty(); + // create a hive table + org.apache.hadoop.hive.metastore.api.Table hiveTable = + createHiveView(viewName, dbName, tempDir.toUri().toString()); + HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable); + + catalog.setListAllTables(true); + List tableIdents1 = catalog.listTables(ns); + assertThat(tableIdents1).as("should have one table with type VIRTUAL_VIEW.").hasSize(1); + + assertThat(catalog.viewExists(identifier)).isFalse(); + + assertThatThrownBy( + () -> + catalog + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from hivedb.tbl") + .create()) + .isInstanceOf(NoSuchIcebergViewException.class) + .hasMessageStartingWith("Not an iceberg view: hive.hivedb.test_hive_view"); + } + @Test public void testListView() throws TException { String dbName = "hivedb"; Namespace ns = Namespace.of(dbName); - TableIdentifier identifier = TableIdentifier.of(ns, "tbl"); + TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_view"); if (requiresNamespaceCreate()) { catalog.createNamespace(identifier.namespace()); @@ -96,10 +133,10 @@ public void testListView() throws TException { assertThat(catalog.viewExists(identifier)).isFalse(); assertThat(catalog.listViews(ns)).isEmpty(); - String hiveTableName = "test_hive_view"; + String hiveViewName = "test_hive_view"; // create a hive table org.apache.hadoop.hive.metastore.api.Table hiveTable = - createHiveView(hiveTableName, dbName, tempDir.toUri().toString()); + createHiveView(hiveViewName, dbName, tempDir.toUri().toString()); HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable); catalog.setListAllTables(true); @@ -113,7 +150,7 @@ public void testListView() throws TException { .buildView(identifier) .withSchema(SCHEMA) .withDefaultNamespace(ns) - .withQuery("hive", "select * from ns.tbl") + .withQuery("hive", "select * from hivedb.tbl") .create(); assertThat(catalog.viewExists(identifier)).isTrue(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index 5fd953312303..fb1a7de40bb8 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -54,7 +54,7 @@ /** Test Hive locks and Hive errors and retry during commits. */ public class TestHiveViewCommits { - private static final String VIEW_NAME = "view_name"; + private static final String VIEW_NAME = "test_iceberg_view"; private static final String DB_NAME = "hivedb"; private static final Namespace ns = Namespace.of(DB_NAME); private static final Schema SCHEMA = @@ -159,6 +159,7 @@ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() failCommitAndThrowException(spyOps); ops.refresh(); + assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); assertThat(metadataFileCount(ops.current())) @@ -191,6 +192,7 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE spyOps.commit(metadataV2, metadataV1); ops.refresh(); + assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); assertThat(metadataFileExists(ops.current())) .as("Current metadata file should still exist") @@ -316,6 +318,7 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted spyOps.commit(metadataV2, metadataV1); ops.refresh(); + assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); assertThat(metadataFileExists(ops.current())) .as("Current metadata file should still exist") @@ -327,7 +330,7 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted @Test public void testInvalidObjectException() { - TableIdentifier badTi = TableIdentifier.of(DB_NAME, "`view_name`"); + TableIdentifier badTi = TableIdentifier.of(DB_NAME, "`test_iceberg_view`"); assertThatThrownBy( () -> catalog @@ -337,7 +340,7 @@ public void testInvalidObjectException() { .withQuery("hive", "select * from ns.tbl") .create()) .isInstanceOf(ValidationException.class) - .hasMessage(String.format("Invalid Hive object for %s.%s", DB_NAME, "`view_name`")); + .hasMessage(String.format("Invalid Hive object for %s.%s", DB_NAME, "`test_iceberg_view`")); } /** Uses NoLock and pretends we throw an error because of a concurrent commit */ @@ -364,6 +367,7 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter .persistTable(any(), anyBoolean(), any()); ops.refresh(); + assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); assertThat(metadataFileCount(ops.current())) From 5dc357a40e7dc2d5ec17f4319666bc0b94d16d8b Mon Sep 17 00:00:00 2001 From: nk1506 Date: Tue, 26 Mar 2024 18:36:32 +0530 Subject: [PATCH 04/17] Rebased with upstream. HiveViewOperations has own definition of commits. --- .../org/apache/iceberg/view/ViewMetadata.java | 16 ++ .../org/apache/iceberg/hive/HiveCatalog.java | 84 ++++++-- .../iceberg/hive/HiveTableOperations.java | 6 +- .../iceberg/hive/HiveViewOperations.java | 181 +++++++++++++++++- .../apache/iceberg/hive/TestHiveCatalog.java | 2 +- .../iceberg/hive/TestHiveViewCommits.java | 80 +++----- 6 files changed, 299 insertions(+), 70 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java index ae837ff96882..8425ed44f06a 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java @@ -85,6 +85,22 @@ default Integer currentSchemaId() { @Nullable String metadataFileLocation(); + default String property(String property, String defaultValue) { + return properties().getOrDefault(property, defaultValue); + } + + default boolean propertyAsBoolean(String property, boolean defaultValue) { + return PropertyUtil.propertyAsBoolean(properties(), property, defaultValue); + } + + default int propertyAsInt(String property, int defaultValue) { + return PropertyUtil.propertyAsInt(properties(), property, defaultValue); + } + + default long propertyAsLong(String property, long defaultValue) { + return PropertyUtil.propertyAsLong(properties(), property, defaultValue); + } + default ViewVersion version(int versionId) { return versionsById().get(versionId); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index fc3416ddaeb2..e63a2d4e3c69 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; @@ -63,6 +64,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewOperations; @@ -128,20 +130,12 @@ public void initialize(String inputName, Map properties) { @Override public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - if (viewExists(identifier)) { - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "View with same name already exists: %s", identifier); - } - return super.buildTable(identifier, schema); + return new ViewAwareTableBuilder(identifier, schema); } @Override public ViewBuilder buildView(TableIdentifier identifier) { - if (tableExists(identifier)) { - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Table with same name already exists: %s", identifier); - } - return super.buildView(identifier); + return new TableAwareViewBuilder(identifier); } @Override @@ -393,7 +387,7 @@ private void renameTableOrView( } catch (InvalidOperationException e) { if (e.getMessage() != null && e.getMessage().contains(String.format("new table %s already exists", to))) { - throwErrorForExistedToContent(from, removeCatalogName(from)); + throwErrorForExistedToContent(from, removeCatalogName(to)); } else { throw new RuntimeException("Failed to rename " + from + " to " + to, e); } @@ -804,4 +798,72 @@ void setListAllTables(boolean listAllTables) { ClientPool clientPool() { return clients; } + + /** + * The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the table is an iceberg + * table or not. 2. During commit, it validates if there is any concurrent commit with table or + * table-name already exists. This class helps to do the validation on an early basis. + */ + private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder { + + private final TableIdentifier identifier; + + private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) { + super(identifier, schema); + this.identifier = identifier; + } + + @Override + public Transaction createOrReplaceTransaction() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.createOrReplaceTransaction(); + } + + @Override + public org.apache.iceberg.Table create() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.create(); + } + } + + /** + * The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the view is an iceberg + * view or not. 2. During commit, it validates if there is any concurrent commit with view or + * view-name already exists. This class helps to do the validation on an early basis. + */ + private class TableAwareViewBuilder extends BaseViewBuilder { + + private final TableIdentifier identifier; + + private TableAwareViewBuilder(TableIdentifier identifier) { + super(identifier); + this.identifier = identifier; + } + + @Override + public View createOrReplace() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.createOrReplace(); + } + + @Override + public View create() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.create(); + } + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 64f091385297..30cebc2f8c2d 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -167,7 +167,7 @@ protected void doRefresh() { refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); } - @SuppressWarnings("checkstyle:CyclomaticComplexity") + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; @@ -191,6 +191,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (newTable && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { + if (tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + throw new AlreadyExistsException( + "View with same name already exists: %s.%s", database, tableName); + } throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index 6988c830f7c1..5d862883d3fb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -18,17 +18,34 @@ */ package org.apache.iceberg.hive; +import static java.util.Collections.emptySet; + +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ClientPool; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.view.BaseViewOperations; import org.apache.iceberg.view.ViewMetadata; import org.apache.thrift.TException; @@ -94,15 +111,177 @@ public void doRefresh() { refreshFromMetadataLocation(metadataLocation); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override public void doCommit(ViewMetadata base, ViewMetadata metadata) { + boolean newView = base == null; String newMetadataLocation = writeNewMetadataIfRequired(metadata); - String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + boolean hiveEngineEnabled = false; + + CommitStatus commitStatus = CommitStatus.FAILURE; + boolean updateHiveTable = false; + + HiveLock lock = lockObject(); + try { + lock.lock(); + + Table tbl = loadHmsTable(); + + if (tbl != null) { + // If we try to create the view but the metadata location is already set, then we had a + // concurrent commit + if (newView + && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + throw new AlreadyExistsException("View already exists: %s.%s", database, viewName); + } + + updateHiveTable = true; + LOG.debug("Committing existing view: {}", fullName); + } else { + tbl = + newHmsTable( + metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new view: {}", fullName); + } + + tbl.setSd( + HiveOperationsBase.storageDescriptor( + metadata.schema(), + metadata.location(), + hiveEngineEnabled)); // set to pick up any schema changes + + String metadataLocation = + tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Cannot commit: Base metadata location '%s' is not same as the current view metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, viewName); + } + + // get Iceberg props that have been removed + Set removedProps = emptySet(); + if (base != null) { + removedProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps); + + lock.ensureActive(); + + try { + persistTable(tbl, updateHiveTable, hiveLockEnabled(conf) ? null : baseMetadataLocation); + lock.ensureActive(); + + commitStatus = CommitStatus.SUCCESS; + } catch (LockException le) { + commitStatus = CommitStatus.UNKNOWN; + throw new CommitStateUnknownException( + "Failed to heartbeat for hive lock while " + + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " + + "Please check the commit history. If you are running into this issue, try reducing " + + "iceberg.hive.lock-heartbeat-interval-ms.", + le); + } catch (AlreadyExistsException e) { + throw new AlreadyExistsException(e, "View already exists: %s.%s", database, viewName); + + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database, viewName); + + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + + } catch (Throwable e) { + if (e.getMessage() + .contains( + "The view has been modified. The parameter value for key '" + + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The view %s.%s has been modified concurrently", database, viewName); + } + + if (e.getMessage() != null + && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { + throw new RuntimeException( + "Failed to acquire locks from metastore because the underlying metastore " + + "view 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " + + "support transactions. To fix this use an alternative metastore.", + e); + } + + LOG.error("Cannot tell if commit to {}.{} succeeded.", database, viewName, e); + throw new CommitStateUnknownException(e); + } + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database, viewName), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } catch (LockException e) { + throw new CommitFailedException(e); + + } finally { + HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock); + } LOG.info( "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); } + private void setHmsTableParameters( + String newMetadataLocation, Table tbl, ViewMetadata metadata, Set obsoleteProps) { + Map parameters = + Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); + + // push all Iceberg view properties into HMS + metadata.properties().entrySet().stream() + .filter(entry -> !entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER)) + .forEach(entry -> parameters.put(entry.getKey(), entry.getValue())); + if (metadata.uuid() != null) { + parameters.put("uuid", metadata.uuid()); + } + + // remove any props from HMS that are no longer present in Iceberg view props + obsoleteProps.forEach(parameters::remove); + + parameters.put( + BaseMetastoreTableOperations.TABLE_TYPE_PROP, + ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation); + + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + parameters.put( + BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + // If needed set the 'storage_handler' property to enable query from Hive + parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); + + setSchema(metadata.schema(), parameters); + tbl.setParameters(parameters); + } + + private static boolean hiveLockEnabled(Configuration conf) { + return conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, true); + } + + @VisibleForTesting + HiveLock lockObject() { + if (hiveLockEnabled(conf)) { + return new MetastoreLock(conf, metaClients, catalogName, database, viewName); + } else { + return new NoLock(); + } + } + @Override protected String viewName() { return fullName; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 9249deb7598e..030d426c94b8 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -1041,7 +1041,7 @@ public void testNotExposeTableProperties() { .doesNotContainKey(CURRENT_SNAPSHOT_ID) .doesNotContainKey(CURRENT_SNAPSHOT_TIMESTAMP); - ops.setSchema(metadata.schema(), parameters); + ops.setSchema(metadata, parameters); assertThat(parameters).doesNotContainKey(CURRENT_SCHEMA); ops.setPartitionSpec(metadata, parameters); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index fb1a7de40bb8..8423e45bfc7f 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -38,6 +38,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; @@ -116,7 +117,7 @@ public void testSuppressUnlockExceptions() { AtomicReference lockRef = new AtomicReference<>(); - when(spyOps.lockObject(metadataV1.properties(), catalog.getConf(), catalog.name())) + when(spyOps.lockObject()) .thenAnswer( i -> { HiveLock lock = (HiveLock) i.callRealMethod(); @@ -169,9 +170,12 @@ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() .isEqualTo(2); } - /** Pretends we throw an error while persisting that actually does commit serverside */ + /** + * Pretends we throw an error while persisting that actually does commit serverside, Since it + * fails on client side. It will delete the corresponding metadata file. + */ @Test - public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException { + public void testThriftExceptionFailureOnCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); @@ -187,19 +191,16 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE // Simulate a communication error after a successful commit commitAndThrowException(ops, spyOps); - // Shouldn't throw because the commit actually succeeds even though persistTable throws an - // exception - spyOps.commit(metadataV2, metadataV1); + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageContaining("Datacenter on fire"); - ops.refresh(); + assertThatThrownBy(ops::refresh).isInstanceOf(NotFoundException.class); - assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(ops.current()).as("Current metadata should have not changed").isEqualTo(metadataV2); assertThat(metadataFileExists(ops.current())) - .as("Current metadata file should still exist") + .as("Previous(V2) metadata file should still exist") .isTrue(); - assertThat(metadataFileCount(ops.current())) - .as("Commit should have been successful and new metadata file should be made") - .isEqualTo(2); } /** @@ -232,41 +233,6 @@ public void testThriftExceptionUnknownFailedCommit() throws TException, Interrup assertThat(metadataFileExists(ops.current())) .as("Current metadata file should still exist") .isTrue(); - assertThat(metadataFileCount(ops.current())) - .as("Client could not determine outcome so new metadata file should also exist") - .isEqualTo(2); - } - - /** - * Pretends we throw an exception while persisting and don't know what happened, can't check to - * find out, but in reality the commit succeeded - */ - @Test - public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException { - HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); - ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); - - view.updateProperties().set("k1", "v1").commit(); - ops.refresh(); - ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); - - HiveViewOperations spyOps = spy(ops); - - commitAndThrowException(ops, spyOps); - breakFallbackCatalogCommitCheck(spyOps); - - assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) - .isInstanceOf(CommitStateUnknownException.class) - .hasMessageStartingWith("Datacenter on fire"); - - ops.refresh(); - - assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); - assertThat(metadataFileExists(ops.current())) - .as("Current metadata file should still exist") - .isTrue(); } /** @@ -293,7 +259,7 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted ViewMetadata metadataV1 = ops.current(); assertThat(ops.current().properties()).hasSize(0); - view.updateProperties().set("k1", "v1").commit(); + view.updateProperties().set("k0", "v0").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); assertThat(ops.current().properties()).hasSize(1); @@ -303,19 +269,21 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted AtomicReference lock = new AtomicReference<>(); doAnswer( l -> { - lock.set(ops.lockObject(metadataV1.properties(), catalog.getConf(), catalog.name())); + lock.set(ops.lockObject()); return lock.get(); }) .when(spyOps) - .lockObject(metadataV1.properties(), catalog.getConf(), catalog.name()); + .lockObject(); concurrentCommitAndThrowException(ops, spyOps, (BaseView) view, lock); /* - This commit and our concurrent commit should succeed even though this commit throws an exception + This commit should fail and concurrent commit should succeed even though this commit throws an exception after the persist operation succeeds */ - spyOps.commit(metadataV2, metadataV1); + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageContaining("Datacenter on fire"); ops.refresh(); @@ -324,8 +292,8 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted .as("Current metadata file should still exist") .isTrue(); assertThat(ops.current().properties()) - .as("The properties addition from the concurrent commit should have been successful") - .hasSize(1); + .as("The new properties from the concurrent commit should have been successful") + .hasSize(2); } @Test @@ -357,7 +325,7 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter HiveViewOperations spyOps = spy(ops); // Sets NoLock - doReturn(new NoLock()).when(spyOps).lockObject(any(), any(), any()); + doReturn(new NoLock()).when(spyOps).lockObject(); // Simulate a concurrent view modification error doThrow( @@ -446,7 +414,7 @@ private void concurrentCommitAndThrowException( // Simulate lock expiration or removal lock.get().unlock(); baseView.operations().refresh(); - baseView.updateProperties().set("k1", "v1").commit(); + baseView.updateProperties().set("k1", "v1").set("k2", "v2").commit(); throw new TException("Datacenter on fire"); }) .when(spyOperations) From 100a9cc5247df47acd4dc487c973b968b571eb0c Mon Sep 17 00:00:00 2001 From: nk1506 Date: Tue, 26 Mar 2024 22:47:33 +0530 Subject: [PATCH 05/17] Addressed comments, removed extra changes --- .../org/apache/iceberg/view/ViewMetadata.java | 16 ----- .../apache/iceberg/view/ViewCatalogTests.java | 2 +- .../org/apache/iceberg/hive/HiveCatalog.java | 15 ++--- .../iceberg/hive/HiveViewOperations.java | 6 +- .../iceberg/hive/TestHiveViewCatalog.java | 63 +++++++++++++------ 5 files changed, 58 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java index 8425ed44f06a..ae837ff96882 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java @@ -85,22 +85,6 @@ default Integer currentSchemaId() { @Nullable String metadataFileLocation(); - default String property(String property, String defaultValue) { - return properties().getOrDefault(property, defaultValue); - } - - default boolean propertyAsBoolean(String property, boolean defaultValue) { - return PropertyUtil.propertyAsBoolean(properties(), property, defaultValue); - } - - default int propertyAsInt(String property, int defaultValue) { - return PropertyUtil.propertyAsInt(properties(), property, defaultValue); - } - - default long propertyAsLong(String property, long defaultValue) { - return PropertyUtil.propertyAsLong(properties(), property, defaultValue); - } - default ViewVersion version(int versionId) { return versionsById().get(versionId); } diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index cf1630f92c7f..b3765bb1eae7 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -59,7 +59,7 @@ public abstract class ViewCatalogTests catalog.renameView(invalidFrom, validTo)) + .isInstanceOf(NoSuchViewException.class) + .hasMessageContaining("Invalid identifier: " + invalidFrom); + assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo)) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("Invalid identifier: " + invalidFrom); + + TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); + TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); + assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid identifier: " + invalidTo); + assertThatThrownBy(() -> catalog.renameTable(validFrom, invalidTo)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid identifier: " + invalidTo); + } + + @Test + public void testHiveViewAndIcebergViewWithSameName() throws TException, IOException { String dbName = "hivedb"; Namespace ns = Namespace.of(dbName); String viewName = "test_hive_view"; @@ -98,13 +123,13 @@ public void testHiveViewAndIcebergViewWithSameName() throws TException { assertThat(catalog.listViews(ns)).isEmpty(); // create a hive table - org.apache.hadoop.hive.metastore.api.Table hiveTable = - createHiveView(viewName, dbName, tempDir.toUri().toString()); + Table hiveTable = + createHiveView( + viewName, dbName, Files.createTempDirectory("hive-view-tests-name").toString()); HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable); catalog.setListAllTables(true); - List tableIdents1 = catalog.listTables(ns); - assertThat(tableIdents1).as("should have one table with type VIRTUAL_VIEW.").hasSize(1); + assertThat(catalog.listTables(ns)).containsExactly(identifier).hasSize(1); assertThat(catalog.viewExists(identifier)).isFalse(); @@ -121,7 +146,7 @@ public void testHiveViewAndIcebergViewWithSameName() throws TException { } @Test - public void testListView() throws TException { + public void testListViewWithHiveView() throws TException, IOException { String dbName = "hivedb"; Namespace ns = Namespace.of(dbName); TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_view"); @@ -135,16 +160,18 @@ public void testListView() throws TException { String hiveViewName = "test_hive_view"; // create a hive table - org.apache.hadoop.hive.metastore.api.Table hiveTable = - createHiveView(hiveViewName, dbName, tempDir.toUri().toString()); + Table hiveTable = + createHiveView( + hiveViewName, dbName, Files.createTempDirectory("hive-view-tests-list").toString()); HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable); catalog.setListAllTables(true); - List tableIdents1 = catalog.listTables(ns); - assertThat(tableIdents1).as("should have one table with type VIRTUAL_VIEW.").hasSize(1); - List tableIdents2 = catalog.listViews(ns); - assertThat(tableIdents2).as("should have zero iceberg view.").hasSize(0); + assertThat(catalog.listTables(ns)) + .containsExactly(TableIdentifier.of(ns, hiveViewName)) + .hasSize(1); + + assertThat(catalog.listViews(ns)).hasSize(0); catalog .buildView(identifier) @@ -154,12 +181,10 @@ public void testListView() throws TException { .create(); assertThat(catalog.viewExists(identifier)).isTrue(); - List tableIdents3 = catalog.listViews(ns); - assertThat(tableIdents3).as("should have one iceberg view .").hasSize(1); + assertThat(catalog.listViews(ns)).containsExactly(identifier).hasSize(1); } - private org.apache.hadoop.hive.metastore.api.Table createHiveView( - String hiveViewName, String dbName, String location) { + private Table createHiveView(String hiveViewName, String dbName, String location) { Map parameters = Maps.newHashMap(); parameters.put( serdeConstants.SERIALIZATION_CLASS, "org.apache.hadoop.hive.serde2.thrift.test.IntString"); @@ -183,8 +208,8 @@ private org.apache.hadoop.hive.metastore.api.Table createHiveView( Lists.newArrayList(), Maps.newHashMap()); - org.apache.hadoop.hive.metastore.api.Table hiveTable = - new org.apache.hadoop.hive.metastore.api.Table( + Table hiveTable = + new Table( hiveViewName, dbName, "test_owner", From 0b81eb2efb13945a41e0a63d510c39f990dcb056 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Wed, 27 Mar 2024 01:10:15 +0530 Subject: [PATCH 06/17] Addressed comments, added commit status check with latest metadata location --- .../iceberg/hive/HiveViewOperations.java | 32 +++++++++++++++++-- .../iceberg/hive/TestHiveViewCommits.java | 17 +++------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index cb24e44ffca8..319ce692a0fb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -218,8 +218,25 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { e); } - LOG.error("Cannot tell if commit to {}.{} succeeded.", database, viewName, e); - throw new CommitStateUnknownException(e); + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + viewName, + e); + commitStatus = + checkCommitStatus( + viewName, + newMetadataLocation, + metadata.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } } } catch (TException e) { throw new RuntimeException( @@ -240,6 +257,17 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation); } + /** + * Validate if the new metadata location is the current metadata location. + * + * @param newMetadataLocation newly written metadata location + * @return true if the new metadata location is the current metadata location + */ + private boolean checkCurrentMetadataLocation(String newMetadataLocation) { + ViewMetadata metadata = refresh(); + return newMetadataLocation.equals(metadata.metadataFileLocation()); + } + private void setHmsTableParameters( String newMetadataLocation, Table tbl, ViewMetadata metadata, Set obsoleteProps) { Map parameters = diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index 8423e45bfc7f..5d5479e1243e 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -38,7 +38,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.CommitStateUnknownException; -import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; @@ -170,12 +169,9 @@ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() .isEqualTo(2); } - /** - * Pretends we throw an error while persisting that actually does commit serverside, Since it - * fails on client side. It will delete the corresponding metadata file. - */ + /** Pretends we throw an error while persisting that actually does commit serverside. */ @Test - public void testThriftExceptionFailureOnCommit() throws TException, InterruptedException { + public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); @@ -190,16 +186,11 @@ public void testThriftExceptionFailureOnCommit() throws TException, InterruptedE // Simulate a communication error after a successful commit commitAndThrowException(ops, spyOps); - - assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) - .isInstanceOf(CommitStateUnknownException.class) - .hasMessageContaining("Datacenter on fire"); - - assertThatThrownBy(ops::refresh).isInstanceOf(NotFoundException.class); + spyOps.commit(metadataV2, metadataV1); assertThat(ops.current()).as("Current metadata should have not changed").isEqualTo(metadataV2); assertThat(metadataFileExists(ops.current())) - .as("Previous(V2) metadata file should still exist") + .as("Current metadata file should still exist") .isTrue(); } From c3cb1628b1ab461d7ff737d69634fb091a7efcb4 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 28 Mar 2024 10:39:56 +0530 Subject: [PATCH 07/17] Addressed comments --- .../apache/iceberg/hive/HiveViewOperations.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index 319ce692a0fb..951b8ccbabf4 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -120,7 +120,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { boolean hiveEngineEnabled = false; CommitStatus commitStatus = CommitStatus.FAILURE; - boolean updateHiveTable = false; + boolean updateHiveView = false; HiveLock lock = lockObject(); try { @@ -134,10 +134,16 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { if (newView && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { - throw new AlreadyExistsException("View already exists: %s.%s", database, viewName); + throw new AlreadyExistsException( + "%s already exists: %s.%s", + tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + ? ContentType.VIEW.value() + : ContentType.TABLE.value(), + database, + viewName); } - updateHiveTable = true; + updateHiveView = true; LOG.debug("Committing existing view: {}", fullName); } else { tbl = @@ -178,7 +184,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { lock.ensureActive(); try { - persistTable(tbl, updateHiveTable, hiveLockEnabled(conf) ? null : baseMetadataLocation); + persistTable(tbl, updateHiveView, hiveLockEnabled(conf) ? null : baseMetadataLocation); lock.ensureActive(); commitStatus = CommitStatus.SUCCESS; @@ -190,7 +196,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { + "Please check the commit history. If you are running into this issue, try reducing " + "iceberg.hive.lock-heartbeat-interval-ms.", le); - } catch (AlreadyExistsException e) { + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException(e, "View already exists: %s.%s", database, viewName); } catch (InvalidObjectException e) { From 798ceae157b545c05b6c9bcd0bf68f44ae66502a Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 28 Mar 2024 20:45:03 +0530 Subject: [PATCH 08/17] Addressed comments --- .../org/apache/iceberg/hive/HiveCatalog.java | 34 +++++++------------ .../apache/iceberg/hive/TestHiveCatalog.java | 22 ++++++++++++ .../iceberg/hive/TestHiveViewCatalog.java | 23 ------------- 3 files changed, 35 insertions(+), 44 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 5d8999af2d8d..3e3965ad55ff 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -338,6 +338,7 @@ private List listIcebergTables( .collect(Collectors.toList()); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private void renameTableOrView( TableIdentifier from, TableIdentifier originalTo, @@ -358,6 +359,16 @@ private void renameTableOrView( "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace()); } + if (tableExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. Table already exists", from, to); + } + + if (viewExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + String toDatabase = to.namespace().level(0); String fromDatabase = from.namespace().level(0); String fromName = from.name(); @@ -388,7 +399,8 @@ private void renameTableOrView( } catch (InvalidOperationException e) { if (e.getMessage() != null && e.getMessage().contains(String.format("new table %s already exists", to))) { - throwErrorForExistedToContent(from, removeCatalogName(to)); + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table already exists: %s", to); } else { throw new RuntimeException("Failed to rename " + from + " to " + to, e); } @@ -413,26 +425,6 @@ private void validateTableIsIcebergTableOrView( } } - private void throwErrorForExistedToContent(TableIdentifier from, TableIdentifier to) { - String toDatabase = to.namespace().level(0); - try { - Table table = clients.run(client -> client.getTable(toDatabase, to.name())); - throw new org.apache.iceberg.exceptions.AlreadyExistsException( - "Cannot rename %s to %s. %s already exists", - from, - to, - table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) - ? HiveOperationsBase.ContentType.VIEW.value() - : HiveOperationsBase.ContentType.TABLE.value()); - } catch (TException e) { - throw new RuntimeException("Failed to load content " + to, e); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted in call to load content", e); - } - } - @Override public void createNamespace(Namespace namespace, Map meta) { Preconditions.checkArgument( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 030d426c94b8..4a9eda5bc624 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -73,6 +73,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -155,6 +156,27 @@ private Schema getTestSchema() { required(2, "data", Types.StringType.get())); } + @Test + public void testInvalidIdentifiersWithRename() { + TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); + TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView"); + assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo)) + .isInstanceOf(NoSuchViewException.class) + .hasMessageContaining("Invalid identifier: " + invalidFrom); + assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo)) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("Invalid identifier: " + invalidFrom); + + TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); + TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); + assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid identifier: " + invalidTo); + assertThatThrownBy(() -> catalog.renameTable(validFrom, invalidTo)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid identifier: " + invalidTo); + } + @Test public void testCreateTableBuilder() throws Exception { Schema schema = getTestSchema(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index b08c3b24d581..40a4b86ce491 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -36,8 +36,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchIcebergViewException; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -89,27 +87,6 @@ protected boolean requiresNamespaceCreate() { return true; } - @Test - public void testInvalidIdentifiersWithRename() { - TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); - TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView"); - assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo)) - .isInstanceOf(NoSuchViewException.class) - .hasMessageContaining("Invalid identifier: " + invalidFrom); - assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo)) - .isInstanceOf(NoSuchTableException.class) - .hasMessageContaining("Invalid identifier: " + invalidFrom); - - TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); - TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); - assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Invalid identifier: " + invalidTo); - assertThatThrownBy(() -> catalog.renameTable(validFrom, invalidTo)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Invalid identifier: " + invalidTo); - } - @Test public void testHiveViewAndIcebergViewWithSameName() throws TException, IOException { String dbName = "hivedb"; From 756c64130b12869f43c24b264d1ff7592a3aa19c Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 11 Apr 2024 10:20:08 +0530 Subject: [PATCH 09/17] Added ViewSQLText to HMS Parameters. --- .../iceberg/hive/HiveViewOperations.java | 50 ++++++++++++++++--- .../iceberg/hive/TestHiveViewCatalog.java | 27 ++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index 951b8ccbabf4..e9d3374f26e2 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -20,6 +20,7 @@ import static java.util.Collections.emptySet; +import java.util.Collections; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -45,10 +46,13 @@ import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.SQLViewRepresentation; import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewRepresentation; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,12 +150,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { updateHiveView = true; LOG.debug("Committing existing view: {}", fullName); } else { - tbl = - newHmsTable( - PropertyUtil.propertyAsString( - metadata.properties(), - HiveCatalog.HMS_TABLE_OWNER, - HiveHadoopUtil.currentUser())); + tbl = newHMSView(metadata); LOG.debug("Committing new view: {}", fullName); } @@ -311,6 +310,45 @@ private static boolean hiveLockEnabled(Configuration conf) { return conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, true); } + private Table newHMSView(ViewMetadata metadata) { + final long currentTimeMillis = System.currentTimeMillis(); + String hmsTableOwner = + PropertyUtil.propertyAsString( + metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()); + Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be null"); + String sqlQuery = sqlFor(metadata); + + return new Table( + table(), + database(), + hmsTableOwner, + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + null, + Collections.emptyList(), + Maps.newHashMap(), + sqlQuery, + sqlQuery, + tableType().name()); + } + + private String sqlFor(ViewMetadata metadata) { + SQLViewRepresentation closest = null; + for (ViewRepresentation representation : metadata.currentVersion().representations()) { + if (representation instanceof SQLViewRepresentation) { + SQLViewRepresentation sqlViewRepresentation = (SQLViewRepresentation) representation; + if (sqlViewRepresentation.dialect().equalsIgnoreCase("hive")) { + return sqlViewRepresentation.sql(); + } else if (closest == null) { + closest = sqlViewRepresentation; + } + } + } + + return closest == null ? null : closest.sql(); + } + @VisibleForTesting HiveLock lockObject() { if (hiveLockEnabled(conf)) { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index 40a4b86ce491..de899b63106a 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -161,6 +161,33 @@ public void testListViewWithHiveView() throws TException, IOException { assertThat(catalog.listViews(ns)).containsExactly(identifier).hasSize(1); } + @Test + public void testViewWithHiveParameters() throws TException, IOException { + String dbName = "hivedb"; + Namespace ns = Namespace.of(dbName); + TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_view"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + assertThat(catalog.viewExists(identifier)).isFalse(); + String tableQuery = "select * from hivedb.tbl"; + + catalog + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", tableQuery) + .create(); + assertThat(catalog.viewExists(identifier)).isTrue(); + + Table hiveTable = + HIVE_METASTORE_EXTENSION.metastoreClient().getTable(dbName, identifier.name()); + assertThat(hiveTable.getViewOriginalText()).isEqualTo(tableQuery); + assertThat(hiveTable.getViewExpandedText()).isEqualTo(tableQuery); + } + private Table createHiveView(String hiveViewName, String dbName, String location) { Map parameters = Maps.newHashMap(); parameters.put( From 8abfc30d007b0476d3fa4487d35ec2c6cae902bb Mon Sep 17 00:00:00 2001 From: nk1506 Date: Mon, 15 Apr 2024 23:58:26 +0530 Subject: [PATCH 10/17] Added tests for InvalidIdentifier with Table and View separately --- .../org/apache/iceberg/hive/TestHiveCatalog.java | 15 ++++----------- .../apache/iceberg/hive/TestHiveViewCatalog.java | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 4a9eda5bc624..0569887a964e 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -73,7 +73,6 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -158,20 +157,14 @@ private Schema getTestSchema() { @Test public void testInvalidIdentifiersWithRename() { - TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); - TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView"); - assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo)) - .isInstanceOf(NoSuchViewException.class) - .hasMessageContaining("Invalid identifier: " + invalidFrom); + TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "table1"); + TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedTable"); assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo)) .isInstanceOf(NoSuchTableException.class) .hasMessageContaining("Invalid identifier: " + invalidFrom); - TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); - TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); - assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Invalid identifier: " + invalidTo); + TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "table1"); + TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "renamedTable"); assertThatThrownBy(() -> catalog.renameTable(validFrom, invalidTo)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Invalid identifier: " + invalidTo); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index de899b63106a..8ce462331547 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -36,6 +36,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -188,6 +189,21 @@ public void testViewWithHiveParameters() throws TException, IOException { assertThat(hiveTable.getViewExpandedText()).isEqualTo(tableQuery); } + @Test + public void testInvalidIdentifiersWithRename() { + TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); + TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView"); + assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo)) + .isInstanceOf(NoSuchViewException.class) + .hasMessageContaining("Invalid identifier: " + invalidFrom); + + TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); + TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "renamedView"); + assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid identifier: " + invalidTo); + } + private Table createHiveView(String hiveViewName, String dbName, String location) { Map parameters = Maps.newHashMap(); parameters.put( From 6c03cd2e355b32ca344fc297bf00a8b15f5ae50e Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 25 Apr 2024 15:44:58 +0530 Subject: [PATCH 11/17] Addressed comments --- .../org/apache/iceberg/hive/HiveCatalog.java | 23 ++++++++----------- .../iceberg/hive/HiveOperationsBase.java | 5 ++-- .../iceberg/hive/HiveTableOperations.java | 2 +- .../iceberg/hive/HiveViewOperations.java | 22 +++++++----------- .../iceberg/hive/TestHiveViewCommits.java | 21 ++++++++++------- 5 files changed, 33 insertions(+), 40 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 3e3965ad55ff..9cdae0f042a3 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -184,16 +184,16 @@ public List listViews(Namespace namespace) { try { String database = namespace.level(0); - List tableNames = + List viewNames = clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW)); // Retrieving the Table objects from HMS in batches to avoid OOM List filteredTableIdentifiers = Lists.newArrayList(); - Iterable> tableNameSets = Iterables.partition(tableNames, 100); + Iterable> viewNameSets = Iterables.partition(viewNames, 100); - for (List tableNameSet : tableNameSets) { + for (List viewNameSet : viewNameSets) { filteredTableIdentifiers.addAll( - listIcebergTables(tableNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)); + listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)); } return filteredTableIdentifiers; @@ -275,13 +275,9 @@ public boolean dropView(TableIdentifier identifier) { try { String database = identifier.namespace().level(0); String viewName = identifier.name(); - Table table = clients.run(client -> client.getTable(database, viewName)); - HiveOperationsBase.validateTableIsIcebergView( - table, CatalogUtil.fullTableName(name, identifier)); HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier); ViewMetadata lastViewMetadata = null; - try { lastViewMetadata = ops.current(); } catch (NotFoundException e) { @@ -290,22 +286,22 @@ public boolean dropView(TableIdentifier identifier) { clients.run( client -> { - client.dropTable(database, viewName); + client.dropTable(database, viewName, false, false); return null; }); if (lastViewMetadata != null) { CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); + LOG.info("Dropped view: {}", identifier); + return true; } - LOG.info("Dropped view: {}", identifier); - return true; - + return false; } catch (NoSuchObjectException e) { LOG.info("Skipping drop, view does not exist: {}", identifier, e); return false; } catch (TException e) { - throw new RuntimeException("Failed to drop " + identifier, e); + throw new RuntimeException("Failed to drop view " + identifier, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted in call to dropView", e); @@ -318,7 +314,6 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { } @Override - @SuppressWarnings("FormatStringAnnotation") public void renameView(TableIdentifier from, TableIdentifier to) { renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java index 6b39d71d1e04..4c78c43096fe 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java @@ -134,9 +134,8 @@ static void validateTableIsIceberg(Table table, String fullName) { static void validateTableIsIcebergView(Table table, String fullName) { String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); NoSuchIcebergViewException.check( - table.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) - && tableTypeProp != null - && tableTypeProp.equalsIgnoreCase(ICEBERG_VIEW_TYPE_VALUE), + TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) + && ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp), "Not an iceberg view: %s (type=%s) (tableType=%s)", fullName, tableTypeProp, diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 30cebc2f8c2d..518daaf6acd1 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -191,7 +191,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (newTable && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { - if (tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name())) { + if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) { throw new AlreadyExistsException( "View with same name already exists: %s.%s", database, tableName); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index e9d3374f26e2..4f2a4a20d926 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ClientPool; @@ -46,7 +45,6 @@ import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.view.BaseViewOperations; @@ -76,13 +74,12 @@ final class HiveViewOperations extends BaseViewOperations implements HiveOperati FileIO fileIO, String catalogName, TableIdentifier viewIdentifier) { - String dbName = viewIdentifier.namespace().level(0); this.conf = conf; this.catalogName = catalogName; this.metaClients = metaClients; this.fileIO = fileIO; this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier); - this.database = dbName; + this.database = viewIdentifier.namespace().level(0); this.viewName = viewIdentifier.name(); this.maxHiveTablePropertySize = conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); @@ -140,7 +137,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { != null) { throw new AlreadyExistsException( "%s already exists: %s.%s", - tbl.getTableType().equalsIgnoreCase(TableType.VIRTUAL_VIEW.name()) + TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType()) ? ContentType.VIEW.value() : ContentType.TABLE.value(), database, @@ -205,11 +202,12 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { throw e; } catch (Throwable e) { - if (e.getMessage() - .contains( - "The view has been modified. The parameter value for key '" - + BaseMetastoreTableOperations.METADATA_LOCATION_PROP - + "' is")) { + if (e.getMessage() != null + && e.getMessage() + .contains( + "The view has been modified. The parameter value for key '" + + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + + "' is")) { throw new CommitFailedException( e, "The view %s.%s has been modified concurrently", database, viewName); } @@ -299,9 +297,6 @@ private void setHmsTableParameters( BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); } - // If needed set the 'storage_handler' property to enable query from Hive - parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); - setSchema(metadata.schema(), parameters); tbl.setParameters(parameters); } @@ -315,7 +310,6 @@ private Table newHMSView(ViewMetadata metadata) { String hmsTableOwner = PropertyUtil.propertyAsString( metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()); - Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be null"); String sqlQuery = sqlFor(metadata); return new Table( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index 5d5479e1243e..dbc199a10919 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -30,8 +30,10 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Schema; @@ -69,6 +71,7 @@ public class TestHiveViewCommits { HiveMetastoreExtension.builder().withDatabase(DB_NAME).build(); private View view; + private Path viewLocation; private static HiveCatalog catalog; @@ -94,10 +97,12 @@ public void createTestView() { .withDefaultNamespace(ns) .withQuery("hive", "select * from ns.tbl") .create(); + viewLocation = new Path(view.location()); } @AfterEach - public void dropTestView() { + public void dropTestView() throws IOException { + viewLocation.getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf()).delete(viewLocation, true); catalog.dropView(VIEW_IDENTIFIER); } @@ -110,7 +115,7 @@ public void testSuppressUnlockExceptions() { view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -152,7 +157,7 @@ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -179,7 +184,7 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE view.updateProperties().set("k1", "v1").commit(); ops.refresh(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); ViewMetadata metadataV2 = ops.current(); HiveViewOperations spyOps = spy(ops); @@ -207,7 +212,7 @@ public void testThriftExceptionUnknownFailedCommit() throws TException, Interrup view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -253,7 +258,7 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted view.updateProperties().set("k0", "v0").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k0", "v0"); HiveViewOperations spyOps = spy(ops); @@ -311,7 +316,7 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -343,7 +348,7 @@ public void testLockExceptionUnknownSuccessCommit() throws TException, Interrupt view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1); + assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); From dc5135be52c31f88e7b1ca88354f939dc7cb750c Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 25 Apr 2024 16:58:11 +0530 Subject: [PATCH 12/17] Remove purge with dropView --- .../org/apache/iceberg/hive/HiveCatalog.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 9cdae0f042a3..8bfdeb4fbebb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -284,19 +284,17 @@ public boolean dropView(TableIdentifier identifier) { LOG.warn("Failed to load view metadata for view: {}", identifier, e); } - clients.run( - client -> { - client.dropTable(database, viewName, false, false); - return null; - }); - - if (lastViewMetadata != null) { - CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); - LOG.info("Dropped view: {}", identifier); + if (lastViewMetadata == null) { + return false; + } else { + clients.run( + client -> { + client.dropTable(database, viewName, false, false); + return null; + }); return true; } - return false; } catch (NoSuchObjectException e) { LOG.info("Skipping drop, view does not exist: {}", identifier, e); return false; From 2449647b6499804902a3d4666d84072d3155be53 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 25 Apr 2024 17:13:07 +0530 Subject: [PATCH 13/17] Addressed comments --- .../org/apache/iceberg/hive/HiveCatalog.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 8bfdeb4fbebb..aefcf1307b09 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -284,17 +284,18 @@ public boolean dropView(TableIdentifier identifier) { LOG.warn("Failed to load view metadata for view: {}", identifier, e); } - if (lastViewMetadata == null) { - return false; - } else { - clients.run( - client -> { - client.dropTable(database, viewName, false, false); - return null; - }); - return true; + clients.run( + client -> { + client.dropTable(database, viewName, false, false); + return null; + }); + + if (lastViewMetadata != null) { + CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); } + LOG.info("Dropped view: {}", identifier); + return true; } catch (NoSuchObjectException e) { LOG.info("Skipping drop, view does not exist: {}", identifier, e); return false; From d85fb13692cb6e5008311a024d9c6e77bcd24f9b Mon Sep 17 00:00:00 2001 From: nk1506 Date: Fri, 13 Sep 2024 12:51:04 +0530 Subject: [PATCH 14/17] Addressed comments: added gc_enabled flag check before dropping view metadata file. --- .../java/org/apache/iceberg/CatalogUtil.java | 7 +- .../org/apache/iceberg/hive/HiveCatalog.java | 9 +-- .../apache/iceberg/hive/TestHiveCatalog.java | 2 +- .../iceberg/hive/TestHiveViewCatalog.java | 80 ++++++++++++++++++- 4 files changed, 86 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 3f17e8dd5f4e..70b10cbaeb62 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -147,7 +147,12 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { * @param metadata the last valid ViewMetadata instance for a dropped view. */ public static void dropViewMetadata(FileIO io, ViewMetadata metadata) { - deleteFile(io, metadata.metadataFileLocation(), "metadata"); + boolean gcEnabled = + PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT); + + if (gcEnabled) { + deleteFile(io, metadata.metadataFileLocation(), "metadata"); + } } @SuppressWarnings("DangerousStringInternUsage") diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index aefcf1307b09..1cf738d736cb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -337,14 +337,7 @@ private void renameTableOrView( TableIdentifier from, TableIdentifier originalTo, HiveOperationsBase.ContentType contentType) { - if (!isValidIdentifier(from)) { - switch (contentType) { - case TABLE: - throw new NoSuchTableException("Invalid identifier: %s", from); - case VIEW: - throw new NoSuchViewException("Invalid identifier: %s", from); - } - } + Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from); TableIdentifier to = removeCatalogName(originalTo); Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 0569887a964e..43e2d33317ac 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -160,7 +160,7 @@ public void testInvalidIdentifiersWithRename() { TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "table1"); TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedTable"); assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo)) - .isInstanceOf(NoSuchTableException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Invalid identifier: " + invalidFrom); TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "table1"); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java index 8ce462331547..3c195e256520 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java @@ -25,6 +25,7 @@ import java.nio.file.Files; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -32,14 +33,15 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchIcebergViewException; -import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.ViewCatalogTests; import org.apache.thrift.TException; import org.junit.jupiter.api.AfterEach; @@ -194,7 +196,7 @@ public void testInvalidIdentifiersWithRename() { TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view"); TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView"); assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo)) - .isInstanceOf(NoSuchViewException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Invalid identifier: " + invalidFrom); TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view"); @@ -204,6 +206,80 @@ public void testInvalidIdentifiersWithRename() { .hasMessageContaining("Invalid identifier: " + invalidTo); } + @Test + public void dropViewShouldNotDropMetadataFileIfGcNotEnabled() throws IOException { + String dbName = "hivedb"; + Namespace ns = Namespace.of(dbName); + TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_drop_view_gc_disabled"); + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + BaseView view = + (BaseView) + catalog + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from hivedb.tbl") + .withProperty(TableProperties.GC_ENABLED, "false") + .create(); + + assertThat(catalog.viewExists(identifier)).isTrue(); + + Path viewLocation = new Path(view.location()); + String currentMetadataLocation = view.operations().current().metadataFileLocation(); + + catalog.dropView(identifier); + + assertThat( + viewLocation + .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf()) + .exists(new Path(currentMetadataLocation))) + .isTrue(); + assertThat(catalog.viewExists(identifier)).isFalse(); + } + + @Test + public void dropViewShouldDropMetadataFileIfGcEnabled() throws IOException { + String dbName = "hivedb"; + Namespace ns = Namespace.of(dbName); + TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_drop_view_gc_enabled"); + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + BaseView view = + (BaseView) + catalog + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("hive", "select * from hivedb.tbl") + .withProperty(TableProperties.GC_ENABLED, "true") + .create(); + + assertThat(catalog.viewExists(identifier)).isTrue(); + + Path viewLocation = new Path(view.location()); + String currentMetadataLocation = view.operations().current().metadataFileLocation(); + + assertThat( + viewLocation + .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf()) + .exists(new Path(currentMetadataLocation))) + .isTrue(); + + catalog.dropView(identifier); + + assertThat( + viewLocation + .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf()) + .exists(new Path(currentMetadataLocation))) + .isFalse(); + assertThat(catalog.viewExists(identifier)).isFalse(); + } + private Table createHiveView(String hiveViewName, String dbName, String location) { Map parameters = Maps.newHashMap(); parameters.put( From da764a594cd4fdd8b622b0d49ef8d20b4ec7d10c Mon Sep 17 00:00:00 2001 From: nk1506 Date: Fri, 13 Sep 2024 13:41:05 +0530 Subject: [PATCH 15/17] build fix --- .../java/org/apache/iceberg/hive/TestHiveCatalog.java | 2 +- .../java/org/apache/iceberg/hive/TestHiveViewCommits.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 43e2d33317ac..7d0eb641a385 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -1056,7 +1056,7 @@ public void testNotExposeTableProperties() { .doesNotContainKey(CURRENT_SNAPSHOT_ID) .doesNotContainKey(CURRENT_SNAPSHOT_TIMESTAMP); - ops.setSchema(metadata, parameters); + ops.setSchema(metadata.schema(), parameters); assertThat(parameters).doesNotContainKey(CURRENT_SCHEMA); ops.setPartitionSpec(metadata, parameters); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index dbc199a10919..6912dea3235e 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -58,13 +58,13 @@ public class TestHiveViewCommits { private static final String VIEW_NAME = "test_iceberg_view"; private static final String DB_NAME = "hivedb"; - private static final Namespace ns = Namespace.of(DB_NAME); + private static final Namespace NS = Namespace.of(DB_NAME); private static final Schema SCHEMA = new Schema( 5, required(3, "id", Types.IntegerType.get(), "unique ID"), required(4, "data", Types.StringType.get())); - private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(ns, VIEW_NAME); + private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(NS, VIEW_NAME); @RegisterExtension protected static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = @@ -94,7 +94,7 @@ public void createTestView() { catalog .buildView(VIEW_IDENTIFIER) .withSchema(SCHEMA) - .withDefaultNamespace(ns) + .withDefaultNamespace(NS) .withQuery("hive", "select * from ns.tbl") .create(); viewLocation = new Path(view.location()); @@ -300,7 +300,7 @@ public void testInvalidObjectException() { catalog .buildView(badTi) .withSchema(SCHEMA) - .withDefaultNamespace(ns) + .withDefaultNamespace(NS) .withQuery("hive", "select * from ns.tbl") .create()) .isInstanceOf(ValidationException.class) From b8c0446604949e4f0f9b288a19591f7151672104 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Fri, 13 Sep 2024 15:41:44 +0530 Subject: [PATCH 16/17] Addressed comments: Fix incorrect tests from TestHiveViewCommits. --- .../iceberg/hive/HiveViewOperations.java | 2 +- .../iceberg/hive/TestHiveViewCommits.java | 90 ++++++++++++------- 2 files changed, 61 insertions(+), 31 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java index 4f2a4a20d926..4fc71299d457 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java @@ -205,7 +205,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { if (e.getMessage() != null && e.getMessage() .contains( - "The view has been modified. The parameter value for key '" + "The table has been modified. The parameter value for key '" + BaseMetastoreTableOperations.METADATA_LOCATION_PROP + "' is")) { throw new CommitFailedException( diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index 6912dea3235e..d5de06dd5492 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -39,6 +39,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -110,12 +111,12 @@ public void dropTestView() throws IOException { public void testSuppressUnlockExceptions() { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -140,7 +141,7 @@ public void testSuppressUnlockExceptions() { ops.refresh(); // the commit must succeed - assertThat(ops.current().properties()).hasSize(0); + assertThat(ops.current().properties()).hasSize(0).isEqualTo(metadataV1.properties()); } /** @@ -152,22 +153,27 @@ public void testSuppressUnlockExceptions() { public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); - assertThat(ops.current().properties()).hasSize(0); + ViewMetadata metadataV1 = ops.current(); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); failCommitAndThrowException(spyOps); + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageStartingWith("Datacenter on fire"); + ops.refresh(); assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); - assertThat(metadataFileCount(ops.current())) + assertThat(metadataFileCount(metadataV2)) .as( "New metadata files should still exist, new location not in history but" + " the commit may still succeed") @@ -180,12 +186,12 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); ViewMetadata metadataV2 = ops.current(); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -194,7 +200,7 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE spyOps.commit(metadataV2, metadataV1); assertThat(ops.current()).as("Current metadata should have not changed").isEqualTo(metadataV2); - assertThat(metadataFileExists(ops.current())) + assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); } @@ -207,12 +213,12 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE public void testThriftExceptionUnknownFailedCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -226,7 +232,7 @@ public void testThriftExceptionUnknownFailedCommit() throws TException, Interrup ops.refresh(); assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); - assertThat(metadataFileExists(ops.current())) + assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); } @@ -253,12 +259,12 @@ public void testThriftExceptionUnknownFailedCommit() throws TException, Interrup public void testThriftExceptionConcurrentCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k0", "v0").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k0", "v0"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k0", "v0"); HiveViewOperations spyOps = spy(ops); @@ -273,21 +279,21 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted concurrentCommitAndThrowException(ops, spyOps, (BaseView) view, lock); - /* - This commit should fail and concurrent commit should succeed even though this commit throws an exception - after the persist operation succeeds - */ + // This commit should fail and concurrent commit should succeed even though this commit + // throws an exception after the persist operation succeeds + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) .isInstanceOf(CommitStateUnknownException.class) .hasMessageContaining("Datacenter on fire"); ops.refresh(); + ViewMetadata metadataV3 = ops.current(); - assertThat(ops.current()).as("Current metadata should have changed").isNotEqualTo(metadataV2); - assertThat(metadataFileExists(ops.current())) + assertThat(metadataV3).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(metadataFileExists(metadataV3)) .as("Current metadata file should still exist") .isTrue(); - assertThat(ops.current().properties()) + assertThat(metadataV3.properties()) .as("The new properties from the concurrent commit should have been successful") .hasSize(2); } @@ -304,19 +310,20 @@ public void testInvalidObjectException() { .withQuery("hive", "select * from ns.tbl") .create()) .isInstanceOf(ValidationException.class) - .hasMessage(String.format("Invalid Hive object for %s.%s", DB_NAME, "`test_iceberg_view`")); + .hasMessage("Invalid Hive object for " + DB_NAME + "." + "`test_iceberg_view`"); } /** Uses NoLock and pretends we throw an error because of a concurrent commit */ @Test public void testNoLockThriftExceptionConcurrentCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); - assertThat(ops.current().properties()).hasSize(0); + ViewMetadata metadataV1 = ops.current(); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -330,25 +337,30 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter .when(spyOps) .persistTable(any(), anyBoolean(), any()); + // Should throw a CommitFailedException so the commit could be retried + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("The view hivedb.test_iceberg_view has been modified concurrently"); + ops.refresh(); assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2); assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue(); - assertThat(metadataFileCount(ops.current())) + assertThat(metadataFileCount(metadataV2)) .as("New metadata files should not exist") - .isEqualTo(2); + .isEqualTo(1); } @Test public void testLockExceptionUnknownSuccessCommit() throws TException, InterruptedException { HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); ViewMetadata metadataV1 = ops.current(); - assertThat(ops.current().properties()).hasSize(0); + assertThat(metadataV1.properties()).hasSize(0); view.updateProperties().set("k1", "v1").commit(); ops.refresh(); ViewMetadata metadataV2 = ops.current(); - assertThat(ops.current().properties()).hasSize(1).containsEntry("k1", "v1"); + assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1"); HiveViewOperations spyOps = spy(ops); @@ -370,14 +382,32 @@ public void testLockExceptionUnknownSuccessCommit() throws TException, Interrupt ops.refresh(); - assertThat(ops.current().location()) + assertThat(metadataV2.location()) .as("Current metadata should have changed to metadata V1") .isEqualTo(metadataV1.location()); - assertThat(metadataFileExists(ops.current())) + assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); } + @Test + public void testCommitExceptionWithoutMessage() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(metadataV1.properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + + ops.refresh(); + + HiveViewOperations spyOps = spy(ops); + doThrow(new RuntimeException()).when(spyOps).persistTable(any(), anyBoolean(), any()); + + assertThatThrownBy(() -> spyOps.commit(ops.current(), metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageStartingWith("null\nCannot determine whether the commit was successful or not"); + } + private void commitAndThrowException( HiveViewOperations realOperations, HiveViewOperations spyOperations) throws TException, InterruptedException { From 2b88e13e970b503c3d9a6c3f283be9b1dc8ffcc7 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Fri, 13 Sep 2024 18:47:44 +0530 Subject: [PATCH 17/17] Addressed comments: added metadata file counts for tests of TestHiveViewCommits --- .../iceberg/hive/TestHiveViewCommits.java | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java index d5de06dd5492..47abb51602fa 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java @@ -203,6 +203,9 @@ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedE assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); + assertThat(metadataFileCount(metadataV2)) + .as("Commit should have been successful and new metadata file should be made") + .isEqualTo(2); } /** @@ -235,6 +238,47 @@ public void testThriftExceptionUnknownFailedCommit() throws TException, Interrup assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); + assertThat(metadataFileCount(metadataV2)) + .as("Client could not determine outcome so new metadata file should also exist") + .isEqualTo(2); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to + * find out, but in reality the commit succeeded + */ + @Test + public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException { + HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations(); + ViewMetadata metadataV1 = ops.current(); + assertThat(metadataV1.properties()).hasSize(0); + + view.updateProperties().set("k1", "v1").commit(); + ops.refresh(); + + ViewMetadata metadataV2 = ops.current(); + + assertThat(metadataV2.properties()).hasSize(1); + + HiveViewOperations spyOps = spy(ops); + + commitAndThrowException(ops, spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessageStartingWith("Datacenter on fire"); + + ops.refresh(); + ViewMetadata metadataV3 = ops.current(); + + assertThat(metadataV3).as("Current metadata should have changed").isNotEqualTo(metadataV2); + assertThat(metadataFileExists(metadataV3)) + .as("Current metadata file should still exist") + .isTrue(); + assertThat(metadataFileCount(metadataV3)) + .as("Commit should have been successful with updated properties at metadataV2") + .isEqualTo(2); } /** @@ -281,7 +325,6 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted // This commit should fail and concurrent commit should succeed even though this commit // throws an exception after the persist operation succeeds - assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1)) .isInstanceOf(CommitStateUnknownException.class) .hasMessageContaining("Datacenter on fire"); @@ -388,6 +431,7 @@ public void testLockExceptionUnknownSuccessCommit() throws TException, Interrupt assertThat(metadataFileExists(metadataV2)) .as("Current metadata file should still exist") .isTrue(); + assertThat(metadataFileCount(metadataV2)).as("New metadata file should exist").isEqualTo(2); } @Test