diff --git a/inc/bench.h b/inc/bench.h index 2fb088593..d47bafbaf 100644 --- a/inc/bench.h +++ b/inc/bench.h @@ -648,6 +648,8 @@ typedef struct SpecifiedQueryInfo_S { TAOS_RES *res[MAX_QUERY_SQL_COUNT]; uint64_t totalQueried; bool mixed_query; + // error rate + uint64_t totalFail; } SpecifiedQueryInfo; typedef struct SuperQueryInfo_S { @@ -669,6 +671,8 @@ typedef struct SuperQueryInfo_S { TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT]; char ** childTblName; uint64_t totalQueried; + // error rate + uint64_t totalFail; } SuperQueryInfo; typedef struct SQueryMetaInfo_S { @@ -856,14 +860,24 @@ typedef struct SThreadInfo_S { } threadInfo; typedef struct SQueryThreadInfo_S { - int start_sql; - int end_sql; - int threadId; - BArray* query_delay_list; - int sockfd; SBenchConn* conn; - int64_t total_delay; -} queryThreadInfo; + int32_t start_sql; + int32_t end_sql; + int32_t threadID; + BArray* query_delay_list; + int32_t sockfd; + double total_delay; + + char filePath[MAX_PATH_LEN]; + uint64_t start_table_from; + uint64_t end_table_to; + uint64_t ntables; + uint64_t querySeq; + + // error rate + uint64_t nSucc; + uint64_t nFail; +} qThreadInfo; typedef struct STSmaThreadInfo_S { char* dbName; @@ -883,6 +897,7 @@ extern bool g_fail; extern char configDir[]; extern tools_cJSON * root; extern uint64_t g_memoryUsage; +extern int32_t g_majorVersionOfClient; #define min(a, b) (((a) < (b)) ? (a) : (b)) #define BARRAY_GET_ELEM(array, index) \ @@ -895,11 +910,11 @@ void initArgument(); void queryAggrFunc(); void parseFieldDatatype(char *dataType, BArray *fields, bool isTag); /* demoJsonOpt.c */ -int getInfoFromJsonFile(); +int readJsonConfig(char * file); /* demoUtil.c */ int compare(const void *a, const void *b); void encodeAuthBase64(); -void replaceChildTblName(char *inSql, char *outSql, int tblIndex); +int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex); void setupForAnsiEscape(void); void resetAfterAnsiEscape(void); char * convertDatatypeToString(int type); @@ -907,7 +922,7 @@ int convertStringToDatatype(char *type, int length); unsigned int taosRandom(); void tmfree(void *buf); void tmfclose(FILE *fp); -int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo); +int64_t fetchResult(TAOS_RES *res, char *filePath); void prompt(bool NonStopMode); void ERROR_EXIT(const char *msg); int getServerVersionRest(int16_t rest_port); @@ -927,7 +942,8 @@ int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName, int64_t childTblCountOfSuperTbl); void* benchCalloc(size_t nmemb, size_t size, bool record); BArray* benchArrayInit(size_t size, size_t elemSize); -void* benchArrayPush(BArray* pArray, void* pData); +void* benchArrayPush(BArray* pArray, void* pData); // free pData for auto +void* benchArrayPushNoFree(BArray* pArray, void* pData); // not free pData void* benchArrayDestroy(BArray* pArray); void benchArrayClear(BArray* pArray); void* benchArrayGet(const BArray* pArray, size_t index); @@ -1015,4 +1031,18 @@ bool isRest(int32_t iface); // get group index about dbname.tbname int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt); + +// ------------ benchQuery util ------------- +void freeSpecialQueryInfo(); +// init conn +int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface); +// close conn +void closeQueryConn(qThreadInfo * pThreadInfo, int iface); + +void *queryKiller(void *arg); +// kill show +int killSlowQuery(); +// fetch super table child name from server +int fetchChildTableName(char *dbName, char *stbName); + #endif // INC_BENCH_H_ diff --git a/src/benchJsonOpt.c b/src/benchJsonOpt.c index 932b38c88..3e4190866 100644 --- a/src/benchJsonOpt.c +++ b/src/benchJsonOpt.c @@ -1081,6 +1081,7 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { return -1; } + // read from super table tools_cJSON *continueIfFail = tools_cJSON_GetObjectItem(stbInfo, "continue_if_fail"); // yes, no, if (tools_cJSON_IsString(continueIfFail)) { @@ -1095,6 +1096,9 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { continueIfFail->valuestring); return -1; } + } else { + // default value is common specialed + superTable->continueIfFail = g_arguments->continueIfFail; } // start_fillback_time @@ -1565,6 +1569,7 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } } + // read from common tools_cJSON *continueIfFail = tools_cJSON_GetObjectItem(json, "continue_if_fail"); // yes, no, if (tools_cJSON_IsString(continueIfFail)) { @@ -1770,79 +1775,8 @@ static int getMetaFromInsertJsonFile(tools_cJSON *json) { return code; } -static int getMetaFromQueryJsonFile(tools_cJSON *json) { - int32_t code = -1; - - tools_cJSON *telnet_tcp_port = - tools_cJSON_GetObjectItem(json, "telnet_tcp_port"); - if (tools_cJSON_IsNumber(telnet_tcp_port)) { - g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint; - } - - tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times"); - if (tools_cJSON_IsNumber(gQueryTimes)) { - g_queryInfo.query_times = gQueryTimes->valueint; - } else { - g_queryInfo.query_times = 1; - } - - tools_cJSON *gKillSlowQueryThreshold = - tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold"); - if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) { - g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint; - } else { - g_queryInfo.killQueryThreshold = 0; - } - - tools_cJSON *gKillSlowQueryInterval = - tools_cJSON_GetObjectItem(json, "kill_slow_query_interval"); - if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) { - g_queryInfo.killQueryInterval = gKillSlowQueryInterval ->valueint; - } else { - g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */ - } - - tools_cJSON *resetCache = - tools_cJSON_GetObjectItem(json, "reset_query_cache"); - if (tools_cJSON_IsString(resetCache)) { - if (0 == strcasecmp(resetCache->valuestring, "yes")) { - g_queryInfo.reset_query_cache = true; - } - } else { - g_queryInfo.reset_query_cache = false; - } - - tools_cJSON *respBuffer = - tools_cJSON_GetObjectItem(json, "response_buffer"); - if (tools_cJSON_IsNumber(respBuffer)) { - g_queryInfo.response_buffer = respBuffer->valueint; - } else { - g_queryInfo.response_buffer = RESP_BUF_LEN; - } - - tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases"); - if (tools_cJSON_IsString(dbs)) { - g_queryInfo.dbName = dbs->valuestring; - } - - tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode"); - if (tools_cJSON_IsString(queryMode)) { - if (0 == strcasecmp(queryMode->valuestring, "rest")) { - g_queryInfo.iface = REST_IFACE; - } else if (0 == strcasecmp(queryMode->valuestring, "taosc")) { - g_queryInfo.iface = TAOSC_IFACE; - } else { - errorPrint("Invalid query_mode value: %s\n", - queryMode->valuestring); - goto PARSE_OVER; - } - } - // init sqls - g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL)); - - // specified_table_query - tools_cJSON *specifiedQuery = - tools_cJSON_GetObjectItem(json, "specified_table_query"); +// Spec Query +int32_t readSpecQueryJson(tools_cJSON * specifiedQuery) { g_queryInfo.specifiedQueryInfo.concurrent = 1; if (tools_cJSON_IsObject(specifiedQuery)) { tools_cJSON *queryInterval = @@ -1868,12 +1802,14 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { if (tools_cJSON_IsString(mixedQueryObj)) { if (0 == strcasecmp(mixedQueryObj->valuestring, "yes")) { g_queryInfo.specifiedQueryInfo.mixed_query = true; + infoPrint("%s\n","mixed_query is True"); } else if (0 == strcasecmp(mixedQueryObj->valuestring, "no")) { g_queryInfo.specifiedQueryInfo.mixed_query = false; + infoPrint("%s\n","mixed_query is False"); } else { errorPrint("Invalid mixed_query value: %s\n", mixedQueryObj->valuestring); - goto PARSE_OVER; + return -1; } } @@ -1955,7 +1891,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { if (fp == NULL) { errorPrint("failed to open file: %s\n", sqlFileObj->valuestring); - goto PARSE_OVER; + return -1; } char *buf = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); while (fgets(buf, TSDB_MAX_ALLOWED_SQL_LEN, fp)) { @@ -2040,16 +1976,19 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { } } else { errorPrint("%s", "Invalid sql in json\n"); - goto PARSE_OVER; + return -1; } } } } } - // super_table_query - tools_cJSON *superQuery = - tools_cJSON_GetObjectItem(json, "super_table_query"); + // succ + return 0; +} + +// Super Query +int32_t readSuperQueryJson(tools_cJSON * superQuery) { g_queryInfo.superQueryInfo.threadCnt = 1; if (!superQuery || superQuery->type != tools_cJSON_Object) { g_queryInfo.superQueryInfo.sqlCount = 0; @@ -2175,7 +2114,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { errorPrint( "failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); - goto PARSE_OVER; + return -1; } g_queryInfo.superQueryInfo.sqlCount = superSqlSize; @@ -2201,11 +2140,118 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { } } } + + // succ + return 0; +} - code = 0; +// read query json +static int getMetaFromQueryJsonFile(tools_cJSON *json) { + int32_t code = -1; -PARSE_OVER: - return code; + // read common + tools_cJSON *telnet_tcp_port = + tools_cJSON_GetObjectItem(json, "telnet_tcp_port"); + if (tools_cJSON_IsNumber(telnet_tcp_port)) { + g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint; + } + + tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times"); + if (tools_cJSON_IsNumber(gQueryTimes)) { + g_queryInfo.query_times = gQueryTimes->valueint; + } else { + g_queryInfo.query_times = 1; + } + + tools_cJSON *gKillSlowQueryThreshold = + tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold"); + if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) { + g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint; + } else { + g_queryInfo.killQueryThreshold = 0; + } + + tools_cJSON *gKillSlowQueryInterval = + tools_cJSON_GetObjectItem(json, "kill_slow_query_interval"); + if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) { + g_queryInfo.killQueryInterval = gKillSlowQueryInterval->valueint; + } else { + g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */ + } + + tools_cJSON *resetCache = + tools_cJSON_GetObjectItem(json, "reset_query_cache"); + if (tools_cJSON_IsString(resetCache)) { + if (0 == strcasecmp(resetCache->valuestring, "yes")) { + g_queryInfo.reset_query_cache = true; + } + } else { + g_queryInfo.reset_query_cache = false; + } + + tools_cJSON *respBuffer = + tools_cJSON_GetObjectItem(json, "response_buffer"); + if (tools_cJSON_IsNumber(respBuffer)) { + g_queryInfo.response_buffer = respBuffer->valueint; + } else { + g_queryInfo.response_buffer = RESP_BUF_LEN; + } + + tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases"); + if (tools_cJSON_IsString(dbs)) { + g_queryInfo.dbName = dbs->valuestring; + } + + tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode"); + if (tools_cJSON_IsString(queryMode)) { + if (0 == strcasecmp(queryMode->valuestring, "rest")) { + g_queryInfo.iface = REST_IFACE; + } else if (0 == strcasecmp(queryMode->valuestring, "taosc")) { + g_queryInfo.iface = TAOSC_IFACE; + } else { + errorPrint("Invalid query_mode value: %s\n", + queryMode->valuestring); + return -1; + } + } + // init sqls + g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL)); + + // specified_table_query + tools_cJSON *specifiedQuery = tools_cJSON_GetObjectItem(json, "specified_table_query"); + if (specifiedQuery) { + code = readSpecQueryJson(specifiedQuery); + if(code) { + errorPrint("failed to readSpecQueryJson code=%d \n", code); + return code; + } + } + + // super_table_query + tools_cJSON *superQuery = tools_cJSON_GetObjectItem(json, "super_table_query"); + if (superQuery) { + code = readSuperQueryJson(superQuery); + if(code) { + errorPrint("failed to readSuperQueryJson code=%d \n", code); + return code; + } + } + + // only have one + const char* errType = "json config invalid:"; + if (specifiedQuery && superQuery) { + errorPrint("%s only appear one for 'specified_table_query' and 'super_table_query' \n", errType); + return -1; + } + + // must have one + if (specifiedQuery == NULL && superQuery == NULL ) { + errorPrint("%s must have one for 'specified_table_query' or 'super_table_query' \n", errType); + return -1; + } + + // succ + return 0; } #ifdef TD_VER_COMPATIBLE_3_0_0_0 @@ -2378,8 +2424,7 @@ static int getMetaFromTmqJsonFile(tools_cJSON *json) { } #endif -int getInfoFromJsonFile() { - char * file = g_arguments->metaFile; +int readJsonConfig(char * file) { int32_t code = -1; FILE * fp = fopen(file, "r"); if (!fp) { @@ -2430,20 +2475,18 @@ int getInfoFromJsonFile() { // read common item code = getMetaFromCommonJsonFile(root); if (INSERT_TEST == g_arguments->test_mode || CSVFILE_TEST == g_arguments->test_mode) { + // insert code = getMetaFromInsertJsonFile(root); -#ifdef TD_VER_COMPATIBLE_3_0_0_0 } else if (QUERY_TEST == g_arguments->test_mode) { -#else - } else { -#endif + // query memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); code = getMetaFromQueryJsonFile(root); -#ifdef TD_VER_COMPATIBLE_3_0_0_0 } else if (SUBSCRIBE_TEST == g_arguments->test_mode) { + // subscribe memset(&g_tmqInfo, 0, sizeof(STmqMetaInfo)); code = getMetaFromTmqJsonFile(root); -#endif } + PARSE_OVER: free(content); fclose(fp); diff --git a/src/benchMain.c b/src/benchMain.c index 7f590df9e..86ad795d0 100644 --- a/src/benchMain.c +++ b/src/benchMain.c @@ -25,7 +25,7 @@ tools_cJSON* root; #define CLIENT_INFO_LEN 20 static char g_client_info[CLIENT_INFO_LEN] = {0}; -int g_majorVersionOfClient = 0; +int32_t g_majorVersionOfClient = 0; // set flag if command passed, see ARG_OPT_ ??? uint64_t g_argFlag = 0; @@ -43,11 +43,6 @@ void* benchCancelHandler(void* arg) { g_arguments->terminate = true; toolsMsleep(10); - if (g_arguments->in_prompt || INSERT_TEST != g_arguments->test_mode) { - toolsMsleep(100); - postFreeResource(); - exit(EXIT_SUCCESS); - } return NULL; } #endif @@ -128,7 +123,11 @@ int main(int argc, char* argv[]) { #endif if (g_arguments->metaFile) { g_arguments->totalChildTables = 0; - if (getInfoFromJsonFile()) exit(EXIT_FAILURE); + if (readJsonConfig(g_arguments->metaFile)) { + errorPrint("failed to readJsonConfig %s\n", g_arguments->metaFile); + exitLog(); + return -1; + } } else { modifyArgument(); } diff --git a/src/benchQuery.c b/src/benchQuery.c index 566dff5a5..0de71d8b1 100644 --- a/src/benchQuery.c +++ b/src/benchQuery.c @@ -13,14 +13,16 @@ #include #include "benchLog.h" -extern int g_majorVersionOfClient; - -int selectAndGetResult(threadInfo *pThreadInfo, char *command) { +// query and get result record is true to total request +int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) { int ret = 0; + // user cancel if (g_arguments->terminate) { return -1; } + + // execute sql uint32_t threadID = pThreadInfo->threadID; char dbName[TSDB_DB_NAME_LEN] = {0}; tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN); @@ -30,142 +32,179 @@ int selectAndGetResult(threadInfo *pThreadInfo, char *command) { 0, g_arguments->port, false, pThreadInfo->sockfd, pThreadInfo->filePath); if (0 != retCode) { - errorPrint("====restful return fail, threadID[%u]\n", - threadID); + errorPrint("====restful return fail, threadID[%u]\n", threadID); ret = -1; } } else { + // query TAOS *taos = pThreadInfo->conn->taos; int64_t rows = 0; TAOS_RES *res = taos_query(taos, command); int code = taos_errno(res); if (res == NULL || code) { - if (YES_IF_FAILED == g_arguments->continueIfFail) { - warnPrint("failed to execute sql:%s, " - "code: 0x%08x, reason:%s\n", - command, code, taos_errstr(res)); - } else { - errorPrint("failed to execute sql:%s, " - "code: 0x%08x, reason:%s\n", - command, code, taos_errstr(res)); - ret = -1; - } + // failed query + errorPrint("failed to execute sql:%s, " + "code: 0x%08x, reason:%s\n", + command, code, taos_errstr(res)); + ret = -1; } else { - //if (strlen(pThreadInfo->filePath) > 0) { - rows = fetchResult(res, pThreadInfo); - //} + // succ query + if (record) + rows = fetchResult(res, pThreadInfo->filePath); + } + + // free result + if (res) { + taos_free_result(res); } - taos_free_result(res); debugPrint("query sql:%s rows:%"PRId64"\n", command, rows); } + + // record count + if (ret ==0) { + // succ + if (record) + pThreadInfo->nSucc ++; + } else { + // fail + if (record) + pThreadInfo->nFail ++; + + // continue option + if (YES_IF_FAILED == g_arguments->continueIfFail) { + ret = 0; // force continue + } + } + return ret; } -static void *mixedQuery(void *sarg) { - queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg; +// interlligent sleep +void autoSleep(uint64_t interval, uint64_t delay ) { + if (delay < interval * 1000) { + toolsMsleep((int32_t)(interval * 1000 - delay)); // ms + } +} + +// reset +int32_t resetQueryCache(qThreadInfo* pThreadInfo) { + // execute sql + if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); + return -1; + } + // succ + return 0; +} + + + +// +// --------------------------------- second levle funtion for Thread ----------------------------------- +// + +// show rela qps +int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) { + int64_t now = toolsGetTimestampMs(); + if (now - lastPrintTime > 10 * 1000) { + // real total + uint64_t totalQueried = pThreadInfo->nSucc; + if(g_arguments->continueIfFail == YES_IF_FAILED) { + totalQueried += pThreadInfo->nFail; + } + infoPrint( + "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n", + pThreadInfo->threadID, totalQueried, + (double)(totalQueried / ((now - startTs) / 1000.0))); + return now; + } else { + return lastPrintTime; + } +} + +// spec query mixed thread +static void *specQueryMixThread(void *sarg) { + qThreadInfo *pThreadInfo = (qThreadInfo*)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "mixedQuery"); + prctl(PR_SET_NAME, "specQueryMixThread"); #endif // use db if (g_queryInfo.dbName) { if (pThreadInfo->conn && pThreadInfo->conn->taos && taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) { - errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadId, g_queryInfo.dbName); + errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName); return NULL; } } - int64_t lastPrintTs = toolsGetTimestampMs(); - int64_t st; - int64_t et; + int64_t st = 0; + int64_t et = 0; + int64_t startTs = toolsGetTimestampMs(); + int64_t lastPrintTime = startTs; uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; + uint64_t interval = g_queryInfo.specifiedQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t)); for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) { SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); for (int j = 0; j < queryTimes; ++j) { - if (g_arguments->terminate) { - return NULL; + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; } + + // reset cache if (g_queryInfo.reset_query_cache) { - if (queryDbExecCall(pThreadInfo->conn, - "RESET QUERY CACHE")) { - errorPrint("%s() LN%d, reset query cache failed\n", - __func__, __LINE__); + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); return NULL; } } + + // execute sql st = toolsGetTimestampUs(); - if (g_queryInfo.iface == REST_IFACE) { - int retCode = postProceSql(sql->command, g_queryInfo.dbName, - 0, g_queryInfo.iface, 0, - g_arguments->port, - false, pThreadInfo->sockfd, ""); - if (retCode) { - errorPrint("thread[%d]: restful query <%s> failed\n", - pThreadInfo->threadId, sql->command); - continue; - } - } else { - TAOS_RES *res = taos_query(pThreadInfo->conn->taos, - sql->command); - if (res == NULL || taos_errno(res) != 0) { - if (YES_IF_FAILED == g_arguments->continueIfFail) { - warnPrint( - "thread[%d]: failed to execute sql :%s, " - "code: 0x%x, reason: %s\n", - pThreadInfo->threadId, - sql->command, - taos_errno(res), taos_errstr(res)); - } else { - errorPrint( - "thread[%d]: failed to execute sql :%s, " - "code: 0x%x, reason: %s\n", - pThreadInfo->threadId, - sql->command, - taos_errno(res), taos_errstr(res)); - if (TSDB_CODE_RPC_NETWORK_UNAVAIL == - taos_errno(res)) { - taos_free_result(res); - return NULL; - } - } - taos_free_result(res); - continue; - } - taos_free_result(res); + int ret = selectAndGetResult(pThreadInfo, sql->command, true); + if (ret) { + g_fail = true; + errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j); + return NULL; } et = toolsGetTimestampUs(); - int64_t* delay = benchCalloc(1, sizeof(int64_t), false); - *delay = et - st; - debugPrint("%s() LN%d, delay: %"PRId64"\n", - __func__, __LINE__, *delay); - - pThreadInfo->total_delay += (et - st); - if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){ - tmfree(delay); + + // sleep + if (interval > 0) { + autoSleep(interval, et - st); } - int64_t currentPrintTs = toolsGetTimestampMs(); - if (currentPrintTs - lastPrintTs > 10 * 1000) { - infoPrint("thread[%d] has currently complete query %d times\n", - pThreadInfo->threadId, - (int)pThreadInfo->query_delay_list->size); - lastPrintTs = currentPrintTs; + + // delay + if (ret == 0) { + int64_t* delay = benchCalloc(1, sizeof(int64_t), false); + *delay = et - st; + debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay); + + pThreadInfo->total_delay += *delay; + if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){ + tmfree(delay); + } } + + // real show + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } } + return NULL; } -static void *specifiedTableQuery(void *sarg) { - threadInfo *pThreadInfo = (threadInfo *)sarg; +// spec query thread +static void *specQueryThread(void *sarg) { + qThreadInfo *pThreadInfo = (qThreadInfo *)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "specTableQuery"); + prctl(PR_SET_NAME, "specQueryThread"); #endif uint64_t st = 0; uint64_t et = 0; - uint64_t minDelay = UINT64_MAX; - uint64_t maxDelay = 0; - uint64_t totalDelay = 0; int32_t index = 0; // use db @@ -179,13 +218,13 @@ static void *specifiedTableQuery(void *sarg) { } uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; - pThreadInfo->query_delay_list = benchCalloc(queryTimes, - sizeof(uint64_t), false); - uint64_t lastPrintTime = toolsGetTimestampMs(); - uint64_t startTs = toolsGetTimestampMs(); + uint64_t interval = g_queryInfo.specifiedQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t)); - SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, - pThreadInfo->querySeq); + uint64_t startTs = toolsGetTimestampMs(); + uint64_t lastPrintTime = startTs; + + SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq); if (sql->result[0] != '\0') { snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d", @@ -193,210 +232,328 @@ static void *specifiedTableQuery(void *sarg) { } while (index < queryTimes) { - // check cancel - if (g_arguments->terminate) { - return NULL; + // use cancel + if(g_arguments->terminate) { + infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID); + break; } - if (g_queryInfo.specifiedQueryInfo.queryInterval && - (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) { - toolsMsleep((int32_t)( - g_queryInfo.specifiedQueryInfo.queryInterval*1000 - - (et - st))); // ms - } + // reset cache if (g_queryInfo.reset_query_cache) { - // execute sql - if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE")) { - errorPrint("%s() LN%d, reset query cache failed\n", - __func__, __LINE__); + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); return NULL; } } + // execute sql st = toolsGetTimestampUs(); - int ret = selectAndGetResult(pThreadInfo, sql->command); + int ret = selectAndGetResult(pThreadInfo, sql->command, true); if (ret) { g_fail = true; + errorPrint("failed call spec selectAndGetResult, index=%d\n", index); + break; } - et = toolsGetTimestampUs(); - int64_t delay = et - st; - debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay); - if (ret == 0) { - pThreadInfo->query_delay_list[index] = delay; - pThreadInfo->totalQueried++; - } - index++; - totalDelay += delay; - if (delay > maxDelay) { - maxDelay = delay; - } - if (delay < minDelay) { - minDelay = delay; + // sleep + if (interval > 0) { + autoSleep(interval, et - st); } - uint64_t currentPrintTime = toolsGetTimestampMs(); - uint64_t endTs = toolsGetTimestampMs(); - if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) { - infoPrint( - "thread[%d] has currently completed queries: %" PRIu64 - ", QPS: %10.6f\n", - pThreadInfo->threadID, pThreadInfo->totalQueried, - (double)(pThreadInfo->totalQueried / - ((endTs - startTs) / 1000.0))); - lastPrintTime = currentPrintTime; - } + uint64_t delay = et - st; + debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay); - if (-2 == ret) { - toolsMsleep(1000); - return NULL; + if (ret == 0) { + // only succ add delay list + benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay); + pThreadInfo->total_delay += delay; } + index++; + + // real show + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } - qsort(pThreadInfo->query_delay_list, queryTimes, - sizeof(uint64_t), compare); - pThreadInfo->avg_delay = (double)totalDelay / queryTimes; + return NULL; } -static void *superTableQuery(void *sarg) { +// super table query thread +static void *stbQueryThread(void *sarg) { char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false); - threadInfo *pThreadInfo = (threadInfo *)sarg; + qThreadInfo *pThreadInfo = (qThreadInfo *)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "superTableQuery"); + prctl(PR_SET_NAME, "stbQueryThread"); #endif uint64_t st = 0; - uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000; + uint64_t et = 0; uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes; + uint64_t interval = g_queryInfo.superQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t)); + uint64_t startTs = toolsGetTimestampMs(); - - uint64_t lastPrintTime = toolsGetTimestampMs(); + uint64_t lastPrintTime = startTs; while (queryTimes--) { - if (g_queryInfo.superQueryInfo.queryInterval - && ((et - st) < - (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) { - toolsMsleep((int32_t) - (g_queryInfo.superQueryInfo.queryInterval*1000 - - (et - st))); + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; + } + + // reset cache + if (g_queryInfo.reset_query_cache) { + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); + return NULL; + } } + // execute st = toolsGetTimestampMs(); - for (int i = (int)pThreadInfo->start_table_from; - i <= pThreadInfo->end_table_to; i++) { + // for each table + for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; + } + + // for each sql for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN); - replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, - i); + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; + } + + // get real child name sql + if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) { + // fault + tmfree(sqlstr); + return NULL; + } + if (g_queryInfo.superQueryInfo.result[j][0] != '\0') { snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d", g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - if (selectAndGetResult(pThreadInfo, sqlstr)) { + + // execute sql + uint64_t s = toolsGetTimestampUs(); + int ret = selectAndGetResult(pThreadInfo, sqlstr, true); + if (ret) { + // found error + errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j); g_fail = true; + tmfree(sqlstr); + return NULL; } - - pThreadInfo->totalQueried++; - - int64_t currentPrintTime = toolsGetTimestampMs(); - int64_t endTs = toolsGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30 * 1000) { - infoPrint( - "thread[%d] has currently completed queries: %" PRIu64 - ", QPS: %10.3f\n", - pThreadInfo->threadID, pThreadInfo->totalQueried, - (double)(pThreadInfo->totalQueried / - ((endTs - startTs) / 1000.0))); - lastPrintTime = currentPrintTime; + uint64_t delay = toolsGetTimestampUs() - s; + debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay); + if (ret == 0) { + // only succ add delay list + benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay); + pThreadInfo->total_delay += delay; } + + // show real QPS + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } } et = toolsGetTimestampMs(); + + // sleep + if (interval > 0) { + autoSleep(interval, et - st); + } + } tmfree(sqlstr); + return NULL; } -static int multi_thread_super_table_query(uint16_t iface, char* dbName) { - int ret = -1; - pthread_t * pidsOfSub = NULL; - threadInfo *infosOfSub = NULL; - //==== create sub threads for query from all sub table of the super table - if ((g_queryInfo.superQueryInfo.sqlCount > 0) - && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt - *sizeof(pthread_t), - false); - infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt - *sizeof(threadInfo), false); - - int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; - int threads = g_queryInfo.superQueryInfo.threadCnt; - - int64_t a = ntables / threads; - if (a < 1) { - threads = (int)ntables; - a = 1; - } - - int64_t b = 0; - if (threads != 0) { - b = ntables % threads; - } - - uint64_t tableFrom = 0; - for (int i = 0; i < threads; i++) { - threadInfo *pThreadInfo = infosOfSub + i; - pThreadInfo->threadID = i; - pThreadInfo->start_table_from = tableFrom; - pThreadInfo->ntables = i < b ? a + 1 : a; - pThreadInfo->end_table_to = - i < b ? tableFrom + a : tableFrom + a - 1; - tableFrom = pThreadInfo->end_table_to + 1; - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - if (sockfd < 0) { - goto OVER; - } - pThreadInfo->sockfd = sockfd; - } else { - pThreadInfo->conn = initBenchConn(); - if (pThreadInfo->conn == NULL) { - goto OVER; - } - } - pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); - } - g_queryInfo.superQueryInfo.threadCnt = threads; - - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - if (!g_arguments->terminate) - pthread_join(pidsOfSub[i], NULL); - threadInfo *pThreadInfo = infosOfSub + i; - if (iface == REST_IFACE) { - destroySockFd(pThreadInfo->sockfd); - } else { - closeBenchConn(pThreadInfo->conn); - } - if (g_fail) { - goto OVER; - } - } - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) { - g_queryInfo.superQueryInfo.totalQueried - += infosOfSub[i].totalQueried; +// +// --------------------------------- firse level function ------------------------------ +// + +void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) { + // valid check + if (infos == NULL || threadCnt == 0) { + return ; + } + + // statistic + BArray * delay_list = benchArrayInit(1, sizeof(int64_t)); + double total_delays = 0; + + // clear + for (int i = 0; i < threadCnt; ++i) { + qThreadInfo * pThreadInfo = infos + i; + if(pThreadInfo->query_delay_list == NULL) { + continue;; } + + // append delay + benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData, + pThreadInfo->query_delay_list->size, false); + total_delays += pThreadInfo->total_delay; + + // free delay + benchArrayDestroy(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + + } + + // succ is zero + if (delay_list->size == 0) { + errorPrint("%s", "succ queries count is zero.\n"); + benchArrayDestroy(delay_list); + return ; + } + + + // sort + qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare); + + // show delay min max + if (delay_list->size) { + infoPrint( + "spend %.6fs using " + "%d threads complete query %d times, " + "min delay: %.6fs, " + "avg delay: %.6fs, " + "p90: %.6fs, " + "p95: %.6fs, " + "p99: %.6fs, " + "max: %.6fs\n", + spend/1E6, + threadCnt, (int)delay_list->size, + *(int64_t *)(benchArrayGet(delay_list, 0))/1E6, + (double)total_delays/delay_list->size/1E6, + *(int64_t *)(benchArrayGet(delay_list, + (int32_t)(delay_list->size * 0.9)))/1E6, + *(int64_t *)(benchArrayGet(delay_list, + (int32_t)(delay_list->size * 0.95)))/1E6, + *(int64_t *)(benchArrayGet(delay_list, + (int32_t)(delay_list->size * 0.99)))/1E6, + *(int64_t *)(benchArrayGet(delay_list, + (int32_t)(delay_list->size - 1)))/1E6); } else { + errorPrint("%s() LN%d, delay_list size: %"PRId64"\n", + __func__, __LINE__, (int64_t)delay_list->size); + } + benchArrayDestroy(delay_list); +} + +// +// super table query +// +static int stbQuery(uint16_t iface, char* dbName) { + int ret = -1; + pthread_t * pidsOfSub = NULL; + qThreadInfo *threadInfos = NULL; + g_queryInfo.superQueryInfo.totalQueried = 0; + g_queryInfo.superQueryInfo.totalFail = 0; + + // check + if ((g_queryInfo.superQueryInfo.sqlCount == 0) + || (g_queryInfo.superQueryInfo.threadCnt == 0)) { return 0; } + // malloc + pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt + *sizeof(pthread_t), + false); + threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt + *sizeof(qThreadInfo), false); + + int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; + int nConcurrent = g_queryInfo.superQueryInfo.threadCnt; + + int64_t a = ntables / nConcurrent; + if (a < 1) { + nConcurrent = (int)ntables; + a = 1; + } + + int64_t b = 0; + if (nConcurrent != 0) { + b = ntables % nConcurrent; + } + + uint64_t tableFrom = 0; + int threadCnt = 0; + for (int i = 0; i < nConcurrent; i++) { + qThreadInfo *pThreadInfo = threadInfos + i; + pThreadInfo->threadID = i; + pThreadInfo->start_table_from = tableFrom; + pThreadInfo->ntables = i < b ? a + 1 : a; + pThreadInfo->end_table_to = + i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; + // create conn + if (initQueryConn(pThreadInfo, iface)){ + break; + } + int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo); + if (code != 0) { + errorPrint("failed stbQueryThread create. error code =%d \n", code); + break; + } + threadCnt ++; + } + + bool needExit = false; + // if failed, set termainte flag true like ctrl+c exit + if (threadCnt != nConcurrent ) { + needExit = true; + g_arguments->terminate = true; + goto OVER; + } + + // reset total + g_queryInfo.superQueryInfo.totalQueried = 0; + g_queryInfo.superQueryInfo.totalFail = 0; + + // real thread count + g_queryInfo.superQueryInfo.threadCnt = threadCnt; + int64_t start = toolsGetTimestampUs(); + + for (int i = 0; i < threadCnt; i++) { + pthread_join(pidsOfSub[i], NULL); + qThreadInfo *pThreadInfo = threadInfos + i; + // add succ + g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + // "yes" need add fail cnt + g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail; + g_queryInfo.superQueryInfo.totalFail += pThreadInfo->nFail; + } + + // close conn + closeQueryConn(pThreadInfo, iface); + } + int64_t end = toolsGetTimestampUs(); + + if (needExit) { + goto OVER; + } + + // total show + totalChildQuery(threadInfos, threadCnt, end - start); + ret = 0; + OVER: tmfree((char *)pidsOfSub); - tmfree((char *)infosOfSub); + tmfree((char *)threadInfos); for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) { tmfree(g_queryInfo.superQueryInfo.childTblName[i]); @@ -405,51 +562,39 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) { return ret; } -// free g_queryInfo.specailQueryInfo memory , can re-call -void freeSpecialQueryInfo() { - // can re-call - if (g_queryInfo.specifiedQueryInfo.sqls == NULL) { - return; - } - - // loop free each item memory - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { - SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); - tmfree(sql->command); - tmfree(sql->delay_list); - } - - // free Array - benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); - g_queryInfo.specifiedQueryInfo.sqls = NULL; -} - - -static int multi_thread_specified_table_query(uint16_t iface, char* dbName) { - pthread_t * pids = NULL; - threadInfo *infos = NULL; - //==== create sub threads for query from specify table - int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; +// +// specQuery +// +static int specQuery(uint16_t iface, char* dbName) { + int ret = -1; + pthread_t *pids = NULL; + qThreadInfo *infos = NULL; + int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size; + g_queryInfo.specifiedQueryInfo.totalQueried = 0; + g_queryInfo.specifiedQueryInfo.totalFail = 0; // check invaid if(nSqlCount == 0 || nConcurrent == 0 ) { if(nSqlCount == 0) warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount); if(nConcurrent == 0) - warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent); + warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent); return 0; } - // malloc funciton global memory + // malloc threads memory pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false); - infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false); + infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false); - bool exeError = false; for (uint64_t i = 0; i < nSqlCount; i++) { + if( g_arguments->terminate ) { + break; + } + // reset memset(pids, 0, nConcurrent * sizeof(pthread_t)); - memset(infos, 0, nConcurrent * sizeof(threadInfo)); + memset(infos, 0, nConcurrent * sizeof(qThreadInfo)); // get execute sql SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); @@ -457,307 +602,266 @@ static int multi_thread_specified_table_query(uint16_t iface, char* dbName) { // create threads int threadCnt = 0; for (int j = 0; j < nConcurrent; j++) { - threadInfo *pThreadInfo = infos + j; + qThreadInfo *pThreadInfo = infos + j; pThreadInfo->threadID = i * nConcurrent + j; pThreadInfo->querySeq = i; - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - // int iMode = 1; - // ioctl(sockfd, FIONBIO, &iMode); - if (sockfd < 0) { - exeError = true; - break; - } - pThreadInfo->sockfd = sockfd; - } else { - pThreadInfo->conn = initBenchConn(); - if (pThreadInfo->conn == NULL) { - destroySockFd(pThreadInfo->sockfd); - exeError = true; - break; - } + // create conn + if (initQueryConn(pThreadInfo, iface)) { + break; } - pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo); + int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo); + if (code != 0) { + errorPrint("failed specQueryThread create. error code =%d \n", code); + break; + } threadCnt++; } + bool needExit = false; // if failed, set termainte flag true like ctrl+c exit - if (exeError) { - errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i); + if (threadCnt != nConcurrent ) { + needExit = true; g_arguments->terminate = true; } + int64_t start = toolsGetTimestampUs(); // wait threads execute finished one by one for (int j = 0; j < threadCnt ; j++) { pthread_join(pids[j], NULL); - threadInfo *pThreadInfo = infos + j; - if (iface == REST_IFACE) { -#ifdef WINDOWS - closesocket(pThreadInfo->sockfd); - WSACleanup(); -#else - close(pThreadInfo->sockfd); -#endif - } else { - closeBenchConn(pThreadInfo->conn); - pThreadInfo->conn = NULL; - } + qThreadInfo *pThreadInfo = infos + j; + closeQueryConn(pThreadInfo, iface); // need exit in loop - if (g_fail || g_arguments->terminate) { + if (needExit) { // free BArray - tmfree(pThreadInfo->query_delay_list); + benchArrayDestroy(pThreadInfo->query_delay_list); pThreadInfo->query_delay_list = NULL; } } + int64_t spend = toolsGetTimestampUs() - start; + if(spend == 0) { + // avoid xx/spend expr throw error + spend = 1; + } + + // create + if (needExit) { + errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d, exit testing.\n", nConcurrent, threadCnt); + goto OVER; + } + + // + // show QPS and P90 ... + // + uint64_t n = 0; + double total_delays = 0.0; + uint64_t totalQueried = 0; + uint64_t totalFail = 0; + for (int j = 0; j < threadCnt; j++) { + qThreadInfo *pThreadInfo = infos + j; + if(pThreadInfo->query_delay_list == NULL) { + continue;; + } + + // total one sql + for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) { + int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k); + sql->delay_list[n++] = *delay; + total_delays += *delay; + } - // cancel or need exit check - if (g_fail || g_arguments->terminate) { - // free current funciton malloc memory - tmfree((char *)pids); - tmfree((char *)infos); - // free global - freeSpecialQueryInfo(); - return -1; - } - - // execute successfully - uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes; - uint64_t totalQueryTimes = query_times * nConcurrent; - double avg_delay = 0.0; - for (int j = 0; j < nConcurrent; j++) { - threadInfo *pThreadInfo = infos + j; - avg_delay += pThreadInfo->avg_delay; - for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) { - sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k]; + // total queries + totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + totalQueried += pThreadInfo->nFail; + totalFail += pThreadInfo->nFail; } - // free BArray - tmfree(pThreadInfo->query_delay_list); + // free BArray query_delay_list + benchArrayDestroy(pThreadInfo->query_delay_list); pThreadInfo->query_delay_list = NULL; } - avg_delay /= nConcurrent; - qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare); - infoPrintNoTimestamp("complete query with %d threads and %" PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, avg_delay / 1E6, /* avg */ - sql->delay_list[0] / 1E6, /* min */ - sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ + + // appand current sql + g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried; + g_queryInfo.specifiedQueryInfo.totalFail += totalFail; + + // succ is zero + if(totalQueried == 0 || n == 0) { + errorPrint("%s", "succ queries count is zero.\n"); + goto OVER; + } + + qsort(sql->delay_list, n, sizeof(uint64_t), compare); + int32_t bufLen = strlen(sql->command) + 512; + char * buf = benchCalloc(bufLen, sizeof(char), false); + snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " " + "sql %"PRIu64" spend %.6fs QPS: %.3f " + "query delay " + "avg: %.6fs " + "min: %.6fs " + "max: %.6fs " + "p90: %.6fs " + "p95: %.6fs " + "p99: %.6fs " + "SQL command: %s \n", + threadCnt, totalQueried, + i + 1, spend/1E6, totalQueried / (spend/1E6), + total_delays/n/1E6, /* avg */ + sql->delay_list[0] / 1E6, /* min */ + sql->delay_list[n - 1] / 1E6, /* max */ /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, + sql->delay_list[(uint64_t)(n * 0.90)] / 1E6, /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, + sql->delay_list[(uint64_t)(n * 0.95)] / 1E6, /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); - infoPrintNoTimestampToFile("complete query with %d threads and %" PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, avg_delay / 1E6, /* avg */ - sql->delay_list[0] / 1E6, /* min */ - sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ - /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, - /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, - /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); - } - - g_queryInfo.specifiedQueryInfo.totalQueried = - nSqlCount * g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent; + sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, + sql->command); + + infoPrintNoTimestamp("%s", buf); + infoPrintNoTimestampToFile("%s", buf); + tmfree(buf); + } + ret = 0; + +OVER: tmfree((char *)pids); tmfree((char *)infos); // free specialQueryInfo freeSpecialQueryInfo(); - return 0; + return ret; } -static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) { - int code = -1; - int thread = g_queryInfo.specifiedQueryInfo.concurrent; - pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true); - queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true); +// +// specQueryMix +// +static int specQueryMix(uint16_t iface, char* dbName) { + // init + int ret = -1; + int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; + pthread_t * pids = benchCalloc(nConcurrent, sizeof(pthread_t), true); + qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true); + + // concurent calc int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size; - int start_sql = 0; - int a = total_sql_num / thread; + int start_sql = 0; + int a = total_sql_num / nConcurrent; if (a < 1) { - thread = total_sql_num; + warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent); + nConcurrent = total_sql_num; a = 1; } int b = 0; - if (thread != 0) { - b = total_sql_num % thread; - } - for (int i = 0; i < thread; ++i) { - queryThreadInfo *pQueryThreadInfo = infos + i; - pQueryThreadInfo->threadId = i; - pQueryThreadInfo->start_sql = start_sql; - pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1; - start_sql = pQueryThreadInfo->end_sql + 1; - pQueryThreadInfo->total_delay = 0; - pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t)); - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - if (sockfd < 0) { - goto OVER; - } - pQueryThreadInfo->sockfd = sockfd; - } else { - pQueryThreadInfo->conn = initBenchConn(); - if (pQueryThreadInfo->conn == NULL) { - goto OVER; - } - } - pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo); + if (nConcurrent != 0) { + b = total_sql_num % nConcurrent; + } + + // + // running + // + int threadCnt = 0; + for (int i = 0; i < nConcurrent; ++i) { + qThreadInfo *pThreadInfo = infos + i; + pThreadInfo->threadID = i; + pThreadInfo->start_sql = start_sql; + pThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1; + start_sql = pThreadInfo->end_sql + 1; + pThreadInfo->total_delay = 0; + + // create conn + if (initQueryConn(pThreadInfo, iface)){ + break; + } + // main run + int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo); + if (code != 0) { + errorPrint("failed specQueryMixThread create. error code =%d \n", code); + break; + } + + threadCnt ++; + } + + bool needExit = false; + // if failed, set termainte flag true like ctrl+c exit + if (threadCnt != nConcurrent) { + needExit = true; + g_arguments->terminate = true; } + // reset total + g_queryInfo.specifiedQueryInfo.totalQueried = 0; + g_queryInfo.specifiedQueryInfo.totalFail = 0; + int64_t start = toolsGetTimestampUs(); - for (int i = 0; i < thread; ++i) { + for (int i = 0; i < threadCnt; ++i) { pthread_join(pids[i], NULL); - } - int64_t end = toolsGetTimestampUs(); + qThreadInfo *pThreadInfo = infos + i; + closeQueryConn(pThreadInfo, iface); - // statistic - BArray * delay_list = benchArrayInit(1, sizeof(int64_t)); - int64_t total_delay = 0; - for (int i = 0; i < thread; ++i) { - queryThreadInfo * pThreadInfo = infos + i; - benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData, - pThreadInfo->query_delay_list->size, true); - total_delay += pThreadInfo->total_delay; - tmfree(pThreadInfo->query_delay_list); - pThreadInfo->query_delay_list = NULL; + // total queries + g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + // yes need add failed count + g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail; + g_queryInfo.specifiedQueryInfo.totalFail += pThreadInfo->nFail; + } - if (iface == REST_IFACE) { -#ifdef WINDOWS - closesocket(pThreadInfo->sockfd); - WSACleanup(); -#else - close(pThreadInfo->sockfd); -#endif - } else { - closeBenchConn(pThreadInfo->conn); + // destory + if (needExit) { + benchArrayDestroy(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; } } - qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare); - if (delay_list->size) { - infoPrint( - "spend %.6fs using " - "%d threads complete query %d times, " - "min delay: %.6fs, " - "avg delay: %.6fs, " - "p90: %.6fs, " - "p95: %.6fs, " - "p99: %.6fs, " - "max: %.6fs\n", - (end - start)/1E6, - thread, (int)delay_list->size, - *(int64_t *)(benchArrayGet(delay_list, 0))/1E6, - (double)total_delay/delay_list->size/1E6, - *(int64_t *)(benchArrayGet(delay_list, - (int32_t)(delay_list->size * 0.9)))/1E6, - *(int64_t *)(benchArrayGet(delay_list, - (int32_t)(delay_list->size * 0.95)))/1E6, - *(int64_t *)(benchArrayGet(delay_list, - (int32_t)(delay_list->size * 0.99)))/1E6, - *(int64_t *)(benchArrayGet(delay_list, - (int32_t)(delay_list->size - 1)))/1E6); - } else { - errorPrint("%s() LN%d, delay_list size: %"PRId64"\n", - __func__, __LINE__, (int64_t)delay_list->size); + int64_t end = toolsGetTimestampUs(); + + // create + if (needExit) { + errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d, exit testing.\n", nConcurrent, threadCnt); + goto OVER; } - benchArrayDestroy(delay_list); - code = 0; - g_queryInfo.specifiedQueryInfo.totalQueried = - g_queryInfo.specifiedQueryInfo.sqls->size * g_queryInfo.specifiedQueryInfo.queryTimes * thread; + + // statistic + totalChildQuery(infos, threadCnt, end - start); + ret = 0; OVER: tmfree(pids); tmfree(infos); - return code; -} -#define KILLID_LEN 20 + // free sqls + freeSpecialQueryInfo(); -void *queryKiller(void *arg) { - char host[MAX_HOSTNAME_LEN] = {0}; - tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN); + return ret; +} - while (true) { - TAOS *taos = taos_connect(g_arguments->host, g_arguments->user, - g_arguments->password, NULL, g_arguments->port); - if (NULL == taos) { - errorPrint("Slow query killer thread " - "failed to connect to the server %s\n", - g_arguments->host); - return NULL; - } +// total query for end +void totalQuery(int64_t spends) { + // total QPS + uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + + g_queryInfo.superQueryInfo.totalQueried; - char command[TSDB_MAX_ALLOWED_SQL_LEN] = - "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries"; - TAOS_RES *res = taos_query(taos, command); - int32_t code = taos_errno(res); - if (code) { - printErrCmdCodeStr(command, code, res); - } - - TAOS_ROW row = NULL; - while ((row = taos_fetch_row(res)) != NULL) { - int32_t *lengths = taos_fetch_lengths(res); - if (lengths[0] <= 0) { - infoPrint("No valid query found by %s\n", command); - } else { - int64_t execUSec = *(int64_t*)row[1]; - - if (execUSec > g_queryInfo.killQueryThreshold * 1000000) { - char sql[SHORT_1K_SQL_BUFF_LEN] = {0}; - tstrncpy(sql, (char*)row[2], - min(strlen((char*)row[2])+1, - SHORT_1K_SQL_BUFF_LEN)); - - char killId[KILLID_LEN] = {0}; - tstrncpy(killId, (char*)row[0], - min(strlen((char*)row[0])+1, KILLID_LEN)); - char killCommand[KILLID_LEN + 15] = {0}; - snprintf(killCommand, KILLID_LEN + 15, - "KILL QUERY '%s'", killId); - TAOS_RES *resKill = taos_query(taos, killCommand); - int32_t codeKill = taos_errno(resKill); - if (codeKill) { - printErrCmdCodeStr(killCommand, codeKill, resKill); - } else { - infoPrint("%s succeed, sql: %s killed!\n", - killCommand, sql); - taos_free_result(resKill); - } - } - } + // error rate + char errRate[128] = ""; + if(g_arguments->continueIfFail == YES_IF_FAILED) { + uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail; + if (totalQueried > 0) { + snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried); } - - taos_free_result(res); - taos_close(taos); - toolsMsleep(g_queryInfo.killQueryInterval*1000); } - return NULL; + // show + double tInS = (double)spends / 1000; + char buf[512] = ""; + snprintf(buf, sizeof(buf), + "Spend %.4f second completed total queries: %" PRIu64 + ", the QPS of all threads: %10.3f%s\n\n", + tInS, totalQueried, (double)totalQueried / tInS, errRate); + infoPrint("%s", buf); + infoPrintToFile("%s", buf); } int queryTestProcess() { @@ -767,13 +871,15 @@ int queryTestProcess() { encodeAuthBase64(); } - pthread_t pidKiller = {0}; + // kill sql for executing seconds over "kill_slow_query_threshold" if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) { - pthread_create(&pidKiller, NULL, queryKiller, NULL); - pthread_join(pidKiller, NULL); - toolsMsleep(1000); + int32_t ret = killSlowQuery(); + if (ret != 0) { + return ret; + } } + // covert addr if (g_queryInfo.iface == REST_IFACE) { if (convertHostToServAddr(g_arguments->host, g_arguments->port + TSDB_PORT_HTTP, @@ -783,105 +889,47 @@ int queryTestProcess() { } } + // fetch child name if super table if ((g_queryInfo.superQueryInfo.sqlCount > 0) && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - SBenchConn* conn = initBenchConn(); - if (conn == NULL) { + int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); + if (ret != 0) { + errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); return -1; } - char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0"; - if (3 == g_majorVersionOfClient) { - snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, - "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)", - g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); + } + + // + // start running + // + + + uint64_t startTs = toolsGetTimestampMs(); + if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) { + // specified table + if (g_queryInfo.specifiedQueryInfo.mixed_query) { + // mixed + if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) { + return -1; + } } else { - snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, - "SELECT COUNT(TBNAME) FROM %s.%s", - g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); - } - TAOS_RES *res = taos_query(conn->taos, cmd); - int32_t code = taos_errno(res); - if (code) { - printErrCmdCodeStr(cmd, code, res); - closeBenchConn(conn); - return -1; - } - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(res); - TAOS_FIELD *fields = taos_fetch_fields(res); - while ((row = taos_fetch_row(res)) != NULL) { - if (0 == strlen((char *)(row[0]))) { - errorPrint("stable %s have no child table\n", - g_queryInfo.superQueryInfo.stbName); - taos_free_result(res); - closeBenchConn(conn); + // no mixied + if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) { return -1; } - char temp[256] = {0}; - taos_print_row(temp, row, fields, num_fields); - g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp); - } - infoPrint("%s's childTblCount: %" PRId64 "\n", - g_queryInfo.superQueryInfo.stbName, - g_queryInfo.superQueryInfo.childTblCount); - taos_free_result(res); - g_queryInfo.superQueryInfo.childTblName = - benchCalloc(g_queryInfo.superQueryInfo.childTblCount, - sizeof(char *), false); - if (getAllChildNameOfSuperTable( - conn->taos, g_queryInfo.dbName, - g_queryInfo.superQueryInfo.stbName, - g_queryInfo.superQueryInfo.childTblName, - g_queryInfo.superQueryInfo.childTblCount)) { - tmfree(g_queryInfo.superQueryInfo.childTblName); - closeBenchConn(conn); - return -1; } - closeBenchConn(conn); - } - uint64_t startTs = toolsGetTimestampMs(); - if (g_queryInfo.specifiedQueryInfo.mixed_query) { - if (multi_thread_specified_mixed_query(g_queryInfo.iface, - g_queryInfo.dbName)) { + } else if(g_queryInfo.superQueryInfo.sqlCount > 0) { + // super table + if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) { return -1; } } else { - if (multi_thread_specified_table_query(g_queryInfo.iface, - g_queryInfo.dbName)) { - return -1; - } - } - if (multi_thread_super_table_query(g_queryInfo.iface, - g_queryInfo.dbName)) { + // nothing + errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty."); return -1; } - // workaround to use separate taos connection; - uint64_t endTs = toolsGetTimestampMs(); - int64_t t = endTs - startTs; - double tInS = (double)t / 1000.0; - - // specifiedQuery - if (g_queryInfo.specifiedQueryInfo.totalQueried) { - infoPrint("Total specified queries: %" PRIu64 "\n", - g_queryInfo.specifiedQueryInfo.totalQueried); - } - - // superQuery - if (g_queryInfo.superQueryInfo.totalQueried) { - infoPrint("Total super queries: %" PRIu64 "\n", - g_queryInfo.superQueryInfo.totalQueried); - } - // total QPS - uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried - + g_queryInfo.superQueryInfo.totalQueried; - infoPrint( - "Spend %.4f second completed total queries: %" PRIu64 - ", the QPS of all threads: %10.3f\n\n", - tInS, totalQueried, (double)totalQueried / tInS); - infoPrintToFile( - "Spend %.4f second completed total queries: %" PRIu64 - ", the QPS of all threads: %10.3f\n\n", - tInS, totalQueried, (double)totalQueried / tInS); + // total + totalQuery(toolsGetTimestampMs() - startTs); return 0; } diff --git a/src/benchSubscribe.c b/src/benchSubscribe.c index 011d54584..7f0d7e157 100644 --- a/src/benchSubscribe.c +++ b/src/benchSubscribe.c @@ -23,7 +23,7 @@ static void stable_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param, return; } - if (param) fetchResult(res, (threadInfo *)param); + if (param) fetchResult(res, ((threadInfo *)param)->filePath); // tao_unsubscribe() will free result. } @@ -35,7 +35,7 @@ static void specified_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param, return; } - if (param) fetchResult(res, (threadInfo *)param); + if (param) fetchResult(res, ((threadInfo *)param)->filePath); // tao_unsubscribe() will free result. } @@ -127,7 +127,7 @@ static void *specifiedSubscribe(void *sarg) { } fetchResult( g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], - pThreadInfo); + pThreadInfo->filePath); g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID]++; if ((g_queryInfo.specifiedQueryInfo @@ -247,7 +247,7 @@ static void *superSubscribe(void *sarg) { .result[pThreadInfo->querySeq], pThreadInfo->threadID); } - fetchResult(res, pThreadInfo); + fetchResult(res, pThreadInfo->filePath); consumed[tsubSeq]++; if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) && diff --git a/src/benchUtil.c b/src/benchUtil.c index f541d1d6d..900f0ded7 100644 --- a/src/benchUtil.c +++ b/src/benchUtil.c @@ -240,19 +240,24 @@ static void appendResultBufToFile(char *resultBuf, char * filePath) { tmfclose(fp); } -void replaceChildTblName(char *inSql, char *outSql, int tblIndex) { - char sourceString[32] = "xxxx"; - char *pos = strstr(inSql, sourceString); - if (0 == pos) return; +int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) { + // child table mark + char mark[32] = "xxxx"; + char *pos = strstr(inSql, mark); + if (0 == pos) { + errorPrint("sql format error, sql not found mark string '%s'", mark); + return -1; + } char subTblName[TSDB_TABLE_NAME_LEN]; snprintf(subTblName, TSDB_TABLE_NAME_LEN, - "%s.%s", g_queryInfo.dbName, + "`%s`.%s", g_queryInfo.dbName, g_queryInfo.superQueryInfo.childTblName[tblIndex]); tstrncpy(outSql, inSql, pos - inSql + 1); - snprintf(outSql + strlen(outSql), TSDB_MAX_ALLOWED_SQL_LEN -1, - "%s%s", subTblName, pos + strlen(sourceString)); + snprintf(outSql + (pos - inSql), TSDB_MAX_ALLOWED_SQL_LEN - 1, + "%s%s", subTblName, pos + strlen(mark)); + return 0; } int64_t toolsGetTimestamp(int32_t precision) { @@ -822,14 +827,14 @@ int postProceSql(char *sqlstr, char* dbName, int precision, int iface, } // fetch result fo file or nothing -int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { +int64_t fetchResult(TAOS_RES *res, char * filePath) { TAOS_ROW row = NULL; int num_fields = 0; int64_t totalLen = 0; TAOS_FIELD *fields = 0; int64_t rows = 0; char *databuf = NULL; - bool toFile = strlen(pThreadInfo->filePath) > 0; + bool toFile = strlen(filePath) > 0; if(toFile) { @@ -843,7 +848,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { if (toFile) { if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) { // buff is full - appendResultBufToFile(databuf, pThreadInfo->filePath); + appendResultBufToFile(databuf, filePath); totalLen = 0; memset(databuf, 0, FETCH_BUFFER_SIZE); } @@ -862,7 +867,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { // end if (toFile) { - appendResultBufToFile(databuf, pThreadInfo->filePath); + appendResultBufToFile(databuf, filePath); free(databuf); } return rows; @@ -1104,7 +1109,7 @@ static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) { } void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) { - if (pData == NULL) { + if (pData == NULL || elems <=0) { return NULL; } @@ -1125,6 +1130,11 @@ FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) { return benchArrayAddBatch(pArray, pData, 1, true); } +FORCE_INLINE void* benchArrayPushNoFree(BArray* pArray, void* pData) { + return benchArrayAddBatch(pArray, pData, 1, false); +} + + void* benchArrayDestroy(BArray* pArray) { if (pArray) { tmfree(pArray->pData); @@ -1304,7 +1314,7 @@ void destroySockFd(int sockfd) { FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { char buff[512]; char *msg = cmd; - if (strlen(cmd) > sizeof(msg)) { + if (strlen(cmd) >= sizeof(buff)) { memcpy(buff, cmd, 500); buff[500] = 0; strcat(buff, "..."); @@ -1549,4 +1559,206 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) { return h1; } -#endif \ No newline at end of file +#endif + + +// +// ---------------- benchQuery util ---------------------- +// + +// init conn +int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface) { + // create conn + if (iface == REST_IFACE) { + int sockfd = createSockFd(); + if (sockfd < 0) { + return -1; + } + pThreadInfo->sockfd = sockfd; + } else { + pThreadInfo->conn = initBenchConn(); + if (pThreadInfo->conn == NULL) { + return -1; + } + } + + return 0; +} + +// close conn +void closeQueryConn(qThreadInfo * pThreadInfo, int iface) { + if (iface == REST_IFACE) { +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } else { + closeBenchConn(pThreadInfo->conn); + pThreadInfo->conn = NULL; + } +} + + +// free g_queryInfo.specailQueryInfo memory , can re-call +void freeSpecialQueryInfo() { + // can re-call + if (g_queryInfo.specifiedQueryInfo.sqls == NULL) { + return; + } + + // loop free each item memory + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { + SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); + tmfree(sql->command); + tmfree(sql->delay_list); + } + + // free Array + benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); + g_queryInfo.specifiedQueryInfo.sqls = NULL; +} + + +#define KILLID_LEN 64 + +void *queryKiller(void *arg) { + char host[MAX_HOSTNAME_LEN] = {0}; + tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN); + + while (true) { + TAOS *taos = taos_connect(g_arguments->host, g_arguments->user, + g_arguments->password, NULL, g_arguments->port); + if (NULL == taos) { + errorPrint("Slow query killer thread " + "failed to connect to the server %s\n", + g_arguments->host); + return NULL; + } + + char command[TSDB_MAX_ALLOWED_SQL_LEN] = + "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries"; + TAOS_RES *res = taos_query(taos, command); + int32_t code = taos_errno(res); + if (code) { + printErrCmdCodeStr(command, code, res); + } + + TAOS_ROW row = NULL; + while ((row = taos_fetch_row(res)) != NULL) { + int32_t *lengths = taos_fetch_lengths(res); + if (lengths[0] <= 0) { + infoPrint("No valid query found by %s\n", command); + } else { + int64_t execUSec = *(int64_t*)row[1]; + + if (execUSec > g_queryInfo.killQueryThreshold * 1000000) { + char sql[SHORT_1K_SQL_BUFF_LEN] = {0}; + tstrncpy(sql, (char*)row[2], + min(strlen((char*)row[2])+1, + SHORT_1K_SQL_BUFF_LEN)); + + char killId[KILLID_LEN] = {0}; + tstrncpy(killId, (char*)row[0], + min(strlen((char*)row[0])+1, KILLID_LEN)); + char killCommand[KILLID_LEN + 32] = {0}; + snprintf(killCommand, sizeof(killCommand), "KILL QUERY '%s'", killId); + TAOS_RES *resKill = taos_query(taos, killCommand); + int32_t codeKill = taos_errno(resKill); + if (codeKill) { + printErrCmdCodeStr(killCommand, codeKill, resKill); + } else { + infoPrint("%s succeed, sql: %s killed!\n", + killCommand, sql); + taos_free_result(resKill); + } + } + } + } + + taos_free_result(res); + taos_close(taos); + toolsMsleep(g_queryInfo.killQueryInterval*1000); + } + + return NULL; +} + +// kill show +int killSlowQuery() { + pthread_t pidKiller = {0}; + int32_t ret = pthread_create(&pidKiller, NULL, queryKiller, NULL); + if (ret != 0) { + errorPrint("pthread_create failed create queryKiller thread. error code =%d \n", ret); + return -1; + } + pthread_join(pidKiller, NULL); + toolsMsleep(1000); + return 0; +} + +// fetch super table child name from server +int fetchChildTableName(char *dbName, char *stbName) { + SBenchConn* conn = initBenchConn(); + if (conn == NULL) { + return -1; + } + + // get child count + char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0"; + if (3 == g_majorVersionOfClient) { + snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, + "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM `%s`.`%s`)", + dbName, stbName); + } else { + snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, + "SELECT COUNT(TBNAME) FROM `%s`.`%s`", + dbName, stbName); + } + TAOS_RES *res = taos_query(conn->taos, cmd); + int32_t code = taos_errno(res); + if (code) { + printErrCmdCodeStr(cmd, code, res); + closeBenchConn(conn); + return -1; + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(res); + TAOS_FIELD *fields = taos_fetch_fields(res); + while ((row = taos_fetch_row(res)) != NULL) { + if (0 == strlen((char *)(row[0]))) { + errorPrint("stable %s have no child table\n", stbName); + taos_free_result(res); + closeBenchConn(conn); + return -1; + } + char temp[256] = {0}; + taos_print_row(temp, row, fields, num_fields); + + // set child table count + g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp); + } + infoPrint("%s's childTblCount: %" PRId64 "\n", stbName, g_queryInfo.superQueryInfo.childTblCount); + taos_free_result(res); + + // malloc memory with child table count + g_queryInfo.superQueryInfo.childTblName = + benchCalloc(g_queryInfo.superQueryInfo.childTblCount, + sizeof(char *), false); + // fetch child table name + if (getAllChildNameOfSuperTable( + conn->taos, dbName, stbName, + g_queryInfo.superQueryInfo.childTblName, + g_queryInfo.superQueryInfo.childTblCount)) { + // faild + tmfree(g_queryInfo.superQueryInfo.childTblName); + closeBenchConn(conn); + return -1; + } + closeBenchConn(conn); + + // succ + return 0; +} diff --git a/tests/taosbenchmark/bugs.py b/tests/taosbenchmark/bugs.py index f668b3c11..610e9989b 100644 --- a/tests/taosbenchmark/bugs.py +++ b/tests/taosbenchmark/bugs.py @@ -156,6 +156,7 @@ def bugsTS(self, benchmark): # TS-5846 keys = ["completed total queries: 40"] self.benchmarkQuery(benchmark, "./taosbenchmark/json/TS-5846-Query.json", keys) + keys = ["completed total queries: 20"] self.benchmarkQuery(benchmark, "./taosbenchmark/json/TS-5846-Mixed-Query.json", keys) # bugs td diff --git a/tests/taosbenchmark/json/TD-32846.json b/tests/taosbenchmark/json/TD-32846.json index cd8b87e48..309ba40fa 100644 --- a/tests/taosbenchmark/json/TD-32846.json +++ b/tests/taosbenchmark/json/TD-32846.json @@ -29,7 +29,7 @@ "super_tables": [{ "name": "product", "child_table_exists": "no", - "childtable_count": 9, + "childtable_count": 5, "childtable_prefix": "d", "auto_create_table": "yes", "batch_create_tbl_num": 10, @@ -37,7 +37,7 @@ "insert_mode": "stmt2", "non_stop_mode": "no", "line_protocol": "line", - "insert_rows": 10340, + "insert_rows": 1034, "childtable_limit": 0, "childtable_offset": 0, "interlace_rows": 0, diff --git a/tests/taosbenchmark/json/queryErrorBothSpecSuper.json b/tests/taosbenchmark/json/queryErrorBothSpecSuper.json new file mode 100644 index 000000000..48b9de491 --- /dev/null +++ b/tests/taosbenchmark/json/queryErrorBothSpecSuper.json @@ -0,0 +1,35 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 1, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryErrorFormat.json b/tests/taosbenchmark/json/queryErrorFormat.json new file mode 100644 index 000000000..07bfd6abf --- /dev/null +++ b/tests/taosbenchmark/json/queryErrorFormat.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "super_table_query": + "stblname": "meters", + "concurrent": 3, + "query_interval": 1, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + }, + { + "sql": "select count(ts) from xxxx" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryErrorNoSpecSuper.json b/tests/taosbenchmark/json/queryErrorNoSpecSuper.json new file mode 100644 index 000000000..71dc16d5d --- /dev/null +++ b/tests/taosbenchmark/json/queryErrorNoSpecSuper.json @@ -0,0 +1,12 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc" +} diff --git a/tests/taosbenchmark/json/queryModeSpec.json b/tests/taosbenchmark/json/queryModeSpec.json new file mode 100644 index 000000000..533cb05c2 --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSpec.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryModeSpecMix.json b/tests/taosbenchmark/json/queryModeSpecMix.json new file mode 100644 index 000000000..39a9593d2 --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSpecMix.json @@ -0,0 +1,25 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 4, + "mixed_query": "yes", + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryModeSpecMixRest.json b/tests/taosbenchmark/json/queryModeSpecMixRest.json new file mode 100644 index 000000000..5d962f135 --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSpecMixRest.json @@ -0,0 +1,28 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "specified_table_query": { + "concurrent": 3, + "mixed_query": "yes", + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + }, + { + "sql": "select count(*) from meters" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryModeSpecRest.json b/tests/taosbenchmark/json/queryModeSpecRest.json new file mode 100644 index 000000000..5b00ee647 --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSpecRest.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + }, + { + "sql": "select count(*) from meters" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryModeSuper.json b/tests/taosbenchmark/json/queryModeSuper.json new file mode 100644 index 000000000..9d20154d4 --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSuper.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 0, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + }, + { + "sql": "select count(ts) from xxxx" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryModeSuperRest.json b/tests/taosbenchmark/json/queryModeSuperRest.json new file mode 100644 index 000000000..4ae43062a --- /dev/null +++ b/tests/taosbenchmark/json/queryModeSuperRest.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 0, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryQps.json b/tests/taosbenchmark/json/queryQps.json index 02661fe9d..21b4cf6f9 100644 --- a/tests/taosbenchmark/json/queryQps.json +++ b/tests/taosbenchmark/json/queryQps.json @@ -20,16 +20,5 @@ "sql": "select last_row(*) from db.stb00_9 ", "result": "./query_res1.txt" }] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval":20, - "threads": 4, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] - } + } } diff --git a/tests/taosbenchmark/json/queryQps1.json b/tests/taosbenchmark/json/queryQps1.json new file mode 100644 index 000000000..c2c238177 --- /dev/null +++ b/tests/taosbenchmark/json/queryQps1.json @@ -0,0 +1,22 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 1, + "super_table_query": { + "stblname": "stb1", + "query_interval":20, + "threads": 4, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } + } diff --git a/tests/taosbenchmark/json/queryRestful.json b/tests/taosbenchmark/json/queryRestful.json index 5de560fd2..6cb83bc2e 100644 --- a/tests/taosbenchmark/json/queryRestful.json +++ b/tests/taosbenchmark/json/queryRestful.json @@ -22,17 +22,6 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/taosbenchmark/json/queryRestful1.json b/tests/taosbenchmark/json/queryRestful1.json new file mode 100644 index 000000000..54d2589ce --- /dev/null +++ b/tests/taosbenchmark/json/queryRestful1.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "rest", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } + } + diff --git a/tests/taosbenchmark/json/querySpeciMutisql100.json b/tests/taosbenchmark/json/querySpeciMutisql100.json index 5b523ad3d..13010efdb 100644 --- a/tests/taosbenchmark/json/querySpeciMutisql100.json +++ b/tests/taosbenchmark/json/querySpeciMutisql100.json @@ -413,17 +413,6 @@ "result": "./query_res0.txt" }] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] - } + } } \ No newline at end of file diff --git a/tests/taosbenchmark/json/queryTaosc-mixed-query.json b/tests/taosbenchmark/json/queryTaosc-mixed-query.json index 9b2f1b467..9cdc191d6 100644 --- a/tests/taosbenchmark/json/queryTaosc-mixed-query.json +++ b/tests/taosbenchmark/json/queryTaosc-mixed-query.json @@ -23,16 +23,5 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/taosbenchmark/json/queryTaosc-mixed-query1.json b/tests/taosbenchmark/json/queryTaosc-mixed-query1.json new file mode 100644 index 000000000..a3caa1c5e --- /dev/null +++ b/tests/taosbenchmark/json/queryTaosc-mixed-query1.json @@ -0,0 +1,23 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "taosc", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/queryTaosc.json b/tests/taosbenchmark/json/queryTaosc.json index d1e64e645..bcc2852b3 100644 --- a/tests/taosbenchmark/json/queryTaosc.json +++ b/tests/taosbenchmark/json/queryTaosc.json @@ -22,16 +22,5 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/taosbenchmark/json/queryTaosc1.json b/tests/taosbenchmark/json/queryTaosc1.json new file mode 100644 index 000000000..a3caa1c5e --- /dev/null +++ b/tests/taosbenchmark/json/queryTaosc1.json @@ -0,0 +1,23 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "taosc", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/taosbenchmark/json/rest_query.json b/tests/taosbenchmark/json/rest_query.json index 459e496f0..817d73320 100644 --- a/tests/taosbenchmark/json/rest_query.json +++ b/tests/taosbenchmark/json/rest_query.json @@ -14,14 +14,5 @@ "sql": "select count(*) from db.stb", "result": "rest_query_specified" }] - }, - "super_table_query": { - "stblname": "stb", - "sqls": [ - { - "sql": "select count(*) from xxxx", - "result": "rest_query_super" - } - ] } } \ No newline at end of file diff --git a/tests/taosbenchmark/json/rest_query1.json b/tests/taosbenchmark/json/rest_query1.json new file mode 100644 index 000000000..e09112737 --- /dev/null +++ b/tests/taosbenchmark/json/rest_query1.json @@ -0,0 +1,18 @@ +{ + "filetype":"query", + "cfgdir": "/etc/taos", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_mode": "rest", + "connection_pool_size": 10, + "response_buffer": 10000, + "super_table_query": { + "stblname": "stb", + "sqls": [ + { + "sql": "select count(*) from xxxx", + "result": "rest_query_super" + } + ] + } +} \ No newline at end of file diff --git a/tests/taosbenchmark/json/taosc_query.json b/tests/taosbenchmark/json/taosc_query.json index c8ff2e927..2cf8f648a 100644 --- a/tests/taosbenchmark/json/taosc_query.json +++ b/tests/taosbenchmark/json/taosc_query.json @@ -18,16 +18,5 @@ "sql": "select count(*) from db.stb", "result": "taosc_query_specified" }] - }, - "super_table_query": { - "stblname": "stb", - "query_interval": 1, - "concurrent": 1, - "sqls": [ - { - "sql": "select count(*) from xxxx", - "result": "taosc_query_super" - } - ] } } \ No newline at end of file diff --git a/tests/taosbenchmark/json/taosc_query1.json b/tests/taosbenchmark/json/taosc_query1.json new file mode 100644 index 000000000..fd650f90e --- /dev/null +++ b/tests/taosbenchmark/json/taosc_query1.json @@ -0,0 +1,24 @@ +{ + "filetype":"query", + "cfgdir": "/etc/taos", + "host": "localhost", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 1, + "reset_query_cache": "yes", + "super_table_query": + { + "stblname": "stb", + "query_interval": 1, + "concurrent": 1, + "sqls": [ + { + "sql": "select count(*) from xxxx", + "result": "taosc_query_super" + } + ] + } +} \ No newline at end of file diff --git a/tests/taosbenchmark/queryMain.py b/tests/taosbenchmark/queryMain.py new file mode 100644 index 000000000..13a6105f8 --- /dev/null +++ b/tests/taosbenchmark/queryMain.py @@ -0,0 +1,288 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import os +import json +import sys +import os +import time +import datetime +import platform +import subprocess + +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * +from util.dnodes import tdDnodes + + +# reomve single and double quotation +def removeQuotation(origin): + value = "" + for c in origin: + if c != '\'' and c != '"': + value += c + + return value + +class TDTestCase: + def caseDescription(self): + """ + [TD-11510] taosBenchmark test cases + """ + + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def getPath(self, tool="taosBenchmark"): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if "community" in selfPath: + projPath = selfPath[: selfPath.find("community")] + elif "src" in selfPath: + projPath = selfPath[: selfPath.find("src")] + elif "/tools/" in selfPath: + projPath = selfPath[: selfPath.find("/tools/")] + else: + projPath = selfPath[: selfPath.find("tests")] + + paths = [] + for root, dummy, files in os.walk(projPath): + if (tool) in files: + rootRealPath = os.path.dirname(os.path.realpath(root)) + if "packaging" not in rootRealPath: + paths.append(os.path.join(root, tool)) + break + if len(paths) == 0: + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def runSeconds(self, command, timeout = 180): + tdLog.info(f"runSeconds {command} ...") + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.wait(timeout) + + # get output + output = process.stdout.read().decode(encoding="gbk") + error = process.stderr.read().decode(encoding="gbk") + return output, error + + def getKeyValue(self, content, key, end): + # find key + s = content.find(key) + if s == -1: + return False,"" + + # skip self + s += len(key) + # skip blank + while s < len(content): + if content[s] != " ": + break + s += 1 + + # end check + if s + 1 == len(content): + return False, "" + + # find end + if len(end) == 0: + e = -1 + else: + e = content.find(end, s) + + # get value + if e == -1: + value = content[s : ] + else: + value = content[s : e] + + return True, value + + def getDbRows(self, times): + sql = f"select count(*) from test.meters" + tdSql.waitedQuery(sql, 1, times) + dbRows = tdSql.getData(0, 0) + return dbRows + + def checkItem(self, output, key, end, expect, equal): + ret, value = self.getKeyValue(output, key, end) + if ret == False: + tdLog.exit(f"not found key:{key}. end:{end} output:\n{output}") + + fval = float(value) + # compare + if equal and fval != expect: + tdLog.exit(f"check not expect. expect:{expect} real:{fval}, key:{key} end:{end} output:\n{output}") + elif equal == False and fval <= expect: + tdLog.exit(f"failed because {fval} <= {expect}, key:{key} end:{end} output:\n{output}") + else: + # succ + if equal: + tdLog.info(f"check successfully. key:{key} expect:{expect} real:{fval}") + else: + tdLog.info(f"check successfully. key:{key} {fval} > {expect}") + + + def checkAfterRun(self, benchmark, jsonFile, specMode, tbCnt): + # run + cmd = f"{benchmark} -f {jsonFile}" + output, error = self.runSeconds(cmd) + + if specMode : + label = "specified_table_query" + else: + label = "super_table_query" + + # + # check insert result + # + with open(jsonFile, "r") as file: + data = json.load(file) + + queryTimes = data["query_times"] + # contineIfFail + try: + continueIfFail = data["continue_if_fail"] + except: + continueIfFail = "no" + + concurrent = data[label]["concurrent"] + sqls = data[label]["sqls"] + + + # mix + try: + mixedQuery = data[label]["mixed_query"] + except: + mixedQuery = "no" + + tdLog.info(f"queryTimes={queryTimes} concurrent={concurrent} mixedQuery={mixedQuery} len(sqls)={len(sqls)} label={label}\n") + + totalQueries = 0 + threadQueries = 0 + + if continueIfFail.lower() == "yes": + allEnd = " " + else: + allEnd = "\n" + + if specMode and mixedQuery.lower() != "yes": + # spec + threadQueries = queryTimes * concurrent + totalQueries = queryTimes * concurrent * len(sqls) + threadKey = f"complete query with {concurrent} threads and " + qpsKey = "QPS: " + avgKey = "query delay avg: " + minKey = "min:" + else: + # spec mixed or super + if specMode: + # spec + totalQueries = queryTimes * len(sqls) + else: + # super + totalQueries = queryTimes * len(sqls) * tbCnt + threadQueries = totalQueries + + nSql = len(sqls) + if specMode and nSql < concurrent : + tdLog.info(f"set concurrent = {nSql} because len(sqls) < concurrent") + concurrent = nSql + threadKey = f"using {concurrent} threads complete query " + qpsKey = "" + avgKey = "avg delay:" + minKey = "min delay:" + + items = [ + [threadKey, " ", threadQueries, True], + [qpsKey, " ", 5, False], # qps need > 1 + [avgKey, "s", 0, False], + [minKey, "s", 0, False], + ["max: ", "s", 0, False], + ["p90: ", "s", 0, False], + ["p95: ", "s", 0, False], + ["p99: ", "s", 0, False], + ["INFO: Spend ", " ", 0, False], + ["completed total queries: ", ",", totalQueries, True], + ["the QPS of all threads:", allEnd, 10 , False] # all qps need > 5 + ] + + # check + for item in items: + if len(item[0]) > 0: + self.checkItem(output, item[0], item[1], item[2], item[3]) + + # native + def threeQueryMode(self, benchmark, tbCnt, tbRow): + # json + args = [ + ["taosbenchmark/json/queryModeSpec", True], + ["taosbenchmark/json/queryModeSpecMix", True], + ["taosbenchmark/json/queryModeSuper", False] + ] + + # native + for arg in args: + self.checkAfterRun(benchmark, arg[0] + ".json", arg[1], tbCnt) + + # rest + for arg in args: + self.checkAfterRun(benchmark, arg[0] + "Rest.json", arg[1], tbCnt) + + def expectFailed(self, command): + ret = os.system(command) + if ret == 0: + tdLog.exit(f" expect failed but success. command={command}") + else: + tdLog.info(f" expect failed is ok. command={command}") + + + # check excption + def exceptTest(self, benchmark, tbCnt, tbRow): + # 'specified_table_query' and 'super_table_query' error + self.expectFailed(f"{benchmark} -f taosbenchmark/json/queryErrorNoSpecSuper.json") + self.expectFailed(f"{benchmark} -f taosbenchmark/json/queryErrorBothSpecSuper.json") + # json format error + self.expectFailed(f"{benchmark} -f taosbenchmark/json/queryErrorFormat.json") + + + + def run(self): + tbCnt = 10 + tbRow = 1000 + benchmark = self.getPath() + + # insert + command = f"{benchmark} -d test -t {tbCnt} -n {tbRow} -I stmt2 -r 100 -y" + ret = os.system(command) + if ret !=0 : + tdLog.exit(f"exec failed. command={command}") + + # query mode test + self.threeQueryMode(benchmark, tbCnt, tbRow) + + # exception test + self.exceptTest(benchmark, tbCnt, tbRow); + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/taosbenchmark/query_json.py b/tests/taosbenchmark/query_json.py index edc43237c..29686927d 100644 --- a/tests/taosbenchmark/query_json.py +++ b/tests/taosbenchmark/query_json.py @@ -74,6 +74,10 @@ def run(self): cmd = "%s -f ./taosbenchmark/json/taosc_query.json" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) + cmd = "%s -f ./taosbenchmark/json/taosc_query1.json" % binPath + tdLog.info("%s" % cmd) + os.system("%s" % cmd) + with open("%s" % "taosc_query_specified-0", "r+") as f1: for line in f1.readlines(): queryTaosc = line.strip().split()[0] @@ -84,9 +88,13 @@ def run(self): queryTaosc = line.strip().split()[0] assert queryTaosc == "1", "result is %s != expect: 1" % queryTaosc + # split two cmd = "%s -f ./taosbenchmark/json/rest_query.json" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) + cmd = "%s -f ./taosbenchmark/json/rest_query1.json" % binPath + tdLog.info("%s" % cmd) + os.system("%s" % cmd) times = 0 with open("rest_query_super-0", "r+") as f1: diff --git a/tests/taosbenchmark/taosdemoTestQueryWithJson.py b/tests/taosbenchmark/taosdemoTestQueryWithJson.py index b3341f210..61b823b2d 100644 --- a/tests/taosbenchmark/taosdemoTestQueryWithJson.py +++ b/tests/taosbenchmark/taosdemoTestQueryWithJson.py @@ -117,6 +117,9 @@ def run(self): # taosc query: query specified table and query super table os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) os.system("%s -f ./taosbenchmark/json/queryTaosc.json" % binPath) + # forbid parallel spec query with super query + os.system("%s -f ./taosbenchmark/json/queryTaosc1.json" % binPath) + os.system("cat query_res0.txt* > all_query_res0_taosc.txt") os.system("cat query_res1.txt* > all_query_res1_taosc.txt") os.system("cat query_res2.txt* > all_query_res2_taosc.txt") @@ -143,6 +146,7 @@ def run(self): # use restful api to query os.system("%s -f ./taosbenchmark/json/queryInsertrestdata.json" % binPath) os.system("%s -f ./taosbenchmark/json/queryRestful.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryRestful1.json" % binPath) os.system("cat query_res0.txt* > all_query_res0_rest.txt") os.system("cat query_res1.txt* > all_query_res1_rest.txt") os.system("cat query_res2.txt* > all_query_res2_rest.txt") @@ -191,6 +195,8 @@ def run(self): os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps.json" % binPath) assert exceptcode == 0 + exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps1.json" % binPath) + assert exceptcode == 0 # 2021.02.09 need modify taosBenchmakr code # use illegal or out of range parameters query json file diff --git a/tests/taosbenchmark/v3/taosdemoTestQueryWithJson-mixed-query.py b/tests/taosbenchmark/v3/taosdemoTestQueryWithJson-mixed-query.py index 0a4a14e1e..c7b7fc921 100644 --- a/tests/taosbenchmark/v3/taosdemoTestQueryWithJson-mixed-query.py +++ b/tests/taosbenchmark/v3/taosdemoTestQueryWithJson-mixed-query.py @@ -119,6 +119,7 @@ def run(self): # taosc query: query specified table and query super table os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) os.system("%s -f ./taosbenchmark/json/queryTaosc-mixed-query.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryTaosc-mixed-query1.json" % binPath) os.system("cat query_res2.txt* > all_query_res2_taosc.txt") # correct Times testcases @@ -136,6 +137,7 @@ def run(self): # use restful api to query os.system("%s -f ./taosbenchmark/json/queryInsertrestdata.json" % binPath) os.system("%s -f ./taosbenchmark/json/queryRestful.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryRestful1.json" % binPath) os.system("cat query_res2.txt* > all_query_res2_rest.txt") # correct Times testcases