Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ dependencies {
api project(':util-channel')

Classpaths.inheritParquetHadoop(project)
Classpaths.inheritParquetHadoopConfiguration(project)

// TODO These are the additional dependencies that are required for the "fs.file.impl.disable.cache", check
// MetadataFileWriter code for more details.
// implementation group: 'org.apache.commons', name: 'commons-configuration2', version: '2.0'
// implementation group: 'org.apache.hadoop', name: 'hadoop-auth', version: '3.3.3'

malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
implementation project(':extensions-parquet-compression')
implementation project(':Base')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {

private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializer.Factory nullMaterializerFactory;

private URI uri;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
*/
Expand All @@ -62,12 +57,12 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI columnChunkURI,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootURI = rootURI;
this.columnChunkURI = columnChunkURI;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand Down Expand Up @@ -122,15 +117,7 @@ public final ColumnPageDirectAccessor getPageAccessor() {
}

private URI getURI() {
if (uri != null) {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
}
return columnChunkURI;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile;

/**
* Uses the hadoop implementation of {@link org.apache.parquet.hadoop.ParquetFileWriter} to generate metadata files for
* provided Parquet files.
*/
public final class MetadataFileWriter implements MetadataFileWriterBase {
// TODO Please suggest a better name for this class and the interface
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

private final String metadataRootDir;
private final List<Footer> footers;

public MetadataFileWriter(final String metadataRootDir, final File[] destinations) {
// TODO Discuss with Ryan about this check
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// This check exists inside the hadoop code but is done at the time of writing the metadata files
// I added this here so that we can do this check upfront to catch errors early.
for (final File destination : destinations) {
if (!destination.getAbsolutePath().startsWith(metadataRootDir)) {
throw new UncheckedDeephavenException("All destinations must be contained in the provided metadata root"
+ " directory, provided destination " + destination.getAbsolutePath() + " is not in " +
metadataRootDir);
}
}
this.metadataRootDir = metadataRootDir;
this.footers = new ArrayList<>(destinations.length);
}

/**
* Added parquet metadata for provided parquet file.
*
* @param parquetFile The parquet file destination path
* @param parquetMetadata The parquet metadata
*/
public void addFooter(final File parquetFile, final ParquetMetadata parquetMetadata) {
footers.add(new Footer(new Path(parquetFile.getAbsolutePath()), parquetMetadata));
}

public void writeMetadataFiles() {
final Configuration conf = new Configuration();
conf.setBoolean("fs.file.impl.disable.cache", true);

// TODO Discuss with Ryan about this setting
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// Not setting it requires adding dependency on hadoop-auth and commons-configuration2
// Setting it means we will not cache the filesystem instance which leads to a small performance hit but that's
// okay since this is only used when writing the metadata files
// But this leads to additional warnings like
// Jan 31, 2024 6:31:23 PM org.apache.hadoop.fs.FileSystem loadFileSystems
// WARNING: Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
// Provider org.apache.hadoop.fs.viewfs.ViewFileSystem could not be instantiated
// Jan 31, 2024 6:31:23 PM org.apache.hadoop.fs.FileSystem loadFileSystems
// WARNING: java.lang.NoClassDefFoundError: org/apache/commons/configuration2/Configuration
// Jan 31, 2024 6:31:23 PM org.apache.hadoop.fs.FileSystem loadFileSystems
// WARNING: java.lang.ClassNotFoundException: org.apache.commons.configuration2.Configuration
//
// Other option is to move the metadata file writing code outside of parquet-hadoop and then remove the code
// for this caching and filesystem stuff. Note that the parquet-hadoop method is deprecated, so we don't expect
// the code to
// change in the future.

try {
writeMetadataFile(conf, new Path(metadataRootDir), footers);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Failed to write metadata files at " + metadataRootDir, e);
}
footers.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;

/**
* Used to write _metadata and _common_metadata files for Parquet.
*/
public interface MetadataFileWriterBase {

/**
* Add the parquet metadata for the provided parquet file to the list of metadata to be written to combined metadata
* files.
*
* @param parquetFile The parquet file destination path
* @param parquetMetadata The parquet metadata corresponding to the parquet file
*/
void addFooter(File parquetFile, ParquetMetadata parquetMetadata);

/**
* Write the combined metadata files for all metadata accumulated so far and clear the list.
*/
void writeMetadataFiles();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;

/**
* A no-op implementation of MetadataFileWriterBase when we don't want to write metadata files for Parquet files.
*/
public final class NullMetadataFileWriter implements MetadataFileWriterBase {

public static final NullMetadataFileWriter INSTANCE = new NullMetadataFileWriter();

private NullMetadataFileWriter() {}

@Override
public void addFooter(final File parquetFile, final ParquetMetadata parquetMetadata) {}

@Override
public void writeMetadataFiles() {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,23 +40,29 @@ public final class ParquetFileWriter {
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
private final File metadataFilePath;
private final MetadataFileWriterBase metadataFileWriter;

public ParquetFileWriter(
final String filePath,
final File destFile,
final File metadataFilePath,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final SeekableChannelsProvider channelsProvider,
final int targetPageSize,
final ByteBufferAllocator allocator,
final MessageType type,
final String codecName,
final Map<String, String> extraMetaData) throws IOException {
final Map<String, String> extraMetaData,
@NotNull final MetadataFileWriterBase metadataFileWriter) throws IOException {
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.extraMetaData = new HashMap<>(extraMetaData);
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false),
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(destFile.getPath(), false),
OUTPUT_BUFFER_SIZE);
bufferedOutput.write(ParquetFileReader.MAGIC);
this.type = type;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
this.metadataFilePath = metadataFilePath;
this.metadataFileWriter = metadataFileWriter;
}

public RowGroupWriter addRowGroup(final long size) {
Expand All @@ -71,6 +79,7 @@ public void close() throws IOException {
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer);
metadataFileWriter.addFooter(metadataFilePath, footer);
// Flush any buffered data and close the channel
bufferedOutput.close();
compressorAdapter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;

public class RowGroupReaderImpl implements RowGroupReader {

private static final int BUFFER_SIZE = 65536;
Expand Down Expand Up @@ -79,17 +82,26 @@ public ColumnChunkReaderImpl getColumnChunk(@NotNull final List<String> path,
return null;
}

final URI columnChunkURI;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
columnChunkURI = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
columnChunkURI = rootURI;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

OffsetIndex offsetIndex = null;
if (columnChunk.isSetOffset_index_offset()) {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(channelContext, rootURI)) {
try (final SeekableByteChannel readChannel =
channelsProvider.getReadChannel(channelContext, columnChunkURI)) {
readChannel.position(columnChunk.getOffset_index_offset());
offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(
new BufferedInputStream(Channels.newInputStream(readChannel), BUFFER_SIZE)));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootURI, type, offsetIndex, fieldTypes,
return new ColumnChunkReaderImpl(columnChunk, channelsProvider, columnChunkURI, type, offsetIndex, fieldTypes,
numRows(), version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public static int getDefaultTargetPageSize() {
return defaultTargetPageSize;
}

static final String DEFAULT_METADATA_ROOT_DIR = ""; // Empty = No metadata files written

public ParquetInstructions() {}

public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) {
Expand Down Expand Up @@ -164,6 +166,11 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean isRefreshing();

/**
* @return the directory in which metadata files should be stored.
*/
public abstract String getMetadataRootDir();

@VisibleForTesting
public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) {
if (i1 == EMPTY) {
Expand Down Expand Up @@ -238,6 +245,11 @@ public int getTargetPageSize() {
public boolean isRefreshing() {
return DEFAULT_IS_REFRESHING;
}

@Override
public String getMetadataRootDir() {
return DEFAULT_METADATA_ROOT_DIR;
}
};

private static class ColumnInstructions {
Expand Down Expand Up @@ -306,6 +318,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final int targetPageSize;
private final boolean isRefreshing;
private final Object specialInstructions;
private final String metadataRootDir;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -316,7 +329,8 @@ private ReadOnly(
final boolean isLegacyParquet,
final int targetPageSize,
final boolean isRefreshing,
final Object specialInstructions) {
final Object specialInstructions,
final String metadataRootDir) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -326,6 +340,7 @@ private ReadOnly(
this.targetPageSize = targetPageSize;
this.isRefreshing = isRefreshing;
this.specialInstructions = specialInstructions;
this.metadataRootDir = metadataRootDir;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -419,6 +434,10 @@ public boolean isRefreshing() {
return specialInstructions;
}

@Override
public String getMetadataRootDir() {
return metadataRootDir;
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
Expand Down Expand Up @@ -471,6 +490,7 @@ public static class Builder {
private int targetPageSize = defaultTargetPageSize;
private boolean isRefreshing = DEFAULT_IS_REFRESHING;
private Object specialInstructions;
private String metadataRootDir = DEFAULT_METADATA_ROOT_DIR;

public Builder() {}

Expand Down Expand Up @@ -647,6 +667,17 @@ public Builder setSpecialInstructions(final Object specialInstructions) {
return this;
}

/**
* Set the default metadata root directory.
*
* @param metadataRootDir the root directory to store metadata files in. All the parquet destinations should be
* inside this directory.
*/
public Builder setMetadataRootDir(final String metadataRootDir) {
this.metadataRootDir = metadataRootDir;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -655,7 +686,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions);
specialInstructions, metadataRootDir);
}
}

Expand Down
Loading
Loading