Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 5, 2024
1 parent 5c7353f commit c174452
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public final class Channels {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ private static void write(
static MessageType getSchemaForTable(@NotNull final Table table,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions instructions) {
if (definition.numColumns() == 0) {
throw new IllegalArgumentException("Table definition must have at least one column");
}
final Table pretransformTable = pretransformTable(table, definition);
return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(),
pretransformTable.getColumnSourceMap(), instructions).getParquetSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,16 @@ private static Map<String, ParquetTableWriter.GroupingColumnWritingInfo> groupin
* directories are created.
* @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will
* result in files named "PC=partition1/table.parquet", "PC=partition2/table.parquet", etc., where "PC" is a
* partitioning column.
* <p>
* The token {@code {i}} in base name will be replaced with an automatically incremented integer for files in
* a directory. For example, a base name of "table-{i}" will result in files named
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.
* <p>
* Also, token {@code {partitions}} will be replaced with a concatenated string of partition values. For
* example, a base name of "{partitions}-table" will result in files
* partitioning column. Users can also provide the following tokens to be replaced in the base name:
* <ul>
* <li>The token {@code {uuid}} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
* <li>The token {@code {partitions}} will be replaced with an underscore-delimited, concatenated string of
* partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
* partitioning columns.
* partitioning columns.</li>
* </ul>
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
Expand All @@ -508,16 +508,16 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
* directories are created.
* @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will
* result in files named "PC=partition1/table.parquet", "PC=partition2/table.parquet", etc., where "PC" is a
* partitioning column.
* <p>
* The token {@code {i}} in base name will be replaced with an automatically incremented integer for files in
* a directory. For example, a base name of "table-{i}" will result in files named
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.
* <p>
* Also, token {@code {partitions}} will be replaced with a concatenated string of partition values. For
* example, a base name of "{partitions}-table" will result in files
* partitioning column. Users can also provide the following tokens to be replaced in the base name:
* <ul>
* <li>The token {@code {uuid}} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
* <li>The token {@code {partitions}} will be replaced with an underscore-delimited, concatenated string of
* partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
* partitioning columns.
* partitioning columns.</li>
* </ul>
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
Expand Down Expand Up @@ -546,34 +546,31 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
* directories are created.
* @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will
* result in files named "PC=partition1/table.parquet", "PC=partition2/table.parquet", etc., where "PC" is a
* partitioning column.
* <p>
* The token {@code {i}} in base name will be replaced with an automatically incremented integer for files in
* a directory. For example, a base name of "table-{i}" will result in files named
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.
* <p>
* Also, token {@code {partitions}} will be replaced with a concatenated string of partition values. For
* example, a base name of "{partitions}-table" will result in files
* partitioning column. Users can also provide the following tokens to be replaced in the base name:
* <ul>
* <li>The token {@code {i}} will be replaced with an automatically incremented integer for files in a
* directory. For example, a base name of "table-{i}" will result in files named like
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.</li>
* <li>The token {@code {uuid}} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
* <li>The token {@code {partitions}} will be replaced with an underscore-delimited, concatenated string of
* partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
* partitioning columns.
* partitioning columns.</li>
* </ul>
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final File destinationDir,
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
// Get key column definitions from the partitioned table definition and non-key column definitions from the
// constituent table definition, and combine them to build the overall table definition to be written
final Set<String> keyColumnNames = partitionedTable.keyColumnNames();
final Collection<ColumnDefinition<?>> columnDefinitions = new ArrayList<>(keyColumnNames.size() +
partitionedTable.constituentDefinition().numColumns());
keyColumnNames.stream().map(keyColName -> partitionedTable.table().getDefinition().getColumn(keyColName))
.forEach(columnDefinitions::add);
partitionedTable.constituentDefinition().getColumns().stream()
.filter(columnDefinition -> !keyColumnNames.contains(columnDefinition.getName()))
.forEach(columnDefinitions::add);
final TableDefinition definition = TableDefinition.of(columnDefinitions);
writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, baseName, writeInstructions);
final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(),
partitionedTable.table().getDefinition());
final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(),
partitionedTable.constituentDefinition());
writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir,
baseName, writeInstructions);
}

/**
Expand All @@ -587,35 +584,49 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
* directories are created.
* @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will
* result in files named "PC=partition1/table.parquet", "PC=partition2/table.parquet", etc., where "PC" is a
* partitioning column.
* <p>
* The token {@code {i}} in base name will be replaced with an automatically incremented integer for files in
* a directory. For example, a base name of "table-{i}" will result in files named
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.
* <p>
* Also, token {@code {partitions}} will be replaced with a concatenated string of partition values. For
* example, a base name of "{partitions}-table" will result in files
* partitioning column.Users can also provide the following tokens to be replaced in the base name:
* <ul>
* <li>The token {@code {i}} will be replaced with an automatically incremented integer for files in a
* directory. For example, a base name of "table-{i}" will result in files named like
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc.</li>
* <li>The token {@code {uuid}} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
* <li>The token {@code {partitions}} will be replaced with an underscore-delimited, concatenated string of
* partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
* partitioning columns.
* partitioning columns.</li>
* </ul>
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final TableDefinition definition,
@NotNull final File destinationDir,
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
final TableDefinition partitionedTableDefinition = getNonKeyTableDefiniton(partitionedTable.keyColumnNames(),
definition);
if (partitionedTableDefinition.numColumns() == 0) {
final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(), definition);
final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(), definition);
writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir,
baseName, writeInstructions);
}

private static void writeKeyValuePartitionedTableImpl(@NotNull final PartitionedTable partitionedTable,
@NotNull final TableDefinition keyTableDefinition,
@NotNull final TableDefinition leafDefinition,
@NotNull final File destinationDir,
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
if (leafDefinition.numColumns() == 0) {
throw new IllegalArgumentException("Cannot write a partitioned parquet table without any non-partitioning "
+ "columns");
}
final boolean hasPartitionInName = baseName.contains("{partitions}");
final boolean hasIndexInName = baseName.contains("{i}");
if (!partitionedTable.uniqueKeys() && !hasIndexInName) {
final boolean hasUUIDInName = baseName.contains("{uuid}");
if (!partitionedTable.uniqueKeys() && !hasIndexInName && !hasUUIDInName) {
throw new IllegalArgumentException(
"Cannot write a partitioned parquet table with non-unique keys without " +
"{i} in the base name because there can be multiple partitions with the same key values");
"{i} or {uuid} in the base name because there can be multiple partitions with the same key values");
}
// Note that there can be multiple constituents with the same key values, so cannot directly use the
// partitionedTable.constituentFor(keyValues) method, and we need to group them together
Expand Down Expand Up @@ -656,6 +667,9 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
if (hasIndexInName) {
filename = filename.replace("{i}", Integer.toString(count));
}
if (hasUUIDInName) {
filename = filename.replace("{uuid}", UUID.randomUUID().toString());
}
filename += PARQUET_FILE_EXTENSION;
destinations.add(new File(relativePath, filename));
partitionedData.add(constituent);
Expand All @@ -664,33 +678,48 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
row++;
}
}
// If needed, generate schema for _common_metadata file from key table
final ParquetInstructions updatedWriteInstructions;
if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) {
final MessageType commonSchema = getSchemaForTable(partitionedTable.table(), definition, writeInstructions);
// Generate initial schema for _common_metadata file from key columns. The schema for remaining columns will
// be inferred at the time of writing the parquet files and merged with the common schema.
final MessageType commonSchema =
getSchemaForTable(partitionedTable.table(), keyTableDefinition, writeInstructions);
updatedWriteInstructions = ParquetInstructions.createWithCommonSchema(writeInstructions, commonSchema);
} else {
updatedWriteInstructions = writeInstructions;
}
ParquetTools.writeParquetTables(
partitionedData.toArray(Table[]::new),
partitionedTableDefinition,
leafDefinition,
updatedWriteInstructions,
destinations.toArray(File[]::new),
partitionedTableDefinition.getGroupingColumnNamesArray());
leafDefinition.getGroupingColumnNamesArray());
}

/**
* Using the provided definition and key column names, create a sub table definition for the key columns that are
* present in the definition.
*/
private static TableDefinition getKeyTableDefinition(@NotNull final Collection<String> keyColumnNames,
@NotNull final TableDefinition definition) {
final Collection<ColumnDefinition<?>> keyColumnDefinitions = new ArrayList<>(keyColumnNames.size());
for (final String keyColumnName : keyColumnNames) {
final ColumnDefinition<?> keyColumnDef = definition.getColumn(keyColumnName);
if (keyColumnDef != null) {
keyColumnDefinitions.add(keyColumnDef);
}
}
return TableDefinition.of(keyColumnDefinitions);
}

/**
* Using the provided definition and key column names, create a sub table definition for the non-key columns.
*/
private static TableDefinition getNonKeyTableDefiniton(@NotNull final Collection<String> keyColumnNames,
private static TableDefinition getNonKeyTableDefinition(@NotNull final Collection<String> keyColumnNames,
@NotNull final TableDefinition definition) {
final List<ColumnDefinition<?>> nonKeyColumnDefinition =
definition.getColumns().stream()
.filter(columnDefinition -> {
final String columnName = columnDefinition.getName();
return !keyColumnNames.contains(columnName);
}).collect(Collectors.toList());
final Collection<ColumnDefinition<?>> nonKeyColumnDefinition = definition.getColumns().stream()
.filter(columnDefinition -> !keyColumnNames.contains(columnDefinition.getName()))
.collect(Collectors.toList());
return TableDefinition.of(nonKeyColumnDefinition);
}

Expand Down
Loading

0 comments on commit c174452

Please sign in to comment.