From 24175abe2b1d7f5544c3a22346a210ff551f7ea9 Mon Sep 17 00:00:00 2001 From: Haolin Wang Date: Sat, 14 Dec 2024 21:14:06 +0800 Subject: [PATCH] newfix: enable dedup on stmt/stmt2 inserts in interlace mode --- include/common/tdataformat.h | 4 +- source/client/src/clientStmt.c | 6 - source/client/src/clientStmt2.c | 6 - source/common/src/tdataformat.c | 44 ++++- source/libs/parser/src/parInsertStmt.c | 10 +- tests/script/api/makefile | 2 + tests/script/api/stmt-insert-dupkeys.c | 191 +++++++++++---------- tests/script/api/stmt2-insert-dupkeys.c | 212 ++++++++++++++++++++++++ 8 files changed, 360 insertions(+), 115 deletions(-) create mode 100644 tests/script/api/stmt2-insert-dupkeys.c diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index cb05f98f454c..0cc30edf82ca 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -378,7 +378,7 @@ typedef struct { TAOS_MULTI_BIND *bind; } SBindInfo; int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray); + SArray *rowArray, bool *orderedDup); // stmt2 binding int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos, @@ -392,7 +392,7 @@ typedef struct { } SBindInfo2; int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray); + SArray *rowArray, bool *orderedDup); #endif diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 60475c55df93..9cad21961488 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -1557,12 +1557,6 @@ int stmtExec(TAOS_STMT* stmt) { launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { if (pStmt->sql.stbInterlaceMode) { - STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock; - if (!pTableCxt->ordered || pTableCxt->duplicateTs) { - tscError("failed to insert disordered or duplicate data"); - STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); - } - int64_t startTs = taosGetTimestampUs(); while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { taosUsleep(1); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8869a4eb062e..f03879f199e3 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1669,12 +1669,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { if (STMT_TYPE_QUERY != pStmt->sql.type) { if (pStmt->sql.stbInterlaceMode) { - STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock; - if (!pTableCxt->ordered || pTableCxt->duplicateTs) { - tscError("failed to insert disordered or duplicate data"); - STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); - } - int64_t startTs = taosGetTimestampUs(); while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { taosUsleep(1); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index a38842735c47..6c463e6de492 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -449,9 +449,10 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para * `infoSorted` is whether the bind information is sorted by column id * `pTSchema` is the schema of the table * `rowArray` is the array to store the rows + * `orderedDup` is an array to store ordered and duplicateTs */ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray) { + SArray *rowArray, bool *orderedDup) { if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -469,6 +470,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, return terrno; } + SRowKey rowKey, lastRowKey; for (int32_t iRow = 0; iRow < numOfRows; iRow++) { taosArrayClear(colValArray); @@ -507,6 +509,24 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, code = terrno; goto _exit; } + + if (orderedDup) { + tRowGetKey(row, &rowKey); + if (iRow == 0) { + // init to ordered by default + orderedDup[0] = true; + // init to non-duplicate by default + orderedDup[1] = false; + } else { + // no more compare if we already get disordered or duplicate rows + if (orderedDup[0] && !orderedDup[1]) { + int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); + orderedDup[0] = (code >= 0); + orderedDup[1] = (code == 0); + } + } + lastRowKey = rowKey; + } } _exit: @@ -3235,9 +3255,10 @@ int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int3 * `infoSorted` is whether the bind information is sorted by column id * `pTSchema` is the schema of the table * `rowArray` is the array to store the rows + * `orderedDup` is an array to store ordered and duplicateTs */ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray) { + SArray *rowArray, bool *orderedDup) { if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte } } + SRowKey rowKey, lastRowKey; for (int32_t iRow = 0; iRow < numOfRows; iRow++) { taosArrayClear(colValArray); @@ -3317,6 +3339,24 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte code = terrno; goto _exit; } + + if (orderedDup) { + tRowGetKey(row, &rowKey); + if (iRow == 0) { + // init to ordered by default + orderedDup[0] = true; + // init to non-duplicate by default + orderedDup[1] = false; + } else { + // no more compare if we already get disordered or duplicate rows + if (orderedDup[0] && !orderedDup[1]) { + int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); + orderedDup[0] = (code >= 0); + orderedDup[1] = (code == 0); + } + } + lastRowKey = rowKey; + } } _exit: diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index b3c89a6b1ce5..f463687a4f7c 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -323,6 +323,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind int32_t code = 0; int16_t lastColId = -1; bool colInOrder = true; + bool orderedDup[2]; if (NULL == *pTSchema) { *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); @@ -368,7 +369,9 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind // } } - code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols); + code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup); + pDataBlock->ordered = orderedDup[0]; + pDataBlock->duplicateTs = orderedDup[1]; qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); @@ -682,6 +685,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin int16_t lastColId = -1; bool colInOrder = true; int ncharColNums = 0; + bool orderedDup[2]; if (NULL == *pTSchema) { *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); @@ -738,7 +742,9 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin pBindInfos[c].bytes = pColSchema->bytes; } - code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols); + code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup); + pDataBlock->ordered = orderedDup[0]; + pDataBlock->duplicateTs = orderedDup[1]; qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); diff --git a/tests/script/api/makefile b/tests/script/api/makefile index c0422b0fa72c..68d0f592c819 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -29,6 +29,7 @@ exe: # gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS) gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS) gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS) + gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS) clean: rm $(ROOT)batchprepare @@ -48,3 +49,4 @@ clean: rm $(ROOT)stmt2-nohole rm $(ROOT)stmt-crash rm $(ROOT)stmt-insert-dupkeys + rm $(ROOT)stmt2-insert-dupkeys diff --git a/tests/script/api/stmt-insert-dupkeys.c b/tests/script/api/stmt-insert-dupkeys.c index 2d3bd822ac06..b564fbb21dc4 100644 --- a/tests/script/api/stmt-insert-dupkeys.c +++ b/tests/script/api/stmt-insert-dupkeys.c @@ -1,10 +1,12 @@ // compile with -// gcc -o multi_bind_example multi_bind_example.c -ltaos +// gcc -o stmt-insert-dupkeys stmt-insert-dupkeys.c -ltaos #include #include #include #include "taos.h" +#define NUMROWS 3 + /** * @brief execute sql only and ignore result set * @@ -37,20 +39,8 @@ void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { } } -/** - * @brief insert data using stmt API - * - * @param taos - */ -void insertData(TAOS *taos) { - // init with interlace mode - TAOS_STMT *stmt = taos_stmt_init(taos); - // prepare - const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); +void prepareBindTags(TAOS_MULTI_BIND *tags) { // bind table name and tags - TAOS_MULTI_BIND tags[2]; char *location = "California.SanFrancisco"; int groupId = 2; tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; @@ -64,150 +54,120 @@ void insertData(TAOS *taos) { tags[1].length = (int32_t *)&tags[1].buffer_length; tags[1].buffer = &groupId; tags[1].is_null = NULL; +} - code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); - checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); - - // highlight-start - // insert 3 rows with multi binds - TAOS_MULTI_BIND params[4]; - // values to bind - int64_t ts[] = {1648432611249, 1648432611749, 1648432611849}; - float current[] = {10.3f, 12.6f, 11.2f}; - int voltage[] = {219, 218, 216}; - float phase[] = {0.31f, 0.33f, 0.32f}; +void prepareBindParams(TAOS_MULTI_BIND *params, int64_t *ts, float *current, int *voltage, float *phase) { // is_null array - char is_null[3] = {0}; + char is_null[NUMROWS] = {0}; // length array - int32_t int64Len[3] = {sizeof(int64_t)}; - int32_t floatLen[3] = {sizeof(float)}; - int32_t intLen[3] = {sizeof(int)}; + int32_t int64Len[NUMROWS] = {sizeof(int64_t)}; + int32_t floatLen[NUMROWS] = {sizeof(float)}; + int32_t intLen[NUMROWS] = {sizeof(int)}; params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; params[0].buffer_length = sizeof(int64_t); params[0].buffer = ts; params[0].length = int64Len; params[0].is_null = is_null; - params[0].num = 3; + params[0].num = NUMROWS; params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; params[1].buffer_length = sizeof(float); params[1].buffer = current; params[1].length = floatLen; params[1].is_null = is_null; - params[1].num = 3; + params[1].num = NUMROWS; params[2].buffer_type = TSDB_DATA_TYPE_INT; params[2].buffer_length = sizeof(int); params[2].buffer = voltage; params[2].length = intLen; params[2].is_null = is_null; - params[2].num = 3; + params[2].num = NUMROWS; params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; params[3].buffer_length = sizeof(float); params[3].buffer = phase; params[3].length = floatLen; params[3].is_null = is_null; - params[3].num = 3; + params[3].num = NUMROWS; +} + +/** + * @brief insert data using stmt API + * + * @param taos + */ +void insertData(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) { + // init + TAOS_STMT *stmt = taos_stmt_init(taos); + + // prepare + const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + + // bind table name and tags + TAOS_MULTI_BIND tags[2]; + prepareBindTags(tags); + code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); + checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); + + TAOS_MULTI_BIND params[4]; + prepareBindParams(params, ts, current, voltage, phase); code = taos_stmt_bind_param_batch(stmt, params); // bind batch checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + code = taos_stmt_add_batch(stmt); // add batch checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); - // highlight-end + // execute code = taos_stmt_execute(stmt); checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + int affectedRows = taos_stmt_affected_rows(stmt); printf("successfully inserted %d rows\n", affectedRows); + // close (void)taos_stmt_close(stmt); } -void insertDupData(TAOS *taos) { +void insertDataInterlace(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) { // init with interlace mode TAOS_STMT_OPTIONS op; op.reqId = 0; op.singleStbInsert = true; op.singleTableBindOnce = true; TAOS_STMT *stmt = taos_stmt_init_with_options(taos, &op); + // prepare const char *sql = "INSERT INTO ? values(?, ?, ?, ?)"; int code = taos_stmt_prepare(stmt, sql, 0); checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + // bind table name and tags TAOS_MULTI_BIND tags[2]; - char *location = "California.SanFrancisco"; - int groupId = 2; - tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[0].buffer_length = strlen(location); - tags[0].length = (int32_t *)&tags[0].buffer_length; - tags[0].buffer = location; - tags[0].is_null = NULL; - - tags[1].buffer_type = TSDB_DATA_TYPE_INT; - tags[1].buffer_length = sizeof(int); - tags[1].length = (int32_t *)&tags[1].buffer_length; - tags[1].buffer = &groupId; - tags[1].is_null = NULL; - + prepareBindTags(tags); code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); - // highlight-start - // insert 3 rows with multi binds TAOS_MULTI_BIND params[4]; - // values to bind - int64_t ts[] = {1648432611749, 1648432611749, 1648432611849}; - float current[] = {10.3f, 12.6f, 11.2f}; - int voltage[] = {219, 218, 216}; - float phase[] = {0.31f, 0.33f, 0.32f}; - // is_null array - char is_null[3] = {0}; - // length array - int32_t int64Len[3] = {sizeof(int64_t)}; - int32_t floatLen[3] = {sizeof(float)}; - int32_t intLen[3] = {sizeof(int)}; - - params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[0].buffer_length = sizeof(int64_t); - params[0].buffer = ts; - params[0].length = int64Len; - params[0].is_null = is_null; - params[0].num = 3; - - params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[1].buffer_length = sizeof(float); - params[1].buffer = current; - params[1].length = floatLen; - params[1].is_null = is_null; - params[1].num = 3; - - params[2].buffer_type = TSDB_DATA_TYPE_INT; - params[2].buffer_length = sizeof(int); - params[2].buffer = voltage; - params[2].length = intLen; - params[2].is_null = is_null; - params[2].num = 3; - - params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[3].buffer_length = sizeof(float); - params[3].buffer = phase; - params[3].length = floatLen; - params[3].is_null = is_null; - params[3].num = 3; + prepareBindParams(params, ts, current, voltage, phase); code = taos_stmt_bind_param_batch(stmt, params); // bind batch checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + code = taos_stmt_add_batch(stmt); // add batch checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); - // highlight-end + // execute code = taos_stmt_execute(stmt); checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + int affectedRows = taos_stmt_affected_rows(stmt); printf("successfully inserted %d rows\n", affectedRows); + // close (void)taos_stmt_close(stmt); } @@ -224,14 +184,51 @@ int main() { executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), " "groupId INT)"); - insertData(taos); - insertDupData(taos); - insertData(taos); - insertDupData(taos); + + // initial insert, expect insert 3 rows + int64_t ts0[] = {1648432611234, 1648432611345, 1648432611456}; + float current0[] = {10.1f, 10.2f, 10.3f}; + int voltage0[] = {216, 217, 218}; + float phase0[] = {0.31f, 0.32f, 0.33f}; + insertData(taos, ts0, current0, voltage0, phase0); + + // insert with interlace mode, send non-duplicate ts, expect insert 3 overlapped rows + int64_t ts1[] = {1648432611234, 1648432611345, 1648432611456}; + int voltage1[] = {219, 220, 221}; + insertDataInterlace(taos, ts1, current0, voltage1, phase0); + + // insert with interlace mode, send duplicate ts, expect insert 2 rows with dups merged + int64_t ts2[] = {1648432611678, 1648432611678, 1648432611789}; + int voltage2[] = {222, 223, 224}; + insertDataInterlace(taos, ts2, current0, voltage2, phase0); + + // insert with interlace mode, send disordered rows, expect insert 3 sorted rows + int64_t ts3[] = {1648432611900, 1648432611890, 1648432611910}; + int voltage3[] = {225, 226, 227}; + insertDataInterlace(taos, ts3, current0, voltage3, phase0); + + // insert with interlace mode, send disordered and duplicate rows, expect insert 2 sorted and dup-merged rows + int64_t ts4[] = {1648432611930, 1648432611920, 1648432611930}; + int voltage4[] = {228, 229, 230}; + insertDataInterlace(taos, ts4, current0, voltage4, phase0); taos_close(taos); taos_cleanup(); + + // final results + // taos> select * from d1001; + // ts | current | voltage | phase | + // ====================================================================================== + // 2022-03-28 09:56:51.234 | 10.1000004 | 219 | 0.3100000 | + // 2022-03-28 09:56:51.345 | 10.1999998 | 220 | 0.3200000 | + // 2022-03-28 09:56:51.456 | 10.3000002 | 221 | 0.3300000 | + // 2022-03-28 09:56:51.678 | 10.1999998 | 223 | 0.3200000 | + // 2022-03-28 09:56:51.789 | 10.3000002 | 224 | 0.3300000 | + // 2022-03-28 09:56:51.890 | 10.1999998 | 226 | 0.3200000 | + // 2022-03-28 09:56:51.900 | 10.1000004 | 225 | 0.3100000 | + // 2022-03-28 09:56:51.910 | 10.3000002 | 227 | 0.3300000 | + // 2022-03-28 09:56:51.920 | 10.1999998 | 229 | 0.3200000 | + // 2022-03-28 09:56:51.930 | 10.3000002 | 230 | 0.3300000 | + // Query OK, 10 row(s) in set (0.005083s) } -// output: -// successfully inserted 3 rows diff --git a/tests/script/api/stmt2-insert-dupkeys.c b/tests/script/api/stmt2-insert-dupkeys.c new file mode 100644 index 000000000000..c056e1bcb040 --- /dev/null +++ b/tests/script/api/stmt2-insert-dupkeys.c @@ -0,0 +1,212 @@ +#include +#include +#include +#include +#include +#include "taos.h" + +int CTB_NUMS = 3; +int ROW_NUMS = 3; + +void do_query(TAOS* taos, const char* sql) { + TAOS_RES* result = taos_query(taos, sql); + int code = taos_errno(result); + if (code) { + printf("failed to query: %s, reason:%s\n", sql, taos_errstr(result)); + taos_free_result(result); + return; + } + taos_free_result(result); +} + +void createdb(TAOS* taos) { + do_query(taos, "drop database if exists db"); + do_query(taos, "create database db"); + do_query(taos, "create stable db.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); + do_query(taos, "use db"); +} + +#define INIT(tbs, ts, ts_len, b, b_len, tags, paramv) \ +do { \ + /* tbname */ \ + tbs = (char**)malloc(CTB_NUMS * sizeof(char*)); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + tbs[i] = (char*)malloc(sizeof(char) * 20); \ + sprintf(tbs[i], "ctb_%d", i); \ + } \ + /* col params */ \ + ts = (int64_t**)malloc(CTB_NUMS * sizeof(int64_t*)); \ + b = (char**)malloc(CTB_NUMS * sizeof(char*)); \ + ts_len = (int*)malloc(ROW_NUMS * sizeof(int)); \ + b_len = (int*)malloc(ROW_NUMS * sizeof(int)); \ + for (int i = 0; i < ROW_NUMS; i++) { \ + ts_len[i] = sizeof(int64_t); \ + b_len[i] = 1; \ + } \ + for (int i = 0; i < CTB_NUMS; i++) { \ + ts[i] = (int64_t*)malloc(ROW_NUMS * sizeof(int64_t)); \ + b[i] = (char*)malloc(ROW_NUMS * sizeof(char)); \ + for (int j = 0; j < ROW_NUMS; j++) { \ + ts[i][j] = 1591060628000 + j; \ + b[i][j] = (char)('a' + j); \ + } \ + } \ + /*tag params */ \ + int t1 = 0; \ + int t1len = sizeof(int); \ + int t2len = 3; \ + /* bind params */ \ + paramv = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \ + tags = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + /* create tags */ \ + tags[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \ + tags[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_INT, &t1, &t1len, NULL, 0}; \ + tags[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, "after", &t2len, NULL, 0}; \ + /* create col params */ \ + paramv[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \ + paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; \ + paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; \ + } \ +} while (0) + +#define UINIT(tbs, ts, ts_len, b, b_len, tags, paramv) \ +do { \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(tbs[i]); \ + } \ + free(tbs); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(ts[i]); \ + free(b[i]); \ + } \ + free(ts); \ + free(b); \ + free(ts_len); \ + free(b_len); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(tags[i]); \ + free(paramv[i]); \ + } \ + free(tags); \ + free(paramv); \ +} while (0) + +void insert(TAOS* taos, char **tbs, TAOS_STMT2_BIND **tags, TAOS_STMT2_BIND **paramv, const char* sql) +{ + clock_t start, end; + double cpu_time_used; + + TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; + TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option); + int code = taos_stmt2_prepare(stmt, sql, 0); + if (code != 0) { + printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + + // bind + start = clock(); + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, tags, paramv}; + if (taos_stmt2_bind_param(stmt, &bindv, -1)) { + printf("failed to execute taos_stmt2_bind_param statement.error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + end = clock(); + cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC; + printf("stmt2-bind [%s] insert Time used: %f seconds\n", sql, cpu_time_used); + start = clock(); + + // exec + if (taos_stmt2_exec(stmt, NULL)) { + printf("failed to execute taos_stmt2_exec statement.error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + end = clock(); + cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC; + printf("stmt2-exec [%s] insert Time used: %f seconds\n", sql, cpu_time_used); + + taos_stmt2_close(stmt); +} + +void insert_dist(TAOS* taos, const char *sql) { + char **tbs, **b; + int64_t **ts; + int *ts_len, *b_len; + TAOS_STMT2_BIND **paramv, **tags; + + INIT(tbs, ts, ts_len, b, b_len, tags, paramv); + + insert(taos, tbs, tags, paramv, sql); + + UINIT(tbs, ts, ts_len, b, b_len, tags, paramv); +} + +void insert_dup(TAOS* taos, const char *sql) { + char **tbs, **b; + int64_t **ts; + int *ts_len, *b_len; + TAOS_STMT2_BIND **paramv, **tags; + + INIT(tbs, ts, ts_len, b, b_len, tags, paramv); + + // insert duplicate rows + for (int i = 0; i < CTB_NUMS; i++) { + for (int j = 0; j < ROW_NUMS; j++) { + ts[i][j] = 1591060628000; + b[i][j] = (char)('x' + j); + } + } + for (int i = 0; i < CTB_NUMS; i++) { + paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + insert(taos, tbs, tags, paramv, sql); + + UINIT(tbs, ts, ts_len, b, b_len, tags, paramv); +} + +int main() { + TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0); + if (!taos) { + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); + exit(EXIT_FAILURE); + } + + createdb(taos); + // insert distinct rows + insert_dist(taos, "insert into db.? using db.stb tags(?,?)values(?,?)"); + // insert duplicate rows + insert_dup(taos, "insert into db.? values(?,?)"); + + taos_close(taos); + taos_cleanup(); +} + +// final results +// taos> select * from ctb_0; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.003975s) +// +// taos> select * from ctb_1; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.007241s) + +// taos> select * from ctb_2; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.005443s)