Skip to content

Commit

Permalink
Implement tests for #130. (#132)
Browse files Browse the repository at this point in the history
* feat: Implement tests for mobility command feedback handling.

* checking correct handling of any non-PROCESSING mobility command status
* checking handling of valid command transitions - from in-progress to finished
* checking robot states after velocity movement commands

* fix: Remove redundant call to a single response repetition.

Co-authored-by: mhidalgo-bdai <[email protected]>

---------

Co-authored-by: mhidalgo-bdai <[email protected]>
Co-authored-by: Tiffany Cappellari <[email protected]>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent 289a2e1 commit 4797533
Showing 1 changed file with 145 additions and 2 deletions.
147 changes: 145 additions & 2 deletions spot_wrapper/tests/test_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Copyright (c) 2023 Boston Dynamics AI Institute LLC. See LICENSE file for more info.

import logging
from typing import Iterator
import time
from typing import Iterator, List

import grpc
import pytest
from bosdyn.api.basic_command_pb2 import RobotCommandFeedbackStatus, SitCommand, StandCommand
from bosdyn.api.manipulation_api_pb2 import (
ManipulationApiRequest,
ManipulationApiResponse,
Expand All @@ -14,7 +16,7 @@
PowerCommandResponse,
PowerCommandStatus,
)
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
from bosdyn.api.robot_command_pb2 import RobotCommandFeedbackResponse, RobotCommandResponse
from bosdyn.api.robot_state_pb2 import PowerState

import spot_wrapper.testing
Expand Down Expand Up @@ -120,3 +122,144 @@ def test_spot_wrapper_commands(simple_spot: SpotFixture, simple_spot_wrapper: Sp
assert not ok
ok, _ = simple_spot_wrapper.sit()
assert not ok


def test_mobility_command_feedback(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
non_processing_feedback_statuses: List[RobotCommandFeedbackStatus] = [
RobotCommandFeedbackStatus.STATUS_UNKNOWN,
RobotCommandFeedbackStatus.STATUS_COMMAND_OVERRIDDEN,
RobotCommandFeedbackStatus.STATUS_COMMAND_TIMED_OUT,
RobotCommandFeedbackStatus.STATUS_ROBOT_FROZEN,
RobotCommandFeedbackStatus.STATUS_INCOMPATIBLE_HARDWARE,
]

# bosdyn.api.RobotCommandService/RobotCommand implementation is mocked
# (as spot_wrapper.testing.mocks.MockSpot is automatically specified),
# so here we provide the same response for every future command.
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response).forever()

def update_with_feedback_response(r: RobotCommandFeedbackResponse) -> None:
simple_spot.api.RobotCommandFeedback.future.returns(r)
simple_spot_wrapper.updateTasks()
time.sleep(0.1)

def assert_non_processing_stand_status(should_stand: bool) -> None:
for status in non_processing_feedback_statuses:
simple_spot_wrapper.stand()
assert simple_spot_wrapper.last_stand_command is not None
assert simple_spot_wrapper.is_standing == should_stand
r = RobotCommandFeedbackResponse()
r.feedback.synchronized_feedback.mobility_command_feedback.status = status
update_with_feedback_response(r)
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.is_standing == should_stand

def assert_non_processing_sit_status(should_sit: bool) -> None:
for status in non_processing_feedback_statuses:
simple_spot_wrapper.sit()
assert simple_spot_wrapper.last_sit_command is not None
assert simple_spot_wrapper.is_sitting == should_sit
r = RobotCommandFeedbackResponse()
r.feedback.synchronized_feedback.mobility_command_feedback.status = status
update_with_feedback_response(r)
assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.is_sitting == should_sit

# initial standing state
assert simple_spot_wrapper.last_stand_command is None
assert not simple_spot_wrapper.is_standing

# stand command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_stand_status(simple_spot_wrapper.is_standing)

# stand command with processing status
simple_spot_wrapper.stand()
assert simple_spot_wrapper.last_stand_command is not None
assert not simple_spot_wrapper.is_standing
response = RobotCommandFeedbackResponse()
response.feedback.synchronized_feedback.mobility_command_feedback.status = (
RobotCommandFeedbackStatus.STATUS_PROCESSING
)
# command in progress
response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = (
StandCommand.Feedback.STATUS_IN_PROGRESS
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_stand_command is not None
assert not simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_sitting
# command finished
response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = (
StandCommand.Feedback.STATUS_IS_STANDING
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_sitting

# stand command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_stand_status(simple_spot_wrapper.is_standing)

# initial sitting state
assert simple_spot_wrapper.last_sit_command is None
assert not simple_spot_wrapper.is_sitting

# sit command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_sit_status(simple_spot_wrapper.is_sitting)

# sit command with processing status
simple_spot_wrapper.sit()
assert simple_spot_wrapper.last_sit_command is not None
assert not simple_spot_wrapper.is_sitting
response = RobotCommandFeedbackResponse()
response.feedback.synchronized_feedback.mobility_command_feedback.status = (
RobotCommandFeedbackStatus.STATUS_PROCESSING
)
# command in progress
response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = (
SitCommand.Feedback.STATUS_IN_PROGRESS
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_sit_command is not None
assert not simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing
# command finished
response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = (
SitCommand.Feedback.STATUS_IS_SITTING
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing

# sit command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_sit_status(simple_spot_wrapper.is_sitting)


def test_robot_movement_states(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
# bosdyn.api.RobotCommandService/RobotCommand implementation is mocked
# (as spot_wrapper.testing.mocks.MockSpot is automatically specified),
# so here we provide the same response for every future command.
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response).forever()

assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.last_velocity_command_time is None
assert simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_moving

simple_spot_wrapper.velocity_cmd(0, 0, 0)
assert simple_spot_wrapper.last_velocity_command_time is not None
simple_spot_wrapper.updateTasks()
time.sleep(0.1)

assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.last_stand_command is None
assert not simple_spot_wrapper.is_sitting
assert simple_spot_wrapper.is_standing
assert simple_spot_wrapper.is_moving

0 comments on commit 4797533

Please sign in to comment.