Skip to content

Commit

Permalink
Some code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Nov 15, 2023
1 parent 82299ce commit 96bb598
Showing 1 changed file with 74 additions and 85 deletions.
159 changes: 74 additions & 85 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import jpy
import numba
import numpy
import numpy as np

from deephaven import DHError
Expand Down Expand Up @@ -366,20 +367,6 @@ def _j_py_script_session() -> _JPythonScriptSession:
_SUPPORTED_NP_TYPE_CODES = ["i", "l", "h", "f", "d", "b", "?", "U", "M", "O"]


def _parse_annotation(annotation: Any) -> Any:
"""Parse a Python annotation, for now mostly to extract the non-None type from an Optional(Union) annotation,
otherwise return the original annotation. """
if isinstance(annotation, _GenericAlias) and annotation.__origin__ == Union and len(annotation.__args__) == 2:
if annotation.__args__[1] == type(None): # noqa: E721
return annotation.__args__[0]
elif annotation.__args__[0] == type(None): # noqa: E721
return annotation.__args__[1]
else:
return annotation
else:
return annotation


def _encode_signature(fn: Callable) -> str:
"""Encode the signature of a Python function by mapping the annotations of the parameter types and the return
type to numpy dtype chars (i,l,h,f,d,b,?,U,M,O), and pack them into a string with parameter type chars first,
Expand All @@ -394,7 +381,7 @@ def _encode_signature(fn: Callable) -> str:
# numpy ufuncs actually have signature encoded in their 'types' attribute, we want to better support
# them in the future (https://github.com/deephaven/deephaven-core/issues/4762)
if type(fn) == np.ufunc:
return "O"*fn.nin + "->" + "O"
return "O" * fn.nin + "->" + "O"
return "->O"

np_type_codes = []
Expand All @@ -413,59 +400,72 @@ def _encode_signature(fn: Callable) -> str:

@dataclass
class ParsedAnnotation:
dtypes: Set[DType] = field(default_factory=set)
dtypes: List[DType] = field(default_factory=list)
np_dtype_chars: List[str] = field(default_factory=list)
is_array: bool = False
is_optional: bool = False


def _parse_signature(fn: Callable):
np_type_codes = []
try:
sig = inspect.signature(fn)
for n, p in sig.parameters.items():
p_annotation = _parse_annotation(p.annotation)
np_type_codes.append(_np_dtype_char(p_annotation))
except:
@dataclass
class ParsedSignature:
fn: Callable = None
parameters: List[ParsedAnnotation] = field(default_factory=list)
return_annotation: ParsedAnnotation = None


def _parse_annotation(annotation: Any) -> ParsedAnnotation:
""" Parse an annotation in a function's signature """

pa = ParsedAnnotation()
if isinstance(annotation, _GenericAlias) and annotation.__origin__ == Union:
for arg in annotation.__args__:
if arg is type(None):
pa.is_optional = True
continue
np_dtype = np.dtype(arg)
pa.dtypes.append(dtypes.from_np_dtype(np_dtype))
pa.np_dtype_chars.append(np_dtype.char if np_dtype.char in _SUPPORTED_NP_TYPE_CODES else "O")
else:
pa.dtypes.add(dtypes.from_np_dtype(np.dtype(annotation)))

# TODO check if the annotation refers to an array-like type, e.g. List, Tuple, etc.
# TODO add check for numba.guvectorize, which uses a tuple of arrays as the signature
component_type = _component_np_dtype_char(annotation)
if component_type:
if dtypes.from_np_dtype(np.dtype(component_type)) in _BUILDABLE_ARRAY_DTYPE_MAP:
pa.is_array = True

return pa


def _parse_signature(fn: Callable) -> ParsedSignature:
""" Parse the signature of a function, return a ParsedSignature object """

parsed_signature = ParsedSignature(fn=fn)
parsed_annotations = []
if isinstance(fn, (numba.np.ufunc.gufunc.GUFunc, numba.np.dufunc.DUFunc)):
sig = fn.signature
rtype = sig.split("->")[-1]
for p in sig.split("(")[1].split(")")[0].split(","):
parsed_annotations.append(_parse_annotation(p))
elif isinstance(fn, numpy.ufunc):
# in case inspect.signature() fails, we'll just use the default 'O' - object type.
# numpy ufuncs actually have signature encoded in their 'types' attribute, we want to better support
# them in the future (https://github.com/deephaven/deephaven-core/issues/4762)
if type(fn) == np.ufunc:
return "O" * fn.nin + "->" + "O"
return [ParsedAnnotation()] * fn.nin + "->" + "O"
return "->O"


# If the function is a numba guvectorized function, examine the signature of the function to determine if it
# returns an array.
if isinstance(fn, numba.np.ufunc.gufunc.GUFunc):
sig = fn.signature
rtype = sig.split("->")[-1].strip("()")
if rtype:
return_array = True
else:
try:
return_annotation = _parse_annotation(inspect.signature(fn).return_annotation)
except ValueError:
# the function has no return annotation, and since we can't know what the exact type is, the return type
# defaults to the generic object type therefore it is not an array of a specific type,
# but see (https://github.com/deephaven/deephaven-core/issues/4762) for future imporvement to better support
# numpy ufuncs.
pass
else:
component_type = _component_np_dtype_char(return_annotation)
if component_type:
ret_dtype = dtypes.from_np_dtype(np.dtype(component_type))
if ret_dtype in _BUILDABLE_ARRAY_DTYPE_MAP:
return_array = True


sig = inspect.signature(fn)
for n, p in sig.parameters.items():
parsed_annotations.append(_parse_annotation(p.annotation))
parsed_signature.parameters = parsed_annotations
parsed_signature.return_annotation = _parse_annotation(sig.return_annotation)

return_annotation = _parse_annotation(sig.return_annotation)
return_type_code = _np_dtype_char(return_annotation)
np_type_codes = [c if c in _SUPPORTED_NP_TYPE_CODES else "O" for c in np_type_codes]
return_type_code = return_type_code if return_type_code in _SUPPORTED_NP_TYPE_CODES else "O"
if len(parsed_signature.return_annotation.dtypes) > 1 and not parsed_signature.return_annotation.is_optional:
raise ValueError("only single return type is supported.")

np_type_codes.extend(["-", ">", return_type_code])
return "".join(np_type_codes)
return parsed_signature


def _udf_return_dtype(fn: Callable, signature: str) -> dtypes.Dtype:
Expand All @@ -475,6 +475,16 @@ def _udf_return_dtype(fn: Callable, signature: str) -> dtypes.Dtype:
return dtypes.from_np_dtype(np.dtype(signature[-1]))


def _prepare_runtime_signature(ps: ParsedSignature, args) -> str:
params = ps.parameters
if len(params) != len(args):
raise ValueError(f"number of parameters does not match the number of arguments for {ps.fn}.")

for param, arg in zip(params, args):




def _py_udf(fn: Callable):
"""A decorator that acts as a transparent translator for Python UDFs used in Deephaven query formulas between
Python and Java. This decorator is intended for use by the Deephaven query engine and should not be used by
Expand All @@ -491,38 +501,17 @@ def _py_udf(fn: Callable):
"""
if hasattr(fn, "return_type"):
return fn

fn_signature = _encode_signature(fn)
ret_dtype = _udf_return_dtype(fn, signature=fn_signature)
return_array = False

return_array = False
# If the function is a numba guvectorized function, examine the signature of the function to determine if it
# returns an array.
if isinstance(fn, numba.np.ufunc.gufunc.GUFunc):
sig = fn.signature
rtype = sig.split("->")[-1].strip("()")
if rtype:
return_array = True
else:
try:
return_annotation = _parse_annotation(inspect.signature(fn).return_annotation)
except ValueError:
# the function has no return annotation, and since we can't know what the exact type is, the return type
# defaults to the generic object type therefore it is not an array of a specific type,
# but see (https://github.com/deephaven/deephaven-core/issues/4762) for future imporvement to better support
# numpy ufuncs.
pass
else:
component_type = _component_np_dtype_char(return_annotation)
if component_type:
ret_dtype = dtypes.from_np_dtype(np.dtype(component_type))
if ret_dtype in _BUILDABLE_ARRAY_DTYPE_MAP:
return_array = True
parsed_signature = _parse_signature(fn)
return_array = parsed_signature.return_annotation.is_array
ret_dtype = parsed_signature.return_annotation.dtypes[0]
rt_signature = None

@wraps(fn)
def wrapper(*args, **kwargs):
converted_args = _convert_udf_args(args, fn_signature, null_value=np.nan)
nonlocal rt_signature
if rt_signature is None:
rt_signature = _prepare_runtime_signature(parsed_signature, args)
converted_args = _convert_udf_args(args, rt_signature, null_value=np.nan)
ret = fn(*converted_args, **kwargs)
if return_array:
return dtypes.array(ret_dtype, ret)
Expand Down

0 comments on commit 96bb598

Please sign in to comment.