Skip to content

Commit

Permalink
newfix: enable dedup on stmt/stmt2 inserts in interlace mode
Browse files Browse the repository at this point in the history
  • Loading branch information
haolinw committed Dec 16, 2024
1 parent fe6ca7b commit 24175ab
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 115 deletions.
4 changes: 2 additions & 2 deletions include/common/tdataformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ typedef struct {
TAOS_MULTI_BIND *bind;
} SBindInfo;
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *orderedDup);

// stmt2 binding
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
Expand All @@ -392,7 +392,7 @@ typedef struct {
} SBindInfo2;

int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *orderedDup);

#endif

Expand Down
6 changes: 0 additions & 6 deletions source/client/src/clientStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1557,12 +1557,6 @@ int stmtExec(TAOS_STMT* stmt) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
if (pStmt->sql.stbInterlaceMode) {
STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock;
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
tscError("failed to insert disordered or duplicate data");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}

int64_t startTs = taosGetTimestampUs();
while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
taosUsleep(1);
Expand Down
6 changes: 0 additions & 6 deletions source/client/src/clientStmt2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1669,12 +1669,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {

if (STMT_TYPE_QUERY != pStmt->sql.type) {
if (pStmt->sql.stbInterlaceMode) {
STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock;
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
tscError("failed to insert disordered or duplicate data");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}

int64_t startTs = taosGetTimestampUs();
while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
taosUsleep(1);
Expand Down
44 changes: 42 additions & 2 deletions source/common/src/tdataformat.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,10 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
*/
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *orderedDup) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand All @@ -469,6 +470,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
return terrno;
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -507,6 +509,24 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
code = terrno;
goto _exit;
}

if (orderedDup) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down Expand Up @@ -3235,9 +3255,10 @@ int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int3
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
*/
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *orderedDup) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand Down Expand Up @@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
}
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -3317,6 +3339,24 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
code = terrno;
goto _exit;
}

if (orderedDup) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down
10 changes: 8 additions & 2 deletions source/libs/parser/src/parInsertStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
int32_t code = 0;
int16_t lastColId = -1;
bool colInOrder = true;
bool orderedDup[2];

if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
Expand Down Expand Up @@ -368,7 +369,9 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
// }
}

code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down Expand Up @@ -682,6 +685,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
int16_t lastColId = -1;
bool colInOrder = true;
int ncharColNums = 0;
bool orderedDup[2];

if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
Expand Down Expand Up @@ -738,7 +742,9 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
pBindInfos[c].bytes = pColSchema->bytes;
}

code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down
2 changes: 2 additions & 0 deletions tests/script/api/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ exe:
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS)
gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS)

clean:
rm $(ROOT)batchprepare
Expand All @@ -48,3 +49,4 @@ clean:
rm $(ROOT)stmt2-nohole
rm $(ROOT)stmt-crash
rm $(ROOT)stmt-insert-dupkeys
rm $(ROOT)stmt2-insert-dupkeys
Loading

0 comments on commit 24175ab

Please sign in to comment.