Skip to content

Commit

Permalink
feat: support create-db/stb/tb/stream retry (taosdata#512)
Browse files Browse the repository at this point in the history
* feat: support create-db/stb/tb/stream retry

* fix: trying re-init before use
  • Loading branch information
sangshuduo authored Dec 12, 2022
1 parent 4a4027c commit 295cdcd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 24 deletions.
103 changes: 83 additions & 20 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
ret = -1;
} else {
ret = queryDbExec(conn, command);
int32_t trying = g_arguments->keep_trying;
while (ret && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-create "
"supertable %s\n",
g_arguments->trying_interval, stbInfo->stbName);
toolsMsleep(g_arguments->trying_interval);
ret = queryDbExec(conn, command);
if (trying != -1) {
trying --;
}
}
if (0 != ret) {
errorPrint("create supertable %s failed!\n\n",
stbInfo->stbName);
Expand Down Expand Up @@ -383,7 +394,6 @@ int32_t getVgroupsOfDb(SBenchConn *conn, SDataBase *database) {
}
#endif // TD_VER_COMPATIBLE_3_0_0_0


int geneDbCreateCmd(SDataBase *database, char *command) {
int dataLen = 0;
#ifdef TD_VER_COMPATIBLE_3_0_0_0
Expand Down Expand Up @@ -528,7 +538,19 @@ int createDatabaseTaosc(SDataBase* database) {

geneDbCreateCmd(database, command);

if (0 != queryDbExec(conn, command)) {
int32_t code = queryDbExec(conn, command);
int32_t trying = g_arguments->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-create database %s\n",
g_arguments->trying_interval, database->dbName);
toolsMsleep(g_arguments->trying_interval);
code = queryDbExec(conn, command);
if (trying != -1) {
trying --;
}
}

if (code) {
close_bench_conn(conn);
errorPrint("\ncreate database %s failed!\n\n",
database->dbName);
Expand Down Expand Up @@ -658,6 +680,17 @@ static void *createTable(void *sarg) {
pThreadInfo->sockfd);
} else {
ret = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer);
int32_t trying = g_arguments->keep_trying;
while (ret && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-create "
"table %s\n",
g_arguments->trying_interval, pThreadInfo->buffer);
toolsMsleep(g_arguments->trying_interval);
ret = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer);
if (trying != -1) {
trying --;
}
}
}
if (0 != ret) {
g_fail = true;
Expand Down Expand Up @@ -911,13 +944,14 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
case TAOSC_IFACE:
debugPrint("buffer: %s\n", pThreadInfo->buffer);
code = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer);
while (code && stbInfo->keep_trying) {
int32_t trying = stbInfo->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
stbInfo->trying_interval);
toolsMsleep(stbInfo->trying_interval);
code = queryDbExec(pThreadInfo->conn, pThreadInfo->buffer);
if (stbInfo->keep_trying != -1) {
stbInfo->keep_trying --;
if (trying != -1) {
trying --;
}
}
break;
Expand All @@ -932,7 +966,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
stbInfo->tcpTransfer,
pThreadInfo->sockfd,
pThreadInfo->filePath);
while (code && stbInfo->keep_trying) {
trying = stbInfo->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
stbInfo->trying_interval);
toolsMsleep(stbInfo->trying_interval);
Expand All @@ -944,8 +979,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
stbInfo->tcpTransfer,
pThreadInfo->sockfd,
pThreadInfo->filePath);
if (stbInfo->keep_trying != -1) {
stbInfo->keep_trying --;
if (trying != -1) {
trying --;
}
}
break;
Expand Down Expand Up @@ -973,7 +1008,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
? database->sml_precision
: TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(res);
while (code && stbInfo->keep_trying) {
trying = stbInfo->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
stbInfo->trying_interval);
toolsMsleep(stbInfo->trying_interval);
Expand All @@ -986,8 +1022,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
? database->sml_precision
: TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(res);
if (stbInfo->keep_trying != -1) {
stbInfo->keep_trying --;
if (trying != -1) {
trying --;
}
}

Expand Down Expand Up @@ -2435,8 +2471,8 @@ static void* create_tsmas(void* args) {
return NULL;
}

static int createStream(SSTREAM* stream) {
int code = -1;
static int32_t createStream(SSTREAM* stream) {
int32_t code = -1;
char * command = benchCalloc(1, BUFFER_SIZE, false);
snprintf(command, BUFFER_SIZE, "DROP STREAM IF EXISTS %s",
stream->stream_name);
Expand All @@ -2445,29 +2481,56 @@ static int createStream(SSTREAM* stream) {
if (NULL == conn) {
goto END;
}
if (queryDbExec(conn, command)){

code = queryDbExec(conn, command);
int32_t trying = g_arguments->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-drop stream %s\n",
g_arguments->trying_interval, stream->stream_name);
toolsMsleep(g_arguments->trying_interval);
code = queryDbExec(conn, command);
if (trying != -1) {
trying --;
}
}

if (code) {
close_bench_conn(conn);
goto END;
}

memset(command, 0, BUFFER_SIZE);
int pos = snprintf(command, BUFFER_SIZE,
"create stream if not exists %s ", stream->stream_name);
"CREATE STREAM IF NOT EXISTS %s ", stream->stream_name);
if (stream->trigger_mode[0] != '\0') {
pos += snprintf(command + pos, BUFFER_SIZE - pos,
"trigger %s ", stream->trigger_mode);
"TRIGGER %s ", stream->trigger_mode);
}
if (stream->watermark[0] != '\0') {
pos += snprintf(command + pos, BUFFER_SIZE - pos,
"watermark %s ", stream->watermark);
"WATERMARK %s ", stream->watermark);
}
snprintf(command + pos, BUFFER_SIZE - pos,
"into %s as %s", stream->stream_stb, stream->source_sql);
"INTO %s as %s", stream->stream_stb, stream->source_sql);
infoPrint("%s\n", command);
if (queryDbExec(conn, command)) {

code = queryDbExec(conn, command);
trying = g_arguments->keep_trying;
while (code && trying) {
infoPrint("will sleep %"PRIu32" milliseconds "
"then re-create stream %s\n",
g_arguments->trying_interval, stream->stream_name);
toolsMsleep(g_arguments->trying_interval);
code = queryDbExec(conn, command);
if (trying != -1) {
trying --;
}
}

if (code) {
close_bench_conn(conn);
goto END;
}
code = 0;
close_bench_conn(conn);
END:
tmfree(command);
Expand Down
7 changes: 3 additions & 4 deletions src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,9 @@ void close_bench_conn(SBenchConn* conn) {
tmfree(conn);
}

int queryDbExecRest(char *command, char* dbName, int precision,
int32_t queryDbExecRest(char *command, char* dbName, int precision,
int iface, int protocol, bool tcp, int sockfd) {
// TODO
int code = postProceSql(command,
int32_t code = postProceSql(command,
dbName,
precision,
iface,
Expand All @@ -310,7 +309,7 @@ int queryDbExecRest(char *command, char* dbName, int precision,
return code;
}

int queryDbExec(SBenchConn *conn, char *command) {
int32_t queryDbExec(SBenchConn *conn, char *command) {
int32_t code = 0;
#ifdef WEBSOCKET
if (g_arguments->websocket) {
Expand Down

0 comments on commit 295cdcd

Please sign in to comment.