-
Notifications
You must be signed in to change notification settings - Fork 22
Testing Infrastructure
The spot_wrapper.testing
subpackage
offers pytest
compatible machinery to test Spot SDK usage.
At its core, this machinery is nothing but mocks and fixtures: mocks of Spot gRPC services, and
pytest
fixtures serving those mocks over the wire. This affords complete control over
a well-defined interface that spans the entire feature set (effectively emulating a Spot robot) at the expense of
additional complexity, and on the basis of prior pytest
and gRPC knowledge.
import logging
import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
@spot_wrapper.testing.fixture
class some_spot(MockSpot):
name = "bob"
def test_wrapper(some_spot):
simple_spot_wrapper = SpotWrapper(
robot_name=some_spot.api.name,
username="spot",
password="spot",
hostname=some_spot.address,
port=some_spot.port,
cert_resource_glob=str(simple_spot.certificate_path),
logger=logging.getLogger("spot"),
)
assert simple_spot_wrapper.is_valid
assert simple_spot_wrapper.id.nickname == "bob"
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response)
ok, message = simple_spot_wrapper.sit()
assert ok, message
A word of advice. spot_wrapper.testing
complexity is not always warranted. A single Spot SDK API can involve
multiple gRPC service invocations, and Spot SDK documentation is seldom transparent about this. An apparent unit test
is in fact an integration test. For cases in which this is problematic, other mocking frameworks and utilities, such as
unittest.mock
or pytest-mocker
in Python, or gmock
in C++, can be used instead of (and also in addition to)
spot_wrapper.testing
to mock Spot SDK APIs directly.
The goal of the testing infrastructure is to create a fake version of Spot by providing or allowing the user to create fake returns from all of the Spot gRPC services. We will begin by showing how to create a gRPC server that can pretend to be Spot by mocking the gRPC services specific to Spot robots. In practice, this amounts to subclassing gRPC servicer classes and providing an implementation for each service method. Aside from implementation-specific details -- typically a dummy implementation bearing minimal logic, if any --, this is no different from any other gRPC service implementation, and thus the same semantics and guidelines apply.
As an initial example, consider the following bosdyn.api.RobotIdService
mock implementation.
import concurrent.futures
import grpc
from bosdyn.api.robot_id_pb2 import RobotIdRequest, RobotIdResponse
from bosdyn.api.robot_id_service_pb2_grpc import RobotIdServiceServicer
from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server
class MockRobotIdService(RobotIdServiceServicer):
"""A mock Spot robot id service."""
def GetRobotId(self, request: RobotIdRequest, context: grpc.ServicerContext) -> RobotIdResponse:
response = RobotIdResponse()
response.robot_id.serial_number = "1234567890"
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
add_RobotIdServiceServicer_to_server(MockRobotIdService(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()
In the above example, bosdyn.api.robot_id_service_pb2_grpc.RobotIdServiceServicer
is subclassed to implement the
GetRobotId
method. When run, this script will instantiate a gRPC server over an insecure channel (for simplicity's sake), add a MockRobotIdService
instance to that server, and spin on it until the program is terminated. The gRPC server will serve all calls to the robot id service by running this mock implementation and returning that the robot has ID "1234567890".
By mocking Spot gRPC services in this way, a complete Spot robot can be emulated, but that would require writing hundreds of such functions.
To alleviate that burden, spot_wrapper.testing
already provides enough
built-in mocks to cover for spot_wrapper.SpotWrapper
initialization. This includes mocks for emergency stop services, leasing
services, keep-alive services, time synchronization services, and more. Each mock is designed as a standalone component, and
multiple mocks can be aggregated via multiple inheritance, like building blocks of larger, more comprehensive mocks. These mocks
can naturally coexist with user-defined ones. For example, the spot_wrapper.testing
mock implementation of the time sync service is shown below:
import concurrent.futures
import grpc
from bosdyn.api.time_sync_pb2 import (
TimeSyncState,
TimeSyncUpdateRequest,
TimeSyncUpdateResponse,
)
from bosdyn.api.time_sync_service_pb2_grpc import (
TimeSyncServiceServicer,
add_TimeSyncServiceServicer_to_server,
)
from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server
from bosdyn.api.robot_state_service_pb2_grpc import add_RobotStateServiceServicer_to_server
from spot_wrapper.testing.mocks.robot_id import MockRobotIdService
from spot_wrapper.testing.mocks.robot_state import MockRobotStateService
class MockAnyRobotService(MockRobotIdService, MockRobotStateService, TimeSyncServiceServicer):
def TimeSyncUpdate(self, request: TimeSyncUpdateRequest, context: grpc.ServicerContext) -> TimeSyncUpdateResponse:
response = TimeSyncUpdateResponse()
response.state.status = TimeSyncState.STATUS_OK
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
mock = MockAnyRobotService()
add_TimeSyncServiceServicer_to_server(mock, server)
add_RobotIdServiceServicer_to_server(mock, server)
add_RobotStateServiceServicer_to_server(mock, server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()
In the above example, MockAnyRobotService
subclasses bosdyn.api.time_sync_service_pb2_grpc.TimeSyncServiceServicer
to implement the TimeSyncUpdate
method, but it also subclasses the MockRobotIdService
and MockRobotStateService
classes
that the spot_wrapper.testing.mocks
subpackage already provides, implementing GetRobotId
, GetRobotState
, GetRobotMetrics
,
GetRobotHardwareConfiguration
, and GetRobotLinkModel
methods. When run, this script will instantiate a gRPC server over an
insecure channel (for simplicity's sake), add a MockAnyRobotService
instance to that server, and spin on it until the program is terminated.
These gRPC service mocks are seldom used or even referenced explicitly though - instead we offer a MockSpot
class that already contains all of the basic implementations needed to instantiate a SpotWrapper
. Unless you wish to explicitly test one of these services, you should simply use the MockSpot
class (you can find more mocks in spot_wrapper.testing.mocks
):
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
mock = MockSpot()
mock.add_to(server)
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()
Now MockSpot
only has mock implementations for a small number of the possible Spot gRPC services we may want to query. Users could inherit from MockSpot
and keep adding mock implementations for more gRPC services, but this is a lot overhead. Therefore, we allow users to specify for any gRPC service what it should return in the future via a future handler (e.g. all gRPC service
methods left unimplemented will be (re)defined as deferred method handlers). For example the following piece of code shows a user interacting with the robot_command
service even though that is not explicitly mocked out in MockSpot
:
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
mock = MockSpot()
mock.add_to(server)
# Proceed to mock a sequence of exchanges
mock.RobotCommand.future.fails(grpc.StatusCode.INTERNAL) # first one will fail
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
mock.RobotCommand.future.returns(response) # second one will succeed
server.start()
call = mock.RobotCommand.serve(timeout=5.0) # third one will be processed here
assert call is not None
assert call.request.command.Which("command") == "full_body_command"
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
call.returns(response)
# done, now wait
server.wait_for_termination()
if __name__ == "__main__":
main()
In the above example, we've used the future handlers to tell the mock spot how it should respond to the first three calls: the first request will fail with an internal error status and the second request will
succeed, returning a successful response. The third request will be processed in the main thread. The call
object is a proxy to an ongoing service invocation waiting for call resolution (i.e. blocked) in a background thread. No further
requests will be handled.
A note about streaming services. Services that stream request, response, or both are also supported. Streamed requests will be (potentially consumable) iterables, and the same goes for streamed responses. Lists and generators fit the description here.
Instances of MockSpot
(and MockSpot
subclasses) thus cover all Spot services with a mixture of built-in, user-defined, and deferred
implementations. Note that it is up to the user to handle deferred service requests in a timely manner. Failing to do so may result
in unexpected and potentially indefinite hangs (e.g. if the Spot SDK is blocked waiting on a service response for a service request
that was left unhandled). In the example below, the server will just queue RobotCommand
service calls (blocking threads in its thread pool), as we haven't specified how to handle them. Upon termination and during mock cleanup, a warning will be emitted to notify us about this.
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
with MockSpot() as mock:
mock.add_to(server)
server.start()
server.wait_for_termination() # No deferred RobotCommand service request will be handled now
# when leaving context, a warning will be issued about pending RobotCommand service requests if any
if __name__ == "__main__":
main()
MockSpot
subclasses can also enable automatic service tracking support by setting the autotrack
class attribute.
When autotrack
is True
, all gRPC service method implementations, built-in, user-defined, or deferred, will be decorated
to track the number of calls and all the requests served. Note this can have a substantial impact on memory usage over time.
It can also affect streamed services timing, as streamed service requests are collected in full before service invocation.
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
from bosdyn.api.robot_command_pb2 import RobotCommandRequest, RobotCommandResponse
class MyMockSpot(MockSpot):
autotrack = True
def RobotCommand(self, request: RobotCommandRequest, context: grpc.ServicerContext) -> RobotCommandResponse:
response = RobotCommandResponse()
response.state.status = RobotCommandResponse.Status.STATUS_OK
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
try:
with MyMockSpot() as mock:
mock.add_to(server)
server.start()
server.wait_for_termination()
finally:
print(f"RobotCommand service invoked {mock.RobotCommand.num_calls} times with", mock.RobotCommand.requests)
if __name__ == "__main__":
main()
For the odd cases in which better control over how the mock is put together is necessary, there is spot_wrapper.testing.mocks.BaseMockSpot
.
BaseMockSpot
is not as much a mock as it is a mock interface. It inherits from all gRPC servicer classes that the Spot SDK exposes too,
but uses no built-in mock and disables both automatic service specification and tracking by default. Subclasses can enable them using the
autospec
and autotrack
class attributes, respectively.
The above examples show how to create a gRPC server that can almost pretend to be Spot. Full Spot emulation, however, requires secure gRPC channels and domain impersonation. We will also need a SpotWrapper
instance, wrapping Spot SDK gRPC calls.
The easiest way to do this is to define a spot_wrapper.testing
fixture that inherits from MockSpot
. These fixtures are pytest.fixture
s that take care of both instantiating your MockSpot
subclass and adding it to a gRPC server running in the background. This gRPC server will be configured with self-signed SSL certificates included in the spot_wrapper.testing
subpackage for testing purposes. All information necessary to establish the connection is then exposed as fixture attributes to aid SpotWrapper
instantiation.
import logging
import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
@spot_wrapper.testing.fixture
class simplest_mock(MockSpot):
name = "bob"
def test_wrapper(simplest_mock):
wrapper = SpotWrapper(
robot_name=simplest_mock.api.name,
username="spot",
password="spot",
hostname=simplest_mock.address,
port=simplest_mock.port,
cert_resource_glob=str(simplest_spot.certificate_path),
logger=logging.getLogger("spot"),
)
assert wrapper.is_valid
Note that many of the most common calls to SpotWrapper are decorated to require the robot to be powered on before the function will execute. The PowerCommand
is not one of the services that MockSpot
already creates. Therefore, if you will be making any complicated calls to SpotWrapper
(including calls to RobotCommand
), it is necessary to add a mock for the power command. Thus the most common testing set up is as follows:
import logging
import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
@spot_wrapper.testing.fixture
class simplest_mock(MockSpot):
name = "bob"
def PowerCommand(self, request: PowerCommandRequest, context: grpc.ServicerContext) -> PowerCommandResponse:
"""
Dummy implementation of the PowerCommand command.
"""
response = PowerCommandResponse()
if request.request == PowerCommandRequest.Request.REQUEST_ON_MOTORS:
self.robot_state.power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_ON
response.status = PowerCommandStatus.STATUS_SUCCESS
elif request.request == PowerCommandRequest.Request.REQUEST_OFF_MOTORS:
self.robot_state.power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_OFF
response.status = PowerCommandStatus.STATUS_SUCCESS
else:
response.status = PowerCommandStatus.STATUS_INTERNAL_ERROR
return response
def test_wrapper(simplest_mock):
wrapper = SpotWrapper(
robot_name=simplest_mock.api.name,
username="spot",
password="spot",
hostname=simplest_mock.address,
port=simplest_mock.port,
cert_resource_glob=str(simplest_spot.certificate_path),
logger=logging.getLogger("spot"),
)
assert wrapper.is_valid
The usual pattern for testing using spot_wrapper.testing
machinery is shown below:
import logging
import grpc
import pytest
from bosdyn.api.manipulation_api_pb2 import (
ManipulationApiRequest,
ManipulationApiResponse,
)
from bosdyn.api.power_pb2 import (
PowerCommandRequest,
PowerCommandResponse,
PowerCommandStatus,
)
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
from bosdyn.api.robot_state_pb2 import PowerState
import spot_wrapper.testing
from spot_wrapper.testing.fixtures import SpotFixture
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
@spot_wrapper.testing.fixture(ids=["with_arm", "without_arm"], params=[True, False])
class simple_spot(MockSpot):
def __init__(self, request) -> None:
# Fixture initialization can request other fixtures!
super().__init__()
if request.param:
manipulator_state = self.robot_state.manipulator_state
manipulator_state.is_gripper_holding_item = True
def PowerCommand(self, request: PowerCommandRequest, context: grpc.ServicerContext) -> PowerCommandResponse:
response = PowerCommandResponse()
power_state = self.robot_state.power_state
if request.request == PowerCommandRequest.Request.REQUEST_ON_MOTORS:
power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_ON
response.status = PowerCommandStatus.STATUS_SUCCESS
elif request.request == PowerCommandRequest.Request.REQUEST_OFF_MOTORS:
power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_OFF
response.status = PowerCommandStatus.STATUS_SUCCESS
else:
response.status = PowerCommandStatus.STATUS_INTERNAL_ERROR
return response
@pytest.fixture
def simple_spot_wrapper(simple_spot: SpotFixture) -> SpotWrapper:
return SpotWrapper(
robot_name=simple_spot.api.name,
username="spot",
password="spot",
hostname=simple_spot.address,
port=simple_spot.port,
cert_resource_glob=str(simple_spot.certificate_path),
logger=logging.getLogger("spot"),
)
def test_claim(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
assert simple_spot_wrapper.is_valid
ok, message = simple_spot_wrapper.claim()
assert ok, message
def test_command(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
if simple_spot_wrapper.has_arm():
request = ManipulationApiRequest()
request.pick_object.frame_name = "gripper"
request.pick_object.object_rt_frame.x = 1.0
response = ManipulationApiResponse()
response.manipulation_cmd_id = 1
simple_spot.api.ManipulationApi.future.returns(response)
ok, message, command_id = simple_spot_wrapper.manipulation_command(request)
assert ok and response.manipulation_cmd_id == command_id, message
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response)
ok, message = simple_spot_wrapper.sit()
assert ok, message
def test_power_on(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
assert not simple_spot_wrapper.check_is_powered_on()
assert simple_spot_wrapper.toggle_power(True)
assert simple_spot_wrapper.check_is_powered_on()
Let's break that up. First, a spot_wrapper.testing.fixture
named simple_spot
is defined. It is based on MockSpot
, which means that all Spot
services will be mocked. Since a spot_wrapper.testing.fixture
is a pytest.fixture
, the same functionality can be leveraged. Here, the fixture is
parametrized on gripper availability. It relies on the robot_state
attribute that the built-in robot state service mock exposes mutate state. The fixture is also configured (by omission) to be
function scoped, which
means that every test that requests it, directly or indirectly through another fixture, will get their own instance of the fixture.
@spot_wrapper.testing.fixture(ids=["with_arm", "without_arm"], params=[True, False])
class simple_spot(MockSpot):
def __init__(self, request) -> None:
super().__init__()
if request.param:
manipulator_state = self.robot_state.manipulator_state
manipulator_state.is_gripper_holding_item = True
In addition to built-in and deferred mock services, the fixture defines a mock of its own for the PowerCommand
method of the
bosdyn.api.PowerService
service. This is a standard gRPC service method implementation.
def PowerCommand(self, request: PowerCommandRequest, context: grpc.ServicerContext) -> PowerCommandResponse:
response = PowerCommandResponse()
power_state = self.robot_state.power_state
if request.request == PowerCommandRequest.Request.REQUEST_ON_MOTORS:
power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_ON
response.status = PowerCommandStatus.STATUS_SUCCESS
elif request.request == PowerCommandRequest.Request.REQUEST_OFF_MOTORS:
power_state.motor_power_state = PowerState.MotorPowerState.MOTOR_POWER_STATE_OFF
response.status = PowerCommandStatus.STATUS_SUCCESS
else:
response.status = PowerCommandStatus.STATUS_INTERNAL_ERROR
return response
A second fixture, simple_spot_wrapper
, is defined to encapsulate the wrapper instantiation. It requests the simple_spot
fixture to forward address, port, self-signed CA root certificate, and robot name to the wrapper. This will ensure that wrapper and fixture are connected.
@pytest.fixture
def simple_spot_wrapper(simple_spot: SpotFixture) -> SpotWrapper:
return SpotWrapper(
robot_name=simple_spot.api.name,
username="spot",
password="spot",
hostname=simple_spot.address,
port=simple_spot.port,
cert_resource_glob=str(simple_spot.certificate_path),
logger=logging.getLogger("spot"),
)
Any number of tests can now be written using simple_spot_wrapper
and simple_spot
fixtures to command and emulate Spot, respectively.
For example, in test_claim
, simple_spot_wrapper.claim()
will succeed to claim because simple_spot
can deliver dummy leases, fake
emergency stops, and so on. These are served in the background by the fixture gRPC server while simple_splot_wrapper
waits for it.
def test_claim(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
assert simple_spot_wrapper.is_valid
ok, message = simple_spot_wrapper.claim()
assert ok, message
test_command
takes things a bit further. If the emulated Spot has an arm, as reported by mocked robot state services, a
simple_spot_wrapper.manipulation_command
will be issued and succeed because a successful response is specified in advance
via simple_spot.api.ManipulationApi.future
. This response is delivered during simple_spot_wrapper.manipulation_command()
execution. Then, simple_spot_wrapper.sit()
will succeed to sit the emulated Spot because another successful response is
specified in advance, this time via simple_spot.api.RobotCommand.future
.
def test_command(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
if simple_spot_wrapper.has_arm():
request = ManipulationApiRequest()
request.pick_object.frame_name = "gripper"
request.pick_object.object_rt_frame.x = 1.0
response = ManipulationApiResponse()
response.manipulation_cmd_id = 1
simple_spot.api.ManipulationApi.future.returns(response)
ok, message, command_id = simple_spot_wrapper.manipulation_command(request)
assert ok and response.manipulation_cmd_id == command_id, message
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response)
ok, message = simple_spot_wrapper.sit()
assert ok, message
test_power_on
takes yet another route. It relies on the PowerCommand
method mock defined in the fixture to mutate robot state and reply.
def test_power_on(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
assert not simple_spot_wrapper.check_is_powered_on()
assert simple_spot_wrapper.toggle_power(True)
assert simple_spot_wrapper.check_is_powered_on()
Summarizing:
- using
MockSpot
for the fixture takes care of initialization exchanges; - user-defined mock services, defined locally or separately as a mock of its own to enable reuse, can take care of the exchanges that are secondary to but often required by the test;
- the actual test is carried out through deferred mock services, tying the flow of gRPC exchanges to the execution path of the test;
- since
SpotWrapper
is mostly synchronous, service responses (or failure) must be typically specified in advanced.
For further reference on available mocks and their features, go to spot_wrapper.testing.mocks
subpackage. For further reference on spot_wrapper.testing
moving parts, see spot_wrapper.testing
codebase and documentation.