Skip to content

Commit

Permalink
Review with Ryan part 2
Browse files Browse the repository at this point in the history
Tests are failing
  • Loading branch information
malhotrashivam committed Mar 11, 2024
1 parent 9892d14 commit 1d98927
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;

/**
* A no-op implementation of MetadataFileWriterBase when we don't want to write metadata files for Parquet files.
*/
Expand All @@ -12,10 +10,10 @@ public enum NullParquetMetadataFileWriter implements ParquetMetadataFileWriter {
INSTANCE;

@Override
public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata metadata) {}
public void addParquetFileMetadata(final String parquetFilePath, final ParquetMetadata metadata) {}

@Override
public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) {}
public void writeMetadataFiles(final String metadataFilePath, final String commonMetadataFilePath) {}

@Override
public void clear() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public final class ParquetFileWriter {
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
private final File metadataFilePath;
private final String destFilePathForMetadata;
private final ParquetMetadataFileWriter metadataFileWriter;

public ParquetFileWriter(
final String filePath,
final File metadataFilePath,
final String destFilePath,
final String destFilePathForMetadata,
final SeekableChannelsProvider channelsProvider,
final int targetPageSize,
final ByteBufferAllocator allocator,
Expand All @@ -57,12 +57,12 @@ public ParquetFileWriter(
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.extraMetaData = new HashMap<>(extraMetaData);
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false),
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(destFilePath, false),
PARQUET_OUTPUT_BUFFER_SIZE);
bufferedOutput.write(MAGIC);
this.type = type;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
this.metadataFilePath = metadataFilePath;
this.destFilePathForMetadata = destFilePathForMetadata;
this.metadataFileWriter = metadataFileWriter;
}

Expand All @@ -80,7 +80,7 @@ public void close() throws IOException {
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, bufferedOutput);
metadataFileWriter.addParquetFileMetadata(metadataFilePath, footer);
metadataFileWriter.addParquetFileMetadata(destFilePathForMetadata, footer);
// Flush any buffered data and close the channel
bufferedOutput.close();
compressorAdapter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ public interface ParquetMetadataFileWriter {
* Add the parquet metadata for the provided parquet file to the list of metadata to be written to combined metadata
* files.
*
* @param parquetFile The parquet file destination path
* @param parquetFilePath The parquet file destination path
* @param metadata The parquet metadata corresponding to the parquet file
*/
void addParquetFileMetadata(File parquetFile, ParquetMetadata metadata);
void addParquetFileMetadata(String parquetFilePath, ParquetMetadata metadata);

/**
* Write the combined metadata files for all metadata accumulated so far and clear the list.
*
* @param metadataFile The destination for the {@value ParquetUtils#METADATA_FILE_NAME} file
* @param commonMetadataFile The destination for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file
* @param metadataFilePath The destination path for the {@value ParquetUtils#METADATA_FILE_NAME} file
* @param commonMetadataFilePath The destination path for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file
*/
void writeMetadataFiles(File metadataFile, File commonMetadataFile) throws IOException;
void writeMetadataFiles(String metadataFilePath, String commonMetadataFilePath) throws IOException;

/**
* Clear the list of metadata accumulated so far.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,24 @@ public final class ParquetUtils {
*/
public static final int PARQUET_OUTPUT_BUFFER_SIZE = 1 << 18;

/**
* Used as a key for storing deephaven specific metadata in the key-value metadata of parquet files.
*/
public static final String METADATA_KEY = "deephaven";

/**
* Used as a filter to select relevant parquet files while reading all files in a directory.
*/
public static boolean fileNameMatches(final Path path) {
final String fileName = path.getFileName().toString();
return fileName.endsWith(PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}

/**
* @return the key value for storing each file's metadata in the common metadata file.
*/
public static String getKeyForFilePath(final Path path) {
final String fileName = path.getFileName().toString();
return "deephaven_per_file_" + fileName;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.deephaven.parquet.table;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.verify.Assert;
import io.deephaven.parquet.base.ParquetFileWriter;
import io.deephaven.parquet.base.ParquetMetadataFileWriter;
import io.deephaven.parquet.base.ParquetUtils;
Expand All @@ -19,7 +20,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -28,6 +28,8 @@
import java.util.Map;

import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFilePath;
import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;

/**
Expand All @@ -40,11 +42,11 @@ final class ParquetMetadataFileWriterImpl implements ParquetMetadataFileWriter {
* A class to hold the parquet file and its metadata.
*/
private static class ParquetFileMetadata {
final File file;
final String filePath;
final ParquetMetadata metadata;

ParquetFileMetadata(final File file, final ParquetMetadata metadata) {
this.file = file;
ParquetFileMetadata(final String filePath, final ParquetMetadata metadata) {
this.filePath = filePath;
this.metadata = metadata;
}
}
Expand Down Expand Up @@ -73,8 +75,29 @@ private static class ParquetFileMetadata {
*/
ParquetMetadataFileWriterImpl(@NotNull final File metadataRootDir, @NotNull final File[] destinations,
@Nullable final MessageType partitioningColumnsSchema) {
if (destinations.length == 0) {
throw new IllegalArgumentException("No destinations provided");
}
this.metadataRootDirAbsPath = metadataRootDir.getAbsoluteFile().toPath();
final String metadataRootDirAbsPathString = metadataRootDirAbsPath.toString();
final File firstDestination = destinations[0];
for (int i = 0; i < destinations.length; i++) {
final File destination = destinations[i];
if (!destination.getAbsolutePath().startsWith(metadataRootDirAbsPathString)) {
throw new UncheckedDeephavenException("All destinations must be nested under the provided metadata root"
+ " directory, provided destination " + destination.getAbsolutePath() + " is not under " +
metadataRootDirAbsPathString);
}
// TODO How should I change the basename in the API for writeKeyValuePartitioned data for this check?
if (i > 0) {
// We use filename to generate the key for each file's metadata in the common metadata file, therefore
// all files must have unique names.
if (destination.getName().equals(firstDestination.getName())) {
throw new UncheckedDeephavenException("When generating common metadata for multiple parquet files, "
+ "all files must have unique names, but " + destination.getName() + " is repeated.");
}
}
}
for (final File destination : destinations) {
if (!destination.getAbsolutePath().startsWith(metadataRootDirAbsPathString)) {
throw new UncheckedDeephavenException("All destinations must be nested under the provided metadata root"
Expand All @@ -98,27 +121,28 @@ private static class ParquetFileMetadata {
/**
* Added parquet metadata for provided parquet file.
*
* @param parquetFile The parquet file destination path
* @param parquetFilePath The parquet file destination path
* @param metadata The parquet metadata
*/
public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata metadata) {
parquetFileMetadataList.add(new ParquetFileMetadata(parquetFile, metadata));
public void addParquetFileMetadata(final String parquetFilePath, final ParquetMetadata metadata) {
parquetFileMetadataList.add(new ParquetFileMetadata(parquetFilePath, metadata));
}

/**
* Write the accumulated metadata to the provided files and clear the metadata accumulated so far.
*
* @param metadataFile The destination for the {@value ParquetUtils#METADATA_FILE_NAME} file
* @param commonMetadataFile The destination for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file
* @param metadataFilePath The destination path for the {@value ParquetUtils#METADATA_FILE_NAME} file
* @param commonMetadataFilePath The destination path for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file
*/
public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) throws IOException {
public void writeMetadataFiles(final String metadataFilePath, final String commonMetadataFilePath)
throws IOException {
if (parquetFileMetadataList.isEmpty()) {
throw new UncheckedDeephavenException("No parquet files to write metadata for");
}
mergeMetadata();
final ParquetMetadata metadataFooter = new ParquetMetadata(new FileMetaData(mergedSchema,
mergedKeyValueMetaData, mergedCreatedByString), mergedBlocks);
writeMetadataFile(metadataFooter, metadataFile.getAbsolutePath());
writeMetadataFile(metadataFooter, metadataFilePath);

// Skip the blocks data and merge schema with partitioning columns' schema to write the common metadata file.
// The ordering of arguments in method call is important because we want to keep partitioning columns in the
Expand All @@ -127,7 +151,7 @@ public void writeMetadataFiles(final File metadataFile, final File commonMetadat
final ParquetMetadata commonMetadataFooter =
new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString),
new ArrayList<>());
writeMetadataFile(commonMetadataFooter, commonMetadataFile.toString());
writeMetadataFile(commonMetadataFooter, commonMetadataFilePath);

// Clear the accumulated metadata
clear();
Expand All @@ -141,16 +165,16 @@ private void mergeMetadata() throws IOException {
for (final ParquetFileMetadata parquetFileMetadata : parquetFileMetadataList) {
final FileMetaData fileMetaData = parquetFileMetadata.metadata.getFileMetaData();
mergedSchema = mergeSchemaInto(fileMetaData.getSchema(), mergedSchema);
mergeKeyValueMetaData(fileMetaData.getKeyValueMetaData());
mergeKeyValueMetaData(parquetFileMetadata);
mergeBlocksInto(parquetFileMetadata, metadataRootDirAbsPath, mergedBlocks);
mergedCreatedBy.add(fileMetaData.getCreatedBy());
}
// Generate the merged deephaven-specific metadata
// Add table info to the merged key-value metadata
final TableInfo.Builder tableInfoBuilder = TableInfo.builder().addAllColumnTypes(mergedColumnTypes);
if (mergedVersion != null) {
tableInfoBuilder.version(mergedVersion);
}
mergedKeyValueMetaData.put(ParquetTableWriter.METADATA_KEY, tableInfoBuilder.build().serializeToJSON());
mergedKeyValueMetaData.put(METADATA_KEY, tableInfoBuilder.build().serializeToJSON());
mergedCreatedByString =
mergedCreatedBy.size() == 1 ? mergedCreatedBy.iterator().next() : mergedCreatedBy.toString();
}
Expand All @@ -170,13 +194,20 @@ private static MessageType mergeSchemaInto(final MessageType schema, final Messa
}

/**
* Merge the non-deephaven-specific key-value metadata for a file into the merged metadata. For deephaven specific
* fields, we filter the relevant fields here and merge them later once all the files have been processed.
* This method processes both deephaven specific and non-deephaven key-value metadata for each file.
* <ul>
* <li>For non-deephaven specific key-value metadata, we accumulate it directly and enforce that there is only one
* value for each key</li>
* <li>For deephaven specific key-value metadata, we copy each file's metadata directly into the merged metadata as
* well as accumulate the required fields to generate a common table info later once all files are processed.</li>
* </ul>
*/
private void mergeKeyValueMetaData(final Map<String, String> keyValueMetaData) throws IOException {
private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFileMetadata) throws IOException {
final Map<String, String> keyValueMetaData =
parquetFileMetadata.metadata.getFileMetaData().getKeyValueMetaData();
for (final Map.Entry<String, String> entry : keyValueMetaData.entrySet()) {
if (!entry.getKey().equals(ParquetTableWriter.METADATA_KEY)) {
// We should only have one value for each key
if (!entry.getKey().equals(METADATA_KEY)) {
// Make sure we have unique value for each key.
mergedKeyValueMetaData.compute(entry.getKey(), (k, v) -> {
if (v == null) {
// No existing value for this key, so put the new value
Expand All @@ -190,20 +221,26 @@ private void mergeKeyValueMetaData(final Map<String, String> keyValueMetaData) t
return v;
});
} else {
// For merging deephaven-specific metadata,
// Add a separate entry for each file
final String fileKey = getKeyForFilePath(Path.of(parquetFileMetadata.filePath));
// Assuming the keys are unique for each file because file names are unique, verified in the constructor
Assert.eqNull(mergedKeyValueMetaData.get(fileKey), "mergedKeyValueMetaData.get(fileKey)");
mergedKeyValueMetaData.put(fileKey, entry.getValue());

// Also, process and accumulate the relevant fields:
// - groupingColumns, dataIndexes are skipped
// - columnTypes must be the same for all partitions
// - version is set as non-null if all the files have the same version
final TableInfo tableInfo = TableInfo.deserializeFromJSON(entry.getValue());
if (mergedColumnTypes == null) {
// First time we've seen deephaven specific metadata, so just copy the relevant fields
// The First file for which we've seen deephaven specific metadata, so just copy the relevant fields
mergedColumnTypes = tableInfo.columnTypes();
mergedVersion = tableInfo.version();
} else {
if (!mergedColumnTypes.equals(tableInfo.columnTypes())) {
throw new UncheckedDeephavenException("Could not merge metadata for key " +
ParquetTableWriter.METADATA_KEY + ", has conflicting values for columnTypes: " +
tableInfo.columnTypes() + " and " + mergedColumnTypes);
throw new UncheckedDeephavenException("Could not merge metadata for key " + METADATA_KEY +
", has conflicting values for columnTypes: " + tableInfo.columnTypes() + " and "
+ mergedColumnTypes);
}
if (!tableInfo.version().equals(mergedVersion)) {
mergedVersion = null;
Expand All @@ -215,7 +252,7 @@ private void mergeKeyValueMetaData(final Map<String, String> keyValueMetaData) t

private static void mergeBlocksInto(final ParquetFileMetadata parquetFileMetadata,
final Path metadataRootDirAbsPath, final Collection<BlockMetaData> mergedBlocks) {
final Path parquetFileAbsPath = Paths.get(parquetFileMetadata.file.getAbsolutePath());
final Path parquetFileAbsPath = new File(parquetFileMetadata.filePath).getAbsoluteFile().toPath();
String fileRelativePathString = metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString();
// Remove leading slashes from the relative path
int pos = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.function.BiFunction;
import java.util.function.Supplier;

import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;

public class ParquetSchemaReader {
@FunctionalInterface
public interface ColumnDefinitionConsumer {
Expand Down Expand Up @@ -107,14 +109,14 @@ public static ParquetInstructions readParquetSchema(
}

public static Optional<TableInfo> parseMetadata(@NotNull final Map<String, String> keyValueMetadata) {
final String tableInfoRaw = keyValueMetadata.get(ParquetTableWriter.METADATA_KEY);
final String tableInfoRaw = keyValueMetadata.get(METADATA_KEY);
if (tableInfoRaw == null) {
return Optional.empty();
}
try {
return Optional.of(TableInfo.deserializeFromJSON(tableInfoRaw));
} catch (IOException e) {
throw new TableDataException("Failed to parse " + ParquetTableWriter.METADATA_KEY + " metadata", e);
throw new TableDataException("Failed to parse " + METADATA_KEY + " metadata", e);
}
}

Expand Down
Loading

0 comments on commit 1d98927

Please sign in to comment.