From 295cdcdb74d35768734c45d2d2c1a47ed4c25739 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 12 Dec 2022 10:03:27 +0800 Subject: [PATCH] feat: support create-db/stb/tb/stream retry (#512) * feat: support create-db/stb/tb/stream retry * fix: trying re-init before use --- src/benchInsert.c | 103 +++++++++++++++++++++++++++++++++++++--------- src/benchUtil.c | 7 ++-- 2 files changed, 86 insertions(+), 24 deletions(-) diff --git a/src/benchInsert.c b/src/benchInsert.c index e189fb1ea..d3bac02cd 100644 --- a/src/benchInsert.c +++ b/src/benchInsert.c @@ -307,6 +307,17 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) { ret = -1; } else { ret = queryDbExec(conn, command); + int32_t trying = g_arguments->keep_trying; + while (ret && trying) { + infoPrint("will sleep %"PRIu32" milliseconds then re-create " + "supertable %s\n", + g_arguments->trying_interval, stbInfo->stbName); + toolsMsleep(g_arguments->trying_interval); + ret = queryDbExec(conn, command); + if (trying != -1) { + trying --; + } + } if (0 != ret) { errorPrint("create supertable %s failed!\n\n", stbInfo->stbName); @@ -383,7 +394,6 @@ int32_t getVgroupsOfDb(SBenchConn *conn, SDataBase *database) { } #endif // TD_VER_COMPATIBLE_3_0_0_0 - int geneDbCreateCmd(SDataBase *database, char *command) { int dataLen = 0; #ifdef TD_VER_COMPATIBLE_3_0_0_0 @@ -528,7 +538,19 @@ int createDatabaseTaosc(SDataBase* database) { geneDbCreateCmd(database, command); - if (0 != queryDbExec(conn, command)) { + int32_t code = queryDbExec(conn, command); + int32_t trying = g_arguments->keep_trying; + while (code && trying) { + infoPrint("will sleep %"PRIu32" milliseconds then re-create database %s\n", + g_arguments->trying_interval, database->dbName); + toolsMsleep(g_arguments->trying_interval); + code = queryDbExec(conn, command); + if (trying != -1) { + trying --; + } + } + + if (code) { close_bench_conn(conn); errorPrint("\ncreate database %s failed!\n\n", database->dbName); @@ -658,6 +680,17 @@ static void *createTable(void *sarg) { pThreadInfo->sockfd); } else { ret = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer); + int32_t trying = g_arguments->keep_trying; + while (ret && trying) { + infoPrint("will sleep %"PRIu32" milliseconds then re-create " + "table %s\n", + g_arguments->trying_interval, pThreadInfo->buffer); + toolsMsleep(g_arguments->trying_interval); + ret = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer); + if (trying != -1) { + trying --; + } + } } if (0 != ret) { g_fail = true; @@ -911,13 +944,14 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { case TAOSC_IFACE: debugPrint("buffer: %s\n", pThreadInfo->buffer); code = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer); - while (code && stbInfo->keep_trying) { + int32_t trying = stbInfo->keep_trying; + while (code && trying) { infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n", stbInfo->trying_interval); toolsMsleep(stbInfo->trying_interval); code = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer); - if (stbInfo->keep_trying != -1) { - stbInfo->keep_trying --; + if (trying != -1) { + trying --; } } break; @@ -932,7 +966,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { stbInfo->tcpTransfer, pThreadInfo->sockfd, pThreadInfo->filePath); - while (code && stbInfo->keep_trying) { + trying = stbInfo->keep_trying; + while (code && trying) { infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n", stbInfo->trying_interval); toolsMsleep(stbInfo->trying_interval); @@ -944,8 +979,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { stbInfo->tcpTransfer, pThreadInfo->sockfd, pThreadInfo->filePath); - if (stbInfo->keep_trying != -1) { - stbInfo->keep_trying --; + if (trying != -1) { + trying --; } } break; @@ -973,7 +1008,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { ? database->sml_precision : TSDB_SML_TIMESTAMP_NOT_CONFIGURED); code = taos_errno(res); - while (code && stbInfo->keep_trying) { + trying = stbInfo->keep_trying; + while (code && trying) { infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n", stbInfo->trying_interval); toolsMsleep(stbInfo->trying_interval); @@ -986,8 +1022,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { ? database->sml_precision : TSDB_SML_TIMESTAMP_NOT_CONFIGURED); code = taos_errno(res); - if (stbInfo->keep_trying != -1) { - stbInfo->keep_trying --; + if (trying != -1) { + trying --; } } @@ -2435,8 +2471,8 @@ static void* create_tsmas(void* args) { return NULL; } -static int createStream(SSTREAM* stream) { - int code = -1; +static int32_t createStream(SSTREAM* stream) { + int32_t code = -1; char * command = benchCalloc(1, BUFFER_SIZE, false); snprintf(command, BUFFER_SIZE, "DROP STREAM IF EXISTS %s", stream->stream_name); @@ -2445,29 +2481,56 @@ static int createStream(SSTREAM* stream) { if (NULL == conn) { goto END; } - if (queryDbExec(conn, command)){ + + code = queryDbExec(conn, command); + int32_t trying = g_arguments->keep_trying; + while (code && trying) { + infoPrint("will sleep %"PRIu32" milliseconds then re-drop stream %s\n", + g_arguments->trying_interval, stream->stream_name); + toolsMsleep(g_arguments->trying_interval); + code = queryDbExec(conn, command); + if (trying != -1) { + trying --; + } + } + + if (code) { close_bench_conn(conn); goto END; } + memset(command, 0, BUFFER_SIZE); int pos = snprintf(command, BUFFER_SIZE, - "create stream if not exists %s ", stream->stream_name); + "CREATE STREAM IF NOT EXISTS %s ", stream->stream_name); if (stream->trigger_mode[0] != '\0') { pos += snprintf(command + pos, BUFFER_SIZE - pos, - "trigger %s ", stream->trigger_mode); + "TRIGGER %s ", stream->trigger_mode); } if (stream->watermark[0] != '\0') { pos += snprintf(command + pos, BUFFER_SIZE - pos, - "watermark %s ", stream->watermark); + "WATERMARK %s ", stream->watermark); } snprintf(command + pos, BUFFER_SIZE - pos, - "into %s as %s", stream->stream_stb, stream->source_sql); + "INTO %s as %s", stream->stream_stb, stream->source_sql); infoPrint("%s\n", command); - if (queryDbExec(conn, command)) { + + code = queryDbExec(conn, command); + trying = g_arguments->keep_trying; + while (code && trying) { + infoPrint("will sleep %"PRIu32" milliseconds " + "then re-create stream %s\n", + g_arguments->trying_interval, stream->stream_name); + toolsMsleep(g_arguments->trying_interval); + code = queryDbExec(conn, command); + if (trying != -1) { + trying --; + } + } + + if (code) { close_bench_conn(conn); goto END; } - code = 0; close_bench_conn(conn); END: tmfree(command); diff --git a/src/benchUtil.c b/src/benchUtil.c index 074746fb1..c17cb3e82 100644 --- a/src/benchUtil.c +++ b/src/benchUtil.c @@ -296,10 +296,9 @@ void close_bench_conn(SBenchConn* conn) { tmfree(conn); } -int queryDbExecRest(char *command, char* dbName, int precision, +int32_t queryDbExecRest(char *command, char* dbName, int precision, int iface, int protocol, bool tcp, int sockfd) { - // TODO - int code = postProceSql(command, + int32_t code = postProceSql(command, dbName, precision, iface, @@ -310,7 +309,7 @@ int queryDbExecRest(char *command, char* dbName, int precision, return code; } -int queryDbExec(SBenchConn *conn, char *command) { +int32_t queryDbExec(SBenchConn *conn, char *command) { int32_t code = 0; #ifdef WEBSOCKET if (g_arguments->websocket) {