-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqttclient.js
216 lines (192 loc) · 6.94 KB
/
mqttclient.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
const logger = require("./logger");
const fs = require("fs");
var mqtt = require("mqtt");
require("dotenv").config();
// TODO: add code to check for secure connection
const clientID = process.env.IOT_CLIENTID;
console.log(`Connecting to ${process.env.IOT_HOST} as client id: ${clientID}`);
const { timeStamp } = require("console");
const mqttClient = mqtt.connect({
host: process.env.IOT_HOST,
protocol: process.env.IOT_PROTOCOL,
port: process.env.IOT_SECURE_PORT,
clientId: clientID,
username: process.env.IOT_USERNAME,
password: process.env.IOT_PASSWORD,
});
const pool = setupMariaDB();
var connection;
const asyncFunction = function () {
try {
logger.debug(`!!successfully connected to server ${process.env.IOT_HOST}`);
//need to add error checking on the topic itself. Handle case where topic is not present.
mqttClient.subscribe(process.env.IOT_TOPIC, { qos: 2 }, function (err) {
logger.debug(
`!!successfully subscribed to topic: ${process.env.IOT_TOPIC}`
);
});
mqttClient.on("message", (topics, payload) => {
var msg = JSON.parse(payload.toString());
logger.debug(
`\n\n\n\ngot a message: ${JSON.stringify(
msg.firefighter_id
)}-${JSON.stringify(msg.device_id)}`
);
logger.debug(JSON.stringify(msg));
sendWSS(msg);
//insert into database
insertDatabase(msg);
});
//
// This line doesn't run until the server responds to the publish
// await mqttClient2.end();
// This line doesn't run until the client has disconnected without error
} catch (e) {
// Do something about it!
logger.error(e.stack);
process.exit();
}
};
logger.debug("connecting to IoT platform ...");
mqttClient.on("connect", asyncFunction);
mqttClient.on("close", function (err) {
if (err) {
logger.error(err);
} else {
logger.debug("connection closed");
}
});
mqttClient.on("disconnect", function (err) {
if (error) {
logger.error(err);
} else {
logger.debug("connection disconnected");
}
});
mqttClient.on("error", function (err) {
if (err) {
logger.error(err);
} else {
logger.debug("connection error");
}
});
function setupMariaDB() {
console.log(
`creating mariadb connection pool on: ${process.env.MARIADB_HOST}`
);
const mariadb = require("mariadb");
const { type } = require("os");
const pool = mariadb.createPool({
host: process.env.MARIADB_HOST,
port: process.env.MARIADB_PORT,
user: process.env.MARIADB_USERNAME,
password: process.env.MARIADB_PASSWORD,
database: process.env.MARIADB_DB,
connectionLimit: 5,
});
if (pool && pool != null) {
console.log("finished creating mariadb coonection pool");
} else {
console.log("could not create connection to mariadb");
}
return pool;
}
function sendWSS(msg) {
// add type or real
msg.type = "REAL";
// take out this code from. We should not connect everytime a message comes in
var WSS = require("websocket").client;
var webSocketClient = new WSS();
// webSocketClient.connect(`ws://${process.env.WS_HOST}:${process.env.WS_PORT}`, 'echo-protocol');
webSocketClient.connect(
`ws://${process.env.WS_HOST}:${process.env.WS_PORT}`,
"echo-protocol"
);
webSocketClient.on("connectFailed", (error) => {
logger.debug(
`unable to connect to websocketserver: ${process.env.WS_HOST}:${process.env.WS_PORT}` +
error.toString()
);
});
webSocketClient.on("connect", (connection) => {
logger.debug("WebSocket Client Connected");
connection.on("error", function (error) {
logger.debug("Connection Error: " + error.toString());
});
connection.on("close", function () {
logger.debug("echo-protocol Connection Closed");
});
if (connection.connected) {
// var time = Math.floor(Date.now() / 1000);
// var data = { "fields": ["Bombero", "Estado", "Timestamp", "Temp", "Humidity", "CO"], "values": [msg.id, "Verde", time, msg.temp, msg.humidity, msg.CO] };
console.log("sending msg");
connection.sendUTF(JSON.stringify(msg));
// reason codes: https://tools.ietf.org/html/rfc6455#section-7.4.1
connection.close("1000", "Closing connection after sending message");
}
});
}
function getUTCTimeStamp(timestamp) {
// return the UTC value of the timestamp
return timestamp.toUTCString();
}
function setSecondsToZero(timeStamp) {
timeStamp = new Date(timeStamp).setSeconds(0);
return new Date(timeStamp).toISOString().substr(0, 19);
}
function insertDatabase(data) {
logger.debug("insert to database");
return pool
.getConnection()
.then((conn) => {
logger.debug("successfully connected to the database service!");
// check if the data already exists in mariadb before inserting the new data????
// clau - was coming from node-red message._id. Changed to time + data.id
// +-----------------+----------+---------------+-------------+----------+------+
// | clau | SensorID | timestamp | temperature | humidity | CO |
// +-----------------+----------+---------------+-------------+----------+------+
// | 10001e1c.9aa502 | 0006 | 1581342069129 | 26 | 40 | 16 |
// assuming data.device_timestamp is already in UTC format
var timestamp = new Date(data.device_timestamp).setSeconds(0);
// logger.debug(`inserting timestamp: ${new Date(timestamp)}`);
// var device_timestamp = new Date(data.device_timestamp).toISOString();
// var timestamp_mins = new Date(new Date(data.device_timestamp).setSeconds(0)).toISOString();
/*
INSERT INTO prometeo.firefighter_sensor_log
(timestamp_mins, firefighter_id, device_id, device_battery_level, temperature, humidity, carbon_monoxide, nitrogen_dioxide, formaldehyde, acrolein, benzene, device_timestamp, device_status_LED)
VALUES('', '', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
*/
conn
.query(
"INSERT INTO firefighter_sensor_log (timestamp_mins, firefighter_id, device_id, device_battery_level, temperature, humidity, carbon_monoxide, nitrogen_dioxide, formaldehyde, acrolein, benzene, device_timestamp) VALUES (?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?)",
[
new Date(timestamp),
data.firefighter_id,
data.device_id,
data.device_battery_level,
data.temperature,
data.humidity,
data.carbon_monoxide,
data.nitrogen_dioxide,
data.formaldehyde,
data.acrolein,
data.benzene,
data.device_timestamp,
]
)
.then((res) => {
// logger.debug(res); // { affectedRows: 1, insertId: 1, warningStatus: 0 }
conn.end();
return res;
})
.catch((err) => {
//handle error
logger.error(err);
conn.end();
return err;
});
})
.catch((err) => {
logger.error(err);
});
}