Skip to content

Commit

Permalink
Refactoring Code
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed May 24, 2020
1 parent 5aeb4c0 commit df503b1
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 88 deletions.
2 changes: 1 addition & 1 deletion CMakeSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"generator": "Unix Makefiles",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\build\\",
"installRoot": "${projectDir}\\out\\install\\${name}",
"installRoot": "${projectDir}\\build\\",
"cmakeExecutable": "/usr/bin/cmake",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
Expand Down
51 changes: 51 additions & 0 deletions src/plugins/mqtt/inc/mqttInit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef TDENGINE_MQTT_INIT_H
#define TDENGINE_MQTT_INIT_H
#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>
#include "MQTTAsync.h"
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tsclient.h"
char split(char str[], char delims[], char** p_p_cmd_part, int max);
void mqttConnnectLost(void* context, char* cause);
int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response);
void onSubscribe(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);


#define CLIENTID "taos"
#define TOPIC "/taos/+/+/+/" // taos/<token>/<db name>/<table name>/
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L

#ifdef __cplusplus
}
#endif

#endif
27 changes: 27 additions & 0 deletions src/plugins/mqtt/inc/mqttPayload.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef TDENGINE_MQTT_PLYLOAD_H
#define TDENGINE_MQTT_PLYLOAD_H
#ifdef __cplusplus
extern "C" {
#endif
char split(char str[], char delims[], char** p_p_cmd_part, int max);
char* converJsonToSql(char* json, char* _dbname, char* _tablename);
#ifdef __cplusplus
}
#endif

#endif
17 changes: 0 additions & 17 deletions src/plugins/mqtt/inc/mqttSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,11 @@
#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>
#include "MQTTAsync.h"
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tsclient.h"
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
char split(char str[], char delims[], char** p_p_cmd_part, int max);
void connlost(void* context, char* cause);
int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response);
void onSubscribe(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);
#ifdef __cplusplus
}
#endif
Expand Down
64 changes: 64 additions & 0 deletions src/plugins/mqtt/src/mqttPayload.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#define _DEFAULT_SOURCE
#include "mqttUitl.h"
#include "cJSON.h"
#include "string.h"
#include "taos.h"
#include "mqttLog.h"
#include "os.h"
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
}

char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
cJSON* jPlayload = cJSON_Parse(json);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++) //±éÀú×îÍâ²ãjson¼üÖµ¶Ô
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttPrint("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
}
}
cJSON_free(jPlayload);
char _sql[102400] = {0};
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
return _sql;
}
59 changes: 9 additions & 50 deletions src/plugins/mqtt/src/mqttSystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"

#define CLIENTID "taos"
#define TOPIC "/taos/+/+/+/" // taos/<token>/<db name>/<table name>/
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
#include "mqttInit.h"
#include "mqttPlyload.h"

MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
Expand All @@ -42,7 +38,7 @@ int subscribed = 0;
int finished = 0;
int can_exit = 0;

void connlost(void* context, char* cause) {
void mqttConnnectLost(void* context, char* cause) {
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
Expand All @@ -58,20 +54,9 @@ void connlost(void* context, char* cause) {
finished = 1;
}
}
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
}

int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) {

int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) {
mqttTrace("Message arrived,topic is %s,message is %.*s", topicName, message->payloadlen, (char*)message->payload);
char _token[128] = {0};
char _dbname[128] = {0};
Expand All @@ -93,44 +78,18 @@ int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* me
strncpy(_tablename, p_p_cmd_part[3], 127);
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename);
cJSON* jPlayload = cJSON_Parse((char*)message->payload);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++) //±éÀú×îÍâ²ãjson¼üÖµ¶Ô
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttPrint("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
}
}
cJSON_free(jPlayload);
char _sql[102400] = {0};
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);

char* sql = converJsonToSql((char*)message->payload, _dbname, _tablename);
if (mqtt_conn != NULL) {
mqttPrint("query:%s", _sql);
taos_query_a(mqtt_conn, _sql, mqtt_query_insert_callback, &client);
taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client);
}
}
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code) {
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code));
} else if (code == 0) {
Expand Down Expand Up @@ -198,7 +157,7 @@ int32_t mqttInitSystem() {
rc = EXIT_FAILURE;

} else {
if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) {
if ((rc = MQTTAsync_setCallbacks(client, client, mqttConnnectLost, mqttMessageArrived, NULL)) != MQTTASYNC_SUCCESS) {
mqttError("Failed to set callbacks, return code %d", rc);
rc = EXIT_FAILURE;
} else {
Expand Down
Loading

0 comments on commit df503b1

Please sign in to comment.