diff --git a/engine/table/src/main/java/io/deephaven/engine/util/PyCallableWrapperJpyImpl.java b/engine/table/src/main/java/io/deephaven/engine/util/PyCallableWrapperJpyImpl.java index 006bae5be5c..c4b05c25493 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/PyCallableWrapperJpyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/PyCallableWrapperJpyImpl.java @@ -44,8 +44,20 @@ public class PyCallableWrapperJpyImpl implements PyCallableWrapper { // TODO: support for vectorizing functions that return arrays // https://github.com/deephaven/deephaven-core/issues/4649 - private static final Set> vectorizableReturnTypes = Set.of(int.class, long.class, short.class, float.class, - double.class, byte.class, Boolean.class, String.class, Instant.class, PyObject.class); + private static final Set> vectorizableReturnTypes = Set.of( + boolean.class, boolean[].class, + Boolean.class, Boolean[].class, + byte.class, byte[].class, + short.class, short[].class, + char.class, char[].class, + int.class, int[].class, + long.class, long[].class, + float.class, float[].class, + double.class, double[].class, + String.class, String[].class, + Instant.class, Instant[].class, + PyObject.class, PyObject[].class, + Object.class, Object[].class); @Override public boolean isVectorizableReturnType() { diff --git a/py/server/deephaven/_udf.py b/py/server/deephaven/_udf.py index 93073af361f..4e2388dacd5 100644 --- a/py/server/deephaven/_udf.py +++ b/py/server/deephaven/_udf.py @@ -411,6 +411,7 @@ def _dh_vectorize(fn): and (3) the input arrays. """ p_sig = _parse_signature(fn) + return_array = p_sig.ret_annotation.has_array ret_dtype = dtypes.from_np_dtype(np.dtype(p_sig.ret_annotation.encoded_type[-1])) @wraps(fn) @@ -428,10 +429,18 @@ def wrapper(*args): for i in range(chunk_size): scalar_args = next(vectorized_args) converted_args = _convert_args(p_sig, scalar_args) - chunk_result[i] = _scalar(fn(*converted_args), ret_dtype) + ret = fn(*converted_args) + if return_array: + chunk_result[i] = dtypes.array(ret_dtype, ret) + else: + chunk_result[i] = _scalar(ret, ret_dtype) else: for i in range(chunk_size): - chunk_result[i] = _scalar(fn(), ret_dtype) + ret = fn() + if return_array: + chunk_result[i] = dtypes.array(ret_dtype, ret) + else: + chunk_result[i] = _scalar(ret, ret_dtype) return chunk_result diff --git a/py/server/tests/test_vectorization.py b/py/server/tests/test_vectorization.py index 8eb28e65cda..7123276591e 100644 --- a/py/server/tests/test_vectorization.py +++ b/py/server/tests/test_vectorization.py @@ -4,7 +4,7 @@ import random import unittest -from typing import Optional +from typing import Optional, Union import numpy as np from deephaven import DHError, empty_table, dtypes @@ -15,6 +15,8 @@ from deephaven._udf import _dh_vectorize as dh_vectorize from tests.testbase import BaseTestCase +from tests.test_udf_numpy_args import _J_TYPE_NULL_MAP, _J_TYPE_NP_DTYPE_MAP, _J_TYPE_J_ARRAY_TYPE_MAP + class VectorizationTestCase(BaseTestCase): def setUp(self): @@ -278,6 +280,93 @@ def pyfunc(p1: np.int32, p2: np.int32, p3: Optional[np.int32]) -> Optional[int]: self.assertEqual(t.columns[1].data_type, dtypes.long) self.assertEqual(t.columns[2].data_type, dtypes.long) + def test_1d_array_args_no_null(self): + col1_formula = "Col1 = i % 3" + for j_dtype, np_dtype in _J_TYPE_NP_DTYPE_MAP.items(): + col2_formula = f"Col2 = ({j_dtype})i" + with self.subTest(j_dtype): + tbl = empty_table(10).update([col1_formula, col2_formula]).group_by("Col1").update( + "Col2 = Col2.toArray()") + + func_str = f""" +def test_udf(col1, col2: np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]) -> np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]: + return col2 + 5 + """ + exec(func_str, globals()) + + res = tbl.update("Col3 = test_udf(Col1, Col2)") + self.assertEqual(res.columns[0].data_type, dtypes.int32) + self.assertEqual(res.columns[1].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype]) + self.assertEqual(res.columns[2].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype]) + + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 + + def test_1d_array_args_null(self): + col1_formula = "Col1 = i % 3" + for j_dtype, null_name in _J_TYPE_NULL_MAP.items(): + col2_formula = f"Col2 = i % 3 == 0? {null_name} : ({j_dtype})i" + with self.subTest(j_dtype): + tbl = empty_table(10).update([col1_formula, col2_formula]).group_by("Col1").update("Col2 = Col2.toArray()") + + func_str = f""" +def test_udf(col1, col2: np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]) -> np.ndarray[{_J_TYPE_NP_DTYPE_MAP[j_dtype]}]: + return col2 + 5 + """ + exec(func_str, globals()) + + # for floating point types, DH nulls are auto converted to np.nan + # for integer types, DH nulls in the array raise exceptions + if j_dtype in ("float", "double"): + res = tbl.update("Col3 = test_udf(Col1, Col2)") + self.assertEqual(res.columns[0].data_type, dtypes.int32) + self.assertEqual(res.columns[1].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype]) + self.assertEqual(res.columns[2].data_type, _J_TYPE_J_ARRAY_TYPE_MAP[j_dtype]) + else: + with self.assertRaises(DHError) as cm: + tbl.update("Col3 = test_udf(Col1, Col2)") + + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 + + def test_1d_str_bool_datetime_array(self): + with self.subTest("str"): + def f1(p1: np.ndarray[str]) -> bool: + return (p1 == 'None').any() + + t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? `deephaven`: null"]).group_by("X").update("Y = Y.toArray()") + t1 = t.update(["X1 = f1(Y)"]) + self.assertEqual(t1.columns[2].data_type, dtypes.bool_) + self.assertEqual(3, t1.to_string().count("true")) + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 + + with self.subTest("datetime"): + def f2(p1: np.ndarray[np.datetime64]) -> bool: + return np.isnat(p1).any() + + t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? now() : null"]).group_by("X").update("Y = Y.toArray()") + t1 = t.update(["X1 = f2(Y)"]) + self.assertEqual(t1.columns[2].data_type, dtypes.bool_) + self.assertEqual(3, t1.to_string().count("true")) + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 + + with self.subTest("boolean"): + def f3(p1: np.ndarray[np.bool_]) -> np.ndarray[np.bool_]: + return np.invert(p1) + + t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? true : false"]).group_by("X").update("Y = Y.toArray()") + t1 = t.update(["X1 = f3(Y)"]) + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 + + t = empty_table(10).update(["X = i % 3", "Y = i % 2 == 0? true : null"]).group_by("X").update("Y = Y.toArray()") + with self.assertRaises(DHError) as cm: + t1 = t.update(["X1 = f3(Y)"]) + self.assertIn("Java java.lang.Boolean array contains Deephaven null values, but numpy int8 array does not support null values", str(cm.exception)) + self.assertEqual(_udf.vectorized_count, 1) + _udf.vectorized_count = 0 if __name__ == "__main__": unittest.main()