Skip to content

Commit

Permalink
Add Hudi Connector
Browse files Browse the repository at this point in the history
Rebase and resolve conflicts

Use cached thread pool for split generation
  • Loading branch information
codope committed Jan 27, 2022
1 parent 6028e90 commit e440e49
Show file tree
Hide file tree
Showing 77 changed files with 5,993 additions and 28 deletions.
6 changes: 6 additions & 0 deletions core/trino-server/src/main/provisio/presto.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
</artifact>
</artifactSet>

<artifactSet to="plugin/hudi">
<artifact id="${project.groupId}:trino-hudi:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>

<artifactSet to="plugin/iceberg">
<artifact id="${project.groupId}:trino-iceberg:zip:${project.version}">
<unpack />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
Expand Down Expand Up @@ -100,8 +99,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.partitionMatches;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
Expand All @@ -114,11 +111,11 @@
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormat;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeys;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
Expand Down Expand Up @@ -942,28 +939,6 @@ private static List<Path> getTargetPathsFromSymlink(FileSystem fileSystem, Path
}
}

private static List<HivePartitionKey> getPartitionKeys(Table table, Optional<Partition> partition)
{
if (partition.isEmpty()) {
return ImmutableList.of();
}
ImmutableList.Builder<HivePartitionKey> partitionKeys = ImmutableList.builder();
List<Column> keys = table.getPartitionColumns();
List<String> values = partition.get().getValues();
checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
for (int i = 0; i < keys.size(); i++) {
String name = keys.get(i).getName();
HiveType hiveType = keys.get(i).getType();
if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
}
String value = values.get(i);
checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
partitionKeys.add(new HivePartitionKey(name, value));
}
return partitionKeys.build();
}

private static Properties getPartitionSchema(Table table, Optional<Partition> partition)
{
if (partition.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public static Optional<org.apache.parquet.schema.Type> getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}

private static Optional<ColumnIndexStore> getColumnIndexStore(
public static Optional<ColumnIndexStore> getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map<List<String>, RichColumnDescriptor> descriptorsByPath,
Expand Down Expand Up @@ -440,7 +440,7 @@ public static TupleDomain<ColumnDescriptor> getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.buildOrThrow());
}

private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.avro.TrinoAvroSerDe;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SortingColumn;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
Expand Down Expand Up @@ -1128,4 +1129,26 @@ public static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME));
}

public static List<HivePartitionKey> getPartitionKeys(Table table, Optional<Partition> partition)
{
if (partition.isEmpty()) {
return ImmutableList.of();
}
ImmutableList.Builder<HivePartitionKey> partitionKeys = ImmutableList.builder();
List<Column> keys = table.getPartitionColumns();
List<String> values = partition.get().getValues();
checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
for (int i = 0; i < keys.size(); i++) {
String name = keys.get(i).getName();
HiveType hiveType = keys.get(i).getType();
if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
}
String value = values.get(i);
checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
partitionKeys.add(new HivePartitionKey(name, value));
}
return partitionKeys.build();
}
}
Loading

0 comments on commit e440e49

Please sign in to comment.