Skip to content

Commit

Permalink
Fixed failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 15, 2024
1 parent 5018feb commit a529fc2
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ public static boolean fileNameMatches(final Path path) {
}

/**
* @return the key value for storing each file's metadata in the common metadata file.
* @return the key value derived from the file name, used for storing each file's metadata in the metadata files.
*/
public static String getKeyForFilePath(final Path path) {
final String fileName = path.getFileName().toString();
public static String getKeyForFile(final String fileName) {
return "deephaven_per_file_" + fileName;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.deephaven.parquet.table;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.verify.Assert;
import io.deephaven.parquet.base.ParquetFileWriter;
import io.deephaven.parquet.base.ParquetMetadataFileWriter;
import io.deephaven.parquet.base.ParquetUtils;
Expand Down Expand Up @@ -29,7 +28,7 @@

import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFilePath;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFile;
import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;

/**
Expand Down Expand Up @@ -119,7 +118,10 @@ private static class ParquetFileMetadata {
}

/**
* Added parquet metadata for provided parquet file.
* Add parquet metadata for the provided parquet file the combined metadata file. We store deephaven-specific
* metadata for each file individually inside the key-value metadata of the combined metadata file, with keys being
* derived from the file names and values being the metadata for the file. Therefore, the provided parquet files
* must have unique names.
*
* @param parquetFilePath The parquet file destination path
* @param metadata The parquet metadata
Expand Down Expand Up @@ -169,6 +171,11 @@ private void mergeMetadata() throws IOException {
mergeBlocksInto(parquetFileMetadata, metadataRootDirAbsPath, mergedBlocks);
mergedCreatedBy.add(fileMetaData.getCreatedBy());
}
if (mergedKeyValueMetaData.size() != parquetFileMetadataList.size()) {
throw new IllegalStateException("We should have one entry for each file in the merged key-value metadata, "
+ "but we have " + mergedKeyValueMetaData.size() + " entries for " + parquetFileMetadataList.size()
+ " files.");
}
// Add table info to the merged key-value metadata
final TableInfo.Builder tableInfoBuilder = TableInfo.builder().addAllColumnTypes(mergedColumnTypes);
if (mergedVersion != null) {
Expand Down Expand Up @@ -222,9 +229,13 @@ private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFil
});
} else {
// Add a separate entry for each file
final String fileKey = getKeyForFilePath(Path.of(parquetFileMetadata.filePath));
final String fileKey = getKeyForFile(new File(parquetFileMetadata.filePath).getName());
// Assuming the keys are unique for each file because file names are unique, verified in the constructor
Assert.eqNull(mergedKeyValueMetaData.get(fileKey), "mergedKeyValueMetaData.get(fileKey)");
if (mergedKeyValueMetaData.containsKey(fileKey)) {
throw new UncheckedDeephavenException("Could not merge metadata for for file " +
parquetFileMetadata.filePath + " because has conflicting file name with another file. For "
+ " generating metadata files, file names should be unique");
}
mergedKeyValueMetaData.put(fileKey, entry.getValue());

// Also, process and accumulate the relevant fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,25 @@ private static Map<String, ParquetTableWriter.GroupingColumnWritingInfo> groupin
return gcwim;
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns of provided
* table. The parquet files generated will have names of the format {@code "uuid.parquet"} where "uuid" is a random
* UUID. To provide custom file names, use
* {@link #writeKeyValuePartitionedTable(Table, File, String, ParquetInstructions)} instead.
*
* @param sourceTable The table to partition and write
* @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing
* directories are created.
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
@NotNull final File destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(sourceTable, destinationDir, "{uuid}", writeInstructions);
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
Expand Down Expand Up @@ -501,6 +520,27 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
writeInstructions);
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided
* table definition. The parquet files generated will have names of the format {@code "uuid.parquet"} where "uuid"
* is a random UUID. To provide custom file names, use
* {@link #writeKeyValuePartitionedTable(Table, TableDefinition, File, String, ParquetInstructions)} instead.
*
* @param sourceTable The table to partition and write
* @param definition table definition to use (instead of the one implied by the table itself)
* @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing
* directories are created.
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
@NotNull final TableDefinition definition,
@NotNull final File destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(sourceTable, definition, destinationDir, "{uuid}", writeInstructions);
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
Expand Down Expand Up @@ -545,6 +585,24 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
baseName, writeInstructions);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The parquet files generated will have
* names of the format {@code "uuid.parquet"} where "uuid" is a random UUID. To provide custom file names, use
* {@link #writeKeyValuePartitionedTable(PartitionedTable, File, String, ParquetInstructions)} instead.
*
* @param partitionedTable The partitioned table to write
* @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing
* directories are created.
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final File destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(partitionedTable, destinationDir, "{uuid}", writeInstructions);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
Expand Down Expand Up @@ -582,6 +640,27 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
baseName, writeInstructions);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The parquet files generated will have
* names of the format {@code "uuid.parquet"} where "uuid" is a random UUID. To provide custom file names, use
* {@link #writeKeyValuePartitionedTable(PartitionedTable, TableDefinition, File, String, ParquetInstructions)}
* instead.
*
* @param partitionedTable The partitioned table to write
* @param definition table definition to use (instead of the one implied by the table itself)
* @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing
* directories are created.
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final TableDefinition definition,
@NotNull final File destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, "{uuid}", writeInstructions);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
Expand Down Expand Up @@ -647,6 +726,12 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned
"Cannot write a partitioned parquet table with non-unique keys without " +
"{i} or {uuid} in the base name because there can be multiple partitions with the same key values");
}
if (writeInstructions.generateMetadataFiles()) {
if (!hasUUIDInName && !(hasIndexInName && hasPartitionInName)) {
throw new IllegalArgumentException("For generating metadata files, file names for generated parquet " +
"files should be unique. Try passing tokens like {uuid} in the file base name");
}
}
// Note that there can be multiple constituents with the same key values, so cannot directly use the
// partitionedTable.constituentFor(keyValues) method, and we need to group them together
final String[] partitioningColumnNames = partitionedTable.keyColumnNames().toArray(String[]::new);
Expand Down Expand Up @@ -796,7 +881,7 @@ public static void writeParquetTables(@NotNull final Table[] sources,
final File metadataRootDir;
if (writeInstructions.generateMetadataFiles()) {
// We insist on writing the metadata file in the same directory as the destination files, thus all
// destination files are in the same directory.
// destination files should be in the same directory. This also ensures that the file names would be unique.
final String firstDestinationDir = destinations[0].getParentFile().getAbsolutePath();
for (int i = 1; i < destinations.length; i++) {
if (!firstDestinationDir.equals(destinations[i].getParentFile().getAbsolutePath())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_NAME;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_NAME;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFilePath;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFile;
import static java.util.stream.Collectors.toMap;

/**
Expand Down Expand Up @@ -220,7 +220,7 @@ public ParquetMetadataFileLayout(
private static ParquetMetadata getParquetMetadataForFile(@NotNull final File parquetFile,
@NotNull final ParquetMetadata metadataFileMetadata) {
final String fileMetadataString = metadataFileMetadata.getFileMetaData().getKeyValueMetaData()
.get(getKeyForFilePath(parquetFile.toPath()));
.get(getKeyForFile(parquetFile.getName()));
final ParquetMetadata fileMetadata;
if (fileMetadataString != null) {
// Create a new file metadata object using the key-value metadata for that file
Expand Down
Loading

0 comments on commit a529fc2

Please sign in to comment.