diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index 9e252772ea42..5276b9fb61af 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -8,10 +8,12 @@ "thread_count": 4, "thread_count_create_tbl": 4, "result_file": "./insert_res.txt", - "confirm_parameter_prompt": "no", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 100, "databases": [{ "dbinfo": { - "name": "dbx", + "name": "db", "drop": "yes", "replica": 1, "days": 10, @@ -30,14 +32,13 @@ }, "super_tables": [{ "name": "stb", - "child_table_exists":"no", + "child_table_exists":"no", "childtable_count": 100, "childtable_prefix": "stb_", "auto_create_table": "no", "data_source": "rand", "insert_mode": "taosc", - "insert_rate": 0, - "insert_rows": 1000, + "insert_rows": 100000, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, "rows_per_tbl": 100, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 86b84534f663..30370cd79790 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -61,56 +61,6 @@ #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 -#ifdef WINDOWS -#include -// Some old MinGW/CYGWIN distributions don't define this: -#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING -#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004 -#endif - -static HANDLE g_stdoutHandle; -static DWORD g_consoleMode; - -void setupForAnsiEscape(void) { - DWORD mode = 0; - g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE); - - if(g_stdoutHandle == INVALID_HANDLE_VALUE) { - exit(GetLastError()); - } - - if(!GetConsoleMode(g_stdoutHandle, &mode)) { - exit(GetLastError()); - } - - g_consoleMode = mode; - - // Enable ANSI escape codes - mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; - - if(!SetConsoleMode(g_stdoutHandle, mode)) { - exit(GetLastError()); - } -} - -void resetAfterAnsiEscape(void) { - // Reset colors - printf("\x1b[0m"); - - // Reset console mode - if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) { - exit(GetLastError()); - } -} -#else -void setupForAnsiEscape(void) {} - -void resetAfterAnsiEscape(void) { - // Reset colors - printf("\x1b[0m"); -} -#endif - extern char configDir[]; #define INSERT_JSON_NAME "insert.json" @@ -130,7 +80,6 @@ extern char configDir[]; #define OPT_ABORT 1 /* –abort */ #define STRING_LEN 60000 #define MAX_PREPARED_RAND 1000000 -//#define MAX_SQL_SIZE 65536 #define MAX_FILE_NAME_LEN 256 #define MAX_SAMPLES_ONCE_FROM_FILE 10000 @@ -163,7 +112,7 @@ enum MODE { ASYNC, MODE_BUT }; - + enum QUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, @@ -222,6 +171,7 @@ typedef struct { /* Used by main to communicate with parse_opt. */ typedef struct SArguments_S { char * metaFile; + int test_mode; char * host; uint16_t port; char * user; @@ -233,12 +183,14 @@ typedef struct SArguments_S { bool use_metric; bool insert_only; bool answer_yes; + bool debug_print; char * output_file; int mode; char * datatype[MAX_NUM_DATATYPE + 1]; int len_of_binary; int num_of_CPR; int num_of_threads; + int insert_interval; int num_of_RPR; int num_of_tables; int num_of_DPT; @@ -266,7 +218,6 @@ typedef struct SSuperTable_S { char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful - int insertRate; // 0: unlimit > 0 rows/s int multiThreadWriteOneTbl; // 0: no, 1: yes int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl @@ -288,7 +239,7 @@ typedef struct SSuperTable_S { StrColumn tags[MAX_TAG_COUNT]; char* childTblName; - char* colsOfCreatChildTable; + char* colsOfCreateChildTable; int lenOfOneRow; int lenOfTagOfOneRow; @@ -431,7 +382,7 @@ typedef struct SThreadInfo_S { int start_table_id; int end_table_id; int data_of_rate; - int64_t start_time; + uint64_t start_time; char* cols; bool use_metric; SSuperTable* superTblInfo; @@ -439,10 +390,9 @@ typedef struct SThreadInfo_S { // for async insert tsem_t lock_sem; int64_t counter; - int64_t st; - int64_t et; + uint64_t st; + uint64_t et; int64_t lastTs; - int nrecords_per_request; // statistics int64_t totalRowsInserted; @@ -457,224 +407,56 @@ typedef struct SThreadInfo_S { } threadInfo; +#ifdef WINDOWS +#include +// Some old MinGW/CYGWIN distributions don't define this: +#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING +#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004 +#endif -#ifdef LINUX - /* The options we understand. */ - static struct argp_option options[] = { - {0, 'f', "meta file", 0, "The meta data to the execution procedure, if use -f, all others options invalid. Default is NULL.", 0}, - #ifdef _TD_POWER_ - {0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/power/'.", 1}, - {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'powerdb'.", 2}, - #else - {0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 1}, - {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 2}, - #endif - {0, 'h', "host", 0, "The host to connect to TDengine. Default is localhost.", 2}, - {0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 2}, - {0, 'u', "user", 0, "The TDengine user name to use when connecting to the server. Default is 'root'.", 2}, - {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, - {0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 4}, - {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 4}, - {0, 's', "sql file", 0, "The select sql file.", 6}, - {0, 'M', 0, 0, "Use metric flag.", 4}, - {0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 6}, - {0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 4}, - {0, 'b', "type_of_cols", 0, "The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP.", 4}, - {0, 'w', "length_of_chartype", 0, "The length of data_type 'BINARY' or 'NCHAR'. Default is 16", 4}, - {0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 10.", 4}, - {0, 'T', "num_of_threads", 0, "The number of threads. Default is 10.", 4}, - // {0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 100.", 4}, - {0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 4}, - {0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 10000.", 4}, - {0, 'x', 0, 0, "Not insert only flag.", 4}, - {0, 'y', 0, 0, "Default input yes for prompt.", 4}, - {0, 'O', "disorderRatio", 0, "Insert mode--0: In order, > 0: disorder ratio. Default is in order.", 4}, - {0, 'R', "disorderRang", 0, "Out of order data's range, ms, default is 1000.", 4}, - //{0, 'D', "delete database", 0, "if elete database if exists. 0: no, 1: yes, default is 1", 5}, - {0}}; - -/* Parse a single option. */ -static error_t parse_opt(int key, char *arg, struct argp_state *state) { - // Get the input argument from argp_parse, which we know is a pointer to our arguments structure. - SArguments *arguments = state->input; - wordexp_t full_path; - char **sptr; - switch (key) { - case 'f': - arguments->metaFile = arg; - break; - case 'h': - arguments->host = arg; - break; - case 'p': - arguments->port = atoi(arg); - break; - case 'u': - arguments->user = arg; - break; - case 'P': - arguments->password = arg; - break; - case 'o': - arguments->output_file = arg; - break; - case 's': - arguments->sqlFile = arg; - break; - case 'q': - arguments->mode = atoi(arg); - break; - case 'T': - arguments->num_of_threads = atoi(arg); - break; - //case 'r': - // arguments->num_of_RPR = atoi(arg); - // break; - case 't': - arguments->num_of_tables = atoi(arg); - break; - case 'n': - arguments->num_of_DPT = atoi(arg); - break; - case 'd': - arguments->database = arg; - break; - case 'l': - arguments->num_of_CPR = atoi(arg); - break; - case 'b': - sptr = arguments->datatype; - if (strstr(arg, ",") == NULL) { - if (strcasecmp(arg, "INT") != 0 && strcasecmp(arg, "FLOAT") != 0 && - strcasecmp(arg, "TINYINT") != 0 && strcasecmp(arg, "BOOL") != 0 && - strcasecmp(arg, "SMALLINT") != 0 && strcasecmp(arg, "TIMESTAMP") != 0 && - strcasecmp(arg, "BIGINT") != 0 && strcasecmp(arg, "DOUBLE") != 0 && - strcasecmp(arg, "BINARY") != 0 && strcasecmp(arg, "NCHAR") != 0) { - argp_error(state, "Invalid data_type!"); - } - sptr[0] = arg; - } else { - int index = 0; - char *dupstr = strdup(arg); - char *running = dupstr; - char *token = strsep(&running, ","); - while (token != NULL) { - if (strcasecmp(token, "INT") != 0 && strcasecmp(token, "FLOAT") != 0 && - strcasecmp(token, "TINYINT") != 0 && strcasecmp(token, "BOOL") != 0 && - strcasecmp(token, "SMALLINT") != 0 && strcasecmp(token, "TIMESTAMP") != 0 && - strcasecmp(token, "BIGINT") != 0 && strcasecmp(token, "DOUBLE") != 0 && - strcasecmp(token, "BINARY") != 0 && strcasecmp(token, "NCHAR") != 0) { - argp_error(state, "Invalid data_type!"); - } - sptr[index++] = token; - token = strsep(&running, ","); - if (index >= MAX_NUM_DATATYPE) break; - } - } - break; - case 'w': - arguments->len_of_binary = atoi(arg); - break; - case 'm': - arguments->tb_prefix = arg; - break; - case 'M': - arguments->use_metric = true; - break; - case 'x': - arguments->insert_only = true; - break; +static HANDLE g_stdoutHandle; +static DWORD g_consoleMode; - case 'y': - arguments->answer_yes = true; - break; - case 'c': - if (wordexp(arg, &full_path, 0) != 0) { - fprintf(stderr, "Invalid path %s\n", arg); - return -1; - } - taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); - wordfree(&full_path); - break; - case 'O': - arguments->disorderRatio = atoi(arg); - if (arguments->disorderRatio < 0 || arguments->disorderRatio > 100) - { - argp_error(state, "Invalid disorder ratio, should 1 ~ 100!"); - } - break; - case 'R': - arguments->disorderRange = atoi(arg); - break; - case 'a': - arguments->replica = atoi(arg); - if (arguments->replica > 3 || arguments->replica < 1) - { - arguments->replica = 1; - } - break; - //case 'D': - // arguments->method_of_delete = atoi(arg); - // break; - case OPT_ABORT: - arguments->abort = 1; - break; - case ARGP_KEY_ARG: - /*arguments->arg_list = &state->argv[state->next-1]; - state->next = state->argc;*/ - argp_usage(state); - break; +void setupForAnsiEscape(void) { + DWORD mode = 0; + g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE); - default: - return ARGP_ERR_UNKNOWN; + if(g_stdoutHandle == INVALID_HANDLE_VALUE) { + exit(GetLastError()); } - return 0; + + if(!GetConsoleMode(g_stdoutHandle, &mode)) { + exit(GetLastError()); + } + + g_consoleMode = mode; + + // Enable ANSI escape codes + mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; + + if(!SetConsoleMode(g_stdoutHandle, mode)) { + exit(GetLastError()); + } } -static struct argp argp = {options, parse_opt, 0, 0}; +void resetAfterAnsiEscape(void) { + // Reset colors + printf("\x1b[0m"); -void parse_args(int argc, char *argv[], SArguments *arguments) { - argp_parse(&argp, argc, argv, 0, 0, arguments); - if (arguments->abort) { - #ifndef _ALPINE - error(10, 0, "ABORTED"); - #else - abort(); - #endif + // Reset console mode + if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) { + exit(GetLastError()); } } - #else - void printHelp() { - char indent[10] = " "; - printf("%s%s\n", indent, "-f"); - printf("%s%s%s\n", indent, indent, "The meta file to the execution procedure. Default is './meta.json'."); - printf("%s%s\n", indent, "-c"); - printf("%s%s%s\n", indent, indent, "config_directory, Configuration directory. Default is '/etc/taos/'."); - } - - void parse_args(int argc, char *argv[], SArguments *arguments) { - for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "-f") == 0) { - arguments->metaFile = argv[++i]; - } else if (strcmp(argv[i], "-c") == 0) { - strcpy(configDir, argv[++i]); - } else if (strcmp(argv[i], "--help") == 0) { - printHelp(); - exit(EXIT_FAILURE); - } else { - fprintf(stderr, "wrong options\n"); - printHelp(); - exit(EXIT_FAILURE); - } - } - } +void setupForAnsiEscape(void) {} + +void resetAfterAnsiEscape(void) { + // Reset colors + printf("\x1b[0m"); +} #endif -static bool getInfoFromJsonFile(char* file); -//static int generateOneRowDataForStb(SSuperTable* stbInfo); -//static int getDataIntoMemForStb(SSuperTable* stbInfo); -static void init_rand_data(); static int createDatabases(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, int type); @@ -685,9 +467,12 @@ int32_t randint[MAX_PREPARED_RAND]; int64_t randbigint[MAX_PREPARED_RAND]; float randfloat[MAX_PREPARED_RAND]; double randdouble[MAX_PREPARED_RAND]; -char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; +char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", + "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; -SArguments g_args = {NULL, +SArguments g_args = { + NULL, // metaFile + 0, // test_mode "127.0.0.1", // host 6030, // port "root", // user @@ -701,7 +486,8 @@ SArguments g_args = {NULL, "t", // tb_prefix NULL, // sqlFile false, // use_metric - false, // insert_only + false, // insert_only + false, // debug_print false, // answer_yes; "./output.txt", // output_file 0, // mode : sync or async @@ -720,6 +506,7 @@ SArguments g_args = {NULL, 16, // len_of_binary 10, // num_of_CPR 10, // num_of_connections/thread + 0, // insert_interval 100, // num_of_RPR 10000, // num_of_tables 10000, // num_of_DPT @@ -731,13 +518,261 @@ SArguments g_args = {NULL, }; -static int g_jsonType = 0; + static SDbs g_Dbs; static int g_totalChildTables = 0; static SQueryMetaInfo g_queryInfo; static FILE * g_fpOfInsertResult = NULL; +#define debugPrint(fmt, ...) \ + do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) +/////////////////////////////////////////////////// + +void printHelp() { + char indent[10] = " "; + printf("%s%s%s%s\n", indent, "-f", indent, + "The meta file to the execution procedure. Default is './meta.json'."); + printf("%s%s%s%s\n", indent, "-u", indent, + "The TDengine user name to use when connecting to the server. Default is 'root'."); +#ifdef _TD_POWER_ + printf("%s%s%s%s\n", indent, "-P", indent, + "The password to use when connecting to the server. Default is 'powerdb'."); + printf("%s%s%s%s\n", indent, "-c", indent, + "Configuration directory. Default is '/etc/power/'."); +#else + printf("%s%s%s%s\n", indent, "-P", indent, + "The password to use when connecting to the server. Default is 'taosdata'."); + printf("%s%s%s%s\n", indent, "-c", indent, + "Configuration directory. Default is '/etc/taos/'."); +#endif + printf("%s%s%s%s\n", indent, "-h", indent, + "The host to connect to TDengine. Default is localhost."); + printf("%s%s%s%s\n", indent, "-p", indent, + "The TCP/IP port number to use for the connection. Default is 0."); + printf("%s%s%s%s\n", indent, "-d", indent, + "Destination database. Default is 'test'."); + printf("%s%s%s%s\n", indent, "-a", indent, + "Set the replica parameters of the database, Default 1, min: 1, max: 3."); + printf("%s%s%s%s\n", indent, "-m", indent, + "Table prefix name. Default is 't'."); + printf("%s%s%s%s\n", indent, "-s", indent, "The select sql file."); + printf("%s%s%s%s\n", indent, "-M", indent, "Use metric flag."); + printf("%s%s%s%s\n", indent, "-o", indent, + "Direct output to the named file. Default is './output.txt'."); + printf("%s%s%s%s\n", indent, "-q", indent, + "Query mode--0: SYNC, 1: ASYNC. Default is SYNC."); + printf("%s%s%s%s\n", indent, "-b", indent, + "The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP."); + printf("%s%s%s%s\n", indent, "-w", indent, + "The length of data_type 'BINARY' or 'NCHAR'. Default is 16"); + printf("%s%s%s%s\n", indent, "-l", indent, + "The number of columns per record. Default is 10."); + printf("%s%s%s%s\n", indent, "-T", indent, + "The number of threads. Default is 10."); + printf("%s%s%s%s\n", indent, "-i", indent, + "The sleep time (ms) between insertion. Default is 0."); + printf("%s%s%s%s\n", indent, "-r", indent, + "The number of records per request. Default is 100."); + printf("%s%s%s%s\n", indent, "-t", indent, + "The number of tables. Default is 10000."); + printf("%s%s%s%s\n", indent, "-n", indent, + "The number of records per table. Default is 10000."); + printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag."); + printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt."); + printf("%s%s%s%s\n", indent, "-O", indent, + "Insert mode--0: In order, > 0: disorder ratio. Default is in order."); + printf("%s%s%s%s\n", indent, "-R", indent, + "Out of order data's range, ms, default is 1000."); + printf("%s%s%s%s\n", indent, "-g", indent, + "Print debug info."); +/* printf("%s%s%s%s\n", indent, "-D", indent, + "if elete database if exists. 0: no, 1: yes, default is 1"); + */ +} + +void parse_args(int argc, char *argv[], SArguments *arguments) { + char **sptr; + wordexp_t full_path; + + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-f") == 0) { + arguments->metaFile = argv[++i]; + } else if (strcmp(argv[i], "-c") == 0) { + char *configPath = argv[++i]; + if (wordexp(configPath, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", configPath); + return; + } + taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); + wordfree(&full_path); + } else if (strcmp(argv[i], "-h") == 0) { + arguments->host = argv[++i]; + } else if (strcmp(argv[i], "-p") == 0) { + arguments->port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-u") == 0) { + arguments->user = argv[++i]; + } else if (strcmp(argv[i], "-P") == 0) { + arguments->password = argv[++i]; + } else if (strcmp(argv[i], "-o") == 0) { + arguments->output_file = argv[++i]; + } else if (strcmp(argv[i], "-s") == 0) { + arguments->sqlFile = argv[++i]; + } else if (strcmp(argv[i], "-q") == 0) { + arguments->mode = atoi(argv[++i]); + } else if (strcmp(argv[i], "-T") == 0) { + arguments->num_of_threads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-i") == 0) { + arguments->insert_interval = atoi(argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + arguments->num_of_RPR = atoi(argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + arguments->num_of_tables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + arguments->num_of_DPT = atoi(argv[++i]); + } else if (strcmp(argv[i], "-d") == 0) { + arguments->database = argv[++i]; + } else if (strcmp(argv[i], "-l") == 0) { + arguments->num_of_CPR = atoi(argv[++i]); + } else if (strcmp(argv[i], "-b") == 0) { + sptr = arguments->datatype; + ++i; + if (strstr(argv[i], ",") == NULL) { + // only one col + if (strcasecmp(argv[i], "INT") + && strcasecmp(argv[i], "FLOAT") + && strcasecmp(argv[i], "TINYINT") + && strcasecmp(argv[i], "BOOL") + && strcasecmp(argv[i], "SMALLINT") + && strcasecmp(argv[i], "BIGINT") + && strcasecmp(argv[i], "DOUBLE") + && strcasecmp(argv[i], "BINARY") + && strcasecmp(argv[i], "NCHAR")) { + fprintf(stderr, "Invalid data_type!\n"); + printHelp(); + exit(EXIT_FAILURE); + } + sptr[0] = argv[i]; + } else { + // more than one col + int index = 0; + char *dupstr = strdup(argv[i]); + char *running = dupstr; + char *token = strsep(&running, ","); + while (token != NULL) { + if (strcasecmp(token, "INT") + && strcasecmp(token, "FLOAT") + && strcasecmp(token, "TINYINT") + && strcasecmp(token, "BOOL") + && strcasecmp(token, "SMALLINT") + && strcasecmp(token, "BIGINT") + && strcasecmp(token, "DOUBLE") + && strcasecmp(token, "BINARY") + && strcasecmp(token, "NCHAR")) { + fprintf(stderr, "Invalid data_type!\n"); + printHelp(); + exit(EXIT_FAILURE); + } + sptr[index++] = token; + token = strsep(&running, ","); + if (index >= MAX_NUM_DATATYPE) break; + } + sptr[index] = NULL; + } + } else if (strcmp(argv[i], "-w") == 0) { + arguments->len_of_binary = atoi(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0) { + arguments->tb_prefix = argv[++i]; + } else if (strcmp(argv[i], "-M") == 0) { + arguments->use_metric = true; + } else if (strcmp(argv[i], "-x") == 0) { + arguments->insert_only = true; + } else if (strcmp(argv[i], "-y") == 0) { + arguments->answer_yes = true; + } else if (strcmp(argv[i], "-g") == 0) { + arguments->debug_print = true; + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-O") == 0) { + arguments->disorderRatio = atoi(argv[++i]); + if (arguments->disorderRatio > 1 + || arguments->disorderRatio < 0) { + arguments->disorderRatio = 0; + } else if (arguments->disorderRatio == 1) { + arguments->disorderRange = 10; + } + } else if (strcmp(argv[i], "-R") == 0) { + arguments->disorderRange = atoi(argv[++i]); + if (arguments->disorderRange == 1 + && (arguments->disorderRange > 50 + || arguments->disorderRange <= 0)) { + arguments->disorderRange = 10; + } + } else if (strcmp(argv[i], "-a") == 0) { + arguments->replica = atoi(argv[++i]); + if (arguments->replica > 3 || arguments->replica < 1) { + arguments->replica = 1; + } + } else if (strcmp(argv[i], "-D") == 0) { + arguments->method_of_delete = atoi(argv[++i]); + if (arguments->method_of_delete < 0 + || arguments->method_of_delete > 3) { + arguments->method_of_delete = 0; + } + } else if (strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else { + fprintf(stderr, "wrong options\n"); + printHelp(); + exit(EXIT_FAILURE); + } + } + + if (arguments->debug_print) { + printf("###################################################################\n"); + printf("# meta file: %s\n", arguments->metaFile); + printf("# Server IP: %s:%hu\n", + arguments->host == NULL ? "localhost" : arguments->host, + arguments->port ); + printf("# User: %s\n", arguments->user); + printf("# Password: %s\n", arguments->password); + printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + if (*(arguments->datatype)) { + printf("# Specified data type: "); + for (int i = 0; i < MAX_NUM_DATATYPE; i++) + if (arguments->datatype[i]) + printf("%s,", arguments->datatype[i]); + else + break; + printf("\n"); + } + printf("# Insertion interval: %d\n", arguments->insert_interval); + printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); + printf("# Number of Threads: %d\n", arguments->num_of_threads); + printf("# Number of Tables: %d\n", arguments->num_of_tables); + printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); + printf("# Database name: %s\n", arguments->database); + printf("# Table prefix: %s\n", arguments->tb_prefix); + if (arguments->disorderRatio) { + printf("# Data order: %d\n", arguments->disorderRatio); + printf("# Data out of order rate: %d\n", arguments->disorderRange); + + } + printf("# Delete method: %d\n", arguments->method_of_delete); + printf("# Answer yes when prompt: %d\n", arguments->answer_yes); + printf("# Print debug info: %d\n", arguments->debug_print); + printf("###################################################################\n"); + if (!arguments->answer_yes) { + printf("Press enter key to continue\n\n"); + (void) getchar(); + } + } +} +static bool getInfoFromJsonFile(char* file); +//static int generateOneRowDataForStb(SSuperTable* stbInfo); +//static int getDataIntoMemForStb(SSuperTable* stbInfo); +static void init_rand_data(); void tmfclose(FILE *fp) { if (NULL != fp) { fclose(fp); @@ -760,7 +795,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { taos_free_result(res); res = NULL; } - + res = taos_query(taos, command); code = taos_errno(res); if (0 == code) { @@ -769,6 +804,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { } if (code != 0) { + debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); @@ -925,14 +961,37 @@ static void init_rand_data() { } } +#define SHOW_PARSE_RESULT_START() \ + do { if (g_args.metaFile) \ + printf("\033[1m\033[40;32m================ %s parse result START ================\033[0m\n", \ + g_args.metaFile); } while(0) + +#define SHOW_PARSE_RESULT_END() \ + do { if (g_args.metaFile) \ + printf("\033[1m\033[40;32m================ %s parse result END================\033[0m\n", \ + g_args.metaFile); } while(0) + +#define SHOW_PARSE_RESULT_START_TO_FILE(fp) \ + do { if (g_args.metaFile) \ + fprintf(fp, "\033[1m\033[40;32m================ %s parse result START ================\033[0m\n", \ + g_args.metaFile); } while(0) + +#define SHOW_PARSE_RESULT_END_TO_FILE(fp) \ + do { if (g_args.metaFile) \ + fprintf(fp, "\033[1m\033[40;32m================ %s parse result END================\033[0m\n", \ + g_args.metaFile); } while(0) + static int printfInsertMeta() { - printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n"); + SHOW_PARSE_RESULT_START(); + printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); + printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval); + printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -981,11 +1040,13 @@ static int printfInsertMeta() { printf(" quorum: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.quorum); } if (g_Dbs.db[i].dbCfg.precision[0] != 0) { - if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { + if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) + || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision); } else { - printf(" precision error: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision); - return -1; + printf("\033[1m\033[40;31m precision error: %s\033[0m\n", + g_Dbs.db[i].dbCfg.precision); + return -1; } } @@ -1015,7 +1076,6 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); - printf(" insertRate: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertRate); printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { @@ -1038,34 +1098,43 @@ static int printfInsertMeta() { printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { - printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); + if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { + printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, + g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { - printf("column[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); + printf("column[%d]:\033[33m%s\033[0m ", k, + g_Dbs.db[i].superTbls[j].columns[k].dataType); } } printf("\n"); - - printf(" tagCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].tagCount); + + printf(" tagCount: \033[33m%d\033[0m\n ", + g_Dbs.db[i].superTbls[j].tagCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].tagCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { - printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); + if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { + printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, + g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); } else { - printf("tag[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); + printf("tag[%d]:\033[33m%s\033[0m ", k, + g_Dbs.db[i].superTbls[j].tags[k].dataType); } } printf("\n"); } printf("\n"); } - printf("\033[1m\033[40;32m================ insert.json parse result END================\033[0m\n"); + + SHOW_PARSE_RESULT_END(); return 0; } static void printfInsertMetaToFile(FILE* fp) { - fprintf(fp, "================ insert.json parse result START================\n"); + SHOW_PARSE_RESULT_START_TO_FILE(fp); + fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); fprintf(fp, "user: %s\n", g_Dbs.user); fprintf(fp, "password: %s\n", g_Dbs.password); @@ -1120,7 +1189,8 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " quorum: %d\n", g_Dbs.db[i].dbCfg.quorum); } if (g_Dbs.db[i].dbCfg.precision[0] != 0) { - if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { + if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) + || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { fprintf(fp, " precision: %s\n", g_Dbs.db[i].dbCfg.precision); } else { fprintf(fp, " precision error: %s\n", g_Dbs.db[i].dbCfg.precision); @@ -1153,7 +1223,6 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); - fprintf(fp, " insertRate: %d\n", g_Dbs.db[i].superTbls[j].insertRate); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { @@ -1197,11 +1266,11 @@ static void printfInsertMetaToFile(FILE* fp) { } fprintf(fp, "\n"); } - fprintf(fp, "================ insert.json parse result END ================\n\n"); + SHOW_PARSE_RESULT_END_TO_FILE(fp); } static void printfQueryMeta() { - printf("\033[1m\033[40;32m================ query.json parse result ================\033[0m\n"); + SHOW_PARSE_RESULT_START(); printf("host: \033[33m%s:%u\033[0m\n", g_queryInfo.host, g_queryInfo.port); printf("user: \033[33m%s\033[0m\n", g_queryInfo.user); printf("password: \033[33m%s\033[0m\n", g_queryInfo.password); @@ -1213,14 +1282,13 @@ static void printfQueryMeta() { printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent); printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); - if (SUBSCRIBE_MODE == g_jsonType) { + if (SUBSCRIBE_MODE == g_args.test_mode) { printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); } - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); } @@ -1231,7 +1299,7 @@ static void printfQueryMeta() { printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount); printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName); - if (SUBSCRIBE_MODE == g_jsonType) { + if (SUBSCRIBE_MODE == g_args.test_mode) { printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart); @@ -1243,7 +1311,8 @@ static void printfQueryMeta() { printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.subQueryInfo.sql[i]); } printf("\n"); - printf("\033[1m\033[40;32m================ query.json parse result ================\033[0m\n"); + + SHOW_PARSE_RESULT_END(); } @@ -1410,7 +1479,9 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX])); - tstrncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); + tstrncpy(dbInfos[count]->precision, + (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], + fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]); tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], fields[TSDB_SHOW_DB_STATUS_INDEX].bytes); @@ -1861,10 +1932,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, char* dataType = superTbls->columns[colIndex].dataType; if (strcasecmp(dataType, "BINARY") == 0) { - len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "BINARY", superTbls->columns[colIndex].dataLen); + len += snprintf(cols + len, STRING_LEN - len, + ", col%d %s(%d)", colIndex, "BINARY", + superTbls->columns[colIndex].dataLen); lenOfOneRow += superTbls->columns[colIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { - len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "NCHAR", superTbls->columns[colIndex].dataLen); + len += snprintf(cols + len, STRING_LEN - len, + ", col%d %s(%d)", colIndex, "NCHAR", + superTbls->columns[colIndex].dataLen); lenOfOneRow += superTbls->columns[colIndex].dataLen + 3; } else if (strcasecmp(dataType, "INT") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "INT"); @@ -1901,13 +1976,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, //printf("%s.%s column count:%d, column length:%d\n\n", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName, g_Dbs.db[i].superTbls[j].columnCount, lenOfOneRow); // save for creating child table - superTbls->colsOfCreatChildTable = (char*)calloc(len+20, 1); - if (NULL == superTbls->colsOfCreatChildTable) { + superTbls->colsOfCreateChildTable = (char*)calloc(len+20, 1); + if (NULL == superTbls->colsOfCreateChildTable) { printf("Failed when calloc, size:%d", len+1); taos_close(taos); exit(-1); } - snprintf(superTbls->colsOfCreatChildTable, len+20, "(ts timestamp%s)", cols); + snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -1956,12 +2032,17 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, len += snprintf(tags + len, STRING_LEN - len, ")"); superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow; - - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); + + snprintf(command, BUFFER_SIZE, + "create table if not exists %s.%s (ts timestamp%s) tags %s", + dbName, superTbls->sTblName, cols, tags); + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); + if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { - return -1; + fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); + return -1; } - printf("\ncreate supertable %s success!\n\n", superTbls->sTblName); + debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); } return 0; } @@ -1973,75 +2054,94 @@ static int createDatabases() { taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); if (taos == NULL) { fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); - exit(-1); + return -1; } char command[BUFFER_SIZE] = "\0"; - for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); return -1; } } - + int dataLen = 0; - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "create database if not exists %s ", g_Dbs.db[i].dbName); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "create database if not exists %s ", g_Dbs.db[i].dbName); if (g_Dbs.db[i].dbCfg.blocks > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "blocks %d ", g_Dbs.db[i].dbCfg.blocks); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "blocks %d ", g_Dbs.db[i].dbCfg.blocks); } if (g_Dbs.db[i].dbCfg.cache > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "cache %d ", g_Dbs.db[i].dbCfg.cache); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "cache %d ", g_Dbs.db[i].dbCfg.cache); } if (g_Dbs.db[i].dbCfg.days > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "days %d ", g_Dbs.db[i].dbCfg.days); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "days %d ", g_Dbs.db[i].dbCfg.days); } if (g_Dbs.db[i].dbCfg.keep > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "keep %d ", g_Dbs.db[i].dbCfg.keep); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "keep %d ", g_Dbs.db[i].dbCfg.keep); } if (g_Dbs.db[i].dbCfg.replica > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "replica %d ", g_Dbs.db[i].dbCfg.replica); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "replica %d ", g_Dbs.db[i].dbCfg.replica); } if (g_Dbs.db[i].dbCfg.update > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "update %d ", g_Dbs.db[i].dbCfg.update); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "update %d ", g_Dbs.db[i].dbCfg.update); } //if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) { - // dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode); + // dataLen += snprintf(command + dataLen, + // BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode); //} if (g_Dbs.db[i].dbCfg.minRows > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "minrows %d ", g_Dbs.db[i].dbCfg.minRows); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "minrows %d ", g_Dbs.db[i].dbCfg.minRows); } if (g_Dbs.db[i].dbCfg.maxRows > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "maxrows %d ", g_Dbs.db[i].dbCfg.maxRows); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "maxrows %d ", g_Dbs.db[i].dbCfg.maxRows); } if (g_Dbs.db[i].dbCfg.comp > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "comp %d ", g_Dbs.db[i].dbCfg.comp); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "comp %d ", g_Dbs.db[i].dbCfg.comp); } if (g_Dbs.db[i].dbCfg.walLevel > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "wal %d ", g_Dbs.db[i].dbCfg.walLevel); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "wal %d ", g_Dbs.db[i].dbCfg.walLevel); } if (g_Dbs.db[i].dbCfg.cacheLast > 0) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "cachelast %d ", g_Dbs.db[i].dbCfg.cacheLast); + dataLen += snprintf(command + dataLen, + BUFFER_SIZE - dataLen, "cachelast %d ", g_Dbs.db[i].dbCfg.cacheLast); } if (g_Dbs.db[i].dbCfg.fsync > 0) { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "fsync %d ", g_Dbs.db[i].dbCfg.fsync); } - if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { - dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); + if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) + || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { + dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, + "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - + + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); + printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); return -1; } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); + debugPrint("DEBUG %s() %d count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); + debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); @@ -2051,6 +2151,7 @@ static int createDatabases() { } if (0 != ret) { + printf("\ncreate super table %d failed!\n\n", j); taos_close(taos); return -1; } @@ -2061,50 +2162,74 @@ static int createDatabases() { return 0; } - -void * createTable(void *sarg) +static void* createTable(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; int64_t lastPrintTime = taosGetTimestampMs(); - char* buffer = calloc(superTblInfo->maxSqlLen, 1); + int buff_len; + if (superTblInfo) + buff_len = superTblInfo->maxSqlLen; + else + buff_len = BUFFER_SIZE; + + char *buffer = calloc(buff_len, 1); + if (buffer == NULL) { + fprintf(stderr, "Memory allocated failed!"); + exit(-1); + } int len = 0; int batchNum = 0; //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { if (0 == g_Dbs.use_metric) { - snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); + snprintf(buffer, buff_len, + "create table if not exists %s.%s%d %s;", + winfo->db_name, + g_args.tb_prefix, i, + winfo->cols); } else { if (0 == len) { batchNum = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table "); + memset(buffer, 0, buff_len); + len += snprintf(buffer + len, + buff_len - len, "create table "); } - + char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); } else { - tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount); + tagsValBuf = getTagValueFromTagSample( + superTblInfo, + i % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { free(buffer); return NULL; } - len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + len += snprintf(buffer + len, + superTblInfo->maxSqlLen - len, + "if not exists %s.%s%d using %s.%s tags %s ", + winfo->db_name, superTblInfo->childTblPrefix, + i, winfo->db_name, + superTblInfo->sTblName, tagsValBuf); free(tagsValBuf); batchNum++; - if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) { + if ((batchNum < superTblInfo->batchCreateTableNum) + && ((superTblInfo->maxSqlLen - len) + >= (superTblInfo->lenOfTagOfOneRow + 256))) { continue; } } len = 0; + debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2112,20 +2237,24 @@ void * createTable(void *sarg) int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] already create %d - %d tables\n", winfo->threadID, winfo->start_table_id, i); + printf("thread[%d] already create %d - %d tables\n", + winfo->threadID, winfo->start_table_id, i); lastPrintTime = currentPrintTime; } } if (0 != len) { + debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } - + free(buffer); return NULL; } -void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char* db_name, SSuperTable* superTblInfo) { +int startMultiThreadCreateChildTable( + char* cols, int threads, int ntables, + char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); threadInfo *infos = malloc(threads * sizeof(threadInfo)); @@ -2146,14 +2275,23 @@ void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char int b = 0; b = ntables % threads; - + int last = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); t_info->superTblInfo = superTblInfo; - t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + t_info->taos = taos_connect( + g_Dbs.host, + g_Dbs.user, + g_Dbs.password, + db_name, + g_Dbs.port); + if (t_info->taos == NULL) { + fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + return -1; + } t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; @@ -2174,18 +2312,60 @@ void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char free(pids); free(infos); + + return 0; } static void createChildTables() { + char tblColsBuf[MAX_SQL_SIZE]; + int len; + for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { - continue; + if (g_Dbs.db[i].superTblCount > 0) { + // with super table + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) + || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { + continue; + } + + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + startMultiThreadCreateChildTable( + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, + g_Dbs.threadCountByCreateTbl, + g_Dbs.db[i].superTbls[j].childTblCount, + g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; } - startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCountByCreateTbl, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; - } + } else { + // normal table + len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); + for (int i = 0; i < MAX_COLUMN_COUNT; i++) { + if (g_args.datatype[i]) { + if ((strncasecmp(g_args.datatype[i], "BINARY", strlen("BINARY")) == 0) + || (strncasecmp(g_args.datatype[i], "NCHAR", strlen("NCHAR")) == 0)) { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", i, g_args.datatype[i]); + } else { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", i, g_args.datatype[i]); + } + len = strlen(tblColsBuf); + } else { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ")"); + break; + } + } + + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + tblColsBuf); + startMultiThreadCreateChildTable( + tblColsBuf, + g_Dbs.threadCountByCreateTbl, + g_args.num_of_DPT, + g_Dbs.db[i].dbName, + NULL); + } } } @@ -2220,10 +2400,11 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; - + FILE *fp = fopen(superTblInfo->tagsFile, "r"); if (fp == NULL) { - printf("Failed to open tags file: %s, reason:%s\n", superTblInfo->tagsFile, strerror(errno)); + printf("Failed to open tags file: %s, reason:%s\n", + superTblInfo->tagsFile, strerror(errno)); return -1; } @@ -2231,7 +2412,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { free(superTblInfo->tagDataBuf); superTblInfo->tagDataBuf = NULL; } - + int tagCount = 10000; int count = 0; char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); @@ -2254,11 +2435,13 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { count++; if (count >= tagCount - 1) { - char *tmp = realloc(tagDataBuf, (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow); + char *tmp = realloc(tagDataBuf, + (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow); if (tmp != NULL) { tagDataBuf = tmp; tagCount = (int)(tagCount*1.5); - memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow)); + memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, + 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow)); } else { // exit, if allocate more memory failed printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile); @@ -2298,7 +2481,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample readLen = tgetline(&line, &n, fp); if (-1 == readLen) { if(0 != fseek(fp, 0, SEEK_SET)) { - printf("Failed to fseek file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); + printf("Failed to fseek file: %s, reason:%s\n", + superTblInfo->sampleFile, strerror(errno)); return -1; } continue; @@ -2313,7 +2497,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample } if (readLen > superTblInfo->lenOfOneRow) { - printf("sample row len[%d] overflow define schema len[%d], so discard this row\n", (int32_t)readLen, superTblInfo->lenOfOneRow); + printf("sample row len[%d] overflow define schema len[%d], so discard this row\n", + (int32_t)readLen, superTblInfo->lenOfOneRow); continue; } @@ -2359,14 +2544,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s int columnSize = cJSON_GetArraySize(columns); if (columnSize > MAX_COLUMN_COUNT) { - printf("failed to read json, column size overflow, max column size is %d\n", MAX_COLUMN_COUNT); + printf("failed to read json, column size overflow, max column size is %d\n", + MAX_COLUMN_COUNT); goto PARSE_OVER; } int count = 1; int index = 0; StrColumn columnCase; - + //superTbls->columnCount = columnSize; for (int k = 0; k < columnSize; ++k) { cJSON* column = cJSON_GetArrayItem(columns, k); @@ -2544,8 +2730,30 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval"); + if (insertInterval && insertInterval->type == cJSON_Number) { + g_args.insert_interval = insertInterval->valueint; + } else if (!insertInterval) { + g_args.insert_interval = 0; + } else { + printf("failed to read json, insert_interval not found"); + goto PARSE_OVER; + } + + cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); + if (numRecPerReq && numRecPerReq->type == cJSON_Number) { + g_args.num_of_RPR = numRecPerReq->valueint; + } else if (!numRecPerReq) { + g_args.num_of_RPR = 100; + } else { + printf("failed to read json, num_of_records_per_req not found"); + goto PARSE_OVER; + } + cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, - if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) { + if (answerPrompt + && answerPrompt->type == cJSON_String + && answerPrompt->valuestring != NULL) { if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { g_args.answer_yes = false; } else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) { @@ -2790,7 +2998,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE); cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null - if (autoCreateTbl && autoCreateTbl->type == cJSON_String && autoCreateTbl->valuestring != NULL) { + if (autoCreateTbl + && autoCreateTbl->type == cJSON_String + && autoCreateTbl->valuestring != NULL) { if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) { g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { @@ -2816,7 +3026,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no - if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) { + if (childTblExists + && childTblExists->type == cJSON_String + && childTblExists->valuestring != NULL) { if (0 == strncasecmp(childTblExists->valuestring, "yes", 3)) { g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS; } else if (0 == strncasecmp(childTblExists->valuestring, "no", 2)) { @@ -2839,8 +3051,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].childTblCount = count->valueint; cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source"); - if (dataSource && dataSource->type == cJSON_String && dataSource->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, dataSource->valuestring, MAX_DB_NAME_SIZE); + if (dataSource && dataSource->type == cJSON_String + && dataSource->valuestring != NULL) { + tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, + dataSource->valuestring, MAX_DB_NAME_SIZE); } else if (!dataSource) { tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE); } else { @@ -2849,8 +3063,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , restful - if (insertMode && insertMode->type == cJSON_String && insertMode->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, insertMode->valuestring, MAX_DB_NAME_SIZE); + if (insertMode && insertMode->type == cJSON_String + && insertMode->valuestring != NULL) { + tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, + insertMode->valuestring, MAX_DB_NAME_SIZE); } else if (!insertMode) { tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); } else { @@ -2893,7 +3109,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format"); if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, sampleFormat->valuestring, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, + sampleFormat->valuestring, MAX_DB_NAME_SIZE); } else if (!sampleFormat) { tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", MAX_DB_NAME_SIZE); } else { @@ -2903,7 +3120,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, sampleFile->valuestring, MAX_FILE_NAME_LEN); + tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, + sampleFile->valuestring, MAX_FILE_NAME_LEN); } else if (!sampleFile) { memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN); } else { @@ -2913,7 +3131,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, tagsFile->valuestring, MAX_FILE_NAME_LEN); + tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, + tagsFile->valuestring, MAX_FILE_NAME_LEN); if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) { g_Dbs.db[i].superTbls[j].tagSource = 0; } else { @@ -2943,8 +3162,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *multiThreadWriteOneTbl = cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes - if (multiThreadWriteOneTbl && multiThreadWriteOneTbl->type == cJSON_String && multiThreadWriteOneTbl->valuestring != NULL) { + cJSON *multiThreadWriteOneTbl = + cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes + if (multiThreadWriteOneTbl + && multiThreadWriteOneTbl->type == cJSON_String + && multiThreadWriteOneTbl->valuestring != NULL) { if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) { g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1; } else { @@ -2997,16 +3219,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* insertRate = cJSON_GetObjectItem(stbInfo, "insert_rate"); - if (insertRate && insertRate->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].insertRate = insertRate->valueint; - } else if (!insertRate) { - g_Dbs.db[i].superTbls[j].insertRate = 0; - } else { - printf("failed to read json, insert_rate not found"); - goto PARSE_OVER; - } - + cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; @@ -3020,7 +3233,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { + if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable + || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; } @@ -3080,7 +3294,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, - if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) { + if (answerPrompt && answerPrompt->type == cJSON_String + && answerPrompt->valuestring != NULL) { if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { g_args.answer_yes = false; } else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) { @@ -3174,7 +3389,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); - if (keepProgress && keepProgress->type == cJSON_String && keepProgress->valuestring != NULL) { + if (keepProgress + && keepProgress->type == cJSON_String + && keepProgress->valuestring != NULL) { if (0 == strcmp("yes", keepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; } else if (0 == strcmp("no", keepProgress->valuestring)) { @@ -3365,6 +3582,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } static bool getInfoFromJsonFile(char* file) { + debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); + FILE *fp = fopen(file, "r"); if (!fp) { printf("failed to read %s, reason:%s\n", file, strerror(errno)); @@ -3392,32 +3611,32 @@ static bool getInfoFromJsonFile(char* file) { cJSON* filetype = cJSON_GetObjectItem(root, "filetype"); if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) { if (0 == strcasecmp("insert", filetype->valuestring)) { - g_jsonType = INSERT_MODE; + g_args.test_mode = INSERT_MODE; } else if (0 == strcasecmp("query", filetype->valuestring)) { - g_jsonType = QUERY_MODE; + g_args.test_mode = QUERY_MODE; } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { - g_jsonType = SUBSCRIBE_MODE; + g_args.test_mode = SUBSCRIBE_MODE; } else { printf("failed to read json, filetype not support\n"); goto PARSE_OVER; } } else if (!filetype) { - g_jsonType = INSERT_MODE; + g_args.test_mode = INSERT_MODE; } else { printf("failed to read json, filetype not found\n"); goto PARSE_OVER; } - if (INSERT_MODE == g_jsonType) { + if (INSERT_MODE == g_args.test_mode) { ret = getMetaFromInsertJsonFile(root); - } else if (QUERY_MODE == g_jsonType) { + } else if (QUERY_MODE == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); - } else if (SUBSCRIBE_MODE == g_jsonType) { + } else if (SUBSCRIBE_MODE == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); } else { printf("input json file type error! please input correct file type: insert or query or subscribe\n"); goto PARSE_OVER; - } + } PARSE_OVER: free(content); @@ -3426,14 +3645,13 @@ static bool getInfoFromJsonFile(char* file) { return ret; } - void prePareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { // readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]); //} - + if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { (void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]); } @@ -3445,9 +3663,9 @@ void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if (0 != g_Dbs.db[i].superTbls[j].colsOfCreatChildTable) { - free(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable); - g_Dbs.db[i].superTbls[j].colsOfCreatChildTable = NULL; + if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { + free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; } if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); @@ -3525,11 +3743,12 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* } dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); - + return dataLen; } -void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDataBuf) { +static void syncWriteForNumberOfTblInOneSql( + threadInfo *winfo, FILE *fp, char* sampleDataBuf) { SSuperTable* superTblInfo = winfo->superTblInfo; int samplePos = 0; @@ -3551,30 +3770,18 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa numberOfTblInOneSql = tbls; } - int64_t time_counter = winfo->start_time; + uint64_t time_counter = winfo->start_time; int64_t tmp_time; int sampleUsePos; int64_t st = 0; int64_t et = 0; for (int i = 0; i < superTblInfo->insertRows;) { - if (superTblInfo->insertRate && (et - st) < 1000) { - taosMsleep(1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); - } - - if (superTblInfo->insertRate) { - st = taosGetTimestampMs(); - } - int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; - int k = 0; - int batchRowsSql = 0; - while (1) - { + for (int k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -3583,6 +3790,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa if (end_tbl_id > winfo->end_table_id) { end_tbl_id = winfo->end_table_id+1; } + for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) { sampleUsePos = samplePos; if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { @@ -3590,47 +3798,97 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); } else { - tagsValBuf = getTagValueFromTagSample(superTblInfo, tbl_id % superTblInfo->tagSampleCount); + tagsValBuf = getTagValueFromTagSample( + superTblInfo, tbl_id % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { goto free_and_statistics; } if (0 == len) { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s%d using %s.%s tags %s values ", + winfo->db_name, + superTblInfo->childTblPrefix, + tbl_id, + winfo->db_name, + superTblInfo->sTblName, + tagsValBuf); } else { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + " %s.%s%d using %s.%s tags %s values ", + winfo->db_name, + superTblInfo->childTblPrefix, + tbl_id, + winfo->db_name, + superTblInfo->sTblName, + tagsValBuf); } tmfree(tagsValBuf); } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { if (0 == len) { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s values ", + winfo->db_name, + superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } else { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + " %s.%s values ", + winfo->db_name, + superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } } else { // pre-create child table if (0 == len) { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s%d values ", + winfo->db_name, + superTblInfo->childTblPrefix, + tbl_id); } else { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, " %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + " %s.%s%d values ", + winfo->db_name, + superTblInfo->childTblPrefix, + tbl_id); } } - + tmp_time = time_counter; for (k = 0; k < superTblInfo->rowsPerTbl;) { int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { - retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo, &sampleUsePos, fp, sampleDataBuf); + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample"))) { + retLen = getRowDataFromSample(pstr + len, + superTblInfo->maxSqlLen - len, + tmp_time += superTblInfo->timeStampStep, + superTblInfo, + &sampleUsePos, + fp, + sampleDataBuf); if (retLen < 0) { goto free_and_statistics; } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", 8)) { + } else if (0 == strncasecmp( + superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { + if (0 != superTblInfo->disorderRatio + && rand_num < superTblInfo->disorderRatio) { int64_t d = tmp_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo); + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, + d, + superTblInfo); } else { - retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo); + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, + tmp_time += superTblInfo->timeStampStep, + superTblInfo); } if (retLen < 0) { goto free_and_statistics; @@ -3640,30 +3898,44 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa //inserted++; k++; totalRowsInserted++; - batchRowsSql++; - - if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128) || batchRowsSql >= INT16_MAX - 1) { + + if (inserted >= superTblInfo->insertRows || + (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; - printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", superTblInfo->lenOfOneRow); + printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", + superTblInfo->lenOfOneRow); goto send_to_server; } } - } tID = tbl_id; inserted += superTblInfo->rowsPerTbl; - send_to_server: - batchRowsSql = 0; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { +send_to_server: + if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + + if (0 == strncasecmp(superTblInfo->insertMode, + "taosc", + strlen("taosc"))) { //printf("multi table===== sql: %s \n\n", buffer); //int64_t t1 = taosGetTimestampMs(); int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); - int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); + int affectedRows = queryDbExec( + winfo->taos, buffer, INSERT_TYPE); + if (0 > affectedRows) { goto free_and_statistics; } else { @@ -3679,29 +3951,34 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, totalRowsInserted, totalAffectedRows); + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + winfo->threadID, + totalRowsInserted, + totalAffectedRows); lastPrintTime = currentPrintTime; } //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); + //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); } else { //int64_t t1 = taosGetTimestampMs(); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); //int64_t t2 = taosGetTimestampMs(); //printf("http insert sql return, Spent %ld ms \n", t2 - t1); - + if (0 != retCode) { printf("========restful return fail, threadID[%d]\n", winfo->threadID); goto free_and_statistics; } } - - //printf("========tID:%d, k:%d, loop_cnt:%d\n", tID, k, loop_cnt); + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } + break; } if (tID > winfo->end_table_id) { - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } i = inserted; @@ -3709,13 +3986,10 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa } } - if (superTblInfo->insertRate) { - et = taosGetTimestampMs(); - } //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); } - free_and_statistics: +free_and_statistics: tmfree(buffer); winfo->totalRowsInserted = totalRowsInserted; winfo->totalAffectedRows = totalAffectedRows; @@ -3723,6 +3997,64 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa return; } +int32_t generateData(char *res, char **data_type, + int num_of_cols, int64_t timestamp, int len_of_binary) { + memset(res, 0, MAX_DATA_SIZE); + char *pstr = res; + pstr += sprintf(pstr, "(%" PRId64, timestamp); + int c = 0; + + for (; c < MAX_NUM_DATATYPE; c++) { + if (data_type[c] == NULL) { + break; + } + } + + if (0 == c) { + perror("data type error!"); + exit(-1); + } + + for (int i = 0; i < num_of_cols; i++) { + if (strcasecmp(data_type[i % c], "tinyint") == 0) { + pstr += sprintf(pstr, ", %d", rand_tinyint() ); + } else if (strcasecmp(data_type[i % c], "smallint") == 0) { + pstr += sprintf(pstr, ", %d", rand_smallint()); + } else if (strcasecmp(data_type[i % c], "int") == 0) { + pstr += sprintf(pstr, ", %d", rand_int()); + } else if (strcasecmp(data_type[i % c], "bigint") == 0) { + pstr += sprintf(pstr, ", %" PRId64, rand_bigint()); + } else if (strcasecmp(data_type[i % c], "float") == 0) { + pstr += sprintf(pstr, ", %10.4f", rand_float()); + } else if (strcasecmp(data_type[i % c], "double") == 0) { + double t = rand_double(); + pstr += sprintf(pstr, ", %20.8f", t); + } else if (strcasecmp(data_type[i % c], "bool") == 0) { + bool b = rand() & 1; + pstr += sprintf(pstr, ", %s", b ? "true" : "false"); + } else if (strcasecmp(data_type[i % c], "binary") == 0) { + char *s = malloc(len_of_binary); + rand_string(s, len_of_binary); + pstr += sprintf(pstr, ", \"%s\"", s); + free(s); + }else if (strcasecmp(data_type[i % c], "nchar") == 0) { + char *s = malloc(len_of_binary); + rand_string(s, len_of_binary); + pstr += sprintf(pstr, ", \"%s\"", s); + free(s); + } + + if (pstr - res > MAX_DATA_SIZE) { + perror("column length too long, abort"); + exit(-1); + } + } + + pstr += sprintf(pstr, ")"); + + return (int32_t)(pstr - res); +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -3731,11 +4063,118 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s */ -void *syncWrite(void *sarg) { - int64_t totalRowsInserted = 0; - int64_t totalAffectedRows = 0; - int64_t lastPrintTime = taosGetTimestampMs(); - +static void* syncWrite(void *sarg) { + + threadInfo *winfo = (threadInfo *)sarg; + + char buffer[BUFFER_SIZE] = "\0"; + char data[MAX_DATA_SIZE]; + char **data_type = g_args.datatype; + int len_of_binary = g_args.len_of_binary; + + int ncols_per_record = 1; // count first col ts + for (int i = 0; i < MAX_COLUMN_COUNT; i ++) { + if (NULL == g_args.datatype[i]) + break; + else + ncols_per_record ++; + } + + srand((uint32_t)time(NULL)); + int64_t time_counter = winfo->start_time; + + uint64_t st = 0; + uint64_t et = 0; + + for (int i = 0; i < g_args.num_of_DPT;) { + + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + int inserted = i; + int64_t tmp_time = time_counter; + + char *pstr = buffer; + pstr += sprintf(pstr, + "insert into %s.%s%d values", + winfo->db_name, g_args.tb_prefix, tID); + int k; + for (k = 0; k < g_args.num_of_RPR;) { + int rand_num = rand() % 100; + int len = -1; + + if ((g_args.disorderRatio != 0) + && (rand_num < g_args.disorderRange)) { + + int64_t d = tmp_time - rand() % 1000000 + rand_num; + len = generateData(data, data_type, + ncols_per_record, d, len_of_binary); + } else { + len = generateData(data, data_type, + ncols_per_record, tmp_time += 1000, len_of_binary); + } + + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= BUFFER_SIZE) { // too long + break; + } + + pstr += sprintf(pstr, " %s", data); + inserted++; + k++; + + if (inserted >= g_args.num_of_DPT) + break; + } + + /* puts(buffer); */ + int64_t startTs; + int64_t endTs; + startTs = taosGetTimestampUs(); + //queryDB(winfo->taos, buffer); + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); + int affectedRows = queryDbExec(winfo->taos, buffer, 1); + + if (0 <= affectedRows){ + endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + if (delay > winfo->maxDelay) + winfo->maxDelay = delay; + if (delay < winfo->minDelay) + winfo->minDelay = delay; + winfo->cntDelay++; + winfo->totalDelay += delay; + //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + } + + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } + + if (tID == winfo->end_table_id) { + i = inserted; + time_counter = tmp_time; + } + } + + } + return NULL; +} + + +static void* syncWriteWithStb(void *sarg) { + uint64_t totalRowsInserted = 0; + uint64_t totalAffectedRows = 0; + uint64_t lastPrintTime = taosGetTimestampMs(); + threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; @@ -3744,20 +4183,27 @@ void *syncWrite(void *sarg) { int samplePos = 0; // each thread read sample data from csv file - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { - sampleDataBuf = calloc(superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", + strlen("sample"))) { + sampleDataBuf = calloc( + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); if (sampleDataBuf == NULL) { - printf("Failed to calloc %d Bytes, reason:%s\n", superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); + printf("Failed to calloc %d Bytes, reason:%s\n", + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, + strerror(errno)); return NULL; } - + fp = fopen(superTblInfo->sampleFile, "r"); if (fp == NULL) { - printf("Failed to open sample file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); + printf("Failed to open sample file: %s, reason:%s\n", + superTblInfo->sampleFile, strerror(errno)); tmfree(sampleDataBuf); return NULL; } - int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleDataBuf); + int ret = readSampleFromCsvFileToMem(fp, + superTblInfo, sampleDataBuf); if (0 != ret) { tmfree(sampleDataBuf); tmfclose(fp); @@ -3772,68 +4218,31 @@ void *syncWrite(void *sarg) { return NULL; } - //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); - char* buffer = calloc(superTblInfo->maxSqlLen, 1); - - int nrecords_per_request = 0; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - nrecords_per_request = (superTblInfo->maxSqlLen - 1280 - superTblInfo->lenOfTagOfOneRow) / superTblInfo->lenOfOneRow; - } else { - nrecords_per_request = (superTblInfo->maxSqlLen - 1280) / superTblInfo->lenOfOneRow; - } - - int nrecords_no_last_req = nrecords_per_request; - int nrecords_last_req = 0; - int loop_cnt = 0; - if (0 != superTblInfo->insertRate) { - if (nrecords_no_last_req >= superTblInfo->insertRate) { - nrecords_no_last_req = superTblInfo->insertRate; - } else { - nrecords_last_req = superTblInfo->insertRate % nrecords_per_request; - loop_cnt = (superTblInfo->insertRate / nrecords_per_request) + (superTblInfo->insertRate % nrecords_per_request ? 1 : 0) ; - } - } - - if (nrecords_no_last_req <= 0) { - nrecords_no_last_req = 1; - } - - if (nrecords_no_last_req >= INT16_MAX) { - nrecords_no_last_req = INT16_MAX - 1; - } - - if (nrecords_last_req >= INT16_MAX) { - nrecords_last_req = INT16_MAX - 1; + if (NULL == buffer) { + printf("Failed to calloc %d Bytes, reason:%s\n", + superTblInfo->maxSqlLen, + strerror(errno)); + tmfree(sampleDataBuf); + tmfclose(fp); + return NULL; } - int nrecords_cur_req = nrecords_no_last_req; - int loop_cnt_orig = loop_cnt; - - //printf("========nrecords_per_request:%d, nrecords_no_last_req:%d, nrecords_last_req:%d, loop_cnt:%d\n", nrecords_per_request, nrecords_no_last_req, nrecords_last_req, loop_cnt); - int64_t time_counter = winfo->start_time; + uint64_t st = 0; + uint64_t et = 0; - int64_t st = 0; - int64_t et = 0; + debugPrint("DEBUG - %s() LN%d insertRows=%ld\n", __func__, __LINE__, superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { - if (superTblInfo->insertRate && (et - st) < 1000) { - taosMsleep(1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); - } - if (superTblInfo->insertRate) { - st = taosGetTimestampMs(); - } - - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int inserted = i; - int64_t tmp_time = time_counter; + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + uint64_t inserted = i; + uint64_t tmp_time = time_counter; int sampleUsePos = samplePos; int k = 0; - while (1) - { + debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + for (k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -3843,56 +4252,107 @@ void *syncWrite(void *sarg) { if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); } else { - tagsValBuf = getTagValueFromTagSample(superTblInfo, tID % superTblInfo->tagSampleCount); + tagsValBuf = getTagValueFromTagSample( + superTblInfo, + tID % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { goto free_and_statistics_2; } - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d using %s.%s tags %s values", winfo->db_name, superTblInfo->childTblPrefix, tID, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s%d using %s.%s tags %s values", + winfo->db_name, + superTblInfo->childTblPrefix, + tID, + winfo->db_name, + superTblInfo->sTblName, + tagsValBuf); tmfree(tagsValBuf); } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values", winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s values", + winfo->db_name, + superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); } else { - len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values", winfo->db_name, superTblInfo->childTblPrefix, tID); + len += snprintf(pstr + len, + superTblInfo->maxSqlLen - len, + "insert into %s.%s%d values", + winfo->db_name, + superTblInfo->childTblPrefix, + tID); } - - for (k = 0; k < nrecords_cur_req;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { - retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo, &sampleUsePos, fp, sampleDataBuf); + + int retLen = 0; + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + retLen = getRowDataFromSample( + pstr + len, + superTblInfo->maxSqlLen - len, + tmp_time += superTblInfo->timeStampStep, + superTblInfo, + &sampleUsePos, + fp, + sampleDataBuf); if (retLen < 0) { goto free_and_statistics_2; } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", 8)) { + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { + if (0 != superTblInfo->disorderRatio + && rand_num < superTblInfo->disorderRatio) { int64_t d = tmp_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo); + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, d, + superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); - } else { - retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo); + } else { + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, + tmp_time += superTblInfo->timeStampStep, + superTblInfo); } if (retLen < 0) { goto free_and_statistics_2; } - } - len += retLen; - inserted++; - k++; - totalRowsInserted++; - - if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) break; } +/* len += retLen; +*/ + inserted++; + k++; + totalRowsInserted++; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + debugPrint("DEBUG %s() LN%d inserted=%ld k=%d totalRowsInserted=%ld superTblInfo->insertRows=%ld\n", __func__, __LINE__, inserted, k, totalRowsInserted, superTblInfo->insertRows); + if (inserted > superTblInfo->insertRows) + break; +/* if (inserted >= superTblInfo->insertRows + || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) + break; +*/ + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + debugPrint("DEBUG sleep: %d ms\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); //int64_t t1 = taosGetTimestampMs(); int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); + debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + if (0 > affectedRows){ goto free_and_statistics_2; } else { @@ -3908,10 +4368,13 @@ void *syncWrite(void *sarg) { int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, totalRowsInserted, totalAffectedRows); + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + winfo->threadID, + totalRowsInserted, + totalAffectedRows); lastPrintTime = currentPrintTime; } - //int64_t t2 = taosGetTimestampMs(); + //int64_t t2 = taosGetTimestampMs(); //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); } else { //int64_t t1 = taosGetTimestampMs(); @@ -3924,9 +4387,10 @@ void *syncWrite(void *sarg) { goto free_and_statistics_2; } } - - //printf("========tID:%d, k:%d, loop_cnt:%d\n", tID, k, loop_cnt); - + if (g_args.insert_interval) { + et = taosGetTimestampMs(); + } +/* if (loop_cnt) { loop_cnt--; if ((1 == loop_cnt) && (0 != nrecords_last_req)) { @@ -3939,24 +4403,24 @@ void *syncWrite(void *sarg) { } else { break; } + */ } if (tID == winfo->end_table_id) { - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { + if (0 == strncasecmp( + superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } + i = inserted; time_counter = tmp_time; } - } - - if (superTblInfo->insertRate) { - et = taosGetTimestampMs(); } + //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); } - free_and_statistics_2: +free_and_statistics_2: tmfree(buffer); tmfree(sampleDataBuf); tmfclose(fp); @@ -3964,14 +4428,17 @@ void *syncWrite(void *sarg) { winfo->totalRowsInserted = totalRowsInserted; winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows); + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + winfo->threadID, + totalRowsInserted, + totalAffectedRows); return NULL; } void callBack(void *param, TAOS_RES *res, int code) { threadInfo* winfo = (threadInfo*)param; - if (winfo->superTblInfo->insertRate) { + if (g_args.insert_interval) { winfo->et = taosGetTimestampMs(); if (winfo->et - winfo->st < 1000) { taosMsleep(1000 - (winfo->et - winfo->st)); // ms @@ -3994,7 +4461,7 @@ void callBack(void *param, TAOS_RES *res, int code) { return; } - for (int i = 0; i < winfo->nrecords_per_request; i++) { + for (int i = 0; i < g_args.num_of_RPR; i++) { int rand_num = rand() % 100; if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio) { @@ -4013,7 +4480,7 @@ void callBack(void *param, TAOS_RES *res, int code) { } } - if (winfo->superTblInfo->insertRate) { + if (g_args.insert_interval) { winfo->st = taosGetTimestampMs(); } taos_query_a(winfo->taos, buffer, callBack, winfo); @@ -4026,36 +4493,11 @@ void callBack(void *param, TAOS_RES *res, int code) { void *asyncWrite(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; - winfo->nrecords_per_request = 0; - //if (AUTO_CREATE_SUBTBL == winfo->superTblInfo->autoCreateTable) { - winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280 - winfo->superTblInfo->lenOfTagOfOneRow) / winfo->superTblInfo->lenOfOneRow; - //} else { - // winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280) / winfo->superTblInfo->lenOfOneRow; - //} - - if (0 != winfo->superTblInfo->insertRate) { - if (winfo->nrecords_per_request >= winfo->superTblInfo->insertRate) { - winfo->nrecords_per_request = winfo->superTblInfo->insertRate; - } - } - - if (winfo->nrecords_per_request <= 0) { - winfo->nrecords_per_request = 1; - } - - if (winfo->nrecords_per_request >= INT16_MAX) { - winfo->nrecords_per_request = INT16_MAX - 1; - } - - if (winfo->nrecords_per_request >= INT16_MAX) { - winfo->nrecords_per_request = INT16_MAX - 1; - } - winfo->st = 0; winfo->et = 0; winfo->lastTs = winfo->start_time; - if (winfo->superTblInfo->insertRate) { + if (g_args.insert_interval) { winfo->st = taosGetTimestampMs(); } taos_query_a(winfo->taos, "show databases", callBack, winfo); @@ -4065,23 +4507,30 @@ void *asyncWrite(void *sarg) { return NULL; } -void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - int ntables = superTblInfo->childTblCount; +void startMultiThreadInsertData(int threads, char* db_name, char* precision, + SSuperTable* superTblInfo) { - int a = ntables / threads; - if (a < 1) { - threads = ntables; - a = 1; - } + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + threadInfo *infos = malloc(threads * sizeof(threadInfo)); + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); - int b = 0; - if (threads != 0) { - b = ntables % threads; - } + int ntables = 0; + if (superTblInfo) + ntables = superTblInfo->childTblCount; + else + ntables = g_args.num_of_tables; + + int a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } + + int b = 0; + if (threads != 0) { + b = ntables % threads; + } //TAOS* taos; //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { @@ -4104,11 +4553,19 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu } } - int64_t start_time; - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { - start_time = taosGetTimestamp(timePrec); - } else { - (void)taosParseTime(superTblInfo->startTimestamp, &start_time, strlen(superTblInfo->startTimestamp), timePrec, 0); + int64_t start_time; + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + start_time = taosGetTimestamp(timePrec); + } else { + taosParseTime( + superTblInfo->startTimestamp, + &start_time, + strlen(superTblInfo->startTimestamp), + timePrec, 0); + } + } else { + start_time = 1500000000000; } double start = getCurrentTime(); @@ -4123,18 +4580,23 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu t_info->start_time = start_time; t_info->minDelay = INT16_MAX; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + if ((NULL == superTblInfo) || + (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { //t_info->taos = taos; - t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + t_info->taos = taos_connect( + g_Dbs.host, g_Dbs.user, + g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { - printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); + printf("connect to server fail from insert sub thread, reason: %s\n", + taos_errstr(NULL)); exit(-1); } } else { t_info->taos = NULL; } - if (0 == superTblInfo->multiThreadWriteOneTbl) { + if ((NULL == superTblInfo) + || (0 == superTblInfo->multiThreadWriteOneTbl)) { t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; @@ -4145,9 +4607,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu } tsem_init(&(t_info->lock_sem), 0, 0); - if (SYNC == g_Dbs.queryMode) { - pthread_create(pids + i, NULL, syncWrite, t_info); + if (superTblInfo) { + pthread_create(pids + i, NULL, syncWriteWithStb, t_info); + } else { + pthread_create(pids + i, NULL, syncWrite, t_info); + } } else { pthread_create(pids + i, NULL, asyncWrite, t_info); } @@ -4169,13 +4634,15 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu tsem_destroy(&(t_info->lock_sem)); taos_close(t_info->taos); - superTblInfo->totalAffectedRows += t_info->totalAffectedRows; - superTblInfo->totalRowsInserted += t_info->totalRowsInserted; + if (superTblInfo) { + superTblInfo->totalAffectedRows += t_info->totalAffectedRows; + superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + } } cntDelay -= 1; @@ -4184,23 +4651,29 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu double end = getCurrentTime(); double t = end - start; - printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); - fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); + if (superTblInfo) { + printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + t, superTblInfo->totalRowsInserted, + superTblInfo->totalAffectedRows, + threads, db_name, superTblInfo->sTblName, + superTblInfo->totalRowsInserted / t); + fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + t, superTblInfo->totalRowsInserted, + superTblInfo->totalAffectedRows, + threads, db_name, superTblInfo->sTblName, + superTblInfo->totalRowsInserted / t); + } printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0); fprintf(g_fpOfInsertResult, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n", avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0); - //taos_close(taos); free(pids); free(infos); - } @@ -4209,7 +4682,7 @@ void *readTable(void *sarg) { threadInfo *rinfo = (threadInfo *)sarg; TAOS *taos = rinfo->taos; char command[BUFFER_SIZE] = "\0"; - int64_t sTime = rinfo->start_time; + uint64_t sTime = rinfo->start_time; char *tb_prefix = rinfo->tb_prefix; FILE *fp = fopen(rinfo->fp, "a"); if (NULL == fp) { @@ -4217,7 +4690,13 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + int num_of_DPT; + if (rinfo->superTblInfo) { + num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + } else { + num_of_DPT = g_args.num_of_DPT; + } + int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4343,19 +4822,21 @@ void *readMetric(void *sarg) { int insertTestProcess() { - g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); - if (NULL == g_fpOfInsertResult) { - fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); - return 1; - }; - setupForAnsiEscape(); int ret = printfInsertMeta(); resetAfterAnsiEscape(); + if (ret == -1) exit(EXIT_FAILURE); - printfInsertMetaToFile(g_fpOfInsertResult); + debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); + g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); + if (NULL == g_fpOfInsertResult) { + fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); + return -1; + } { + printfInsertMetaToFile(g_fpOfInsertResult); + } if (!g_args.answer_yes) { printf("Press enter key to continue\n\n"); @@ -4365,11 +4846,14 @@ int insertTestProcess() { init_rand_data(); // create database and super tables - (void)createDatabases(); + if(createDatabases() != 0) { + fclose(g_fpOfInsertResult); + return -1; + } // pretreatement prePareSampleData(); - + double start; double end; @@ -4377,26 +4861,41 @@ int insertTestProcess() { start = getCurrentTime(); createChildTables(); end = getCurrentTime(); + if (g_totalChildTables > 0) { - printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); - fprintf(g_fpOfInsertResult, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); + printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + end - start, g_totalChildTables, g_Dbs.threadCount); + fprintf(g_fpOfInsertResult, + "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + end - start, g_totalChildTables, g_Dbs.threadCount); } - - taosMsleep(1000); + taosMsleep(1000); // create sub threads for inserting data //start = getCurrentTime(); - for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - continue; - } - startMultiThreadInsertData(g_Dbs.threadCount, g_Dbs.db[i].dbName, g_Dbs.db[i].dbCfg.precision, superTblInfo); + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.db[i].superTblCount > 0) { + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + continue; + } + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + superTblInfo); + } + } else { + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + NULL); + } } - } //end = getCurrentTime(); - + //int64_t totalRowsInserted = 0; //int64_t totalAffectedRows = 0; //for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -4405,31 +4904,8 @@ int insertTestProcess() { // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; //} //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount); - if (NULL == g_args.metaFile && false == g_Dbs.insert_only) { - // query data - pthread_t read_id; - threadInfo *rInfo = malloc(sizeof(threadInfo)); - rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 - rInfo->start_table_id = 0; - rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1; - //rInfo->do_aggreFunc = g_Dbs.do_aggreFunc; - //rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows; - rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; - rInfo->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port); - strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix); - strcpy(rInfo->fp, g_Dbs.resultFile); - - if (!g_Dbs.use_metric) { - pthread_create(&read_id, NULL, readTable, rInfo); - } else { - pthread_create(&read_id, NULL, readMetric, rInfo); - } - pthread_join(read_id, NULL); - taos_close(rInfo->taos); - } - postFreeResource(); - + return 0; } @@ -4458,12 +4934,15 @@ void *superQueryProcess(void *sarg) { } selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); int64_t t2 = taosGetTimestampUs(); - printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); + printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", + taosGetSelfPthreadId(), (t2 - t1)/1000000.0); } else { int64_t t1 = taosGetTimestampUs(); - int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); + int retCode = postProceSql(g_queryInfo.host, + g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); int64_t t2 = taosGetTimestampUs(); - printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); + printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", + taosGetSelfPthreadId(), (t2 - t1)/1000000.0); if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", winfo->threadID); @@ -4472,7 +4951,8 @@ void *superQueryProcess(void *sarg) { } } et = taosGetTimestampMs(); - printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); + printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", + taosGetSelfPthreadId(), (double)(et - st)/1000.0); } return NULL; } @@ -4480,7 +4960,9 @@ void *superQueryProcess(void *sarg) { void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; - sprintf(subTblName, "%s.%s", g_queryInfo.dbName, g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); + sprintf(subTblName, "%s.%s", + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); //printf("inSql: %s\n", inSql); @@ -4514,28 +4996,42 @@ void *subQueryProcess(void *sarg) { memset(sqlstr,0,sizeof(sqlstr)); replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.subQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[j], winfo->threadID); + if (g_queryInfo.subQueryInfo.result[i][0] != 0) { + sprintf(tmpFile, "%s-%d", + g_queryInfo.subQueryInfo.result[i], + winfo->threadID); } selectAndGetResult(winfo->taos, sqlstr, tmpFile); } } et = taosGetTimestampMs(); - printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), winfo->start_table_id, winfo->end_table_id, (double)(et - st)/1000.0); + printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", + taosGetSelfPthreadId(), + winfo->start_table_id, + winfo->end_table_id, + (double)(et - st)/1000.0); } return NULL; } -int queryTestProcess() { +static int queryTestProcess() { TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, NULL, g_queryInfo.port); + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, + g_queryInfo.port); if (taos == NULL) { fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - (void)getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); + (void)getAllChildNameOfSuperTable(taos, + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.sTblName, + &g_queryInfo.subQueryInfo.childTblName, + &g_queryInfo.subQueryInfo.childTblCount); } printfQueryMeta(); @@ -4569,6 +5065,7 @@ int queryTestProcess() { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); } else { t_info->taos = NULL; @@ -4655,9 +5152,14 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF TAOS_SUB* tsub = NULL; if (g_queryInfo.superQueryInfo.subscribeMode) { - tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, subscribe_callback, (void*)resultFileName, g_queryInfo.superQueryInfo.subscribeInterval); + tsub = taos_subscribe(taos, + g_queryInfo.superQueryInfo.subscribeRestart, + topic, sql, subscribe_callback, (void*)resultFileName, + g_queryInfo.superQueryInfo.subscribeInterval); } else { - tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, NULL, NULL, 0); + tsub = taos_subscribe(taos, + g_queryInfo.superQueryInfo.subscribeRestart, + topic, sql, NULL, NULL, 0); } if (tsub == NULL) { @@ -4674,6 +5176,7 @@ void *subSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ return NULL; } @@ -4717,7 +5220,9 @@ void *subSubscribeProcess(void *sarg) { if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.subQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID); + sprintf(tmpFile, "%s-%d", + g_queryInfo.subQueryInfo.result[i], + winfo->threadID); } getResult(res, tmpFile); } @@ -4726,7 +5231,8 @@ void *subSubscribeProcess(void *sarg) { taos_free_result(res); for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { - taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], g_queryInfo.subQueryInfo.subscribeKeepProgress); + taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], + g_queryInfo.subQueryInfo.subscribeKeepProgress); } return NULL; } @@ -4736,6 +5242,7 @@ void *superSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); + debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { return NULL; } @@ -4754,9 +5261,13 @@ void *superSubscribeProcess(void *sarg) { sprintf(topic, "taosdemo-subscribe-%d", i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.subQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); + sprintf(tmpFile, "%s-%d", + g_queryInfo.superQueryInfo.result[i], winfo->threadID); } - g_queryInfo.superQueryInfo.tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.superQueryInfo.sql[i], topic, tmpFile); + g_queryInfo.superQueryInfo.tsub[i] = + subscribeImpl(winfo->taos, + g_queryInfo.superQueryInfo.sql[i], + topic, tmpFile); if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { return NULL; } @@ -4777,7 +5288,8 @@ void *superSubscribeProcess(void *sarg) { if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); + sprintf(tmpFile, "%s-%d", + g_queryInfo.superQueryInfo.result[i], winfo->threadID); } getResult(res, tmpFile); } @@ -4786,12 +5298,13 @@ void *superSubscribeProcess(void *sarg) { taos_free_result(res); for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); + taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], + g_queryInfo.superQueryInfo.subscribeKeepProgress); } return NULL; } -int subscribeTestProcess() { +static int subscribeTestProcess() { printfQueryMeta(); if (!g_args.answer_yes) { @@ -4800,21 +5313,30 @@ int subscribeTestProcess() { } TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, g_queryInfo.dbName, g_queryInfo.port); + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + g_queryInfo.dbName, + g_queryInfo.port); if (taos == NULL) { fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - (void)getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); + (void)getAllChildNameOfSuperTable(taos, + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.sTblName, + &g_queryInfo.subQueryInfo.childTblName, + &g_queryInfo.subQueryInfo.childTblCount); } pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from super table - if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) { + if (g_queryInfo.superQueryInfo.sqlCount > 0 + && g_queryInfo.superQueryInfo.concurrent > 0) { pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { @@ -4834,9 +5356,12 @@ int subscribeTestProcess() { //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; - if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); - infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); + if ((g_queryInfo.subQueryInfo.sqlCount > 0) + && (g_queryInfo.subQueryInfo.threadCnt > 0)) { + pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + sizeof(pthread_t)); + infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { printf("malloc failed for create threads\n"); taos_close(taos); @@ -4890,23 +5415,23 @@ int subscribeTestProcess() { void initOfInsertMeta() { memset(&g_Dbs, 0, sizeof(SDbs)); - // set default values - tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE); - g_Dbs.port = 6030; - tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); - tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); - g_Dbs.threadCount = 2; - g_Dbs.use_metric = true; + // set default values + tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE); + g_Dbs.port = 6030; + tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); + g_Dbs.threadCount = 2; + g_Dbs.use_metric = true; } void initOfQueryMeta() { memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); - // set default values - tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE); - g_queryInfo.port = 6030; - tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); - tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); + // set default values + tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE); + g_queryInfo.port = 6030; + tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); + tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); } void setParaFromArg(){ @@ -4928,42 +5453,21 @@ void setParaFromArg(){ g_Dbs.port = g_args.port; } + g_Dbs.threadCount = g_args.num_of_threads; + g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; + g_Dbs.dbCount = 1; g_Dbs.db[0].drop = 1; - + tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE); g_Dbs.db[0].dbCfg.replica = g_args.replica; tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", MAX_DB_NAME_SIZE); - tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN); g_Dbs.use_metric = g_args.use_metric; g_Dbs.insert_only = g_args.insert_only; - g_Dbs.db[0].superTblCount = 1; - tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; - g_Dbs.threadCount = g_args.num_of_threads; - g_Dbs.threadCountByCreateTbl = 1; - g_Dbs.queryMode = g_args.mode; - - g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; - g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS; - g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; - g_Dbs.db[0].superTbls[0].insertRate = 0; - g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange; - g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio; - tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].timeStampStep = 10; - - // g_args.num_of_RPR; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; - g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; - g_Dbs.do_aggreFunc = true; char dataString[STRING_LEN]; @@ -4971,32 +5475,58 @@ void setParaFromArg(){ memset(dataString, 0, STRING_LEN); - if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0 || strcasecmp(data_type[0], "NCHAR") == 0 ) { + if (strcasecmp(data_type[0], "BINARY") == 0 + || strcasecmp(data_type[0], "BOOL") == 0 + || strcasecmp(data_type[0], "NCHAR") == 0 ) { g_Dbs.do_aggreFunc = false; } - g_Dbs.db[0].superTbls[0].columnCount = 0; - for (int i = 0; i < MAX_NUM_DATATYPE; i++) { - if (data_type[i] == NULL) { - break; - } - - tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, data_type[i], MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary; - g_Dbs.db[0].superTbls[0].columnCount++; - } + if (g_args.use_metric) { + g_Dbs.db[0].superTblCount = 1; + tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; + g_Dbs.threadCount = g_args.num_of_threads; + g_Dbs.threadCountByCreateTbl = 1; + g_Dbs.queryMode = g_args.mode; + + g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; + g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS; + g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; + g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange; + g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio; + tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, + g_args.tb_prefix, MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, + "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].timeStampStep = 10; + + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; - if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) { - g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR; - } else { - for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) { - tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0; + g_Dbs.db[0].superTbls[0].columnCount = 0; + for (int i = 0; i < MAX_NUM_DATATYPE; i++) { + if (data_type[i] == NULL) { + break; + } + + tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, + data_type[i], MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].columnCount++; } - } + + if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) { + g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR; + } else { + for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) { + tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0; + g_Dbs.db[0].superTbls[0].columnCount++; + } + } - if (g_Dbs.use_metric) { tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; @@ -5004,8 +5534,10 @@ void setParaFromArg(){ g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].tagCount = 2; } else { + g_Dbs.threadCountByCreateTbl = 1; g_Dbs.db[0].superTbls[0].tagCount = 0; } + } /* Function to do regular expression check */ @@ -5050,7 +5582,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); return; } - + int read_len = 0; char * cmd = calloc(1, MAX_SQL_SIZE); size_t cmd_len = 0; @@ -5058,7 +5590,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) size_t line_len = 0; double t = getCurrentTime(); - + while ((read_len = tgetline(&line, &line_len, fp)) != -1) { if (read_len >= MAX_SQL_SIZE) continue; line[--read_len] = '\0'; @@ -5075,6 +5607,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); + debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; @@ -5089,59 +5622,63 @@ void querySqlFile(TAOS* taos, char* sqlFile) return; } -int main(int argc, char *argv[]) { - parse_args(argc, argv, &g_args); - - if (g_args.metaFile) { - initOfInsertMeta(); - initOfQueryMeta(); - if (false == getInfoFromJsonFile(g_args.metaFile)) { - printf("Failed to read %s\n", g_args.metaFile); - return 1; - } - if (INSERT_MODE == g_jsonType) { +static void testMetaFile() { + if (INSERT_MODE == g_args.test_mode) { if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir); - (void)insertTestProcess(); - } else if (QUERY_MODE == g_jsonType) { - if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); - (void)queryTestProcess(); - } else if (SUBSCRIBE_MODE == g_jsonType) { - if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); - (void)subscribeTestProcess(); + insertTestProcess(); + } else if (QUERY_MODE == g_args.test_mode) { + if (g_queryInfo.cfgDir[0]) + taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); + queryTestProcess(); + } else if (SUBSCRIBE_MODE == g_args.test_mode) { + if (g_queryInfo.cfgDir[0]) + taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); + subscribeTestProcess(); } else { ; } - } else { - - memset(&g_Dbs, 0, sizeof(SDbs)); - g_jsonType = INSERT_MODE; - setParaFromArg(); +} - if (NULL != g_args.sqlFile) { - TAOS* qtaos = taos_connect( - g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port); - querySqlFile(qtaos, g_args.sqlFile); - taos_close(qtaos); - return 0; - } - - (void)insertTestProcess(); - if (g_Dbs.insert_only) return 0; +static void testCmdLine() { + + g_args.test_mode = INSERT_MODE; + insertTestProcess(); + + if (g_Dbs.insert_only) + return; // select if (false == g_Dbs.insert_only) { // query data - + pthread_t read_id; threadInfo *rInfo = malloc(sizeof(threadInfo)); rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 rInfo->start_table_id = 0; - rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1; + //rInfo->do_aggreFunc = g_Dbs.do_aggreFunc; - //rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows; - rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; - rInfo->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port); - strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix); + if (g_args.use_metric) { + rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1; + rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; + strcpy(rInfo->tb_prefix, + g_Dbs.db[0].superTbls[0].childTblPrefix); + } else { + rInfo->end_table_id = g_args.num_of_tables -1; + strcpy(rInfo->tb_prefix, g_args.tb_prefix); + } + + rInfo->taos = taos_connect( + g_Dbs.host, + g_Dbs.user, + g_Dbs.password, + g_Dbs.db[0].dbName, + g_Dbs.port); + if (rInfo->taos == NULL) { + fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); + free(rInfo); + exit(-1); + } + strcpy(rInfo->fp, g_Dbs.resultFile); if (!g_Dbs.use_metric) { @@ -5153,9 +5690,42 @@ int main(int argc, char *argv[]) { taos_close(rInfo->taos); free(rInfo); } +} + +int main(int argc, char *argv[]) { + parse_args(argc, argv, &g_args); + + debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); + + if (g_args.metaFile) { + initOfInsertMeta(); + initOfQueryMeta(); + + if (false == getInfoFromJsonFile(g_args.metaFile)) { + printf("Failed to read %s\n", g_args.metaFile); + return 1; + } + + testMetaFile(); + } else { + memset(&g_Dbs, 0, sizeof(SDbs)); + setParaFromArg(); + + if (NULL != g_args.sqlFile) { + TAOS* qtaos = taos_connect( + g_Dbs.host, + g_Dbs.user, + g_Dbs.password, + g_Dbs.db[0].dbName, + g_Dbs.port); + querySqlFile(qtaos, g_args.sqlFile); + taos_close(qtaos); + + } else { + testCmdLine(); + } } - taos_cleanup(); return 0; }