From 479753369eae7358b1a08577ef97e1ea54a69ddb Mon Sep 17 00:00:00 2001 From: Imaniac230 <44968160+Imaniac230@users.noreply.github.com> Date: Mon, 26 Aug 2024 19:18:15 +0200 Subject: [PATCH] Implement tests for #130. (#132) * feat: Implement tests for mobility command feedback handling. * checking correct handling of any non-PROCESSING mobility command status * checking handling of valid command transitions - from in-progress to finished * checking robot states after velocity movement commands * fix: Remove redundant call to a single response repetition. Co-authored-by: mhidalgo-bdai <144129882+mhidalgo-bdai@users.noreply.github.com> --------- Co-authored-by: mhidalgo-bdai <144129882+mhidalgo-bdai@users.noreply.github.com> Co-authored-by: Tiffany Cappellari <156013635+tcappellari-bdai@users.noreply.github.com> --- spot_wrapper/tests/test_wrapper.py | 147 ++++++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 2 deletions(-) diff --git a/spot_wrapper/tests/test_wrapper.py b/spot_wrapper/tests/test_wrapper.py index a3b4311e..0e72e474 100644 --- a/spot_wrapper/tests/test_wrapper.py +++ b/spot_wrapper/tests/test_wrapper.py @@ -1,10 +1,12 @@ # Copyright (c) 2023 Boston Dynamics AI Institute LLC. See LICENSE file for more info. import logging -from typing import Iterator +import time +from typing import Iterator, List import grpc import pytest +from bosdyn.api.basic_command_pb2 import RobotCommandFeedbackStatus, SitCommand, StandCommand from bosdyn.api.manipulation_api_pb2 import ( ManipulationApiRequest, ManipulationApiResponse, @@ -14,7 +16,7 @@ PowerCommandResponse, PowerCommandStatus, ) -from bosdyn.api.robot_command_pb2 import RobotCommandResponse +from bosdyn.api.robot_command_pb2 import RobotCommandFeedbackResponse, RobotCommandResponse from bosdyn.api.robot_state_pb2 import PowerState import spot_wrapper.testing @@ -120,3 +122,144 @@ def test_spot_wrapper_commands(simple_spot: SpotFixture, simple_spot_wrapper: Sp assert not ok ok, _ = simple_spot_wrapper.sit() assert not ok + + +def test_mobility_command_feedback(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None: + non_processing_feedback_statuses: List[RobotCommandFeedbackStatus] = [ + RobotCommandFeedbackStatus.STATUS_UNKNOWN, + RobotCommandFeedbackStatus.STATUS_COMMAND_OVERRIDDEN, + RobotCommandFeedbackStatus.STATUS_COMMAND_TIMED_OUT, + RobotCommandFeedbackStatus.STATUS_ROBOT_FROZEN, + RobotCommandFeedbackStatus.STATUS_INCOMPATIBLE_HARDWARE, + ] + + # bosdyn.api.RobotCommandService/RobotCommand implementation is mocked + # (as spot_wrapper.testing.mocks.MockSpot is automatically specified), + # so here we provide the same response for every future command. + response = RobotCommandResponse() + response.status = RobotCommandResponse.Status.STATUS_OK + simple_spot.api.RobotCommand.future.returns(response).forever() + + def update_with_feedback_response(r: RobotCommandFeedbackResponse) -> None: + simple_spot.api.RobotCommandFeedback.future.returns(r) + simple_spot_wrapper.updateTasks() + time.sleep(0.1) + + def assert_non_processing_stand_status(should_stand: bool) -> None: + for status in non_processing_feedback_statuses: + simple_spot_wrapper.stand() + assert simple_spot_wrapper.last_stand_command is not None + assert simple_spot_wrapper.is_standing == should_stand + r = RobotCommandFeedbackResponse() + r.feedback.synchronized_feedback.mobility_command_feedback.status = status + update_with_feedback_response(r) + assert simple_spot_wrapper.last_stand_command is None + assert simple_spot_wrapper.is_standing == should_stand + + def assert_non_processing_sit_status(should_sit: bool) -> None: + for status in non_processing_feedback_statuses: + simple_spot_wrapper.sit() + assert simple_spot_wrapper.last_sit_command is not None + assert simple_spot_wrapper.is_sitting == should_sit + r = RobotCommandFeedbackResponse() + r.feedback.synchronized_feedback.mobility_command_feedback.status = status + update_with_feedback_response(r) + assert simple_spot_wrapper.last_sit_command is None + assert simple_spot_wrapper.is_sitting == should_sit + + # initial standing state + assert simple_spot_wrapper.last_stand_command is None + assert not simple_spot_wrapper.is_standing + + # stand command with non-processing status - should reset the command and leave state unchanged + assert_non_processing_stand_status(simple_spot_wrapper.is_standing) + + # stand command with processing status + simple_spot_wrapper.stand() + assert simple_spot_wrapper.last_stand_command is not None + assert not simple_spot_wrapper.is_standing + response = RobotCommandFeedbackResponse() + response.feedback.synchronized_feedback.mobility_command_feedback.status = ( + RobotCommandFeedbackStatus.STATUS_PROCESSING + ) + # command in progress + response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = ( + StandCommand.Feedback.STATUS_IN_PROGRESS + ) + update_with_feedback_response(response) + assert simple_spot_wrapper.last_stand_command is not None + assert not simple_spot_wrapper.is_standing + assert not simple_spot_wrapper.is_sitting + # command finished + response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = ( + StandCommand.Feedback.STATUS_IS_STANDING + ) + update_with_feedback_response(response) + assert simple_spot_wrapper.last_stand_command is None + assert simple_spot_wrapper.is_standing + assert not simple_spot_wrapper.is_sitting + + # stand command with non-processing status - should reset the command and leave state unchanged + assert_non_processing_stand_status(simple_spot_wrapper.is_standing) + + # initial sitting state + assert simple_spot_wrapper.last_sit_command is None + assert not simple_spot_wrapper.is_sitting + + # sit command with non-processing status - should reset the command and leave state unchanged + assert_non_processing_sit_status(simple_spot_wrapper.is_sitting) + + # sit command with processing status + simple_spot_wrapper.sit() + assert simple_spot_wrapper.last_sit_command is not None + assert not simple_spot_wrapper.is_sitting + response = RobotCommandFeedbackResponse() + response.feedback.synchronized_feedback.mobility_command_feedback.status = ( + RobotCommandFeedbackStatus.STATUS_PROCESSING + ) + # command in progress + response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = ( + SitCommand.Feedback.STATUS_IN_PROGRESS + ) + update_with_feedback_response(response) + assert simple_spot_wrapper.last_sit_command is not None + assert not simple_spot_wrapper.is_sitting + assert not simple_spot_wrapper.is_standing + # command finished + response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = ( + SitCommand.Feedback.STATUS_IS_SITTING + ) + update_with_feedback_response(response) + assert simple_spot_wrapper.last_sit_command is None + assert simple_spot_wrapper.is_sitting + assert not simple_spot_wrapper.is_standing + + # sit command with non-processing status - should reset the command and leave state unchanged + assert_non_processing_sit_status(simple_spot_wrapper.is_sitting) + + +def test_robot_movement_states(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None: + # bosdyn.api.RobotCommandService/RobotCommand implementation is mocked + # (as spot_wrapper.testing.mocks.MockSpot is automatically specified), + # so here we provide the same response for every future command. + response = RobotCommandResponse() + response.status = RobotCommandResponse.Status.STATUS_OK + simple_spot.api.RobotCommand.future.returns(response).forever() + + assert simple_spot_wrapper.last_sit_command is None + assert simple_spot_wrapper.last_stand_command is None + assert simple_spot_wrapper.last_velocity_command_time is None + assert simple_spot_wrapper.is_sitting + assert not simple_spot_wrapper.is_standing + assert not simple_spot_wrapper.is_moving + + simple_spot_wrapper.velocity_cmd(0, 0, 0) + assert simple_spot_wrapper.last_velocity_command_time is not None + simple_spot_wrapper.updateTasks() + time.sleep(0.1) + + assert simple_spot_wrapper.last_sit_command is None + assert simple_spot_wrapper.last_stand_command is None + assert not simple_spot_wrapper.is_sitting + assert simple_spot_wrapper.is_standing + assert simple_spot_wrapper.is_moving