Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet read TableDefinition support #4831

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review response. Add Dict support
  • Loading branch information
devinrsmith committed Nov 15, 2023
commit 2b3017c86817d889d82f916e145131ddc53eb6c6
Original file line number Diff line number Diff line change
@@ -783,11 +783,9 @@ public static Table readPartitionedTableInferSchema(
return readPartitionedTable(locationKeyFinder, readInstructions);
}

/**
* TODO(deephaven-core#877): Support schema merge when discovering multiple parquet files
*/
private static Pair<TableDefinition, ParquetInstructions> infer(
KnownLocationKeyFinder<ParquetTableLocationKey> inferenceKeys, ParquetInstructions readInstructions) {
// TODO(deephaven-core#877): Support schema merge when discovering multiple parquet files
final ParquetTableLocationKey lastKey = inferenceKeys.getLastKey().orElse(null);
if (lastKey == null) {
throw new IllegalArgumentException(
36 changes: 27 additions & 9 deletions py/server/deephaven/parquet.py
Original file line number Diff line number Diff line change
@@ -6,12 +6,13 @@
Parquet files. """
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
from typing import List, Optional, Union, Dict

import jpy

from deephaven import DHError
from deephaven.column import Column
from deephaven.dtypes import DType
from deephaven.table import Table

_JParquetTools = jpy.get_type("io.deephaven.parquet.table.ParquetTools")
@@ -90,6 +91,23 @@ def _build_parquet_instructions(

return builder.build()

def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmao-denver Should this go in table.py or some other location where it could be reused?

devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
if table_definition is None:
return None
elif isinstance(table_definition, Dict):
return _JTableDefinition.of(
[
Column(name=name, data_type=dtype).j_column_definition
for name, dtype in table_definition.items()
]
)
elif isinstance(table_definition, List):
return _JTableDefinition.of(
[col.j_column_definition for col in table_definition]
)
else:
raise DHError("not type")
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

class ParquetType(Enum):
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
""" The parquet file layout type. """

@@ -112,7 +130,7 @@ def read(
is_legacy_parquet: bool = False,
is_refreshing: bool = False,
type: Optional[ParquetType] = None,
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
table_definition: Optional[List[Column]] = None,
table_definition: Union[Dict[str, DType], List[Column], None] = None,
) -> Table:
""" Reads in a table from a single parquet, metadata file, or directory with recognized layout.

@@ -123,11 +141,11 @@ def read(
is_legacy_parquet (bool): if the parquet data is legacy
is_refreshing (bool): if the parquet data represents a refreshing source
type (Optional[ParquetType]): the parquet type, by default None. When None, the type is inferred.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
table_definition (Optional[List[Column]]): the table definition, by default None. When None, the definition is
inferred from the parquet file(s). Setting a definition guarantees the returned table will have that
definition. This is useful for bootstrapping purposes when the initial partitioned directory is empty and
is_refreshing=True. It is also useful for specifying a subset of the parquet definition. When set, type must
also be set.
table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None,
the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will
have that definition. This is useful for bootstrapping purposes when the initial partitioned directory is
empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. When set,
type must also be set.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
Returns:
a table

@@ -143,10 +161,10 @@ def read(
for_read=True,
force_build=True,
)
if table_definition is not None:
j_table_definition = _j_table_definition(table_definition)
if j_table_definition is not None:
if not type:
raise DHError("Must provide type when table_definition is set")
j_table_definition = _JTableDefinition.of([col.j_column_definition for col in table_definition])
if type == ParquetType.SINGLE:
j_table = _JParquetTools.readSingleTable(_JFile(path), read_instructions, j_table_definition)
elif type == ParquetType.FLAT_PARTITIONED:
30 changes: 15 additions & 15 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -452,11 +452,11 @@ def test_read_single(self):
with self.subTest(msg="read single"):
actual = read(
single_parquet,
table_definition=[
Column("x", dtypes.int32),
Column("y", dtypes.double),
Column("z", dtypes.double),
],
table_definition={
"x": dtypes.int32,
"y": dtypes.double,
"z": dtypes.double,
},
type=ParquetType.SINGLE,
)
self.assert_table_equals(actual, table)
@@ -483,11 +483,11 @@ def test_read_flat_partitioned(self):
with self.subTest(msg="read flat"):
actual = read(
flat_dir,
table_definition=[
Column("x", dtypes.int32),
Column("y", dtypes.double),
Column("z", dtypes.double),
],
table_definition={
"x": dtypes.int32,
"y": dtypes.double,
"z": dtypes.double,
},
type=ParquetType.FLAT_PARTITIONED,
)
self.assert_table_equals(actual, table)
@@ -542,11 +542,11 @@ def test_read_with_table_definition_no_type(self):
with self.assertRaises(DHError) as cm:
read(
fake_parquet,
table_definition=[
Column("x", dtypes.int32),
Column("y", dtypes.double),
Column("z", dtypes.double),
],
table_definition={
"x": dtypes.int32,
"y": dtypes.double,
"z": dtypes.double,
},
)
self.assertIn(
"Must provide type when table_definition is set", str(cm.exception)