Skip to content

Commit

Permalink
fix: enable client dedup for stmt/stmt2 inserts in interlace mode
Browse files Browse the repository at this point in the history
  • Loading branch information
haolinw committed Dec 16, 2024
1 parent 6149330 commit 4f06b3b
Show file tree
Hide file tree
Showing 6 changed files with 502 additions and 6 deletions.
4 changes: 2 additions & 2 deletions include/common/tdataformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ typedef struct {
TAOS_MULTI_BIND *bind;
} SBindInfo;
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *orderedDup);

// stmt2 binding
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
Expand All @@ -392,7 +392,7 @@ typedef struct {
} SBindInfo2;

int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *orderedDup);

#endif

Expand Down
44 changes: 42 additions & 2 deletions source/common/src/tdataformat.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,10 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
*/
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *orderedDup) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand All @@ -469,6 +470,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
return terrno;
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -507,6 +509,24 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
code = terrno;
goto _exit;
}

if (orderedDup) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down Expand Up @@ -3235,9 +3255,10 @@ int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int3
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
*/
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *orderedDup) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand Down Expand Up @@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
}
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -3317,6 +3339,24 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
code = terrno;
goto _exit;
}

if (orderedDup) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down
10 changes: 8 additions & 2 deletions source/libs/parser/src/parInsertStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
int32_t code = 0;
int16_t lastColId = -1;
bool colInOrder = true;
bool orderedDup[2];

if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
Expand Down Expand Up @@ -368,7 +369,9 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
// }
}

code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down Expand Up @@ -682,6 +685,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
int16_t lastColId = -1;
bool colInOrder = true;
int ncharColNums = 0;
bool orderedDup[2];

if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
Expand Down Expand Up @@ -738,7 +742,9 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
pBindInfos[c].bytes = pColSchema->bytes;
}

code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down
4 changes: 4 additions & 0 deletions tests/script/api/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ exe:
# gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS)
gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS)

clean:
rm $(ROOT)batchprepare
Expand All @@ -46,3 +48,5 @@ clean:
rm $(ROOT)stmt2-get-fields
rm $(ROOT)stmt2-nohole
rm $(ROOT)stmt-crash
rm $(ROOT)stmt-insert-dupkeys
rm $(ROOT)stmt2-insert-dupkeys
234 changes: 234 additions & 0 deletions tests/script/api/stmt-insert-dupkeys.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// compile with
// gcc -o stmt-insert-dupkeys stmt-insert-dupkeys.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"

#define NUMROWS 3

/**
* @brief execute sql only and ignore result set
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}

/**
* @brief exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
exit(EXIT_FAILURE);
}
}

void prepareBindTags(TAOS_MULTI_BIND *tags) {
// bind table name and tags
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[0].buffer_length = strlen(location);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = location;
tags[0].is_null = NULL;

tags[1].buffer_type = TSDB_DATA_TYPE_INT;
tags[1].buffer_length = sizeof(int);
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = &groupId;
tags[1].is_null = NULL;
}

void prepareBindParams(TAOS_MULTI_BIND *params, int64_t *ts, float *current, int *voltage, float *phase) {
// is_null array
char is_null[NUMROWS] = {0};
// length array
int32_t int64Len[NUMROWS] = {sizeof(int64_t)};
int32_t floatLen[NUMROWS] = {sizeof(float)};
int32_t intLen[NUMROWS] = {sizeof(int)};

params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].buffer = ts;
params[0].length = int64Len;
params[0].is_null = is_null;
params[0].num = NUMROWS;

params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].buffer = current;
params[1].length = floatLen;
params[1].is_null = is_null;
params[1].num = NUMROWS;

params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].buffer = voltage;
params[2].length = intLen;
params[2].is_null = is_null;
params[2].num = NUMROWS;

params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].buffer = phase;
params[3].length = floatLen;
params[3].is_null = is_null;
params[3].num = NUMROWS;
}

/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);

// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");

// bind table name and tags
TAOS_MULTI_BIND tags[2];
prepareBindTags(tags);
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");

TAOS_MULTI_BIND params[4];
prepareBindParams(params, ts, current, voltage, phase);

code = taos_stmt_bind_param_batch(stmt, params); // bind batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");

code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");

// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");

int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);

// close
(void)taos_stmt_close(stmt);
}

void insertDataInterlace(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) {
// init with interlace mode
TAOS_STMT_OPTIONS op;
op.reqId = 0;
op.singleStbInsert = true;
op.singleTableBindOnce = true;
TAOS_STMT *stmt = taos_stmt_init_with_options(taos, &op);

// prepare
const char *sql = "INSERT INTO ? values(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");

// bind table name and tags
TAOS_MULTI_BIND tags[2];
prepareBindTags(tags);
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");

TAOS_MULTI_BIND params[4];
prepareBindParams(params, ts, current, voltage, phase);

code = taos_stmt_bind_param_batch(stmt, params); // bind batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");

code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");

// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");

int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);

// close
(void)taos_stmt_close(stmt);
}

int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "DROP DATABASE IF EXISTS power");
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
"groupId INT)");

// initial insert, expect insert 3 rows
int64_t ts0[] = {1648432611234, 1648432611345, 1648432611456};
float current0[] = {10.1f, 10.2f, 10.3f};
int voltage0[] = {216, 217, 218};
float phase0[] = {0.31f, 0.32f, 0.33f};
insertData(taos, ts0, current0, voltage0, phase0);

// insert with interlace mode, send non-duplicate ts, expect insert 3 overlapped rows
int64_t ts1[] = {1648432611234, 1648432611345, 1648432611456};
int voltage1[] = {219, 220, 221};
insertDataInterlace(taos, ts1, current0, voltage1, phase0);

// insert with interlace mode, send duplicate ts, expect insert 2 rows with dups merged
int64_t ts2[] = {1648432611678, 1648432611678, 1648432611789};
int voltage2[] = {222, 223, 224};
insertDataInterlace(taos, ts2, current0, voltage2, phase0);

// insert with interlace mode, send disordered rows, expect insert 3 sorted rows
int64_t ts3[] = {1648432611900, 1648432611890, 1648432611910};
int voltage3[] = {225, 226, 227};
insertDataInterlace(taos, ts3, current0, voltage3, phase0);

// insert with interlace mode, send disordered and duplicate rows, expect insert 2 sorted and dup-merged rows
int64_t ts4[] = {1648432611930, 1648432611920, 1648432611930};
int voltage4[] = {228, 229, 230};
insertDataInterlace(taos, ts4, current0, voltage4, phase0);

taos_close(taos);
taos_cleanup();

// final results
// taos> select * from d1001;
// ts | current | voltage | phase |
// ======================================================================================
// 2022-03-28 09:56:51.234 | 10.1000004 | 219 | 0.3100000 |
// 2022-03-28 09:56:51.345 | 10.1999998 | 220 | 0.3200000 |
// 2022-03-28 09:56:51.456 | 10.3000002 | 221 | 0.3300000 |
// 2022-03-28 09:56:51.678 | 10.1999998 | 223 | 0.3200000 |
// 2022-03-28 09:56:51.789 | 10.3000002 | 224 | 0.3300000 |
// 2022-03-28 09:56:51.890 | 10.1999998 | 226 | 0.3200000 |
// 2022-03-28 09:56:51.900 | 10.1000004 | 225 | 0.3100000 |
// 2022-03-28 09:56:51.910 | 10.3000002 | 227 | 0.3300000 |
// 2022-03-28 09:56:51.920 | 10.1999998 | 229 | 0.3200000 |
// 2022-03-28 09:56:51.930 | 10.3000002 | 230 | 0.3300000 |
// Query OK, 10 row(s) in set (0.005083s)
}

Loading

0 comments on commit 4f06b3b

Please sign in to comment.