Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;

final class RowGroupReaderImpl implements RowGroupReader {
private final RowGroup rowGroup;
private final SeekableChannelsProvider channelsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,8 +724,7 @@ public Builder setGenerateMetadataFiles(final boolean generateMetadataFiles) {
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc., where PC is a partitioning
* column.</li>
* <li>The token {@value #UUID_TOKEN} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
* "table-{uuid}" will result in files named like "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet".</li>
* <li>The token {@value #PARTITIONS_TOKEN} will be replaced with an underscore-delimited, concatenated string
* of partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static class ParquetFileMetadata {
}

/**
* Add parquet metadata for the provided parquet file the combined metadata file.
* Add parquet metadata for the provided parquet file to the combined metadata file.
*
* @param parquetFilePath The parquet file destination path
* @param metadata The parquet metadata
Expand Down Expand Up @@ -255,13 +255,7 @@ private static void mergeBlocksInto(final ParquetFileMetadata parquetFileMetadat

private static String getRelativePath(final String parquetFilePath, final Path metadataRootDirAbsPath) {
final Path parquetFileAbsPath = new File(parquetFilePath).getAbsoluteFile().toPath();
final String relativePath = metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString();
// Remove leading slashes from the relative path
int pos = 0;
while (pos < relativePath.length() && relativePath.charAt(pos) == '/') {
pos++;
}
return relativePath.substring(pos);
return metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString();
}

private void writeMetadataFile(final ParquetMetadata metadataFooter, final String outputPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ static class IndexWritingInfo {
final File destFile) {
this.indexColumnNames = indexColumnNames;
this.parquetColumnNames = parquetColumnNames;
this.destFileForMetadata = destFileForMetadata;
this.destFile = destFile;
this.destFileForMetadata = destFileForMetadata.getAbsoluteFile();
this.destFile = destFile.getAbsoluteFile();
}
}

Expand Down Expand Up @@ -143,7 +143,7 @@ static void write(
try {
if (indexInfoList != null) {
cleanupFiles = new ArrayList<>(indexInfoList.size());
final Path destDirPath = Paths.get(destFilePath).getParent();
final Path destDirPath = new File(destFilePath).getAbsoluteFile().getParentFile().toPath();
for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) {
try (final SafeCloseable ignored = t.isRefreshing() ? LivenessScopeStack.open() : null) {
// This will retrieve an existing index if one exists, or create a new one if not
Expand All @@ -166,7 +166,6 @@ static void write(
.addColumnNameMapping(INDEX_ROW_SET_COLUMN_NAME, dataIndex.rowSetColumnName())
.build();
}
// We don't accumulate metadata from grouping files into the main metadata file
write(indexTable, indexTable.getDefinition(), writeInstructionsToUse,
info.destFile.getAbsolutePath(), info.destFileForMetadata.getAbsolutePath(),
Collections.emptyMap(), TableInfo.builder(), NullParquetMetadataFileWriter.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,9 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}. This method does
* not write any indexes as sidecar tables to disk. To write indexes, use
* {@link #writeKeyValuePartitionedTable(PartitionedTable, String, ParquetInstructions, String[][])}.
*
* @param partitionedTable The partitioned table to write
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
Expand Down Expand Up @@ -658,7 +660,9 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}. This method does
* not write any indexes as sidecar tables to disk. To write indexes, use
* {@link #writeKeyValuePartitionedTable(PartitionedTable, TableDefinition, String, ParquetInstructions, String[][])}.
*
* @param partitionedTable The partitioned table to write
* @param definition table definition to use (instead of the one implied by the table itself)
Expand Down Expand Up @@ -798,27 +802,38 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned
} else {
partitioningColumnsSchema = null;
}
final Map<String, Map<ParquetCacheTags, Object>> computedCache =
buildComputedCache(() -> sourceTable.orElseGet(partitionedTable::merge), leafDefinition);
final Table[] partitionedDataArray = partitionedData.toArray(Table[]::new);
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// TODO(deephaven-core#5292): Optimize creating index on constituent tables
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
addIndexesToTables(partitionedDataArray, indexColumnArr);
final Map<String, Map<ParquetCacheTags, Object>> computedCache =
buildComputedCache(() -> sourceTable.orElseGet(partitionedTable::merge), leafDefinition);
// Store hard reference to prevent indexes from being garbage collected
final List<DataIndex> dataIndexes = addIndexesToTables(partitionedDataArray, indexColumnArr);
writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions,
destinations.toArray(File[]::new), indexColumnArr, partitioningColumnsSchema,
new File(destinationRoot), computedCache);
if (dataIndexes != null) {
dataIndexes.clear();
}
}
}

private static void addIndexesToTables(@NotNull final Table[] tables,
/**
* Add data indexes to provided tables, if not present, and return a list of hard references to the indexes.
*/
@Nullable
private static List<DataIndex> addIndexesToTables(@NotNull final Table[] tables,
@Nullable final String[][] indexColumnArr) {
if (indexColumnArr != null && indexColumnArr.length != 0) {
for (final Table table : tables) {
for (final String[] indexCols : indexColumnArr) {
DataIndexer.getOrCreateDataIndex(table, indexCols);
}
if (indexColumnArr == null || indexColumnArr.length == 0) {
return null;
}
final List<DataIndex> dataIndexes = new ArrayList<>(indexColumnArr.length * tables.length);
for (final Table table : tables) {
for (final String[] indexCols : indexColumnArr) {
dataIndexes.add(DataIndexer.getOrCreateDataIndex(table, indexCols));
}
}
return dataIndexes;
}

/**
Expand Down
67 changes: 48 additions & 19 deletions py/server/deephaven/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def _j_file_array(paths: List[str]):
return jpy.array("java.io.File", [_JFile(el) for el in paths])


def _j_array_of_array_of_string(data_indexes: List[List[str]]):
return jpy.array("[Ljava.lang.String;", [jpy.array("java.lang.String", index_cols) for index_cols in data_indexes])


def delete(path: str) -> None:
""" Deletes a Parquet table on disk.

Expand All @@ -245,6 +249,7 @@ def write(
max_dictionary_size: Optional[int] = None,
target_page_size: Optional[int] = None,
generate_metadata_files: Optional[bool] = None,
index_columns: Optional[List[List[str]]] = None
) -> None:
""" Write a table to a Parquet file.

Expand All @@ -268,6 +273,11 @@ def write(
defaults to False. Generating these files can help speed up reading of partitioned parquet data because these
files contain metadata (including schema) about the entire dataset, which can be used to skip reading some
files.
index_columns (Optional[List[List[str]]]): Lists containing the column names for indexes to persist. The write
operation will store the index info as sidecar tables. By default, data indexes to write are determined by
those present on the source table. This argument is used to narrow the set of indexes to write, or to be
explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
will be computed on demand.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

Raises:
DHError
Expand All @@ -282,15 +292,18 @@ def write(
for_read=False,
generate_metadata_files=generate_metadata_files,
)

table_definition = None
if col_definitions is not None:
table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])
else:
table_definition = table._definition

if table_definition:
_JParquetTools.writeTable(table.j_table, path, table_definition, write_instructions)
if index_columns:
table_array = jpy.array("io.deephaven.engine.table.Table", [table.j_table])
index_columns_array = _j_array_of_array_of_string(index_columns)
_JParquetTools.writeParquetTables(table_array, table_definition, write_instructions,
_j_file_array([path]), index_columns_array)
else:
_JParquetTools.writeTable(table.j_table, _JFile(path), write_instructions)
_JParquetTools.writeTable(table.j_table, path, table_definition, write_instructions)
except Exception as e:
raise DHError(e, "failed to write to parquet data.") from e

Expand All @@ -306,6 +319,7 @@ def write_partitioned(
target_page_size: Optional[int] = None,
base_name: Optional[str] = None,
generate_metadata_files: Optional[bool] = None,
index_columns: Optional[List[List[str]]] = None
) -> None:
""" Write table to disk in parquet format with the partitioning columns written as "key=value" format in a nested
directory structure. For example, for a partitioned column "date", we will have a directory structure like
Expand Down Expand Up @@ -344,6 +358,11 @@ def write_partitioned(
defaults to False. Generating these files can help speed up reading of partitioned parquet data because these
files contain metadata (including schema) about the entire dataset, which can be used to skip reading some
files.
index_columns (Optional[List[List[str]]]): Lists containing the column names for indexes to persist. The write
operation will store the index info as sidecar tables. By default, data indexes to write are determined by
those present on the source table. This argument is used to narrow the set of indexes to write, or to be
explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
will be computed on demand.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

Raises:
DHError
Expand All @@ -364,11 +383,20 @@ def write_partitioned(
if col_definitions is not None:
table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])

if table_definition:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, table_definition, destination_dir,
write_instructions)
if index_columns:
index_columns_array = _j_array_of_array_of_string(index_columns)
if table_definition:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, table_definition, destination_dir,
write_instructions, index_columns_array)
else:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, destination_dir, write_instructions,
index_columns_array)
else:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, destination_dir, write_instructions)
if table_definition:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, table_definition, destination_dir,
write_instructions)
else:
_JParquetTools.writeKeyValuePartitionedTable(table.j_object, destination_dir, write_instructions)
except Exception as e:
raise DHError(e, "failed to write to parquet data.") from e

Expand All @@ -382,23 +410,19 @@ def batch_write(
max_dictionary_keys: Optional[int] = None,
max_dictionary_size: Optional[int] = None,
target_page_size: Optional[int] = None,
grouping_cols: Optional[List[str]] = None,
generate_metadata_files: Optional[bool] = None,
index_columns: Optional[List[List[str]]] = None
):
""" Writes tables to disk in parquet format to a supplied set of paths.

If you specify grouping columns, there must already be grouping information for those columns in the sources.
This can be accomplished with .groupBy(<grouping columns>).ungroup() or .sort(<grouping column>).

Note that either all the tables are written out successfully or none is.

Args:
tables (List[Table]): the source tables
paths (List[str]): the destinations paths. Any non existing directories in the paths provided are
paths (List[str]): the destination paths. Any non-existing directories in the paths provided are
created. If there is an error, any intermediate directories previously created are removed; note this makes
this method unsafe for concurrent use
col_definitions (Optional[List[Column]]): the column definitions to use for writing, instead of the definitions
implied by the table. Default is None, which means use the column definitions implied by the table
col_definitions (List[Column]): the column definitions to use for writing.
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
col_instructions (Optional[List[ColumnInstruction]]): instructions for customizations while writing
compression_codec_name (Optional[str]): the compression codec to use. Allowed values include "UNCOMPRESSED",
"SNAPPY", "GZIP", "LZO", "LZ4", "LZ4_RAW", "ZSTD", etc. If not specified, defaults to "SNAPPY".
Expand All @@ -407,11 +431,15 @@ def batch_write(
max_dictionary_size (Optional[int]): the maximum number of bytes the writer should add to the dictionary before
switching to non-dictionary encoding, never evaluated for non-String columns, defaults to 2^20 (1,048,576)
target_page_size (Optional[int]): the target page size in bytes, if not specified, defaults to 2^20 bytes (1 MiB)
grouping_cols (Optional[List[str]]): the group column names
generate_metadata_files (Optional[bool]): whether to generate parquet _metadata and _common_metadata files,
defaults to False. Generating these files can help speed up reading of partitioned parquet data because these
files contain metadata (including schema) about the entire dataset, which can be used to skip reading some
files.
index_columns (Optional[List[List[str]]]): Lists containing the column names for indexes to persist. The write
operation will store the index info as sidecar tables. By default, data indexes to write are determined by
those present on the source table. This argument is used to narrow the set of indexes to write, or to be
explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
will be computed on demand.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

Raises:
DHError
Expand All @@ -429,9 +457,10 @@ def batch_write(

table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])

if grouping_cols:
if index_columns:
index_columns_array = _j_array_of_array_of_string(index_columns)
_JParquetTools.writeParquetTables([t.j_table for t in tables], table_definition, write_instructions,
_j_file_array(paths), grouping_cols)
_j_file_array(paths), index_columns_array)
else:
_JParquetTools.writeTables([t.j_table for t in tables], table_definition,
_j_file_array(paths))
Expand Down
Loading
Loading