Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of Concept: MQTT database/provider #548

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/mqtt.eve
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

# MQTT

## Listening to incoming messages

```eve
search @mqtt
m = [#message #incoming topic payload]
bind @browser
[#div text: "MQTT message at {{topic}} : {{payload}}"]
```

## Sending messages

```eve
search
i = range[from: 1 to: 3]

commit @mqtt
[#message #outgoing topic: "eve/test/" + i, payload: i]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't know that we overloaded + in this way? pretty sure the preferred syntax here would
be 'interpolation', as in "eve/test/{{i}}". this inherits implicit string conversion from JS, but
i think there is an ongoing discussion about how much implicit string conversion makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just what I instincivally reached for as a first-day Eve programmer, and it worked, so I didn't change it. :D Agreed that string interpolation is preferred, it is easier to understand the code, and it is less magical than implicit string conversions (which I don't think are very nice at all).

```
40 changes: 40 additions & 0 deletions examples/mqtt_temperature.eve
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

# MQTT
## Temperature
From mqtt://mqtt.bitraf.no see http://iot.bitraf.no
### Define the sensors
```eve
commit @shared
[#bitraf #temperature name: "lab" topic: "bitraf/temperature/1"]
[#bitraf #temperature name: "office" topic: "bitraf/temperature/2/value"]
[#bitraf #temperature name: "outside" topic: "bitraf/temperature/3/value"]
```
### Show latest sensor values
```eve
search @shared
sensor = [#bitraf #temperature name topic]
search @mqtt
message = [#message #incoming topic: topic, payload]
sort[value: payload, per: topic] = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the intent here? does the payload for each topic contain historical samples that you
want to throw away?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each message (on that topic) contains a payload with a temperature value. The sensors send such messages at a fixed messages intervals (every minute in our case, I think). And since the search is for messages, without a sort/filter I get all these messages, including things which are long in the past.

bind @browser
[#div text: "{{name}}: {{payload}} C"]
```

## Testing

```eve disabled
search @shared @mqtt
sensor = [#bitraf #temperature name topic]
message = [#message #incoming topic: topic, payload]
ix = sort[value: name]
bind @view
[#history | values: payload]
```

```eve disabled
search
i = range[from: 1 to: 3]

commit @mqtt
[#message #outgoing topic: "eve/test/" + i, payload: i]
```
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"@types/glob": "^5.0.30",
"@types/minimist": "^1.1.29",
"@types/mkdirp": "^0.3.29",
"@types/mqtt": "0.0.32",
"@types/node": "^6.0.41",
"@types/request": "0.0.31",
"@types/tape": "^4.2.28",
Expand All @@ -17,6 +18,7 @@
"glob": "^7.1.1",
"minimist": "^1.2.0",
"mkdirp": "^0.5.1",
"mqtt": "^2.0.1",
"node-uuid": "^1.4.7",
"request": "^2.75.0",
"typescript": "^2.0.3",
Expand Down
176 changes: 176 additions & 0 deletions src/runtime/databases/node/mqtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//---------------------------------------------------------------------
// Node Server Database
//---------------------------------------------------------------------

import {InsertAction} from "../../actions"
import {Changes} from "../../changes";
import {Evaluation, Database} from "../../runtime";

import * as url from "url";
import * as mqtt from "mqtt";


function serializeMessage(payload : any) : string {
let serialized = payload.toString();
if (typeof payload == 'boolean') {
serialized = payload ? 'true' : 'false';
} else if (typeof payload == 'object') {
serialized = (payload) ? JSON.stringify(payload) : 'null';
} else {
// treat as string
}
return serialized;
}

function deserializeMessage(payload : string) : any {
let parsed : any = payload;
if (payload == 'true') {
parsed = true;
} else if (payload == 'false') {
parsed = false;
} else if (payload[0] == '{' || payload[0] == '[') {
try {
parsed = JSON.parse(payload);
} catch (_) {
}
} else {
try {
parsed = parseFloat(payload);
} catch (_) {
}
}
return parsed;
}

export class MqttDatabase extends Database {

receiving: boolean;
requestId: number;
client: mqtt.Client;

constructor() {
super();
this.requestId = 0;
this.receiving = false;
this.client = null;
}

setup() {
let broker = process.env.EVE_MQTT_BROKER || 'mqtt://localhost:1883';
let parsed = url.parse(broker);
let auth = (parsed.auth || ':').split(':');
let options = {
port: parsed.port || 1883,
clientId: 'eve' + Math.random().toString(16).substr(2, 8),
username: auth[0],
password: auth[1]
};
let cleanedUrl = "mqtt://"+parsed.host;
let client = mqtt.connect(cleanedUrl, options);
let onMessage = this.handleMqttMessage.bind(this);
this.client = client;
client.on('error', function(err) {
console.error('MQTT error', err);
});
client.on('connect', function() {
// TODO: be smarter, only subscribe to things there are bindings against
client.subscribe("#", function(s) {
client.on('message', onMessage);
console.log('MQTT subscribed to', cleanedUrl);
});
});
}

handleMqttMessage(topic, message, packet) {
console.log('MQTT got message', topic, message.length);

if(!this.receiving) {
return console.log("Nothing is listening to MQTT messages");
}

// NOTE: assumes UTF-8, no support for binary/Buffer data
let parsed = deserializeMessage(message.toString());

let scopes = ["mqtt"];
let requestId = `request|${this.requestId++}|${(new Date()).getTime()}`
let actions = [
new InsertAction("mqtt|tag", requestId, "tag", "message", undefined, scopes),
new InsertAction("mqtt|tag", requestId, "tag", "incoming", undefined, scopes),
new InsertAction("mqtt|topic", requestId, "topic", topic, undefined, scopes),
];

// TODO: implement entry setting like server.ts does?
// if(parsed && typeof parsed === "object") {
// let bodyId = `${requestId}|body`;
// for(let key of Object.keys(body)) {
// actions.push(new InsertAction("mqtt|message-entry", bodyId, key, body[key], undefined, scopes));
// }
// body = bodyId;
// }
actions.push(new InsertAction("mqtt|message-payload", requestId, "payload", parsed, undefined, scopes))

let evaluation = this.evaluations[0];
evaluation.executeActions(actions);
}

analyze(evaluation: Evaluation, db: Database) {
for(let block of db.blocks) {
for(let scan of block.parse.scanLike) {
if(scan.type === "record" && scan.scopes.indexOf("mqtt") > -1) {
for(let attribute of scan.attributes) {
if(attribute.attribute === "tag" && attribute.value.value === "message") {
console.log('MQTT found listener');
this.receiving = true;
}
}
}
}
}
}

sendMessage(requestId, topic, payload) {
console.log('MQTT sendMessage', topic);
const serialized = serializeMessage(payload);
this.client.publish(topic, serialized);
}

onFixpoint(evaluation: Evaluation, changes: Changes) {
let name = evaluation.databaseToName(this);
let result = changes.result({[name]: true});
let handled = {};
let index = this.index;
let actions = [];
for(let insert of result.insert) {
let [e,a,v] = insert;
if(!handled[e]) {
handled[e] = true;
let isOutgoingMessage = index.lookup(e,"tag", "message") && index.lookup(e,"tag", "outgoing");
let isSent = index.lookup(e, "tag", "sent");
if(isOutgoingMessage && !isSent) {
// TODO: error/warn if multiple payloads (not supported)
let payloads = index.asValues(e, "payload");
if (payloads === undefined) {
console.error("no payloads for outgoing message")
continue;
}
let [payload] = payloads;

// TODO: support multiple topics, or error/warn
let topics = index.asValues(e, "topic");
let [topic] = topics;

actions.push(new InsertAction("mqtt|message-sent", e, "tag", "sent", undefined, [name]));

this.sendMessage(e, topic, payload);
}
}
}
if(actions.length) {
process.nextTick(() => {
evaluation.executeActions(actions);
})
}
}
}


4 changes: 4 additions & 0 deletions src/runtime/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {ActionImplementations} from "./actions";
import {PersistedDatabase} from "./databases/persisted";
import {HttpDatabase} from "./databases/node/http";
import {ServerDatabase} from "./databases/node/server";
import {MqttDatabase} from "./databases/node/mqtt";
import {RuntimeClient} from "./runtimeClient";

//---------------------------------------------------------------------
Expand All @@ -34,6 +35,8 @@ const contentTypes = {
const BROWSER = !argv["server"];
const PORT = process.env.PORT || 8080;
const serverDatabase = new ServerDatabase();
const mqttDatabase = new MqttDatabase();
mqttDatabase.setup();
const shared = new PersistedDatabase();

global["browser"] = false;
Expand Down Expand Up @@ -94,6 +97,7 @@ class ServerRuntimeClient extends RuntimeClient {
constructor(socket:WebSocket, withIDE = true) {
const dbs = {
"http": new HttpDatabase(),
"mqtt": mqttDatabase,
"shared": shared,
}
super(dbs, withIDE);
Expand Down