diff --git a/main.py b/main.py index 1ad85e7a..66b67a84 100644 --- a/main.py +++ b/main.py @@ -19,9 +19,6 @@ from isar.services.service_connections.mqtt.robot_info_publisher import ( RobotInfoPublisher, ) -from isar.services.service_connections.mqtt.robot_status_publisher import ( - RobotStatusPublisher, -) from isar.state_machine.state_machine import StateMachine, main from isar.storage.uploader import Uploader from robot_interface.robot_interface import RobotInterface @@ -56,15 +53,6 @@ target=mqtt_client.run, name="ISAR MQTT Client", daemon=True ) threads.append(mqtt_thread) - robot_status_publisher: RobotStatusPublisher = RobotStatusPublisher( - mqtt_queue=queues.mqtt_queue, robot=robot, state_machine=state_machine - ) - robot_status_thread: Thread = Thread( - target=robot_status_publisher.run, - name="ISAR Robot Status Publisher", - daemon=True, - ) - threads.append(robot_status_thread) robot_info_publisher: RobotInfoPublisher = RobotInfoPublisher( mqtt_queue=queues.mqtt_queue diff --git a/src/isar/config/settings.py b/src/isar/config/settings.py index d4d3ac38..26ebde31 100644 --- a/src/isar/config/settings.py +++ b/src/isar/config/settings.py @@ -229,14 +229,13 @@ def __init__(self) -> None: DATA_CLASSIFICATION: str = Field(default="internal") # List of MQTT Topics - TOPIC_ISAR_STATE: str = Field(default="state", validate_default=True) + TOPIC_ISAR_STATUS: str = Field(default="status", validate_default=True) TOPIC_ISAR_MISSION: str = Field(default="mission", validate_default=True) TOPIC_ISAR_TASK: str = Field(default="task", validate_default=True) TOPIC_ISAR_STEP: str = Field(default="step", validate_default=True) TOPIC_ISAR_INSPECTION_RESULT: str = Field( default="inspection_result", validate_default=True ) - TOPIC_ISAR_ROBOT_STATUS: str = Field(default="robot_status", validate_default=True) TOPIC_ISAR_ROBOT_INFO: str = Field(default="robot_info", validate_default=True) TOPIC_ISAR_ROBOT_HEARTBEAT: str = Field( default="robot_heartbeat", validate_default=True @@ -284,11 +283,10 @@ def set_log_levels(cls, v: Any, info: ValidationInfo) -> dict: } @field_validator( - "TOPIC_ISAR_STATE", + "TOPIC_ISAR_STATUS", "TOPIC_ISAR_MISSION", "TOPIC_ISAR_TASK", "TOPIC_ISAR_STEP", - "TOPIC_ISAR_ROBOT_STATUS", "TOPIC_ISAR_ROBOT_INFO", "TOPIC_ISAR_ROBOT_HEARTBEAT", "TOPIC_ISAR_INSPECTION_RESULT", diff --git a/src/isar/services/service_connections/mqtt/robot_status_publisher.py b/src/isar/services/service_connections/mqtt/robot_status_publisher.py deleted file mode 100644 index e0ee3f1c..00000000 --- a/src/isar/services/service_connections/mqtt/robot_status_publisher.py +++ /dev/null @@ -1,125 +0,0 @@ -import json -import logging -import time -from datetime import datetime -from logging import Logger -from queue import Queue -from threading import Thread -from typing import Optional - -from isar.config.settings import settings -from isar.state_machine.state_machine import StateMachine -from isar.state_machine.states_enum import States -from robot_interface.models.exceptions.robot_exceptions import ( - RobotAPIException, - RobotCommunicationException, - RobotException, -) -from robot_interface.models.mission.status import RobotStatus -from robot_interface.robot_interface import RobotInterface -from robot_interface.telemetry.mqtt_client import MqttPublisher -from robot_interface.telemetry.payloads import RobotStatusPayload -from robot_interface.utilities.json_service import EnhancedJSONEncoder - - -class RobotStatusPublisher: - def __init__( - self, mqtt_queue: Queue, robot: RobotInterface, state_machine: StateMachine - ): - self.mqtt_publisher: MqttPublisher = MqttPublisher(mqtt_queue=mqtt_queue) - self.robot: RobotInterface = robot - self.state_machine: StateMachine = state_machine - self.logger: Logger = logging.getLogger("mqtt") - - def _get_combined_robot_status( - self, robot_status: RobotStatus, current_state: States - ) -> RobotStatus: - if robot_status == RobotStatus.Offline: - return RobotStatus.Offline - elif current_state == States.Idle and robot_status == RobotStatus.Available: - return RobotStatus.Available - elif robot_status == RobotStatus.Blocked: - return RobotStatus.Blocked - elif current_state != States.Idle or robot_status == RobotStatus.Busy: - return RobotStatus.Busy - return None - - def run(self) -> None: - robot_status_monitor: RobotStatusMonitor = RobotStatusMonitor(robot=self.robot) - robot_status_thread: Thread = Thread( - target=robot_status_monitor.run, - name="Robot Status Monitor", - daemon=True, - ) - robot_status_thread.start() - - previous_robot_status: Optional[RobotStatus] = None - - while True: - time.sleep(settings.ROBOT_STATUS_PUBLISH_INTERVAL) - - combined_status: RobotStatus = self._get_combined_robot_status( - robot_status=robot_status_monitor.robot_status, - current_state=self.state_machine.current_state, - ) - - if previous_robot_status: - if previous_robot_status == combined_status: - continue - - payload: RobotStatusPayload = RobotStatusPayload( - isar_id=settings.ISAR_ID, - robot_name=settings.ROBOT_NAME, - robot_status=combined_status, - previous_robot_status=previous_robot_status, - current_isar_state=self.state_machine.current_state, - current_mission_id=( - self.state_machine.current_mission.id - if self.state_machine.current_mission - else None - ), - current_task_id=( - self.state_machine.current_task.id - if self.state_machine.current_task - else None - ), - current_step_id=( - self.state_machine.current_step.id - if self.state_machine.current_step - else None - ), - timestamp=datetime.utcnow(), - ) - - self.logger.info( - f"Combined status has changed to {combined_status} from {previous_robot_status} " - f"with current state {self.state_machine.current_state}" - ) - - self.mqtt_publisher.publish( - topic=settings.TOPIC_ISAR_ROBOT_STATUS, - payload=json.dumps(payload, cls=EnhancedJSONEncoder), - ) - - previous_robot_status = combined_status - - -class RobotStatusMonitor: - def __init__(self, robot: RobotInterface): - self.robot: RobotInterface = robot - self.robot_status: RobotStatus = RobotStatus.Offline - self.logger: Logger = logging.getLogger("robot_status_monitor") - - def run(self) -> None: - while True: - try: - self.robot_status = self.robot.robot_status() - except ( - RobotCommunicationException, - RobotAPIException, - RobotException, - ) as e: - self.logger.error( - f"Failed to get robot status because: {e.error_description}" - ) - time.sleep(settings.ROBOT_API_STATUS_POLL_INTERVAL) diff --git a/src/isar/state_machine/state_machine.py b/src/isar/state_machine/state_machine.py index f6c9bd04..903e5933 100644 --- a/src/isar/state_machine/state_machine.py +++ b/src/isar/state_machine/state_machine.py @@ -24,6 +24,7 @@ Initiate, Monitor, Off, + Offline, Paused, Stop, ) @@ -31,7 +32,12 @@ from robot_interface.models.exceptions.robot_exceptions import ErrorMessage from robot_interface.models.initialize.initialize_params import InitializeParams from robot_interface.models.mission.mission import Mission -from robot_interface.models.mission.status import MissionStatus, StepStatus, TaskStatus +from robot_interface.models.mission.status import ( + MissionStatus, + RobotStatus, + StepStatus, + TaskStatus, +) from robot_interface.models.mission.step import Step from robot_interface.models.mission.task import Task from robot_interface.robot_interface import RobotInterface @@ -88,6 +94,7 @@ def __init__( self.monitor_state: State = Monitor(self) self.initiate_state: State = Initiate(self) self.off_state: State = Off(self) + self.offline_state: State = Offline(self) self.states: List[State] = [ self.off_state, @@ -97,6 +104,7 @@ def __init__( self.monitor_state, self.stop_state, self.paused_state, + self.offline_state, ] self.machine = Machine(self, states=self.states, initial="off", queued=True) @@ -194,6 +202,18 @@ def __init__( "dest": self.idle_state, "before": self._mission_stopped, }, + { + "trigger": "robot_turned_offline", + "source": [self.idle_state], + "dest": self.offline_state, + "before": self._offline, + }, + { + "trigger": "robot_turned_online", + "source": self.offline_state, + "dest": self.idle_state, + "before": self._online, + }, ] ) @@ -239,6 +259,12 @@ def _pause(self) -> None: def _off(self) -> None: return + def _offline(self) -> None: + return + + def _online(self) -> None: + return + def _resume(self) -> None: self.logger.info(f"Resuming mission: {self.current_mission.id}") self.current_mission.status = MissionStatus.InProgress @@ -417,7 +443,7 @@ def update_state(self): self.send_state_status() self._log_state_transition(self.current_state) self.logger.info(f"State: {self.current_state}") - self.publish_state() + self.publish_status() def reset_state_machine(self) -> None: self.logger.info("Resetting state machine") @@ -559,25 +585,33 @@ def publish_step_status(self, step: Step) -> None: retain=False, ) - def publish_state(self) -> None: + def publish_status(self) -> None: if not self.mqtt_publisher: return payload: str = json.dumps( { "isar_id": settings.ISAR_ID, "robot_name": settings.ROBOT_NAME, - "state": self.current_state, + "status": self._current_status(), "timestamp": datetime.utcnow(), }, cls=EnhancedJSONEncoder, ) self.mqtt_publisher.publish( - topic=settings.TOPIC_ISAR_STATE, + topic=settings.TOPIC_ISAR_STATUS, payload=payload, retain=False, ) + def _current_status(self) -> RobotStatus: + if self.current_state == States.Idle: + return RobotStatus.Available + elif self.current_state == States.Offline: + return RobotStatus.Offline + else: + return RobotStatus.Busy + def _log_state_transition(self, next_state): """Logs all state transitions that are not self-transitions.""" self.transitions_list.append(next_state) diff --git a/src/isar/state_machine/states/__init__.py b/src/isar/state_machine/states/__init__.py index ec766345..034c06fe 100644 --- a/src/isar/state_machine/states/__init__.py +++ b/src/isar/state_machine/states/__init__.py @@ -3,5 +3,6 @@ from .initiate import Initiate from .monitor import Monitor from .off import Off +from .offline import Offline from .paused import Paused from .stop import Stop diff --git a/src/isar/state_machine/states/idle.py b/src/isar/state_machine/states/idle.py index 97d6bf41..b90495e5 100644 --- a/src/isar/state_machine/states/idle.py +++ b/src/isar/state_machine/states/idle.py @@ -1,10 +1,17 @@ import logging import time -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from transitions import State +from isar.config.settings import settings from isar.models.communication.message import StartMissionMessage +from isar.services.utilities.threaded_request import ( + ThreadedRequest, + ThreadedRequestNotFinishedError, +) +from robot_interface.models.exceptions.robot_exceptions import RobotException +from robot_interface.models.mission.status import RobotStatus if TYPE_CHECKING: from isar.state_machine.state_machine import StateMachine @@ -15,13 +22,17 @@ def __init__(self, state_machine: "StateMachine") -> None: super().__init__(name="idle", on_enter=self.start, on_exit=self.stop) self.state_machine: "StateMachine" = state_machine self.logger = logging.getLogger("state_machine") + self.robot_status_thread: Optional[ThreadedRequest] = None + self.last_robot_status_poll_time: float = time.time() def start(self) -> None: self.state_machine.update_state() self._run() def stop(self) -> None: - pass + if self.robot_status_thread: + self.robot_status_thread.wait_for_thread() + self.robot_status_thread = None def _run(self) -> None: while True: @@ -37,4 +48,38 @@ def _run(self) -> None: break time.sleep(self.state_machine.sleep_time) + time_from_last_robot_status_poll = ( + time.time() - self.last_robot_status_poll_time + ) + if ( + time_from_last_robot_status_poll + < settings.ROBOT_API_STATUS_POLL_INTERVAL + ): + continue + + if not self.robot_status_thread: + self.robot_status_thread = ThreadedRequest( + request_func=self.state_machine.robot.robot_status + ) + self.robot_status_thread.start_thread( + name="State Machine Offline Get Robot Status" + ) + + try: + robot_status: RobotStatus = self.robot_status_thread.get_output() + except ThreadedRequestNotFinishedError: + time.sleep(self.state_machine.sleep_time) + continue + + except (RobotException,) as e: + self.logger.error( + f"Failed to get robot status because: {e.error_description}" + ) + + self.last_robot_status_poll_time = time.time() + + if robot_status == RobotStatus.Offline: + transition = self.state_machine.robot_turned_offline # type: ignore + break + transition() diff --git a/src/isar/state_machine/states/monitor.py b/src/isar/state_machine/states/monitor.py index 3e245781..8184a0af 100644 --- a/src/isar/state_machine/states/monitor.py +++ b/src/isar/state_machine/states/monitor.py @@ -1,7 +1,7 @@ import logging import time from copy import deepcopy -from typing import Callable, Optional, Sequence, TYPE_CHECKING, Tuple, Union +from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple, Union from injector import inject from transitions import State @@ -14,11 +14,11 @@ ) from robot_interface.models.exceptions.robot_exceptions import ( ErrorMessage, + RobotCommunicationTimeoutException, RobotException, RobotMissionStatusException, RobotRetrieveInspectionException, RobotStepStatusException, - RobotCommunicationTimeoutException, ) from robot_interface.models.inspection.inspection import Inspection from robot_interface.models.mission.mission import Mission @@ -131,8 +131,6 @@ def _run(self) -> None: f"Retrieving the status failed because: {e.error_description}" ) - self.request_status_failure_counter = 0 - if isinstance(status, StepStatus): self.state_machine.current_step.status = status elif isinstance(status, MissionStatus): @@ -158,6 +156,7 @@ def _run(self) -> None: else: if isinstance(status, StepStatus): if self._step_finished(self.state_machine.current_step): + self.state_machine.update_current_step() self.state_machine.current_task.update_task_status() else: # If not all steps are done self.state_machine.current_task.status = TaskStatus.InProgress @@ -166,7 +165,6 @@ def _run(self) -> None: self.state_machine.current_task ) if self.state_machine.current_task.status == TaskStatus.Successful: - self.state_machine.update_remaining_steps() try: self.state_machine.current_task = ( self.state_machine.task_selector.next_task() diff --git a/src/isar/state_machine/states/offline.py b/src/isar/state_machine/states/offline.py new file mode 100644 index 00000000..683bc1d3 --- /dev/null +++ b/src/isar/state_machine/states/offline.py @@ -0,0 +1,62 @@ +import logging +import time +from typing import TYPE_CHECKING, Optional + +from transitions import State + +from isar.config.settings import settings +from isar.services.utilities.threaded_request import ( + ThreadedRequest, + ThreadedRequestNotFinishedError, +) +from robot_interface.models.exceptions.robot_exceptions import RobotException +from robot_interface.models.mission.status import RobotStatus + +if TYPE_CHECKING: + from isar.state_machine.state_machine import StateMachine + + +class Offline(State): + def __init__(self, state_machine: "StateMachine") -> None: + super().__init__(name="offline", on_enter=self.start, on_exit=self.stop) + self.state_machine: "StateMachine" = state_machine + self.logger = logging.getLogger("state_machine") + self.robot_status_thread: Optional[ThreadedRequest] = None + + def start(self) -> None: + self.state_machine.update_state() + self._run() + + def stop(self) -> None: + if self.robot_status_thread: + self.robot_status_thread.wait_for_thread() + self.robot_status_thread = None + + def _run(self) -> None: + while True: + if not self.robot_status_thread: + self.robot_status_thread = ThreadedRequest( + request_func=self.state_machine.robot.robot_status + ) + self.robot_status_thread.start_thread( + name="State Machine Offline Get Robot Status" + ) + + try: + robot_status: RobotStatus = self.robot_status_thread.get_output() + except ThreadedRequestNotFinishedError: + time.sleep(self.state_machine.sleep_time) + continue + + except (RobotException,) as e: + self.logger.error( + f"Failed to get robot status because: {e.error_description}" + ) + + if robot_status != RobotStatus.Offline: + transition = self.state_machine.robot_turned_online # type: ignore + break + + time.sleep(settings.ROBOT_API_STATUS_POLL_INTERVAL) + + transition() diff --git a/src/isar/state_machine/states_enum.py b/src/isar/state_machine/states_enum.py index 55ccae83..d5f4e72d 100644 --- a/src/isar/state_machine/states_enum.py +++ b/src/isar/state_machine/states_enum.py @@ -9,6 +9,7 @@ class States(str, Enum): Monitor = "monitor" Paused = "paused" Stop = "stop" + Offline = "offline" def __repr__(self): return self.value diff --git a/tests/isar/state_machine/test_state_machine.py b/tests/isar/state_machine/test_state_machine.py index 800744a0..e198c576 100644 --- a/tests/isar/state_machine/test_state_machine.py +++ b/tests/isar/state_machine/test_state_machine.py @@ -25,7 +25,7 @@ from robot_interface.models.mission.task import Task from robot_interface.telemetry.mqtt_client import MqttClientInterface from tests.mocks.pose import MockPose -from tests.mocks.robot_interface import MockRobot +from tests.mocks.robot_interface import MockRobot, MockRobotIdleToOfflineToIdleTest from tests.mocks.step import MockStep @@ -35,6 +35,8 @@ def __init__(self, injector) -> None: self.state_machine: StateMachine = injector.get(StateMachine) self._thread: Thread = Thread(target=main, args=[self.state_machine]) self._thread.daemon = True + + def start(self): self._thread.start() @@ -105,6 +107,7 @@ def test_state_machine_transitions( mission: Mission = Mission(tasks=[Task(steps=[step_1, step_2])]) # type: ignore state_machine_thread.state_machine.stepwise_mission = should_run_stepwise + state_machine_thread.start() scheduling_utilities: SchedulingUtilities = injector.get(SchedulingUtilities) scheduling_utilities.start_mission(mission=mission, initial_pose=None) @@ -143,6 +146,7 @@ def test_state_machine_transitions_when_running_full_mission( injector, state_machine_thread ) -> None: state_machine_thread.state_machine.stepwise_mission = False + state_machine_thread.start() step_1: Step = DriveToPose(pose=MockPose.default_pose()) step_2: Step = TakeImage(target=MockPose.default_pose().position) @@ -176,6 +180,8 @@ def test_state_machine_failed_dependency( mocker.patch.object(MockRobot, "step_status", return_value=StepStatus.Failed) + state_machine_thread.start() + scheduling_utilities: SchedulingUtilities = injector.get(SchedulingUtilities) scheduling_utilities.start_mission(mission=mission, initial_pose=None) @@ -198,6 +204,8 @@ def test_state_machine_failed_dependency( def test_state_machine_with_successful_collection( injector, state_machine_thread, uploader_thread ) -> None: + state_machine_thread.start() + storage_mock: StorageInterface = injector.get(List[StorageInterface])[0] step: TakeImage = MockStep.take_image_in_coordinate_direction() @@ -230,6 +238,8 @@ def test_state_machine_with_unsuccessful_collection( mocker.patch.object(MockRobot, "get_inspections", return_value=[]) + state_machine_thread.start() + step: TakeImage = MockStep.take_image_in_coordinate_direction() mission: Mission = Mission(tasks=[Task(steps=[step])]) scheduling_utilities: SchedulingUtilities = injector.get(SchedulingUtilities) @@ -259,6 +269,8 @@ def test_state_machine_with_successful_mission_stop( state_machine_thread: StateMachineThread, caplog: pytest.LogCaptureFixture, ) -> None: + state_machine_thread.start() + step: TakeImage = MockStep.take_image_in_coordinate_direction() mission: Mission = Mission(tasks=[Task(steps=[step])]) @@ -298,6 +310,8 @@ def test_state_machine_with_unsuccessful_mission_stop( MockRobot, "stop", side_effect=_mock_robot_exception_with_message ) + state_machine_thread.start() + scheduling_utilities.start_mission(mission=mission, initial_pose=None) scheduling_utilities.stop_mission() @@ -320,6 +334,19 @@ def test_state_machine_with_unsuccessful_mission_stop( assert expected == actual +def test_state_machine_idle_to_offline_to_idle(state_machine_thread) -> None: + state_machine_thread.state_machine.robot = MockRobotIdleToOfflineToIdleTest() + + state_machine_thread.start() + # Robot status check happens every 5 seconds by default + time.sleep(13) # Ensure that robot_status have been called again in Idle + + expected_transitions_list = deque([States.Idle, States.Offline, States.Idle]) + assert ( + state_machine_thread.state_machine.transitions_list == expected_transitions_list + ) + + def _mock_robot_exception_with_message() -> RobotException: raise RobotException( error_reason=ErrorReason.RobotUnknownErrorException, diff --git a/tests/mocks/robot_interface.py b/tests/mocks/robot_interface.py index c13f7c57..4be71215 100644 --- a/tests/mocks/robot_interface.py +++ b/tests/mocks/robot_interface.py @@ -78,3 +78,15 @@ def mock_image_metadata() -> ImageMetadata: ), file_type="jpg", ) + + +class MockRobotIdleToOfflineToIdleTest(MockRobot): + def __init__(self): + self.first = True + + def robot_status(self) -> RobotStatus: + if self.first: + self.first = False + return RobotStatus.Offline + + return RobotStatus.Available