-
Notifications
You must be signed in to change notification settings - Fork 560
/
Copy pathdirectory.py
129 lines (94 loc) · 4.79 KB
/
directory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the directory service.
A DirectoryClient allows a client to look-up information about other API services available on a
robot.
"""
import collections
from bosdyn.api import directory_pb2, directory_service_pb2_grpc
from .common import (BaseClient, common_header_errors, error_factory, handle_common_header_errors,
handle_unset_status_error)
from .exceptions import ResponseError
class DirectoryResponseError(ResponseError):
"""General class of errors for Directory service."""
class NonexistentServiceError(DirectoryResponseError):
"""The requested service name does not exist."""
def _list_value(response):
return response.service_entries
def _get_entry_value(response):
return response.service_entry
class DirectoryClient(BaseClient):
"""List robot services and get information on them."""
# Typical name of the service in the robot's directory listing.
default_service_name = 'directory'
# gRPC service proto definition implemented by this service
service_type = 'bosdyn.api.DirectoryService'
def __init__(self):
super(DirectoryClient, self).__init__(directory_service_pb2_grpc.DirectoryServiceStub)
def list(self, **kwargs):
"""List all services present on the robot.
Returns:
A list of the proto message definitions of all registered services
Raises:
RpcError: Problem communicating with the robot.
"""
req = directory_pb2.ListServiceEntriesRequest()
return self.call(self._stub.ListServiceEntries, req, value_from_response=_list_value,
error_from_response=common_header_errors, copy_request=False, **kwargs)
def list_async(self, **kwargs):
"""List all services present on the robot.
Returns:
A list of the proto message definitions of all registered services
Raises:
RpcError: Problem communicating with the robot.
"""
req = directory_pb2.ListServiceEntriesRequest()
return self.call_async(self._stub.ListServiceEntries, req, value_from_response=_list_value,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
def get_entry(self, service_name, **kwargs):
"""Get the service entry for one particular service specified by name.
Args:
service_name: The name of the service to retrieve.
Returns:
The proto message definition of the service entry
Raises:
RpcError: Problem communicating with the robot.
NonexistentServiceError: The service was not found.
DirectoryResponseError: Something went wrong during the directory access.
"""
req = directory_pb2.GetServiceEntryRequest(service_name=service_name)
return self.call(self._stub.GetServiceEntry, req, value_from_response=_get_entry_value,
error_from_response=_error_from_response, copy_request=False, **kwargs)
def get_entry_async(self, service_name, **kwargs):
"""Get the service entry for one particular service specified by name.
Args:
service_name: The name of the service to retrieve.
Returns:
The proto message definition of the service entry
Raises:
RpcError: Problem communicating with the robot.
NonexistentServiceError: The service was not found.
DirectoryResponseError: Something went wrong during the directory access.
"""
req = directory_pb2.GetServiceEntryRequest(service_name=service_name)
return self.call_async(self._stub.GetServiceEntry, req,
value_from_response=_get_entry_value,
error_from_response=_error_from_response, copy_request=False,
**kwargs)
_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_STATUS_TO_ERROR.update({
directory_pb2.GetServiceEntryResponse.STATUS_OK: (None, None),
directory_pb2.GetServiceEntryResponse.STATUS_NONEXISTENT_SERVICE:
(NonexistentServiceError, NonexistentServiceError.__doc__),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _error_from_response(response):
"""Return a custom exception based on response, None if no error."""
return error_factory(response, response.status,
status_to_string=directory_pb2.GetServiceEntryResponse.Status.Name,
status_to_error=_STATUS_TO_ERROR)