Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event hanlder AWA-13 #12

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Dockerfile-Event-Handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.7-slim-buster
RUN apt-get update
RUN apt-get update && apt-get install -y \
python3-dev \
cython3
COPY ./mqtt-handler/requirements.txt requirements-mqtt.txt
COPY ./data_classes/requirements.txt requirements-data.txt
COPY ./event-orchestrator/requirements.txt requirements-event.txt
RUN pip3 install --upgrade pip
RUN pip3 install -r requirements-data.txt -r requirements-mqtt.txt -r requirements-event.txt
COPY event-orchestrator/ /app/
COPY data_classes/* /app/
COPY ./common_utils/* /app/
WORKDIR /app
CMD ["/bin/bash"]
4 changes: 2 additions & 2 deletions common_utils/mqtt_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def on_message(
"""
raise NotImplementedError

def define_mqtt_client(self, client_id):
def define_mqtt_client(self, client_id, clean_session=True):
will_message = gmqtt.Message(
"brokers/{}/alerts/service/{}/disconnected".format(
client_id, self.__class__.__name__
Expand All @@ -43,7 +43,7 @@ def define_mqtt_client(self, client_id):
)
self.mqtt_client = gmqtt.Client(
client_id=client_id,
clean_session=True,
clean_session=clean_session,
optimistic_acknowledgement=True,
will_message=will_message,
)
Expand Down
9 changes: 0 additions & 9 deletions data_classes/device.py

This file was deleted.

9 changes: 0 additions & 9 deletions data_classes/device_owner.py

This file was deleted.

59 changes: 59 additions & 0 deletions data_classes/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from mongoengine import Document
from mongoengine.fields import (
ReferenceField,
ListField,
StringField,
DictField,
BooleanField,
)
from bson import ObjectId


class Device(Document):
"""This class persists the devices"""

eui = StringField(required=True)
device_type = StringField(required=True)


class DeviceOwner(Document):
"""This class persists the devices"""

user = ReferenceField("User")
device = ReferenceField("Device")


class Role(Document):
"""This class persists the roles of a user"""

name = StringField(max_length=80, unique=True)
description = StringField(max_length=255)


class User(Document):
"""This class persists the devices"""

roles = ListField(ReferenceField("Device"))
email = StringField(required=True)


class DeviceEvent(Document): # remove after testing
"""This class persists the devices"""

topic = StringField()
payload = DictField(required=True)
device = ReferenceField("Device")
processed = BooleanField(default=False)

def process(self):
"""
Placeholder to handle all necessary events that need to take place
"""
if not self.processed:
self.processed = True
self.save()

@staticmethod
def find_and_process_event(_id):
event = DeviceEvent.objects.get(id=ObjectId(_id))
event.process()
9 changes: 0 additions & 9 deletions data_classes/role.py

This file was deleted.

9 changes: 0 additions & 9 deletions data_classes/user.py

This file was deleted.

9 changes: 6 additions & 3 deletions event-orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
os.getenv("TELEMATICS_MQTT_BROKER_HOST_PORT", "8883")
)
TELEMATICS_MQTT_BROKER_AUTH_TOKEN = os.getenv("TELEMATICS_MQTT_BROKER_AUTH_TOKEN")
# input
TELEMATICS_MQTT_APPLICATION_TOPIC = os.getenv("TELEMATICS_MQTT_APPLICATION_TOPIC")
# Alerts
TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC = os.getenv("TELEMATICS_MQTT_ALERTS_TOPIC")
TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC = os.getenv(
"TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC"
)
MONGO_DB_URI = os.getenv("MONGO_DB_URI")
REDIS_CONNECTION_STRING = os.getenv("REDIS_CONNECTION_STRING", "redis://redis:6379")
TASK_QUEUE = "task_queue"
21 changes: 18 additions & 3 deletions event-orchestrator/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@
import config
from mqtt_mixin import MqttMixin
import asyncio
from gmqtt.mqtt.constants import MQTTv311
import json

# gmqtt also compatible with uvloop
import uvloop
import signal
from models import Device, DeviceEvent
from mongoengine import connect
from rq import Queue
from redis import Redis
import config

redis_conn = Redis.from_url(config.REDIS_CONNECTION_STRING)
task_queue = Queue(config.TASK_QUEUE, connection=redis_conn)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()
Expand All @@ -20,15 +30,20 @@ def ask_exit(*args):
class EventHandler(MqttMixin):
def __init__(self):
# initialize
self.define_mqtt_client()
self.define_mqtt_client("away-event-handler")
connect(host=config.MONGO_DB_URI)

async def on_message(
self, client, topic, payload, qos, properties
): # pylint: disable=unused-argument
"""
Define how to handle the incoming stream
"""
self.logger.info("Handling event.")
event = DeviceEvent(topic=topic, payload=json.loads(payload)).save()
job = task_queue.enqueue(DeviceEvent.find_and_process_event, str(event.id))
self.logger.info(
"Event persisted and scheduled for processing. Job id {}".format(job.id)
)

async def handle_events(self):
self.mqtt_client.set_auth_credentials(
Expand All @@ -38,7 +53,7 @@ async def handle_events(self):
config.TELEMATICS_MQTT_BROKER_HOST,
config.TELEMATICS_MQTT_BROKER_HOST_PORT,
ssl=True,
version=5,
version=MQTTv311,
raise_exc=True,
keepalive=60,
)
Expand Down
3 changes: 2 additions & 1 deletion event-orchestrator/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ gmqtt
uvloop
python-dotenv
rq
redis
redis
mongoengine
2 changes: 2 additions & 0 deletions event-orchestrator/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
argparse==1.4.0 # via -r requirements.in
click==7.1.2 # via rq
gmqtt==0.6.7 # via -r requirements.in
mongoengine==0.20.0 # via -r requirements.in
pymongo==3.11.0 # via mongoengine
python-dotenv==0.14.0 # via -r requirements.in
redis==3.5.3 # via -r requirements.in, rq
rq==1.5.0 # via -r requirements.in
Expand Down
21 changes: 21 additions & 0 deletions event-orchestrator/rq_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from redis import Redis
from rq import Worker
from config import REDIS_CONNECTION_STRING, TASK_QUEUE, MONGO_DB_URI
import argparse
from mongoengine import connect

connect(host=MONGO_DB_URI)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--action", help="Define what you want to do.", choices=["work"], required=True
)
args = parser.parse_args()

if args.action == "work":
connection = Redis.from_url(REDIS_CONNECTION_STRING)
worker = Worker([TASK_QUEUE], connection=connection)
worker.work()
2 changes: 1 addition & 1 deletion mqtt-handler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
LORA_MQTT_BROKER_AUTH_TOKEN = os.getenv("LORA_MQTT_BROKER_AUTH_TOKEN")
# input
LORA_MQTT_APPLICATION_TOPIC = os.getenv("LORA_MQTT_APPLICATION_TOPIC")
TELEMATICS_MQTT_APPLICATION_TOPIC = os.getenv("TELEMATICS_MQTT_APPLICATION_TOPIC")
# Alerts
TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC = os.getenv(
"TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC"
)
TELEMATICS_MQTT_PUBLISHER_ALERTS = os.getenv("TELEMATICS_MQTT_PUBLISHER_ALERTS")
# LORA APPLICATION DETAILS
LORA_APPLICATION_IDENTIFIER = os.getenv("LORA_APPLICATION_IDENTIFIER")
23 changes: 12 additions & 11 deletions mqtt-handler/telematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from mqtt_mixin import MqttMixin
import asyncio
from gmqtt.mqtt.constants import MQTTv311
from gmqtt import Client as MQTTClient

# gmqtt also compatible with uvloop
import uvloop
Expand Down Expand Up @@ -63,22 +62,24 @@ def _assign_callbacks_to_client_telematics(client):
client.on_disconnect = Telematics._on_disconnect_telematics

async def initialize_telematics_connection(self):
client_id = "{}-{}".format(
self.__class__.__name__, self.telematics_application_id
)
will_message = gmqtt.Message(
config.TELEMATICS_MQTT_APPLICATION_ALERTS_TOPIC,
"{}{}".format(config.TELEMATICS_MQTT_PUBLISHER_ALERTS, client_id),
"Unexpected Exit.",
will_delay_interval=10,
qos=1,
retain=True,
retain=False,
)
self.telematics_client = gmqtt.Client(
client_id="{}-{}".format(
self.__class__.__name__, self.telematics_application_id
),
client_id=client_id,
clean_session=True,
optimistic_acknowledgement=True,
will_message=will_message,
)
Telematics._assign_callbacks_to_client_telematics(self.telematics_client)

# Telematics._assign_callbacks_to_client_telematics(self.telematics_client)
if self.telematics_auth_token:
self.telematics_client.set_auth_credentials(
self.telematics_auth_token, None
Expand All @@ -101,18 +102,18 @@ async def on_message(
payload,
qos=1,
content_type="json",
retain=True,
retain=False,
user_property=properties.get("user_property"),
)
self.logger.info("Retaining in our own MQTT broker.")

async def post_inference_to_telematics_hub(self):
# external telematics client
await self.initialize_telematics_connection()
# initialize internal mqtt client
self.define_mqtt_client(
"{}-{}".format("Relayer-", self.telematics_application_id)
"{}-{}".format("Relayer", self.telematics_application_id)
)
# external telematics client
await self.initialize_telematics_connection()
# LORA mqtt broker
self.mqtt_client.set_auth_credentials(
config.LORA_APPLICATION_IDENTIFIER, config.LORA_MQTT_BROKER_AUTH_TOKEN
Expand Down