Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto convert Java values(arrays/scalar) to Numpy ones and convert DH nulls based on the annotations of the params of a Py UDF #4502

Merged
merged 18 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions py/server/deephaven/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ def __call__(self, *args, **kwargs):
}


_J_ARRAY_NP_TYPE_MAP = {
boolean_array.j_type: np.dtype("?"),
byte_array.j_type: np.dtype("b"),
char_array.j_type: np.dtype("uint16"),
short_array.j_type: np.dtype("h"),
int32_array.j_type: np.dtype("i"),
long_array.j_type: np.dtype("l"),
float32_array.j_type: np.dtype("f"),
double_array.j_type: np.dtype("d")
}


def null_remap(dtype: DType) -> Callable[[Any], Any]:
""" Creates a null value remap function for the provided DType.

Expand Down Expand Up @@ -329,14 +341,25 @@ def from_np_dtype(np_dtype: Union[np.dtype, pd.api.extensions.ExtensionDtype]) -
_NUMPY_FLOATING_TYPE_CODES = ["f", "d"]


def _is_py_null(x: Any) -> bool:
"""Checks if the value is a Python null value, i.e. None or NaN, or Pandas.NA."""
if x is None:
return True

try:
return pd.isna(x)
except ValueError:
return False


def _scalar(x: Any, dtype: DType) -> Any:
"""Converts a Python value to a Java scalar value. It converts the numpy primitive types, string to
their Python equivalents so that JPY can handle them. For datetime values, it converts them to Java Instant.
Otherwise, it returns the value as is."""

# NULL_BOOL will appear in Java as a byte value which causes a cast error. We just let JPY converts it to Java null
# and the engine has casting logic to handle it.
if x is None and dtype != bool_ and _PRIMITIVE_DTYPE_NULL_MAP.get(dtype):
if x is None and dtype not in (bool_, char) and _PRIMITIVE_DTYPE_NULL_MAP.get(dtype):
return _PRIMITIVE_DTYPE_NULL_MAP[dtype]

try:
Expand All @@ -354,6 +377,8 @@ def _scalar(x: Any, dtype: DType) -> Any:
elif x.dtype.char == 'M':
from deephaven.time import to_j_instant
return to_j_instant(x)
elif x.dtype.char == 'H': # np.uint16
return jpy.get_type("java.lang.Character")(int(x))
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(x, (datetime.datetime, pd.Timestamp)):
from deephaven.time import to_j_instant
return to_j_instant(x)
Expand Down Expand Up @@ -382,14 +407,26 @@ def _component_np_dtype_char(t: type) -> Optional[str]:
if isinstance(t, _GenericAlias) and issubclass(t.__origin__, Sequence):
component_type = t.__args__[0]

if not component_type:
component_type = _np_ndarray_component_type(t)

if component_type:
return _np_dtype_char(component_type)
else:
return None


def _np_ndarray_component_type(t):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the numpy ndarray component type if the type is a numpy ndarray, otherwise return None."""

# Py3.8: npt.NDArray can be used in Py 3.8 as a generic alias, but a specific alias (e.g. npt.NDArray[np.int64])
# is an instance of a private class of np, yet we don't have a choice but to use it. And when npt.NDArray is used,
# the 1st argument is typing.Any, the 2nd argument is another generic alias of which the 1st argument is the
# component type
if not component_type and sys.version_info.minor == 8:
component_type = None
if sys.version_info.minor == 8:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(t, np._typing._generic_alias._GenericAlias) and t.__origin__ == np.ndarray:
component_type = t.__args__[1].__args__[0]

# Py3.9+, np.ndarray as a generic alias is only supported in Python 3.9+, also npt.NDArray is still available but a
# specific alias (e.g. npt.NDArray[np.int64]) now is an instance of typing.GenericAlias.
# when npt.NDArray is used, the 1st argument is typing.Any, the 2nd argument is another generic alias of which
Expand All @@ -406,8 +443,4 @@ def _component_np_dtype_char(t: type) -> Optional[str]:
a1 = t.__args__[1]
if a0 == typing.Any and isinstance(a1, types.GenericAlias):
component_type = a1.__args__[0]

if component_type:
return _np_dtype_char(component_type)
else:
return None
return component_type
133 changes: 129 additions & 4 deletions py/server/deephaven/jcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,29 @@
""" This module provides Java compatibility support including convenience functions to create some widely used Java
data structures from corresponding Python ones in order to be able to call Java methods. """

from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, TypeVar, Union
from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, TypeVar, Union, Tuple, Literal

import jpy
import numpy as np
import pandas as pd

from deephaven import dtypes, DHError
from deephaven._wrapper import unwrap, wrap_j_object
from deephaven.dtypes import DType
from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP, _J_ARRAY_NP_TYPE_MAP

_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")

_DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = {
dtypes.bool_: pd.BooleanDtype,
dtypes.byte: pd.Int8Dtype,
dtypes.short: pd.Int16Dtype,
dtypes.char: pd.UInt16Dtype,
dtypes.int32: pd.Int32Dtype,
dtypes.int64: pd.Int64Dtype,
dtypes.float32: pd.Float32Dtype,
dtypes.float64: pd.Float64Dtype,
}


def is_java_type(obj: Any) -> bool:
Expand Down Expand Up @@ -181,11 +198,119 @@ def to_sequence(v: Union[T, Sequence[T]] = None, wrapped: bool = False) -> Seque
return ()
if wrapped:
if not isinstance(v, Sequence) or isinstance(v, str):
return (v, )
return (v,)
else:
return tuple(v)

if not isinstance(v, Sequence) or isinstance(v, str):
return (unwrap(v), )
return (unwrap(v),)
else:
return tuple((unwrap(o) for o in v))


def _j_array_to_numpy_array(dtype: DType, j_array: jpy.JType, conv_null: bool = False, no_promotion: bool = False) -> \
np.ndarray:
""" Produces a numpy array from the DType and given Java array."""
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if dtype.is_primitive:
np_array = np.frombuffer(j_array, dtype.np_type)
elif dtype == dtypes.Instant:
longs = _JPrimitiveArrayConversionUtility.translateArrayInstantToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(dtype.np_type)
elif dtype == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, dtype.np_type)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif dtype == dtypes.string:
np_array = np.array([s for s in j_array], dtypes.string.np_type)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif dtype.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, dtype.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)

if conv_null:
dh_null = _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)
if dh_null:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if dtype in (dtypes.float32, dtypes.float64):
np_array = np.copy(np_array)
np_array[np_array == dh_null] = np.nan
else:
if dtype is dtypes.bool_: # promote boolean to float64
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np.frombuffer(np_array, np.byte)
if any(np_array[np_array == dh_null]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if no_promotion:
raise DHError(f"Java array contains Deephaven nulls for dtype {dtype}")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np_array.astype(np.float64)
np_array[np_array == dh_null] = np.nan
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
else:
if dtype is dtypes.bool_: # promote boolean to float64
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
np_array = np.frombuffer(np_array, np.bool_)
chipkent marked this conversation as resolved.
Show resolved Hide resolved
return np_array

return np_array


def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified Java array as a pandas.Series object.

Args:
j_array (jpy.JType): the Java array
dtype (DType): the data type of the Java array
conv_null (bool): whether to check for Deephaven nulls in the data and automatically replace them with
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
pd.NA.

Returns:
a pandas Series

Raises:
DHError
"""
if conv_null and dtype == dtypes.bool_:
j_array = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(j_array, dtype=np.byte)
s = pd.Series(data=np_array, dtype=pd.Int8Dtype(), copy=False)
s.mask(s == _NULL_BOOLEAN_AS_BYTE, inplace=True)
return s.astype(pd.BooleanDtype(), copy=False)

np_array = _j_array_to_numpy_array(dtype, j_array, conv_null=False)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if conv_null and (nv := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)) is not None:
pd_ex_dtype = _DH_PANDAS_NULLABLE_TYPE_MAP.get(dtype)
s = pd.Series(data=np_array, dtype=pd_ex_dtype(), copy=False)
s.mask(s == nv, inplace=True)
else:
s = pd.Series(data=np_array, copy=False)

return s


def _convert_udf_args(args: Tuple[Any], fn_signature: str, null_value: Literal[np.nan, pd.NA, None]) -> List[Any]:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
converted_args = []
for arg, np_dtype_char in zip(args, fn_signature):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if np_dtype_char == 'O':
converted_args.append(arg)
elif src_np_dtype := _J_ARRAY_NP_TYPE_MAP.get(type(arg)):
# array types
np_dtype = np.dtype(np_dtype_char)
if src_np_dtype != np_dtype and np_dtype != np.object_:
raise DHError(f"Cannot convert Java array of type {src_np_dtype} to numpy array of type {np_dtype}")
dtype = dtypes.from_np_dtype(np_dtype)
if null_value is pd.NA:
converted_args.append(_j_array_to_series(dtype, arg, conv_null=True))
else: # np.nan or None
converted_args.append(_j_array_to_numpy_array(dtype, arg, conv_null=bool(null_value)))
else: # scalar type or array types that don't need conversion
try:
np_dtype = np.dtype(np_dtype_char)
except TypeError:
converted_args.append(arg)
else:
dtype = dtypes.from_np_dtype(np_dtype)
if dtype is dtypes.bool_:
converted_args.append(null_value if arg is None else arg)
elif dh_null := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype):
converted_args.append(null_value if arg == dh_null else arg)
else:
converted_args.append(arg)
return converted_args
32 changes: 7 additions & 25 deletions py/server/deephaven/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

""" This module supports the conversion between Deephaven tables and numpy arrays. """
import re
from functools import wraps
from typing import List

import jpy
import numpy as np
from deephaven.dtypes import DType

from deephaven import DHError, dtypes, empty_table, new_table
from deephaven import DHError, dtypes, new_table
from deephaven.column import Column, InputColumn
from deephaven.table import Table
from deephaven.dtypes import DType
from deephaven.jcompat import _j_array_to_numpy_array, _convert_udf_args
from deephaven.table import Table, _encode_signature

_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JDataAccessHelpers = jpy.get_type("io.deephaven.engine.table.impl.DataAccessHelpers")


Expand All @@ -25,28 +26,9 @@ def _to_column_name(name: str) -> str:


def column_to_numpy_array(col_def: Column, j_array: jpy.JType) -> np.ndarray:
""" Produces a numpy array from the given Java array and the Table column definition. """
""" Produces a numpy array from the given Java array and the Table column definition."""
try:
if col_def.data_type.is_primitive:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
elif col_def.data_type == dtypes.Instant:
longs = _JPrimitiveArrayConversionUtility.translateArrayInstantToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(col_def.data_type.np_type)
elif col_def.data_type == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, col_def.data_type.np_type)
elif col_def.data_type == dtypes.string:
np_array = np.array([s for s in j_array], dtypes.string.np_type)
elif col_def.data_type.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)

return np_array
return _j_array_to_numpy_array(col_def.data_type, j_array)
except DHError:
raise
except Exception as e:
Expand Down
Loading
Loading