Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto convert Java values(arrays/scalar) to Numpy ones and convert DH nulls based on the annotations of the params of a Py UDF #4502

Merged
merged 18 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions py/server/deephaven/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ def __call__(self, *args, **kwargs):
}


_J_ARRAY_NP_TYPE_MAP = {
boolean_array.j_type: np.dtype("?"),
byte_array.j_type: np.dtype("b"),
char_array.j_type: np.dtype("uint16"),
short_array.j_type: np.dtype("h"),
int32_array.j_type: np.dtype("i"),
long_array.j_type: np.dtype("l"),
float32_array.j_type: np.dtype("f"),
double_array.j_type: np.dtype("d")
}


def null_remap(dtype: DType) -> Callable[[Any], Any]:
""" Creates a null value remap function for the provided DType.

Expand Down Expand Up @@ -354,6 +366,8 @@ def _scalar(x: Any, dtype: DType) -> Any:
elif x.dtype.char == 'M':
from deephaven.time import to_j_instant
return to_j_instant(x)
elif x.dtype.char == 'H': # np.uint16
return jpy.get_type("java.lang.Character")(int(x))
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(x, (datetime.datetime, pd.Timestamp)):
from deephaven.time import to_j_instant
return to_j_instant(x)
Expand Down Expand Up @@ -382,14 +396,26 @@ def _component_np_dtype_char(t: type) -> Optional[str]:
if isinstance(t, _GenericAlias) and issubclass(t.__origin__, Sequence):
component_type = t.__args__[0]

if not component_type:
component_type = _np_ndarray_component_type(t)

if component_type:
return _np_dtype_char(component_type)
else:
return None


def _np_ndarray_component_type(t):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the numpy ndarray component type if the type is a numpy ndarray, otherwise return None."""

# Py3.8: npt.NDArray can be used in Py 3.8 as a generic alias, but a specific alias (e.g. npt.NDArray[np.int64])
# is an instance of a private class of np, yet we don't have a choice but to use it. And when npt.NDArray is used,
# the 1st argument is typing.Any, the 2nd argument is another generic alias of which the 1st argument is the
# component type
if not component_type and sys.version_info.minor == 8:
component_type = None
if sys.version_info.minor == 8:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(t, np._typing._generic_alias._GenericAlias) and t.__origin__ == np.ndarray:
component_type = t.__args__[1].__args__[0]

# Py3.9+, np.ndarray as a generic alias is only supported in Python 3.9+, also npt.NDArray is still available but a
# specific alias (e.g. npt.NDArray[np.int64]) now is an instance of typing.GenericAlias.
# when npt.NDArray is used, the 1st argument is typing.Any, the 2nd argument is another generic alias of which
Expand All @@ -406,8 +432,4 @@ def _component_np_dtype_char(t: type) -> Optional[str]:
a1 = t.__args__[1]
if a0 == typing.Any and isinstance(a1, types.GenericAlias):
component_type = a1.__args__[0]

if component_type:
return _np_dtype_char(component_type)
else:
return None
return component_type
133 changes: 129 additions & 4 deletions py/server/deephaven/jcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,29 @@
""" This module provides Java compatibility support including convenience functions to create some widely used Java
data structures from corresponding Python ones in order to be able to call Java methods. """

from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, TypeVar, Union
from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, TypeVar, Union, Tuple, Literal

import jpy
import numpy as np
import pandas as pd

from deephaven import dtypes, DHError
from deephaven._wrapper import unwrap, wrap_j_object
from deephaven.dtypes import DType
from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP, _J_ARRAY_NP_TYPE_MAP

_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")

_DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = {
dtypes.bool_: pd.BooleanDtype,
dtypes.byte: pd.Int8Dtype,
dtypes.short: pd.Int16Dtype,
dtypes.char: pd.UInt16Dtype,
dtypes.int32: pd.Int32Dtype,
dtypes.int64: pd.Int64Dtype,
dtypes.float32: pd.Float32Dtype,
dtypes.float64: pd.Float64Dtype,
}


def is_java_type(obj: Any) -> bool:
Expand Down Expand Up @@ -181,11 +198,119 @@ def to_sequence(v: Union[T, Sequence[T]] = None, wrapped: bool = False) -> Seque
return ()
if wrapped:
if not isinstance(v, Sequence) or isinstance(v, str):
return (v, )
return (v,)
else:
return tuple(v)

if not isinstance(v, Sequence) or isinstance(v, str):
return (unwrap(v), )
return (unwrap(v),)
else:
return tuple((unwrap(o) for o in v))


def _j_array_to_numpy_array(dtype: DType, j_array: jpy.JType, conv_null: bool = False, no_promotion: bool = False) -> \
np.ndarray:
""" Produces a numpy array from the DType and given Java array."""
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if dtype.is_primitive:
np_array = np.frombuffer(j_array, dtype.np_type)
elif dtype == dtypes.Instant:
longs = _JPrimitiveArrayConversionUtility.translateArrayInstantToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(dtype.np_type)
elif dtype == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, dtype.np_type)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif dtype == dtypes.string:
np_array = np.array([s for s in j_array], dtypes.string.np_type)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif dtype.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, dtype.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)

if conv_null:
dh_null = _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)
if dh_null:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if dtype in (dtypes.float32, dtypes.float64):
np_array = np.copy(np_array)
np_array[np_array == dh_null] = np.nan
else:
if dtype is dtypes.bool_: # promote boolean to float64
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np.frombuffer(np_array, np.byte)
if any(np_array[np_array == dh_null]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if no_promotion:
raise DHError(f"Java array contains Deephaven nulls for dtype {dtype}")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np_array.astype(np.float64)
np_array[np_array == dh_null] = np.nan
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
else:
if dtype is dtypes.bool_: # promote boolean to float64
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np.frombuffer(np_array, np.bool_)
chipkent marked this conversation as resolved.
Show resolved Hide resolved
return np_array

return np_array


def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified Java array as a pandas.Series object.

Args:
j_array (jpy.JType): the Java array
dtype (DType): the data type of the Java array
conv_null (bool): whether to check for Deephaven nulls in the data and automatically replace them with
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
pd.NA.

Returns:
a pandas Series

Raises:
DHError
"""
if conv_null and dtype == dtypes.bool_:
j_array = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(j_array, dtype=np.byte)
s = pd.Series(data=np_array, dtype=pd.Int8Dtype(), copy=False)
s.mask(s == _NULL_BOOLEAN_AS_BYTE, inplace=True)
return s.astype(pd.BooleanDtype(), copy=False)

np_array = _j_array_to_numpy_array(dtype, j_array, conv_null=False)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if conv_null and (nv := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)) is not None:
pd_ex_dtype = _DH_PANDAS_NULLABLE_TYPE_MAP.get(dtype)
s = pd.Series(data=np_array, dtype=pd_ex_dtype(), copy=False)
s.mask(s == nv, inplace=True)
else:
s = pd.Series(data=np_array, copy=False)

return s


def _convert_udf_args(args: Tuple[Any], fn_signature: str, null_value: Literal[np.nan, pd.NA, None]) -> List[Any]:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
converted_args = []
for arg, np_dtype_char in zip(args, fn_signature):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if np_dtype_char == 'O':
converted_args.append(arg)
elif src_np_dtype := _J_ARRAY_NP_TYPE_MAP.get(type(arg)):
# array types
np_dtype = np.dtype(np_dtype_char)
if src_np_dtype != np_dtype and np_dtype != np.object_:
raise DHError(f"Cannot convert Java array of type {src_np_dtype} to numpy array of type {np_dtype}")
dtype = dtypes.from_np_dtype(np_dtype)
if null_value is pd.NA:
converted_args.append(_j_array_to_series(dtype, arg, conv_null=True))
else: # np.nan or None
converted_args.append(_j_array_to_numpy_array(dtype, arg, conv_null=bool(null_value)))
else: # scalar type or array types that don't need conversion
try:
np_dtype = np.dtype(np_dtype_char)
except TypeError:
converted_args.append(arg)
else:
dtype = dtypes.from_np_dtype(np_dtype)
if dtype is dtypes.bool_:
converted_args.append(null_value if arg is None else arg)
elif dh_null := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype):
converted_args.append(null_value if arg == dh_null else arg)
else:
converted_args.append(arg)
return converted_args
32 changes: 7 additions & 25 deletions py/server/deephaven/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

""" This module supports the conversion between Deephaven tables and numpy arrays. """
import re
from functools import wraps
from typing import List

import jpy
import numpy as np
from deephaven.dtypes import DType

from deephaven import DHError, dtypes, empty_table, new_table
from deephaven import DHError, dtypes, new_table
from deephaven.column import Column, InputColumn
from deephaven.table import Table
from deephaven.dtypes import DType
from deephaven.jcompat import _j_array_to_numpy_array, _convert_udf_args
from deephaven.table import Table, _encode_signature

_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JDataAccessHelpers = jpy.get_type("io.deephaven.engine.table.impl.DataAccessHelpers")


Expand All @@ -25,28 +26,9 @@ def _to_column_name(name: str) -> str:


def column_to_numpy_array(col_def: Column, j_array: jpy.JType) -> np.ndarray:
""" Produces a numpy array from the given Java array and the Table column definition. """
""" Produces a numpy array from the given Java array and the Table column definition."""
try:
if col_def.data_type.is_primitive:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
elif col_def.data_type == dtypes.Instant:
longs = _JPrimitiveArrayConversionUtility.translateArrayInstantToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(col_def.data_type.np_type)
elif col_def.data_type == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, col_def.data_type.np_type)
elif col_def.data_type == dtypes.string:
np_array = np.array([s for s in j_array], dtypes.string.np_type)
elif col_def.data_type.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)

return np_array
return _j_array_to_numpy_array(col_def.data_type, j_array)
except DHError:
raise
except Exception as e:
Expand Down
49 changes: 12 additions & 37 deletions py/server/deephaven/pandas.py
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

""" This module supports the conversion between Deephaven tables and pandas DataFrames. """
from typing import List, Dict, Tuple, Literal
from typing import List, Literal

import jpy
import numpy as np
Expand All @@ -13,26 +13,14 @@
from deephaven import DHError, new_table, dtypes, arrow
from deephaven.column import Column
from deephaven.constants import NULL_BYTE, NULL_SHORT, NULL_INT, NULL_LONG, NULL_FLOAT, NULL_DOUBLE, NULL_CHAR
from deephaven.dtypes import DType
from deephaven.numpy import column_to_numpy_array, _make_input_column
from deephaven.jcompat import _j_array_to_series
from deephaven.numpy import _make_input_column
from deephaven.table import Table

_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JDataAccessHelpers = jpy.get_type("io.deephaven.engine.table.impl.DataAccessHelpers")
_is_dtype_backend_supported = pd.__version__ >= "2.0.0"

_DTYPE_NULL_MAPPING: Dict[DType, Tuple] = {
dtypes.bool_: (_NULL_BOOLEAN_AS_BYTE, pd.BooleanDtype),
dtypes.byte: (NULL_BYTE, pd.Int8Dtype),
dtypes.short: (NULL_SHORT, pd.Int16Dtype),
dtypes.char: (NULL_CHAR, pd.UInt16Dtype),
dtypes.int32: (NULL_INT, pd.Int32Dtype),
dtypes.int64: (NULL_LONG, pd.Int64Dtype),
dtypes.float32: (NULL_FLOAT, pd.Float32Dtype),
dtypes.float64: (NULL_DOUBLE, pd.Float64Dtype),
}


def _column_to_series(table: Table, col_def: Column, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified column as a pandas.Series object.
Expand All @@ -51,29 +39,15 @@ def _column_to_series(table: Table, col_def: Column, conv_null: bool) -> pd.Seri
"""
try:
data_col = _JDataAccessHelpers.getColumn(table.j_table, col_def.name)
if conv_null and col_def.data_type == dtypes.bool_:
j_array = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(data_col.getDirect())
np_array = np.frombuffer(j_array, dtype=np.byte)
s = pd.Series(data=np_array, dtype=pd.Int8Dtype(), copy=False)
s.mask(s == _NULL_BOOLEAN_AS_BYTE, inplace=True)
return s.astype(pd.BooleanDtype(), copy=False)

np_array = column_to_numpy_array(col_def, data_col.getDirect())
if conv_null and (null_pair := _DTYPE_NULL_MAPPING.get(col_def.data_type)) is not None:
nv = null_pair[0]
pd_ex_dtype = null_pair[1]
s = pd.Series(data=np_array, dtype=pd_ex_dtype(), copy=False)
s.mask(s == nv, inplace=True)
else:
s = pd.Series(data=np_array, copy=False)
return s
j_array = data_col.getDirect()
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
return _j_array_to_series(col_def.data_type, j_array, conv_null)
except DHError:
raise
except Exception as e:
raise DHError(e, message="failed to create a pandas Series for {col}") from e


_DTYPE_MAPPING_PYARROW = {
_PANDAS_ARROW_TYPE_MAP = {
pa.int8(): pd.ArrowDtype(pa.int8()),
pa.int16(): pd.ArrowDtype(pa.int16()),
pa.int32(): pd.ArrowDtype(pa.int32()),
Expand All @@ -90,7 +64,7 @@ def _column_to_series(table: Table, col_def: Column, conv_null: bool) -> pd.Seri
pa.timestamp('ns', tz='UTC'): pd.ArrowDtype(pa.timestamp('ns', tz='UTC')),
}

_DTYPE_MAPPING_NUMPY_NULLABLE = {
_PANDAS_NULLABLE_TYPE_MAP = {
pa.int8(): pd.Int8Dtype(),
pa.int16(): pd.Int16Dtype(),
pa.uint16(): pd.UInt16Dtype(),
Expand All @@ -107,8 +81,8 @@ def _column_to_series(table: Table, col_def: Column, conv_null: bool) -> pd.Seri
}

_PYARROW_TO_PANDAS_TYPE_MAPPERS = {
"pyarrow": _DTYPE_MAPPING_PYARROW.get,
"numpy_nullable": _DTYPE_MAPPING_NUMPY_NULLABLE.get,
"pyarrow": _PANDAS_ARROW_TYPE_MAP.get,
"numpy_nullable": _PANDAS_NULLABLE_TYPE_MAP.get,
}


Expand Down Expand Up @@ -180,7 +154,7 @@ def to_pandas(table: Table, cols: List[str] = None,
raise DHError(e, "failed to create a pandas DataFrame from table.") from e


_EX_DTYPE_NULL_MAP = {
_PANDAS_EXTYPE_DH_NULL_MAP = {
# This reflects the fact that in the server we use NULL_BOOLEAN_AS_BYTE - the byte encoding of null boolean to
# translate boxed Boolean to/from primitive bytes
pd.BooleanDtype: _NULL_BOOLEAN_AS_BYTE,
Expand Down Expand Up @@ -209,7 +183,7 @@ def _map_na(array: [np.ndarray, pd.api.extensions.ExtensionArray]):
if not isinstance(pd_dtype, pd.api.extensions.ExtensionDtype):
return array

dh_null = _EX_DTYPE_NULL_MAP.get(type(pd_dtype)) or _EX_DTYPE_NULL_MAP.get(pd_dtype)
dh_null = _PANDAS_EXTYPE_DH_NULL_MAP.get(type(pd_dtype)) or _PANDAS_EXTYPE_DH_NULL_MAP.get(pd_dtype)
# To preserve NaNs in floating point arrays, Pandas doesn't distinguish NaN/Null as far as NA testing is
# concerned, thus its fillna() method will replace both NaN/Null in the data.
if isinstance(pd_dtype, (pd.Float32Dtype, pd.Float64Dtype)) and isinstance(getattr(array, "_data"), np.ndarray):
Expand Down Expand Up @@ -276,3 +250,4 @@ def to_table(df: pd.DataFrame, cols: List[str] = None) -> Table:
raise
except Exception as e:
raise DHError(e, "failed to create a Deephaven Table from a pandas DataFrame.") from e

Loading
Loading