-
Notifications
You must be signed in to change notification settings - Fork 560
/
Copy pathpayload.py
67 lines (47 loc) · 2.22 KB
/
payload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the payload service.
This allows client code to read from the robot payload registry.
"""
import logging
import bosdyn.api.payload_pb2 as payload_protos
import bosdyn.api.payload_pb2 as payload_service_protos
import bosdyn.api.payload_service_pb2_grpc as payload_service
from .common import BaseClient, common_header_errors
LOGGER = logging.getLogger('payload_client')
def _get_entry_value(response):
return response.payloads
class PayloadClient(BaseClient):
"""A client handling payload configs."""
default_service_name = 'payload'
service_type = 'bosdyn.api.PayloadService'
def __init__(self):
super(PayloadClient, self).__init__(payload_service.PayloadServiceStub)
def list_payloads(self, **kw_args):
"""List all payloads registered on the robot
Args:
kw_args: Extra arguments to pass to grpc call invocation.
Returns:
A list of the proto message definitions of all registered payloads
Raises:
RpcError: Problem communicating with the robot.
"""
request = payload_service_protos.ListPayloadsRequest()
return self.call(self._stub.ListPayloads, request, value_from_response=_get_entry_value,
error_from_response=common_header_errors, **kw_args)
def list_payloads_async(self, **kw_args):
"""List all payloads registered on the robot
Args:
kw_args: Extra arguments to pass to grpc call invocation.
Returns:
A list of the proto message definitions of all registered payloads
Raises:
RpcError: Problem communicating with the robot.
"""
request = payload_service_protos.ListPayloadsRequest()
return self.call_async(self._stub.ListPayloads, request,
value_from_response=_get_entry_value,
error_from_response=common_header_errors, **kw_args)