diff --git a/core/trino-server/src/main/provisio/presto.xml b/core/trino-server/src/main/provisio/presto.xml
index 48190e7b21a7..02575a105ae4 100644
--- a/core/trino-server/src/main/provisio/presto.xml
+++ b/core/trino-server/src/main/provisio/presto.xml
@@ -74,6 +74,12 @@
+
+
+
+
+
+
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
index 5546808b2bdb..37f33bc6cfaa 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
@@ -28,7 +28,6 @@
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.acid.AcidTransaction;
-import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
@@ -100,8 +99,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.partitionMatches;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
@@ -114,11 +111,11 @@
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
-import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormat;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeys;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
@@ -942,28 +939,6 @@ private static List getTargetPathsFromSymlink(FileSystem fileSystem, Path
}
}
- private static List getPartitionKeys(Table table, Optional partition)
- {
- if (partition.isEmpty()) {
- return ImmutableList.of();
- }
- ImmutableList.Builder partitionKeys = ImmutableList.builder();
- List keys = table.getPartitionColumns();
- List values = partition.get().getValues();
- checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
- for (int i = 0; i < keys.size(); i++) {
- String name = keys.get(i).getName();
- HiveType hiveType = keys.get(i).getType();
- if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
- throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
- }
- String value = values.get(i);
- checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
- partitionKeys.add(new HivePartitionKey(name, value));
- }
- return partitionKeys.build();
- }
-
private static Properties getPartitionSchema(Table table, Optional partition)
{
if (partition.isEmpty()) {
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index d72571d27363..d57d4a3909cf 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -290,9 +290,9 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
Optional readerProjections = projectBaseColumns(columns);
List baseColumns = readerProjections.map(projection ->
- projection.get().stream()
- .map(HiveColumnHandle.class::cast)
- .collect(toUnmodifiableList()))
+ projection.get().stream()
+ .map(HiveColumnHandle.class::cast)
+ .collect(toUnmodifiableList()))
.orElse(columns);
for (HiveColumnHandle column : baseColumns) {
@@ -365,7 +365,7 @@ public static Optional getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}
- private static Optional getColumnIndexStore(
+ public static Optional getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map, RichColumnDescriptor> descriptorsByPath,
@@ -440,7 +440,7 @@ public static TupleDomain getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.build());
}
- private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
+ public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
index c6d398402f0b..100809bdef90 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
@@ -32,6 +32,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.avro.TrinoAvroSerDe;
import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SortingColumn;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
@@ -1143,4 +1144,26 @@ public static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME));
}
+
+ public static List getPartitionKeys(Table table, Optional partition)
+ {
+ if (partition.isEmpty()) {
+ return ImmutableList.of();
+ }
+ ImmutableList.Builder partitionKeys = ImmutableList.builder();
+ List keys = table.getPartitionColumns();
+ List values = partition.get().getValues();
+ checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
+ for (int i = 0; i < keys.size(); i++) {
+ String name = keys.get(i).getName();
+ HiveType hiveType = keys.get(i).getType();
+ if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
+ throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
+ }
+ String value = values.get(i);
+ checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
+ partitionKeys.add(new HivePartitionKey(name, value));
+ }
+ return partitionKeys.build();
+ }
}
diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml
new file mode 100644
index 000000000000..3e55fcdf2de8
--- /dev/null
+++ b/plugin/trino-hudi/pom.xml
@@ -0,0 +1,425 @@
+
+
+ 4.0.0
+
+
+ trino-root
+ io.trino
+ 368-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-hudi
+ Trino - Hudi Connector
+ trino-plugin
+
+
+ ${project.parent.basedir}
+ 0.11.0-SNAPSHOT
+
+
+
+
+ io.trino
+ trino-hive
+
+
+ io.trino
+ trino-memory-context
+
+
+ io.trino
+ trino-parquet
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+ io.trino.hadoop
+ hadoop-apache
+
+
+ io.trino.hive
+ hive-apache
+
+
+ io.airlift
+ bootstrap
+
+
+ io.airlift
+ configuration
+
+
+ io.airlift
+ event
+
+
+ io.airlift
+ json
+
+
+ io.airlift
+ log
+
+
+ io.airlift
+ units
+
+
+ com.google.guava
+ guava
+
+
+ com.google.inject
+ guice
+
+
+ javax.inject
+ javax.inject
+
+
+ javax.validation
+ validation-api
+
+
+ joda-time
+ joda-time
+
+
+ org.apache.hbase
+ hbase-common
+ 2.4.9
+
+
+ commons-codec
+ commons-codec
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.javassist
+ javassist
+
+
+
+
+ org.apache.hbase
+ hbase-server
+ 2.4.9
+
+
+ commons-codec
+ commons-codec
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+ org.apache.hbase.thirdparty
+ hbase-shaded-jetty
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-distcp
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hbase
+ hbase-protocol
+
+
+ org.apache.hbase.thirdparty
+ hbase-shaded-protobuf
+
+
+ org.glassfish.hk2.external
+ jakarta.inject
+
+
+ org.javassist
+ javassist
+
+
+ org.jruby.jcodings
+ jcodings
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.slf4j
+ log4j-over-slf4j
+ 1.7.32
+
+
+ org.weakref
+ jmxutils
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+ org.apache.hbase
+ hbase-hadoop-compat
+ 2.4.9
+ runtime
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+ io.airlift
+ slice
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+
+ io.trino
+ trino-hive
+ test-jar
+ test
+
+
+ io.trino
+ trino-hive-hadoop2
+ test
+
+
+ io.trino
+ trino-main
+ test
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+ io.trino
+ trino-parser
+ test
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+ io.trino
+ trino-testing
+ test
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+ io.airlift
+ testing
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+ org.testng
+ testng
+ test
+
+
+
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
new file mode 100644
index 000000000000..7b10e20d7fa2
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.DataSize;
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import javax.validation.constraints.NotNull;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class HudiConfig
+{
+ private HoodieFileFormat fileFormat = PARQUET;
+ private boolean metadataEnabled;
+ private boolean shouldSkipMetaStoreForPartition = true;
+ private DataSize maxSplitSize = DataSize.ofBytes(128 * 1024 * 1024);
+
+ @NotNull
+ public HoodieFileFormat getFileFormat()
+ {
+ return HoodieFileFormat.valueOf(fileFormat.name());
+ }
+
+ @Config("hudi.file-format")
+ public HudiConfig setFileFormat(HoodieFileFormat fileFormat)
+ {
+ this.fileFormat = fileFormat;
+ return this;
+ }
+
+ @Config("hudi.metadata-enabled")
+ @ConfigDescription("Fetch the list of file names and sizes from metadata rather than storage")
+ public HudiConfig setMetadataEnabled(boolean metadataEnabled)
+ {
+ this.metadataEnabled = metadataEnabled;
+ return this;
+ }
+
+ @NotNull
+ public boolean isMetadataEnabled()
+ {
+ return this.metadataEnabled;
+ }
+
+ @Config("hudi.max-split-size")
+ public HudiConfig setMaxSplitSize(DataSize size)
+ {
+ this.maxSplitSize = size;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getMaxSplitSize()
+ {
+ return this.maxSplitSize;
+ }
+
+ @Config("hudi.skip-metastore-for-partition")
+ @ConfigDescription("Whether to skip metastore for partition")
+ public HudiConfig setSkipMetaStoreForPartition(boolean shouldSkipMetaStoreForPartition)
+ {
+ this.shouldSkipMetaStoreForPartition = shouldSkipMetaStoreForPartition;
+ return this;
+ }
+
+ @NotNull
+ public boolean getSkipMetaStoreForPartition()
+ {
+ return this.shouldSkipMetaStoreForPartition;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
new file mode 100644
index 000000000000..401bdf44619c
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorAccessControl;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.spi.transaction.IsolationLevel.SERIALIZABLE;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(HudiConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final HudiTransactionManager transactionManager;
+ private final HudiMetadataFactory metadataFactory;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorPageSinkProvider pageSinkProvider;
+ private final ConnectorNodePartitioningProvider nodePartitioningProvider;
+ private final Set systemTables;
+ private final List> sessionProperties;
+ private final List> tableProperties;
+ private final Optional accessControl;
+
+ public HudiConnector(
+ LifeCycleManager lifeCycleManager,
+ HudiTransactionManager transactionManager,
+ HudiMetadataFactory metadataFactory,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorPageSinkProvider pageSinkProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set systemTables,
+ Set sessionPropertiesProviders,
+ List> tableProperties,
+ Optional accessControl)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
+ this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
+ this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
+ this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
+ .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
+ .collect(toImmutableList());
+ this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
+ this.accessControl = requireNonNull(accessControl, "accessControl is null");
+ }
+
+ @Override
+ public Optional getHandleResolver()
+ {
+ return Optional.of(new HudiHandleResolver());
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle);
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider()
+ {
+ return pageSinkProvider;
+ }
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
+ {
+ return nodePartitioningProvider;
+ }
+
+ @Override
+ public Set getSystemTables()
+ {
+ return systemTables;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+
+ @Override
+ public ConnectorAccessControl getAccessControl()
+ {
+ return accessControl.orElseThrow(UnsupportedOperationException::new);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ checkConnectorSupports(SERIALIZABLE, isolationLevel);
+ ConnectorTransactionHandle transaction = new HiveTransactionHandle();
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction, metadataFactory.create());
+ }
+ return transaction;
+ }
+
+ @Override
+ public void commit(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.remove(transaction);
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction)
+ {
+ HudiMetadata metadata = transactionManager.remove(transaction);
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ metadata.rollback();
+ }
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
new file mode 100644
index 000000000000..34d077f756a5
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Class extends Module> module;
+
+ public HudiConnectorFactory(String name)
+ {
+ this(name, EmptyModule.class);
+ }
+
+ public HudiConnectorFactory(String name, Class extends Module> module)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance();
+ Class> moduleClass = classLoader.loadClass(Module.class.getName());
+ return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
+ .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, moduleClass)
+ .invoke(null, catalogName, config, context, moduleInstance);
+ }
+ catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class EmptyModule
+ implements Module
+ {
+ @Override
+ public void configure(Binder binder) {}
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
new file mode 100644
index 000000000000..d38724451c22
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.spi.ErrorCode;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.ErrorType;
+
+import static io.trino.spi.ErrorType.EXTERNAL;
+import static io.trino.spi.ErrorType.INTERNAL_ERROR;
+import static io.trino.spi.ErrorType.USER_ERROR;
+
+public enum HudiErrorCode
+ implements ErrorCodeSupplier
+{
+ HUDI_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
+ HUDI_INVALID_METADATA(1, EXTERNAL),
+ HUDI_TOO_MANY_OPEN_PARTITIONS(2, USER_ERROR),
+ HUDI_INVALID_PARTITION_VALUE(3, EXTERNAL),
+ HUDI_BAD_DATA(4, EXTERNAL),
+ HUDI_MISSING_DATA(5, EXTERNAL),
+ HUDI_CANNOT_OPEN_SPLIT(6, EXTERNAL),
+ HUDI_WRITER_OPEN_ERROR(7, EXTERNAL),
+ HUDI_FILESYSTEM_ERROR(8, EXTERNAL),
+ HUDI_CURSOR_ERROR(9, EXTERNAL),
+ HUDI_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
+ HUDI_INVALID_SNAPSHOT_ID(11, USER_ERROR);
+
+ private final ErrorCode errorCode;
+
+ HudiErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHFileBootstrapIndex.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHFileBootstrapIndex.java
new file mode 100644
index 000000000000..0815338f9009
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHFileBootstrapIndex.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hudi.avro.model.HoodieBootstrapFilePartitionInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapIndexInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapPartitionMetadata;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Maintains mapping from skeleton file id to external bootstrap file.
+ * It maintains 2 physical indices.
+ * (a) At partition granularity to lookup all indices for each partition.
+ * (b) At file-group granularity to lookup bootstrap mapping for an individual file-group.
+ *
+ * This implementation uses HFile as physical storage of index. FOr the initial run, bootstrap
+ * mapping for the entire dataset resides in a single file but care has been taken in naming
+ * the index files in the same way as Hudi data files so that we can reuse file-system abstraction
+ * on these index files to manage multiple file-groups.
+ */
+
+public class HudiHFileBootstrapIndex
+ extends BootstrapIndex
+{
+ protected static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LogManager.getLogger(HudiHFileBootstrapIndex.class);
+
+ public static final String BOOTSTRAP_INDEX_FILE_ID = "00000000-0000-0000-0000-000000000000-0";
+
+ private static final String PARTITION_KEY_PREFIX = "part";
+ private static final String FILE_ID_KEY_PREFIX = "fileid";
+ private static final String KEY_VALUE_SEPARATOR = "=";
+ private static final String KEY_PARTS_SEPARATOR = ";";
+ // This is part of the suffix that HFIle appends to every key
+ private static final String HFILE_CELL_KEY_SUFFIX_PART = "//LATEST_TIMESTAMP/Put/vlen";
+
+ // Additional Metadata written to HFiles.
+ public static final byte[] INDEX_INFO_KEY = Bytes.toBytes("INDEX_INFO");
+
+ private final boolean isPresent;
+
+ public HudiHFileBootstrapIndex(HoodieTableMetaClient metaClient)
+ {
+ super(metaClient);
+ Path indexByPartitionPath = partitionIndexPath(metaClient);
+ Path indexByFilePath = fileIdIndexPath(metaClient);
+ try {
+ FileSystem fs = metaClient.getFs();
+ isPresent = fs.exists(indexByPartitionPath) && fs.exists(indexByFilePath);
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Returns partition-key to be used in HFile.
+ * @param partition Partition-Path
+ * @return
+ */
+ private static String getPartitionKey(String partition)
+ {
+ return getKeyValueString(PARTITION_KEY_PREFIX, partition);
+ }
+
+ /**
+ * Returns file group key to be used in HFile.
+ * @param fileGroupId File Group Id.
+ * @return
+ */
+ private static String getFileGroupKey(HoodieFileGroupId fileGroupId)
+ {
+ return getPartitionKey(fileGroupId.getPartitionPath()) + KEY_PARTS_SEPARATOR
+ + getKeyValueString(FILE_ID_KEY_PREFIX, fileGroupId.getFileId());
+ }
+
+ private static String getPartitionFromKey(String key)
+ {
+ String[] parts = key.split("=", 2);
+ ValidationUtils.checkArgument(parts[0].equals(PARTITION_KEY_PREFIX));
+ return parts[1];
+ }
+
+ private static String getFileIdFromKey(String key)
+ {
+ String[] parts = key.split("=", 2);
+ ValidationUtils.checkArgument(parts[0].equals(FILE_ID_KEY_PREFIX));
+ return parts[1];
+ }
+
+ private static HoodieFileGroupId getFileGroupFromKey(String key)
+ {
+ String[] parts = key.split(KEY_PARTS_SEPARATOR, 2);
+ return new HoodieFileGroupId(getPartitionFromKey(parts[0]), getFileIdFromKey(parts[1]));
+ }
+
+ private static String getKeyValueString(String key, String value)
+ {
+ return key + KEY_VALUE_SEPARATOR + value;
+ }
+
+ private static Path partitionIndexPath(HoodieTableMetaClient metaClient)
+ {
+ return new Path(metaClient.getBootstrapIndexByPartitionFolderPath(),
+ FSUtils.makeBootstrapIndexFileName(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, BOOTSTRAP_INDEX_FILE_ID,
+ HoodieFileFormat.HFILE.getFileExtension()));
+ }
+
+ private static Path fileIdIndexPath(HoodieTableMetaClient metaClient)
+ {
+ return new Path(metaClient.getBootstrapIndexByFileIdFolderNameFolderPath(),
+ FSUtils.makeBootstrapIndexFileName(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, BOOTSTRAP_INDEX_FILE_ID,
+ HoodieFileFormat.HFILE.getFileExtension()));
+ }
+
+ /**
+ * HFile stores cell key in the format example : "2020/03/18//LATEST_TIMESTAMP/Put/vlen=3692/seqid=0".
+ * This API returns only the user key part from it.
+ * @param cellKey HFIle Cell Key
+ * @return
+ */
+ private static String getUserKeyFromCellKey(String cellKey)
+ {
+ int hfileSuffixBeginIndex = cellKey.lastIndexOf(HFILE_CELL_KEY_SUFFIX_PART);
+ return cellKey.substring(0, hfileSuffixBeginIndex);
+ }
+
+ /**
+ * Helper method to create HFile Reader.
+ *
+ * @param hFilePath File Path
+ * @param conf Configuration
+ * @param fileSystem File System
+ */
+ private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem)
+ {
+ try {
+ LOG.info("Opening HFile for reading :" + hFilePath);
+ HFile.Reader reader = HFile.createReader(fileSystem, new HFilePathForReader(hFilePath),
+ new CacheConfig(conf), true, conf);
+ return reader;
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public BootstrapIndex.IndexReader createReader()
+ {
+ return new HFileBootstrapIndexReader(metaClient);
+ }
+
+ @Override
+ public BootstrapIndex.IndexWriter createWriter(String bootstrapBasePath)
+ {
+ return new HFileBootstrapIndexWriter(bootstrapBasePath, metaClient);
+ }
+
+ @Override
+ public void dropIndex()
+ {
+ try {
+ Path[] indexPaths = new Path[]{partitionIndexPath(metaClient), fileIdIndexPath(metaClient)};
+ for (Path indexPath : indexPaths) {
+ if (metaClient.getFs().exists(indexPath)) {
+ LOG.info("Dropping bootstrap index. Deleting file : " + indexPath);
+ metaClient.getFs().delete(indexPath);
+ }
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public boolean isPresent()
+ {
+ return isPresent;
+ }
+
+ /**
+ * HFile Based Index Reader.
+ */
+ public static class HFileBootstrapIndexReader
+ extends BootstrapIndex.IndexReader
+ {
+ // Base Path of external files.
+ private final String bootstrapBasePath;
+ // Well Known Paths for indices
+ private final String indexByPartitionPath;
+ private final String indexByFileIdPath;
+
+ // Index Readers
+ private transient HFile.Reader indexByPartitionReader;
+ private transient HFile.Reader indexByFileIdReader;
+
+ // Bootstrap Index Info
+ private transient HoodieBootstrapIndexInfo bootstrapIndexInfo;
+
+ public HFileBootstrapIndexReader(HoodieTableMetaClient metaClient)
+ {
+ super(metaClient);
+ Path indexByPartitionPath = partitionIndexPath(metaClient);
+ Path indexByFilePath = fileIdIndexPath(metaClient);
+ this.indexByPartitionPath = indexByPartitionPath.toString();
+ this.indexByFileIdPath = indexByFilePath.toString();
+ initIndexInfo();
+ this.bootstrapBasePath = bootstrapIndexInfo.getBootstrapBasePath();
+ LOG.info("Loaded HFileBasedBootstrapIndex with source base path :" + bootstrapBasePath);
+ }
+
+ private void initIndexInfo()
+ {
+ synchronized (this) {
+ if (null == bootstrapIndexInfo) {
+ try {
+ bootstrapIndexInfo = fetchBootstrapIndexInfo();
+ }
+ catch (IOException ioe) {
+ throw new HoodieException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+ }
+
+ private HoodieBootstrapIndexInfo fetchBootstrapIndexInfo() throws IOException
+ {
+ return TimelineMetadataUtils.deserializeAvroMetadata(
+ partitionIndexReader().getHFileInfo().get(INDEX_INFO_KEY),
+ HoodieBootstrapIndexInfo.class);
+ }
+
+ private HFile.Reader partitionIndexReader()
+ {
+ if (null == indexByPartitionReader) {
+ synchronized (this) {
+ if (null == indexByPartitionReader) {
+ LOG.info("Opening partition index :" + indexByPartitionPath);
+ this.indexByPartitionReader =
+ createReader(indexByPartitionPath, metaClient.getHadoopConf(), metaClient.getFs());
+ }
+ }
+ }
+ return indexByPartitionReader;
+ }
+
+ private HFile.Reader fileIdIndexReader()
+ {
+ if (null == indexByFileIdReader) {
+ synchronized (this) {
+ if (null == indexByFileIdReader) {
+ LOG.info("Opening fileId index :" + indexByFileIdPath);
+ this.indexByFileIdReader =
+ createReader(indexByFileIdPath, metaClient.getHadoopConf(), metaClient.getFs());
+ }
+ }
+ }
+ return indexByFileIdReader;
+ }
+
+ @Override
+ public List getIndexedPartitionPaths()
+ {
+ HFileScanner scanner = partitionIndexReader().getScanner(true, false);
+ return getAllKeys(scanner, HudiHFileBootstrapIndex::getPartitionFromKey);
+ }
+
+ @Override
+ public List getIndexedFileGroupIds()
+ {
+ HFileScanner scanner = fileIdIndexReader().getScanner(true, false);
+ return getAllKeys(scanner, HudiHFileBootstrapIndex::getFileGroupFromKey);
+ }
+
+ private List getAllKeys(HFileScanner scanner, Function converter)
+ {
+ List keys = new ArrayList<>();
+ try {
+ boolean available = scanner.seekTo();
+ while (available) {
+ keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getCell()))));
+ available = scanner.next();
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+
+ return keys;
+ }
+
+ @Override
+ public List getSourceFileMappingForPartition(String partition)
+ {
+ try {
+ HFileScanner scanner = partitionIndexReader().getScanner(true, false);
+ KeyValue keyValue = new KeyValue(Bytes.toBytes(getPartitionKey(partition)), new byte[0], new byte[0],
+ HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]);
+ if (scanner.seekTo(keyValue) == 0) {
+ ByteBuffer readValue = scanner.getValue();
+ byte[] valBytes = Bytes.toBytes(readValue);
+ HoodieBootstrapPartitionMetadata metadata =
+ TimelineMetadataUtils.deserializeAvroMetadata(valBytes, HoodieBootstrapPartitionMetadata.class);
+ return metadata.getFileIdToBootstrapFile().entrySet().stream()
+ .map(e -> new BootstrapFileMapping(bootstrapBasePath, metadata.getBootstrapPartitionPath(),
+ partition, e.getValue(), e.getKey())).collect(Collectors.toList());
+ }
+ else {
+ LOG.warn("No value found for partition key (" + partition + ")");
+ return new ArrayList<>();
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public String getBootstrapBasePath()
+ {
+ return bootstrapBasePath;
+ }
+
+ @Override
+ public Map getSourceFileMappingForFileIds(
+ List ids)
+ {
+ Map result = new HashMap<>();
+ // Arrange input Keys in sorted order for 1 pass scan
+ List fileGroupIds = new ArrayList<>(ids);
+ Collections.sort(fileGroupIds);
+ try {
+ HFileScanner scanner = fileIdIndexReader().getScanner(true, false);
+ for (HoodieFileGroupId fileGroupId : fileGroupIds) {
+ KeyValue keyValue = new KeyValue(Bytes.toBytes(getFileGroupKey(fileGroupId)), new byte[0], new byte[0],
+ HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, new byte[0]);
+ if (scanner.seekTo(keyValue) == 0) {
+ ByteBuffer readValue = scanner.getValue();
+ byte[] valBytes = Bytes.toBytes(readValue);
+ HoodieBootstrapFilePartitionInfo fileInfo = TimelineMetadataUtils.deserializeAvroMetadata(valBytes,
+ HoodieBootstrapFilePartitionInfo.class);
+ BootstrapFileMapping mapping = new BootstrapFileMapping(bootstrapBasePath,
+ fileInfo.getBootstrapPartitionPath(), fileInfo.getPartitionPath(), fileInfo.getBootstrapFileStatus(),
+ fileGroupId.getFileId());
+ result.put(fileGroupId, mapping);
+ }
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ return result;
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ if (indexByPartitionReader != null) {
+ indexByPartitionReader.close(true);
+ indexByPartitionReader = null;
+ }
+ if (indexByFileIdReader != null) {
+ indexByFileIdReader.close(true);
+ indexByFileIdReader = null;
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ /**
+ * Bootstrap Index Writer to build bootstrap index.
+ */
+ public static class HFileBootstrapIndexWriter
+ extends BootstrapIndex.IndexWriter
+ {
+ private final String bootstrapBasePath;
+ private final Path indexByPartitionPath;
+ private final Path indexByFileIdPath;
+ private HFile.Writer indexByPartitionWriter;
+ private HFile.Writer indexByFileIdWriter;
+
+ private boolean closed;
+ private int numPartitionKeysAdded;
+ private int numFileIdKeysAdded;
+
+ private final Map> sourceFileMappings = new HashMap<>();
+
+ private HFileBootstrapIndexWriter(String bootstrapBasePath, HoodieTableMetaClient metaClient)
+ {
+ super(metaClient);
+ try {
+ metaClient.initializeBootstrapDirsIfNotExists();
+ this.bootstrapBasePath = bootstrapBasePath;
+ this.indexByPartitionPath = partitionIndexPath(metaClient);
+ this.indexByFileIdPath = fileIdIndexPath(metaClient);
+
+ if (metaClient.getFs().exists(indexByPartitionPath) || metaClient.getFs().exists(indexByFileIdPath)) {
+ String errMsg = "Previous version of bootstrap index exists. Partition Index Path :" + indexByPartitionPath
+ + ", FileId index Path :" + indexByFileIdPath;
+ LOG.info(errMsg);
+ throw new HoodieException(errMsg);
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Append bootstrap index entries for next partitions in sorted order.
+ * @param partitionPath Hudi Partition Path
+ * @param bootstrapPartitionPath Source Partition Path
+ * @param bootstrapFileMappings Bootstrap Source File to Hudi File Id mapping
+ */
+ private void writeNextPartition(String partitionPath, String bootstrapPartitionPath,
+ List bootstrapFileMappings)
+ {
+ try {
+ LOG.info("Adding bootstrap partition Index entry for partition :" + partitionPath
+ + ", bootstrap Partition :" + bootstrapPartitionPath + ", Num Entries :" + bootstrapFileMappings.size());
+ LOG.info("ADDING entries :" + bootstrapFileMappings);
+ HoodieBootstrapPartitionMetadata bootstrapPartitionMetadata = new HoodieBootstrapPartitionMetadata();
+ bootstrapPartitionMetadata.setBootstrapPartitionPath(bootstrapPartitionPath);
+ bootstrapPartitionMetadata.setPartitionPath(partitionPath);
+ bootstrapPartitionMetadata.setFileIdToBootstrapFile(
+ bootstrapFileMappings.stream().map(m -> Pair.of(m.getFileId(),
+ m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
+ Option bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class);
+ if (bytes.isPresent()) {
+ indexByPartitionWriter
+ .append(new KeyValue(Bytes.toBytes(getPartitionKey(partitionPath)), new byte[0], new byte[0],
+ HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, bytes.get()));
+ numPartitionKeysAdded++;
+ }
+ }
+ catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Write next source file to hudi file-id. Entries are expected to be appended in hudi file-group id
+ * order.
+ * @param mapping bootstrap source file mapping.
+ */
+ private void writeNextSourceFileMapping(BootstrapFileMapping mapping)
+ {
+ try {
+ HoodieBootstrapFilePartitionInfo srcFilePartitionInfo = new HoodieBootstrapFilePartitionInfo();
+ srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath());
+ srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath());
+ srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus());
+ KeyValue kv = new KeyValue(getFileGroupKey(mapping.getFileGroupId()).getBytes(StandardCharsets.UTF_8),
+ new byte[0], new byte[0],
+ HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put,
+ TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo,
+ HoodieBootstrapFilePartitionInfo.class).get());
+ indexByFileIdWriter.append(kv);
+ numFileIdKeysAdded++;
+ }
+ catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commit bootstrap index entries. Appends Metadata and closes write handles.
+ */
+ private void commit()
+ {
+ try {
+ if (!closed) {
+ HoodieBootstrapIndexInfo partitionIndexInfo = HoodieBootstrapIndexInfo.newBuilder()
+ .setCreatedTimestamp(new Date().getTime())
+ .setNumKeys(numPartitionKeysAdded)
+ .setBootstrapBasePath(bootstrapBasePath)
+ .build();
+ LOG.info("Adding Partition FileInfo :" + partitionIndexInfo);
+
+ HoodieBootstrapIndexInfo fileIdIndexInfo = HoodieBootstrapIndexInfo.newBuilder()
+ .setCreatedTimestamp(new Date().getTime())
+ .setNumKeys(numFileIdKeysAdded)
+ .setBootstrapBasePath(bootstrapBasePath)
+ .build();
+ LOG.info("Appending FileId FileInfo :" + fileIdIndexInfo);
+
+ indexByPartitionWriter.appendFileInfo(INDEX_INFO_KEY,
+ TimelineMetadataUtils.serializeAvroMetadata(partitionIndexInfo, HoodieBootstrapIndexInfo.class).get());
+ indexByFileIdWriter.appendFileInfo(INDEX_INFO_KEY,
+ TimelineMetadataUtils.serializeAvroMetadata(fileIdIndexInfo, HoodieBootstrapIndexInfo.class).get());
+
+ close();
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Close Writer Handles.
+ */
+ public void close()
+ {
+ try {
+ if (!closed) {
+ indexByPartitionWriter.close();
+ indexByFileIdWriter.close();
+ closed = true;
+ }
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public void begin()
+ {
+ try {
+ HFileContext meta = new HFileContextBuilder().withCellComparator(new HoodieKVComparator()).build();
+ this.indexByPartitionWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
+ new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByPartitionPath)
+ .withFileContext(meta).create();
+ this.indexByFileIdWriter = HFile.getWriterFactory(metaClient.getHadoopConf(),
+ new CacheConfig(metaClient.getHadoopConf())).withPath(metaClient.getFs(), indexByFileIdPath)
+ .withFileContext(meta).create();
+ }
+ catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public void appendNextPartition(String partitionPath, List bootstrapFileMappings)
+ {
+ sourceFileMappings.put(partitionPath, bootstrapFileMappings);
+ }
+
+ @Override
+ public void finish()
+ {
+ // Sort and write
+ List partitions = sourceFileMappings.keySet().stream().sorted().collect(Collectors.toList());
+ partitions.forEach(p -> writeNextPartition(p, sourceFileMappings.get(p).get(0).getBootstrapPartitionPath(),
+ sourceFileMappings.get(p)));
+ sourceFileMappings.values().stream().flatMap(Collection::stream).sorted()
+ .forEach(this::writeNextSourceFileMapping);
+ commit();
+ }
+ }
+
+ /**
+ * IMPORTANT :
+ * HFile Readers use HFile name (instead of path) as cache key. This could be fine as long
+ * as file names are UUIDs. For bootstrap, we are using well-known index names.
+ * Hence, this hacky workaround to return full path string from Path subclass and pass it to reader.
+ * The other option is to disable block cache for Bootstrap which again involves some custom code
+ * as there is no API to disable cache.
+ */
+ private static class HFilePathForReader
+ extends Path
+ {
+ public HFilePathForReader(String pathString) throws IllegalArgumentException
+ {
+ super(pathString);
+ }
+
+ @Override
+ public String getName()
+ {
+ return toString();
+ }
+ }
+
+ /**
+ * This class is explicitly used as Key Comparator to workaround hard coded
+ * legacy format class names inside HBase. Otherwise we will face issues with shading.
+ */
+ public static class HoodieKVComparator
+ extends CellComparatorImpl
+ {
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
new file mode 100644
index 000000000000..fd72934f825a
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HiveInsertTableHandle;
+import io.trino.plugin.hive.HiveOutputTableHandle;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+
+public class HudiHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return HudiTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return HiveColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return HudiSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorOutputTableHandle> getOutputTableHandleClass()
+ {
+ return HiveOutputTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorInsertTableHandle> getInsertTableHandleClass()
+ {
+ return HiveInsertTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return HiveTransactionHandle.class;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
new file mode 100644
index 000000000000..36843d75540f
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class HudiInputInfo
+{
+ private final List partitionIds;
+ // Code that serialize HudiInputInfo into log would often need the ability to limit the length of log entries.
+ // This boolean field allows such code to mark the log entry as length limited.
+ private final boolean truncated;
+
+ @JsonCreator
+ public HudiInputInfo(
+ @JsonProperty("partitionIds") List partitionIds,
+ @JsonProperty("truncated") boolean truncated)
+ {
+ this.partitionIds = partitionIds;
+ this.truncated = truncated;
+ }
+
+ @JsonProperty
+ public List getPartitionIds()
+ {
+ return partitionIds;
+ }
+
+ @JsonProperty
+ public boolean isTruncated()
+ {
+ return truncated;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
new file mode 100644
index 000000000000..92caa6dbda3b
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import io.airlift.log.Logger;
+import io.trino.plugin.hive.HdfsEnvironment;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HivePartition;
+import io.trino.plugin.hive.acid.AcidSchema;
+import io.trino.plugin.hive.authentication.HiveIdentity;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
+import io.trino.plugin.hive.metastore.Table;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Iterables.concat;
+import static io.trino.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
+import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
+import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
+import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
+import static io.trino.plugin.hudi.HudiUtil.splitPredicate;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity;
+
+public class HudiMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger LOG = Logger.get(HudiMetadata.class);
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+
+ public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager)
+ {
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return metastore.getAllDatabases();
+ }
+
+ @Override
+ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ requireNonNull(tableName, "tableName is null");
+ if (isHiveSystemSchema(tableName.getSchemaName())) {
+ return null;
+ }
+ Optional table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName());
+ if (table.isEmpty()) {
+ return null;
+ }
+ if (!isHudiTable(session, table.get())) {
+ throw new TrinoException(HUDI_UNKNOWN_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
+ }
+ return new HudiTableHandle(
+ tableName.getSchemaName(),
+ tableName.getTableName(),
+ table.get().getStorage().getLocation(),
+ HoodieTableType.COPY_ON_WRITE,
+ TupleDomain.all(),
+ Optional.of(getTableMetaClient(session, table.get())));
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) table;
+ return getTableMetadata(session, hudiTableHandle.getSchemaTableName());
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
+ {
+ HudiTableHandle handle = (HudiTableHandle) tableHandle;
+ if (constraint.getSummary().equals(handle.getPredicate())) {
+ return Optional.empty();
+ }
+
+ List> predicate = splitPredicate(constraint.getSummary());
+ TupleDomain unenforcedPredicate = predicate.get(1);
+ HudiTableHandle newHudiTableHandle = handle.withPredicate(constraint.getSummary().transformKeys(HiveColumnHandle.class::cast));
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ newHudiTableHandle,
+ unenforcedPredicate,
+ false));
+ }
+
+ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
+ {
+ Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(tableName));
+ Function metadataGetter = columnMetadataGetter(table);
+ ImmutableList.Builder columns = ImmutableList.builder();
+ for (HiveColumnHandle columnHandle : hiveColumnHandles(table, typeManager, NANOSECONDS)) {
+ columns.add(metadataGetter.apply(columnHandle));
+ }
+
+ // External location property
+ ImmutableMap.Builder properties = ImmutableMap.builder();
+ if (table.getTableType().equals(EXTERNAL_TABLE.name())) {
+ properties.put(EXTERNAL_LOCATION_PROPERTY, table.getStorage().getLocation());
+ }
+
+ // Partitioning property
+ List partitionedBy = table.getPartitionColumns().stream()
+ .map(Column::getName)
+ .collect(toImmutableList());
+ if (!partitionedBy.isEmpty()) {
+ properties.put(PARTITIONED_BY_PROPERTY, partitionedBy);
+ }
+
+ Optional comment = Optional.ofNullable(table.getParameters().get(TABLE_COMMENT));
+ return new ConnectorTableMetadata(tableName, columns.build(), properties.build(), comment);
+ }
+
+ @Override
+ public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return new ConnectorTableProperties();
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ SchemaTableName tableName = ((HudiTableHandle) tableHandle).getSchemaTableName();
+ Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(tableName));
+ return hiveColumnHandles(table, typeManager, NANOSECONDS).stream()
+ .collect(toImmutableMap(HiveColumnHandle::getName, identity()));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((HiveColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Optional