Skip to content

Commit

Permalink
Add support of 1D arrays in UDF vectorization (#5100)
Browse files Browse the repository at this point in the history
* Add support of 1D arrays to UDF vectorization

* Add more test cases

* Add a null test case for boolean arrays
  • Loading branch information
jmao-denver authored Feb 6, 2024
1 parent 4117159 commit e4d69fd
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,20 @@ public class PyCallableWrapperJpyImpl implements PyCallableWrapper {

// TODO: support for vectorizing functions that return arrays
// https://github.com/deephaven/deephaven-core/issues/4649
private static final Set<Class<?>> vectorizableReturnTypes = Set.of(int.class, long.class, short.class, float.class,
double.class, byte.class, Boolean.class, String.class, Instant.class, PyObject.class);
private static final Set<Class<?>> vectorizableReturnTypes = Set.of(
boolean.class, boolean[].class,
Boolean.class, Boolean[].class,
byte.class, byte[].class,
short.class, short[].class,
char.class, char[].class,
int.class, int[].class,
long.class, long[].class,
float.class, float[].class,
double.class, double[].class,
String.class, String[].class,
Instant.class, Instant[].class,
PyObject.class, PyObject[].class,
Object.class, Object[].class);

@Override
public boolean isVectorizableReturnType() {
Expand Down
13 changes: 11 additions & 2 deletions py/server/deephaven/_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def _dh_vectorize(fn):
and (3) the input arrays.
"""
p_sig = _parse_signature(fn)
return_array = p_sig.ret_annotation.has_array
ret_dtype = dtypes.from_np_dtype(np.dtype(p_sig.ret_annotation.encoded_type[-1]))

@wraps(fn)
Expand All @@ -428,10 +429,18 @@ def wrapper(*args):
for i in range(chunk_size):
scalar_args = next(vectorized_args)
converted_args = _convert_args(p_sig, scalar_args)
chunk_result[i] = _scalar(fn(*converted_args), ret_dtype)
ret = fn(*converted_args)
if return_array:
chunk_result[i] = dtypes.array(ret_dtype, ret)
else:
chunk_result[i] = _scalar(ret, ret_dtype)
else:
for i in range(chunk_size):
chunk_result[i] = _scalar(fn(), ret_dtype)
ret = fn()
if return_array:
chunk_result[i] = dtypes.array(ret_dtype, ret)
else:
chunk_result[i] = _scalar(ret, ret_dtype)

return chunk_result

Expand Down
91 changes: 90 additions & 1 deletion py/server/tests/test_vectorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import random
import unittest

from typing import Optional
from typing import Optional, Union
import numpy as np

from deephaven import DHError, empty_table, dtypes
Expand All @@ -15,6 +15,8 @@
from deephaven._udf import _dh_vectorize as dh_vectorize
from tests.testbase import BaseTestCase

from tests.test_udf_numpy_args import _J_TYPE_NULL_MAP, _J_TYPE_NP_DTYPE_MAP, _J_TYPE_J_ARRAY_TYPE_MAP


class VectorizationTestCase(BaseTestCase):
def setUp(self):
Expand Down Expand Up @@ -278,6 +280,93 @@ def pyfunc(p1: np.int32, p2: np.int32, p3: Optional[np.int32]) -> Optional[int]:
self.assertEqual(t.columns[1].data_type, dtypes.long)
self.assertEqual(t.columns[2].data_type, dtypes.long)

def test_1d_array_args_no_null(self):
col1_formula = "Col1 = i % 3"
for j_dtype, np_dtype in _J_TYPE_NP_DTYPE_MAP.items():
col2_formula = f"Col2 = ({j_dtype})i"
with self.subTest(j_dtype):
tbl = empty_table(10).update([col1_formula, col2_formula]).group_by("Col1").update(
"Col2 = Col2.toArray()")

func_str = f"""
def test_udf(col1, col2: np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]) -> np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]:
return col2 + 5
"""
exec(func_str, globals())

res = tbl.update("Col3 = test_udf(Col1, Col2)")
self.assertEqual(res.columns[0].data_type, dtypes.int32)
self.assertEqual(res.columns[1].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype])
self.assertEqual(res.columns[2].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype])

self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

def test_1d_array_args_null(self):
col1_formula = "Col1 = i % 3"
for j_dtype, null_name in _J_TYPE_NULL_MAP.items():
col2_formula = f"Col2 = i % 3 == 0? {null_name} : ({j_dtype})i"
with self.subTest(j_dtype):
tbl = empty_table(10).update([col1_formula, col2_formula]).group_by("Col1").update("Col2 = Col2.toArray()")

func_str = f"""
def test_udf(col1, col2: np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]) -> np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]:
return col2 + 5
"""
exec(func_str, globals())

# for floating point types, DH nulls are auto converted to np.nan
# for integer types, DH nulls in the array raise exceptions
if j_dtype in ("float", "double"):
res = tbl.update("Col3 = test_udf(Col1, Col2)")
self.assertEqual(res.columns[0].data_type, dtypes.int32)
self.assertEqual(res.columns[1].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype])
self.assertEqual(res.columns[2].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype])
else:
with self.assertRaises(DHError) as cm:
tbl.update("Col3 = test_udf(Col1, Col2)")

self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

def test_1d_str_bool_datetime_array(self):
with self.subTest("str"):
def f1(p1: np.ndarray[str]) -> bool:
return (p1 == 'None').any()

t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? `deephaven`: null"]).group_by("X").update("Y = Y.toArray()")
t1 = t.update(["X1 = f1(Y)"])
self.assertEqual(t1.columns[2].data_type, dtypes.bool_)
self.assertEqual(3, t1.to_string().count("true"))
self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

with self.subTest("datetime"):
def f2(p1: np.ndarray[np.datetime64]) -> bool:
return np.isnat(p1).any()

t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? now() : null"]).group_by("X").update("Y = Y.toArray()")
t1 = t.update(["X1 = f2(Y)"])
self.assertEqual(t1.columns[2].data_type, dtypes.bool_)
self.assertEqual(3, t1.to_string().count("true"))
self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

with self.subTest("boolean"):
def f3(p1: np.ndarray[np.bool_]) -> np.ndarray[np.bool_]:
return np.invert(p1)

t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? true : false"]).group_by("X").update("Y = Y.toArray()")
t1 = t.update(["X1 = f3(Y)"])
self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? true : null"]).group_by("X").update("Y = Y.toArray()")
with self.assertRaises(DHError) as cm:
t1 = t.update(["X1 = f3(Y)"])
self.assertIn("Java java.lang.Boolean array contains Deephaven null values, but numpy int8 array does not support null values", str(cm.exception))
self.assertEqual(_udf.vectorized_count, 1)
_udf.vectorized_count = 0

if __name__ == "__main__":
unittest.main()

0 comments on commit e4d69fd

Please sign in to comment.