Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modular estop #57

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions spot_wrapper/spot_estop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
import typing
from dataclasses import dataclass

from bosdyn.client.async_tasks import AsyncPeriodicQuery
from bosdyn.client.estop import (
EstopClient,
EstopEndpoint,
EstopKeepAlive,
)
from bosdyn.client.robot import Robot


@dataclass
class KeepAliveHandle:
"""
Helper class to store a handle to the keepalive which can be modified by the estop class and checked by the async
monitor
"""

keep_alive: typing.Optional[EstopKeepAlive] = None


class AsyncEStopMonitor(AsyncPeriodicQuery):
"""Class to check if the estop endpoint is still valid"""

def __init__(
self,
client,
logger: logging.Logger,
rate: float,
estop_keep_alive: KeepAliveHandle,
) -> None:
"""
Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
estop_keep_alive: A handle to the estop keepalive object
"""
super().__init__("estop_alive", client, logger, period_sec=1.0 / rate)
self._estop_keep_alive = estop_keep_alive

def _start_query(self) -> None:
if not self._estop_keep_alive:
self._logger.debug("No keepalive yet - the lease has not been claimed.")
return

last_estop_status = self._estop_keep_alive.keep_alive.status_queue.queue[-1]
if last_estop_status[0] == EstopKeepAlive.KeepAliveStatus.ERROR:
self._logger.error(f"Estop keepalive has an error: {last_estop_status[1]}")
elif last_estop_status == EstopKeepAlive.KeepAliveStatus.DISABLED:
self._logger.error(f"Estop keepalive is disabled: {last_estop_status[1]}")


class SpotEstop:
"""
Module which handles the e-stop interaction and monitoring
"""

def __init__(
self,
logger: logging.Logger,
robot: Robot,
estop_client: EstopClient,
spot_client_name: str,
estop_timeout: float = 9.0,
) -> None:
"""

Args:
logger:
robot:
estop_client:
spot_client_name:
estop_timeout:
"""
self._logger = logger
self._robot = robot
self._estop_endpoint: typing.Optional[EstopEndpoint] = None
self._keep_alive_handle = KeepAliveHandle()
self._keep_alive: typing.Optional[EstopKeepAlive] = None
self._estop_client = estop_client
self._estop_timeout = estop_timeout
self._spot_client_name = spot_client_name
self._estop_task = AsyncEStopMonitor(
self._estop_client, self._logger, 20.0, self._keep_alive_handle
)

def assert_estop(self, severe: bool = True) -> typing.Tuple[bool, str]:
"""
Forces the robot into eStop state.

Args:
severe: If true, will cut motor power immediately. If false, will try to settle the robot on the ground
first
Returns:
bool success and a message
"""
try:
if severe:
self._keep_alive.stop()
else:
self._keep_alive.settle_then_cut()

return True, "Success"
except Exception as e:
return False, f"Exception while attempting to estop: {e}"

def disengage_estop(self) -> typing.Tuple[bool, str]:
"""
Disengages the E-Stop
"""
try:
self._keep_alive.allow()
return True, "Success"
except Exception as e:
return False, f"Exception while attempting to disengage estop {e}"

def release_estop(self) -> None:
"""
Stop eStop keepalive
"""
if self._keep_alive:
self._keep_alive.stop()
self._keep_alive = None
self._keep_alive_handle.keep_alive = None
self._estop_endpoint = None

def reset_estop(self) -> None:
"""
Get keepalive for eStop
"""
self._estop_endpoint = EstopEndpoint(
self._estop_client, self._spot_client_name, self._estop_timeout
)
self._estop_endpoint.force_simple_setup() # Set this endpoint as the robot's sole estop.
self._keep_alive = EstopKeepAlive(self._estop_endpoint)
self._keep_alive_handle.keep_alive = self._keep_alive

@property
def async_task(self) -> AsyncPeriodicQuery:
"""Returns the async PointCloudService task for the robot"""
return self._estop_task
142 changes: 49 additions & 93 deletions spot_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@
from bosdyn.client import robot_command
from bosdyn.client.async_tasks import AsyncPeriodicQuery, AsyncTasks
from bosdyn.client.docking import DockingClient, blocking_dock_robot, blocking_undock
from bosdyn.client.estop import (
EstopClient,
EstopEndpoint,
EstopKeepAlive,
MotorsOnError,
)
from bosdyn.client.estop import EstopClient
from bosdyn.client.frame_helpers import get_odom_tform_body
from bosdyn.client.graph_nav import GraphNavClient
from bosdyn.client.image import (
Expand Down Expand Up @@ -84,6 +79,7 @@
from google.protobuf.timestamp_pb2 import Timestamp

from .spot_eap import SpotEAP
from .spot_estop import SpotEstop
from .spot_world_objects import SpotWorldObjects


Expand Down Expand Up @@ -460,47 +456,6 @@ def _start_query(self) -> None:
self._spot_wrapper.stand(False)


class AsyncEStopMonitor(AsyncPeriodicQuery):
"""Class to check if the estop endpoint is still valid

Attributes:
client: The Client to a service on the robot
logger: Logger object
rate: Rate (Hz) to trigger the query
spot_wrapper: A handle to the wrapper library
"""

def __init__(self, client, logger, rate, spot_wrapper):
super(AsyncEStopMonitor, self).__init__(
"estop_alive", client, logger, period_sec=1.0 / rate
)
self._spot_wrapper = spot_wrapper

def _start_query(self):
if not self._spot_wrapper._estop_keepalive:
self._logger.debug("No keepalive yet - the lease has not been claimed.")
return

last_estop_status = self._spot_wrapper._estop_keepalive.status_queue.queue[-1]
if (
last_estop_status[0]
== self._spot_wrapper._estop_keepalive.KeepAliveStatus.ERROR
):
self._logger.error(
"Estop keepalive has an error: {}".format(last_estop_status[1])
)
elif (
last_estop_status
== self._spot_wrapper._estop_keepalive.KeepAliveStatus.DISABLED
):
self._logger.error(
"Estop keepalive is disabled: {}".format(last_estop_status[1])
)
else:
# estop keepalive is ok
pass


def try_claim(func=None, *, power_on=False):
"""
Decorator which tries to acquire the lease before executing the wrapped function
Expand Down Expand Up @@ -892,20 +847,13 @@ def __init__(
self._idle_task = AsyncIdle(
self._robot_command_client, self._logger, 10.0, self
)
self._estop_monitor = AsyncEStopMonitor(
self._estop_client, self._logger, 20.0, self
)

self._estop_endpoint = None
self._estop_keepalive = None

robot_tasks = [
self._robot_state_task,
self._robot_metrics_task,
self._lease_task,
self._front_image_task,
self._idle_task,
self._estop_monitor,
]

if self._point_cloud_client:
Expand All @@ -931,6 +879,16 @@ def __init__(
self._world_objects_task = self._spot_world_objects.async_task
robot_tasks.append(self._world_objects_task)

self._spot_estop = SpotEstop(
logger,
self._robot,
self._estop_client,
SPOT_CLIENT_NAME,
self._estop_timeout,
)
self._estop_task = self._spot_estop.async_task
robot_tasks.append(self._estop_task)

self._async_tasks = AsyncTasks(robot_tasks)

self.camera_task_name_to_task_mapping = {
Expand Down Expand Up @@ -1001,6 +959,10 @@ def spot_eap_lidar(self) -> typing.Optional[SpotEAP]:
"""Return SpotEAP instance"""
return self._spot_eap

@property
def spot_estop(self) -> typing.Optional[SpotEstop]:
return self._spot_estop

@property
def logger(self) -> logging.Logger:
"""Return logger instance of the SpotWrapper"""
Expand Down Expand Up @@ -1157,9 +1119,6 @@ def last_velocity_command_time(self) -> typing.Optional[float]:
def last_velocity_command_time(self, command_time: float) -> None:
self._command_data.last_velocity_command_time = command_time

def is_estopped(self, timeout: typing.Optional[float] = None) -> bool:
return self._robot.is_estopped(timeout=timeout)

def has_arm(self, timeout: typing.Optional[float] = None) -> bool:
return self._robot.has_arm(timeout=timeout)

Expand Down Expand Up @@ -1203,7 +1162,7 @@ def claim(self) -> typing.Tuple[bool, str]:
# estop. The robot already being powered on is relevant when the lease is being taken from someone
# else who may already have the motor cut power authority - in this case we cannot take that
# authority as the robot would have to sit down.
self.resetEStop()
self.reset_estop()
return True, "Success"
except (ResponseError, RpcError) as err:
self._logger.error("Failed to initialize robot communication: %s", err)
Expand All @@ -1219,44 +1178,41 @@ def updateTasks(self) -> None:
except Exception as e:
self._logger.error(f"Update tasks failed with error: {str(e)}")

def resetEStop(self) -> None:
"""Get keepalive for eStop"""
self._estop_endpoint = EstopEndpoint(
self._estop_client, SPOT_CLIENT_NAME, self._estop_timeout
)
self._estop_endpoint.force_simple_setup() # Set this endpoint as the robot's sole estop.
self._estop_keepalive = EstopKeepAlive(self._estop_endpoint)

def assertEStop(self, severe: bool = True) -> typing.Tuple[bool, str]:
"""Forces the robot into eStop state.
def is_estopped(self, timeout: typing.Optional[float] = None) -> bool:
"""
Check if the robot is estopped

Args:
severe: Default True - If true, will cut motor power immediately. If false, will try to settle the robot on the ground first
timeout: Wait this long for the rpc communication

Returns:
True if the robot is estopped, false otherwise
"""
try:
if severe:
self._estop_keepalive.stop()
else:
self._estop_keepalive.settle_then_cut()
return self._robot.is_estopped(timeout=timeout)

return True, "Success"
except Exception as e:
return False, f"Exception while attempting to estop: {e}"
def reset_estop(self):
"""
Get or reset the estop
"""
self._spot_estop.reset_estop()

def disengageEStop(self) -> typing.Tuple[bool, str]:
"""Disengages the E-Stop"""
try:
self._estop_keepalive.allow()
return True, "Success"
except Exception as e:
return False, f"Exception while attempting to disengage estop {e}"
def release_estop(self):
"""
Releases the estop
"""
self._spot_estop.release_estop()

def assert_estop(self, severe: bool = True):
"""
Forces the robot into eStop state.

def releaseEStop(self) -> None:
"""Stop eStop keepalive"""
if self._estop_keepalive:
self._estop_keepalive.stop()
self._estop_keepalive = None
self._estop_endpoint = None
Args:
severe: If true, will cut motor power immediately. If false, will try to settle the robot on the ground
first
Returns:
bool success and a message
"""
return self._spot_estop.assert_estop(severe)

def getLease(self) -> None:
"""Get a lease for the robot and keep the lease alive automatically."""
Expand All @@ -1277,7 +1233,7 @@ def release(self) -> typing.Tuple[bool, str]:
"""Return the lease on the body and the eStop handle."""
try:
self.releaseLease()
self.releaseEStop()
self.release_estop()
return True, "Success"
except Exception as e:
return False, f"Exception while attempting to release the lease: {e}"
Expand All @@ -1287,7 +1243,7 @@ def disconnect(self) -> None:
if self._robot.time_sync:
self._robot.time_sync.stop()
self.releaseLease()
self.releaseEStop()
self.release_estop()

def _robot_command(
self,
Expand Down Expand Up @@ -1488,7 +1444,7 @@ def power_on(self) -> typing.Tuple[bool, str]:
if not self.check_is_powered_on():
# If we are requested to start the estop, we have to acquire it when powering on.
if self._start_estop:
self.resetEStop()
self.reset_estop()
try:
self._logger.info("Powering on")
self._robot.power_on()
Expand Down