forked from equinor/isar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
111 lines (95 loc) · 3.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import time
from logging import Logger
from threading import Thread
from typing import List
from injector import Injector
from isar.apis.api import API
from isar.config.keyvault.keyvault_service import Keyvault
from isar.config.log import setup_loggers
from isar.config.settings import settings
from isar.models.communication.queues.queues import Queues
from isar.modules import get_injector
from isar.services.service_connections.mqtt.mqtt_client import MqttClient
from isar.services.service_connections.mqtt.robot_heartbeat_publisher import (
RobotHeartbeatPublisher,
)
from isar.services.service_connections.mqtt.robot_info_publisher import (
RobotInfoPublisher,
)
from isar.services.service_connections.mqtt.robot_status_publisher import (
RobotStatusPublisher,
)
from isar.state_machine.state_machine import StateMachine, main
from isar.storage.uploader import Uploader
from robot_interface.robot_interface import RobotInterface
if __name__ == "__main__":
injector: Injector = get_injector()
keyvault_client = injector.get(Keyvault)
setup_loggers(keyvault=keyvault_client)
logger: Logger = logging.getLogger("main")
state_machine: StateMachine = injector.get(StateMachine)
uploader: Uploader = injector.get(Uploader)
robot: RobotInterface = injector.get(RobotInterface)
queues: Queues = injector.get(Queues)
threads: List[Thread] = []
state_machine_thread: Thread = Thread(
target=main, name="ISAR State Machine", args=[state_machine], daemon=True
)
threads.append(state_machine_thread)
uploader_thread: Thread = Thread(
target=uploader.run, name="ISAR Uploader", daemon=True
)
threads.append(uploader_thread)
if settings.MQTT_ENABLED:
mqtt_client: MqttClient = MqttClient(mqtt_queue=queues.mqtt_queue)
mqtt_thread: Thread = Thread(
target=mqtt_client.run, name="ISAR MQTT Client", daemon=True
)
threads.append(mqtt_thread)
robot_status_publisher: RobotStatusPublisher = RobotStatusPublisher(
mqtt_queue=queues.mqtt_queue, robot=robot, state_machine=state_machine
)
robot_status_thread: Thread = Thread(
target=robot_status_publisher.run,
name="ISAR Robot Status Publisher",
daemon=True,
)
threads.append(robot_status_thread)
robot_info_publisher: RobotInfoPublisher = RobotInfoPublisher(
mqtt_queue=queues.mqtt_queue
)
robot_info_thread: Thread = Thread(
target=robot_info_publisher.run,
name="ISAR Robot Info Publisher",
daemon=True,
)
threads.append(robot_info_thread)
robot_heartbeat_publisher: RobotHeartbeatPublisher = RobotHeartbeatPublisher(
mqtt_queue=queues.mqtt_queue
)
robot_heartbeat_thread: Thread = Thread(
target=robot_heartbeat_publisher.run,
name="ISAR Robot Heartbeat Publisher",
daemon=True,
)
threads.append(robot_heartbeat_thread)
publishers: List[Thread] = robot.get_telemetry_publishers(
queue=queues.mqtt_queue,
robot_name=settings.ROBOT_NAME,
isar_id=settings.ISAR_ID,
)
if publishers:
threads.extend(publishers)
api: API = injector.get(API)
api_thread: Thread = Thread(target=api.run_app, name="ISAR API", daemon=True)
threads.append(api_thread)
for thread in threads:
thread.start()
logger.info(f"Started thread: {thread.name}")
while True:
for thread in threads:
if not thread.is_alive():
logger.critical("Thread '%s' failed - ISAR shutting down", thread.name)
exit(1)
time.sleep(state_machine.sleep_time)