Skip to content

Commit

Permalink
Allow repeated future outcomes in mocked gRPCs
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
mhidalgo-bdai committed Feb 22, 2024
1 parent aeb9ab1 commit 8711110
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 21 deletions.
129 changes: 115 additions & 14 deletions spot_wrapper/testing/grpc.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Copyright (c) 2023 Boston Dynamics AI Institute LLC. See LICENSE file for more info.

import collections.abc
import inspect
import logging
import math
import os
import queue
import sys
import threading
import typing
from abc import ABC, abstractmethod

import grpc

Expand Down Expand Up @@ -323,29 +326,127 @@ class Future:
"""
A window to future gRPC calls.
This helper class stores call resolutions in a FIFO queue,
for the handler to apply on future calls.
This helper class stores call outcomes in a FIFO queue,
for the handler to resolve future calls.
"""

class Outcome(ABC):
"""A generic outcome for future calls."""

def __init__(self) -> None:
self._num_repeats: typing.Union[int, float] = 1
self._num_uses: int = 0

@property
def num_repeats(self) -> typing.Union[int, float]:
"""Returns the number of times this outcome will be used."""
return self._num_repeats

@property
def num_uses(self) -> int:
"""Returns the number of times this outcome has been used."""
return self._num_uses

@abstractmethod
def do_resolve(self, call: "DeferredRpcHandler.Call") -> None:
pass

def resolve(self, call: "DeferredRpcHandler.Call") -> bool:
"""Resolves the given `call` using this outcome."""
if self._num_uses >= self._num_repeats:
return False
self.do_resolve(call)
self._num_uses += 1
return True

def repeatedly(self, times: int) -> None:
"""States that this outcome can be used repeatedly, a finite number of `times`."""
assert times > 0
self._num_repeats = times

def forever(self) -> None:
"""States that this outcome can be used repeatedly, forever."""
self._num_repeats = math.inf

class Response(Outcome):
"""A successful response outcome for future calls."""

def __init__(self, response: typing.Any) -> None:
super().__init__()
self._response = response

def do_resolve(self, call: "DeferredRpcHandler.Call") -> None:
call.returns(self._response)

def _raise_for_streamed_responses(self):
if isinstance(self._response, collections.abc.Iterable):
if not isinstance(self._response, collections.abc.Sized):
raise RuntimeError("Cannot repeat an streamed response, specify it as a sized collection")

def repeatedly(self, times: int) -> None:
self._raise_for_streamed_responses()
super().repeatedly(times)

def forever(self) -> None:
self._raise_for_streamed_responses()
super().forever()

class Failure(Outcome):
"""A failure outcome for future calls."""

def __init__(self, code: grpc.StatusCode, details: typing.Optional[str] = None) -> None:
super().__init__()
self._code = code
self._details = details

def do_resolve(self, call: "DeferredRpcHandler.Call") -> None:
call.fails(self._code, self._details)

def __init__(self) -> None:
self._changequeue: queue.SimpleQueue = queue.SimpleQueue()
self._lock: threading.Lock = threading.Lock()
self._queue: collections.deque = collections.deque()

def materialize(self, call: "DeferredRpcHandler.Call") -> bool:
"""Makes `call` the next call, applying the oldest resolution specified."""
try:
change = self._changequeue.get_nowait()
change(call)
return True
except queue.Empty:
with self._lock:
while len(self._queue) > 0:
outcome = self._queue[0]
if outcome.resolve(call):
return True
self._queue.popleft()
return False

def returns(self, response: typing.Any) -> None:
"""Specifies the next call will succeed with the given `response`."""
self._changequeue.put(lambda call: call.returns(response))
def _raise_when_future_is_predetermined(self) -> None:
if len(self._queue) > 0:
pending_outcome = self._queue[-1]
if pending_outcome.num_repeats and not math.isfinite(pending_outcome.num_repeats):
raise RuntimeError("Future is predetermined, cannot specify response (did you use forever())")

def fails(self, code: grpc.StatusCode, details: typing.Optional[str] = None) -> None:
"""Specifies the next call will fail with given error `code` and `details`."""
self._changequeue.put(lambda call: call.fails(code, details))
def returns(self, response: typing.Any) -> "DeferredRpcHandler.Future.Outcome":
"""
Specifies the next call will succeed with the given `response`.
It returns a future outcome that can inspected and repeated as need be.
"""
with self._lock:
self._raise_when_future_is_predetermined()
outcome = DeferredRpcHandler.Future.Response(response)
self._queue.append(outcome)
return outcome

def fails(
self, code: grpc.StatusCode, details: typing.Optional[str] = None
) -> "DeferredRpcHandler.Future.Outcome":
"""
Specifies the next call will fail with given error `code` and `details`.
It returns a future outcome that can inspected and repeated as need be.
"""
with self._lock:
self._raise_when_future_is_predetermined()
outcome = DeferredRpcHandler.Future.Failure(code, details)
self._queue.append(outcome)
return outcome

def __init__(self, handler: typing.Callable) -> None:
super().__init__(handler)
Expand Down
18 changes: 11 additions & 7 deletions spot_wrapper/tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,32 @@ def test_wrapper_setup(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrappe
if simple_spot_wrapper.has_arm():
# bosdyn.api.ManipulationApiService/ManipulationApi implementation
# is mocked (as spot_wrapper.testing.mocks.MockSpot is automatically
# specified), so we provide a response in advance.
# specified), so here we provide the same response for every future
# request in advance.
request = ManipulationApiRequest()
request.pick_object.frame_name = "gripper"
request.pick_object.object_rt_frame.x = 1.0

response = ManipulationApiResponse()
response.manipulation_cmd_id = 1
simple_spot.api.ManipulationApi.future.returns(response)
simple_spot.api.ManipulationApi.future.returns(response).forever()

ok, message, command_id = simple_spot_wrapper.manipulation_command(request)
assert ok and response.manipulation_cmd_id == command_id, message
for _ in range(5):
ok, message, command_id = simple_spot_wrapper.manipulation_command(request)
assert ok and response.manipulation_cmd_id == command_id, message

# Power toggling relies on a combination of
assert not simple_spot_wrapper.check_is_powered_on()
assert simple_spot_wrapper.toggle_power(True)

# bosdyn.api.RobotCommandService/RobotCommand implementation
# is mocked (as spot_wrapper.testing.mocks.MockSpot is
# automatically specified), so we provide a response in
# advance.
# automatically specified), so here we provide a response in
# advance for each command.
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response)
simple_spot.api.RobotCommand.future.returns(response).repeatedly(2)
ok, message = simple_spot_wrapper.sit()
assert ok, message
ok, message = simple_spot_wrapper.stand()
assert ok, message

0 comments on commit 8711110

Please sign in to comment.