diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 91487e5d1de0..fd6b42f61e7e 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -237,18 +237,20 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR (void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); } else { int32_t dataLen = varDataTLen(pData); - if (meta->type == TSDB_DATA_TYPE_JSON) { - if (*pData == TSDB_DATA_TYPE_NULL) { - dataLen = 0; - } else if (*pData == TSDB_DATA_TYPE_NCHAR) { - dataLen = varDataTLen(pData + sizeof(char)); - } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { - dataLen = sizeof(int64_t); - } else if (*pData == TSDB_DATA_TYPE_BOOL) { - dataLen = sizeof(char); - } - dataLen += sizeof(char); - } + // This is a piece of code to help users implement udf. It is only called during testing. + // Currently, the json type is not supported and will not be called. + // if (meta->type == TSDB_DATA_TYPE_JSON) { + // if (*pData == TSDB_DATA_TYPE_NULL) { + // dataLen = 0; + // } else if (*pData == TSDB_DATA_TYPE_NCHAR) { + // dataLen = varDataTLen(pData + sizeof(char)); + // } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { + // dataLen = sizeof(int64_t); + // } else if (*pData == TSDB_DATA_TYPE_BOOL) { + // dataLen = sizeof(char); + // } + // dataLen += sizeof(char); + // } if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { uint32_t newSize = data->varLenCol.payloadAllocLen; diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 6891653981d8..fad0a749d59d 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -142,6 +142,50 @@ target_link_libraries( udf2_dup PUBLIC os ${LINK_JEMALLOC} ) +set(TARGET_NAMES + change_udf_normal + change_udf_no_init + change_udf_no_process + change_udf_no_destroy + change_udf_init_failed + change_udf_process_failed + change_udf_destory_failed +) + +set(COMPILE_DEFINITIONS + CHANGE_UDF_NORMAL + CHANGE_UDF_NO_INIT + CHANGE_UDF_NO_PROCESS + CHANGE_UDF_NO_DESTROY + CHANGE_UDF_INIT_FAILED + CHANGE_UDF_PROCESS_FAILED + CHANGE_UDF_DESTORY_FAILED +) + +foreach(index RANGE 0 6) + list(GET TARGET_NAMES ${index} target_name) + list(GET COMPILE_DEFINITIONS ${index} compile_def) + + add_library(${target_name} STATIC MODULE test/change_udf.c) + target_include_directories( + ${target_name} + PUBLIC + "${TD_SOURCE_DIR}/include/libs/function" + "${TD_SOURCE_DIR}/include/util" + "${TD_SOURCE_DIR}/include/common" + "${TD_SOURCE_DIR}/include/client" + "${TD_SOURCE_DIR}/include/os" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + ) + target_compile_definitions(${target_name} PRIVATE ${compile_def}) + IF(TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(${target_name} jemalloc) + ENDIF() + target_link_libraries( + ${target_name} PUBLIC os ${LINK_JEMALLOC} + ) +endforeach() + # SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin) add_executable(udfd src/udfd.c) diff --git a/source/libs/function/test/change_udf.c b/source/libs/function/test/change_udf.c new file mode 100644 index 000000000000..f623a3194da3 --- /dev/null +++ b/source/libs/function/test/change_udf.c @@ -0,0 +1,172 @@ +#include +#include +#include +#ifdef LINUX +#include +#endif +#ifdef WINDOWS +#include +#endif +#include "taosudf.h" + +// rename function name +#ifdef CHANGE_UDF_NORMAL +#define UDFNAME change_udf_normal +#define UDFNAMEINIT change_udf_normal_init +#define UDFNAMEDESTROY change_udf_normal_destroy +#elif defined(CHANGE_UDF_NO_INIT) +#define UDFNAME change_udf_no_init +#define UDFNAMEINIT change_udf_no_init_init +#define UDFNAMEDESTROY change_udf_no_init_destroy +#elif defined(CHANGE_UDF_NO_PROCESS) +#define UDFNAME change_udf_no_process +#define UDFNAMEINIT change_udf_no_process_init +#define UDFNAMEDESTROY change_udf_no_process_destroy +#elif defined(CHANGE_UDF_NO_DESTROY) +#define UDFNAME change_udf_no_destroy +#define UDFNAMEINIT change_udf_no_destroy_init +#define UDFNAMEDESTROY change_udf_no_destroy_destroy +#elif defined(CHANGE_UDF_INIT_FAILED) +#define UDFNAME change_udf_init_failed +#define UDFNAMEINIT change_udf_init_failed_init +#define UDFNAMEDESTROY change_udf_init_failed_destroy +#elif defined(CHANGE_UDF_PROCESS_FAILED) +#define UDFNAME change_udf_process_failed +#define UDFNAMEINIT change_udf_process_failed_init +#define UDFNAMEDESTROY change_udf_process_failed_destroy +#elif defined(CHANGE_UDF_DESTORY_FAILED) +#define UDFNAME change_udf_destory_failed +#define UDFNAMEINIT change_udf_destory_failed_init +#define UDFNAMEDESTROY change_udf_destory_failed_destroy +#else +#define UDFNAME change_udf_normal +#define UDFNAMEINIT change_udf_normal_init +#define UDFNAMEDESTROY change_udf_normal_destroy +#endif + + +#ifdef CHANGE_UDF_NO_INIT +#else +DLL_EXPORT int32_t UDFNAMEINIT() { + #ifdef CHANGE_UDF_INIT_FAILED + return -1; + #else + return 0; + #endif // ifdef CHANGE_UDF_INIT_FAILED +} +#endif // ifdef CHANGE_UDF_NO_INIT + +#ifdef CHANGE_UDF_NO_DESTROY +#else +DLL_EXPORT int32_t UDFNAMEDESTROY() { + #ifdef CHANGE_UDF_DESTORY_FAILED + return -1; + #else + return 0; + #endif // ifdef CHANGE_UDF_DESTORY_FAILED + } +#endif // ifdef CHANGE_UDF_NO_DESTROY + +#ifdef CHANGE_UDF_NO_PROCESS +#else +DLL_EXPORT int32_t UDFNAME(SUdfDataBlock *block, SUdfColumn *resultCol) { + #ifdef CHANGE_UDF_PROCESS_FAILED + return -1; + #else + int32_t code = 0; + SUdfColumnData *resultData = &resultCol->colData; + for (int32_t i = 0; i < block->numOfRows; ++i) { + int j = 0; + for (; j < block->numOfCols; ++j) { + if (udfColDataIsNull(block->udfCols[j], i)) { + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } + break; + } + } + if (j == block->numOfCols) { + int32_t luckyNum = 1; + code = udfColDataSet(resultCol, i, (char *)&luckyNum, false); + if (code != 0) { + return code; + } + } + } + // to simulate actual processing delay by udf +#ifdef LINUX + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif // ifdef LINUX +#ifdef WINDOWS + Sleep(1); +#endif // ifdef WINDOWS + resultData->numOfRows = block->numOfRows; + return 0; + #endif // ifdef CHANGE_UDF_PROCESS_FAILED +} +#endif // ifdef CHANGE_UDF_NO_PROCESS + + + + +/********************************************************************************************************************/ +// udf revert functions +/********************************************************************************************************************/ +DLL_EXPORT int32_t udf_reverse_init() { return 0; } + +DLL_EXPORT int32_t udf_reverse_destroy() { return 0; } + +static void reverse_data(char* data, size_t len) { + size_t i, j; + char temp; + for (i = 0, j = len - 1; i < j; i++, j--) { + temp = data[i]; + data[i] = data[j]; + data[j] = temp; + } +} + +DLL_EXPORT int32_t udf_reverse(SUdfDataBlock *block, SUdfColumn *resultCol) { + int32_t code = 0; + SUdfColumnData *resultData = &resultCol->colData; + for (int32_t i = 0; i < block->numOfRows; ++i) { + int j = 0; + for (; j < block->numOfCols; ++j) { + if (udfColDataIsNull(block->udfCols[j], i)) { + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } + break; + } else { + int32_t oldLen = udfColDataGetDataLen(block->udfCols[j], i); + char *pOldData = udfColDataGetData(block->udfCols[j], i); + + + char *buff = malloc(sizeof(VarDataLenT) + oldLen); + if (buff == NULL) { + return -1; + } + ((VarDataLenT *)buff)[0] = (VarDataLenT)oldLen; + memcpy(buff, pOldData, oldLen + sizeof(VarDataLenT)); + reverse_data(buff + sizeof(VarDataLenT), oldLen); + code = udfColDataSet(resultCol, i, buff, false); + if (code != 0) { + free(buff); + return code; + } + } + } + } + // to simulate actual processing delay by udf +#ifdef LINUX + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +#ifdef WINDOWS + Sleep(1); +#endif + resultData->numOfRows = block->numOfRows; + return 0; +} + diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 46b7e1f795d4..bb7b8411f957 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -155,6 +155,9 @@ def error(self, sql, expectedErrno = None, expectErrInfo = None, fullMatched = T try: self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) except BaseException as e: tdLog.info("err:%s" % (e)) expectErrNotOccured = False @@ -165,10 +168,6 @@ def error(self, sql, expectedErrno = None, expectErrInfo = None, fullMatched = T if expectErrNotOccured: tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql)) else: - self.queryRows = 0 - self.queryCols = 0 - self.queryResult = None - if fullMatched: if expectedErrno != None: expectedErrno_rest = expectedErrno & 0x0000ffff diff --git a/tests/pytest/util/tserror.py b/tests/pytest/util/tserror.py new file mode 100644 index 000000000000..35d74153c390 --- /dev/null +++ b/tests/pytest/util/tserror.py @@ -0,0 +1,13 @@ +import ctypes + +TAOS_SYSTEM_ERROR = ctypes.c_int32(0x80ff0000).value +TAOS_DEF_ERROR_CODE = ctypes.c_int32(0x80000000).value + + +TSDB_CODE_MND_FUNC_NOT_EXIST = (TAOS_DEF_ERROR_CODE | 0x0374) + + +TSDB_CODE_UDF_FUNC_EXEC_FAILURE = (TAOS_DEF_ERROR_CODE | 0x290A) + + +TSDB_CODE_TSC_INTERNAL_ERROR = (TAOS_DEF_ERROR_CODE | 0x02FF) diff --git a/tests/system-test/0-others/udfTest.py b/tests/system-test/0-others/udfTest.py index d3efa61e0425..8134327b4190 100644 --- a/tests/system-test/0-others/udfTest.py +++ b/tests/system-test/0-others/udfTest.py @@ -11,6 +11,7 @@ from util.sql import * from util.cases import * from util.dnodes import * +from util.tserror import * import subprocess class TDTestCase: @@ -56,8 +57,32 @@ def prepare_udf_so(self): else: self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_no_init = subprocess.Popen('find %s -name "libchange_udf_no_init.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_no_process = subprocess.Popen('find %s -name "libchange_udf_no_process.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_no_destroy = subprocess.Popen('find %s -name "libchange_udf_no_destroy.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_init_failed = subprocess.Popen('find %s -name "libchange_udf_init_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_process_failed = subprocess.Popen('find %s -name "libchange_udf_process_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_destroy_failed = subprocess.Popen('find %s -name "libchange_udf_destory_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libchange_udf_normal = subprocess.Popen('find %s -name "libchange_udf_normal.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf1 = self.libudf1.replace('\r','').replace('\n','') self.libudf2 = self.libudf2.replace('\r','').replace('\n','') + self.libchange_udf_no_init = self.libchange_udf_no_init.replace('\r','').replace('\n','') + self.libchange_udf_no_process = self.libchange_udf_no_process.replace('\r','').replace('\n','') + self.libchange_udf_normal = self.libchange_udf_normal.replace('\r','').replace('\n','') + self.libchange_udf_no_destroy = self.libchange_udf_no_destroy.replace('\r','').replace('\n','') + self.libchange_udf_init_failed = self.libchange_udf_init_failed.replace('\r','').replace('\n','') + self.libchange_udf_process_failed = self.libchange_udf_process_failed.replace('\r','').replace('\n','') + self.libchange_udf_destroy_failed = self.libchange_udf_destroy_failed.replace('\r','').replace('\n','') + tdLog.info(f"udf1 so path is {self.libudf1}") + tdLog.info(f"udf2 so path is {self.libudf2}") + tdLog.info(f"change_udf_no_init so path is {self.libchange_udf_no_init}") + tdLog.info(f"change_udf_no_process so path is {self.libchange_udf_no_process}") + tdLog.info(f"change_udf_no_destroy so path is {self.libchange_udf_no_destroy}") + tdLog.info(f"change_udf_init_failed so path is {self.libchange_udf_init_failed}") + tdLog.info(f"change_udf_process_failed so path is {self.libchange_udf_process_failed}") + tdLog.info(f"change_udf_destroy_failed so path is {self.libchange_udf_destroy_failed}") + tdLog.info(f"change_udf_normal so path is {self.libchange_udf_normal}") def prepare_data(self): @@ -664,13 +689,118 @@ def test_udfd_cmd(self): path = ''.join(random.choice(letters) for i in range(5000)) os.system(f"udfd -c {path}") + + def test_change_udf_normal(self, func_name): + # create function with normal file + tdSql.execute(f"create function {func_name} as '%s' outputtype int"%self.libchange_udf_normal) + functions = tdSql.getResult("show functions") + for function in functions: + if f"{func_name}" in function[0]: + tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}") + break + tdSql.query(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE) + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + tdSql.checkData(0,2,1) + tdSql.checkData(0,3,1) + tdSql.checkData(0,4,1.000000000) + tdSql.checkData(0,5,1) + tdSql.checkData(0,6,"binary1") + tdSql.checkData(0,7,1) + tdSql.query(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE) + tdSql.execute(f"drop function {func_name}") + tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST) + tdLog.info(f"change udf test finished, using {self.libchange_udf_normal}") + + def test_change_udf_failed(self, func_name, lib_name): + tdLog.info(f"test change udf start: using {lib_name}") + tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST) + tdSql.execute(f"create function {func_name} as '{lib_name}' outputtype int") + functions = tdSql.getResult("show functions") + for function in functions: + if f"{func_name}" in function[0]: + tdLog.info(f"create {func_name} functions success, using {lib_name}") + break + + tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE) + tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE) + tdSql.execute(f"drop function {func_name}") + tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST) + tdLog.info(f"change udf test finished, using {lib_name}") + + def test_change_udf_reverse(self): + tdSql.execute("create database if not exists db duration 100") + tdSql.execute("use db") + + func_name = "udf_reverse" + tdSql.execute(f"create function {func_name} as '%s' outputtype nchar(256)"%self.libchange_udf_normal) + functions = tdSql.getResult("show functions") + for function in functions: + if f"{func_name}" in function[0]: + tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}") + break + + tdSql.error(f"select {func_name}(c8) from db.t1", TSDB_CODE_TSC_INTERNAL_ERROR) + tdSql.execute(f"drop function {func_name}") + tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST) + + self.test_change_udf_normal("change_udf_normal") + tdSql.execute(f"create function {func_name} as '%s' outputtype varchar(256)"%self.libchange_udf_normal) + functions = tdSql.getResult("show functions") + for function in functions: + if f"{func_name}" in function[0]: + tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}") + break + + tdSql.query(f"select {func_name}(c8) from db.t1 order by ts") + tdSql.checkData(0,0, None) + tdSql.checkData(1,0, "1yranib") + tdSql.checkData(2,0, "2yranib") + tdSql.checkData(3,0, "3yranib") + + + + def unexpected_using_test(self): + tdSql.execute("use db ") + + # create function without wrong file path + tdSql.error("create function udf1 as '%s_wrongpath' outputtype int;"%self.libudf1, TAOS_SYSTEM_ERROR|2) + tdSql.error("create aggregate function udf2 as '%s_wrongpath' outputtype double;"%self.libudf2, TAOS_SYSTEM_ERROR|2) + + tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1) + tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("change_udf_no_init", self.libchange_udf_no_init) + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("change_udf_no_process", self.libchange_udf_no_process) + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("change_udf_no_destroy", self.libchange_udf_no_destroy) + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("change_udf_init_failed", self.libchange_udf_init_failed) + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("change_udf_process_failed", self.libchange_udf_process_failed) + + self.test_change_udf_normal("change_udf_normal") + self.test_change_udf_failed("libchange_udf_destroy_failed", self.libchange_udf_destroy_failed) + + tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") + tdSql.execute(f"drop function udf1") + tdSql.error("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb", TSDB_CODE_MND_FUNC_NOT_EXIST) def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring print(" env is ok for all ") self.test_udfd_cmd() self.prepare_udf_so() self.prepare_data() + + self.unexpected_using_test() self.create_udf_function() + self.test_change_udf_reverse() self.basic_udf_query() self.loop_kill_udfd() tdSql.execute(" drop function udf1 ")