forked from cui-shaowei/urx-ur3-rg2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
urscript.py
147 lines (118 loc) · 4.38 KB
/
urscript.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#! /usr/bin/env python
import logging
# Controller Settings
CONTROLLER_PORTS = [0, 1]
CONTROLLER_VOLTAGE = [
0, # 0-5V
2, # 0-10V
]
# Tool Settings
TOOL_PORTS = [2, 3]
TOOL_VOLTAGE = [
0, # 0-5V
1, # 0-10V
2, # 4-20mA
]
OUTPUT_DOMAIN_VOLTAGE = [
0, # 4-20mA
1, # 0-10V
]
class URScript(object):
def __init__(self):
self.logger = logging.getLogger(u"urscript")
# The header is code that is before and outside the myProg() method
self.header = ""
# The program is code inside the myProg() method
self.program = ""
def __call__(self):
if(self.program == ""):
self.logger.debug(u"urscript program is empty")
return ""
# Construct the program
myprog = """def myProg():{}\nend""".format(self.program)
# Construct the full script
script = ""
if self.header:
script = "{}\n\n".format(self.header)
script = "{}{}".format(script, myprog)
return script
def reset(self):
self.header = ""
self.program = ""
def add_header_to_program(self, header_line):
self.header = "{}\n{}".format(self.header, header_line)
def add_line_to_program(self, new_line):
self.program = "{}\n\t{}".format(self.program, new_line)
def _constrain_unsigned_char(self, value):
"""
Ensure that unsigned char values are constrained
to between 0 and 255.
"""
assert(isinstance(value, int))
if value < 0:
value = 0
elif value > 255:
value = 255
return value
def _set_analog_inputrange(self, port, vrange):
if port in CONTROLLER_PORTS:
assert(vrange in CONTROLLER_VOLTAGE)
elif port in TOOL_PORTS:
assert(vrange in TOOL_VOLTAGE)
msg = "set_analog_inputrange({},{})".format(port, vrange)
self.add_line_to_program(msg)
def _set_analog_output(self, input_id, signal_level):
assert(input_id in [0, 1])
assert(signal_level in [0, 1])
msg = "set_analog_output({}, {})".format(input_id, signal_level)
self.add_line_to_program(msg)
def _set_analog_outputdomain(self, port, domain):
assert(domain in OUTPUT_DOMAIN_VOLTAGE)
msg = "set_analog_outputdomain({},{})".format(port, domain)
self.add_line_to_program(msg)
def _set_payload(self, mass, cog=None):
msg = "set_payload({}".format(mass)
if cog:
assert(len(cog) == 3)
msg = "{},{}".format(msg, cog)
msg = "{})".format(msg)
self.add_line_to_program(msg)
def _set_runstate_outputs(self, outputs=None):
if not outputs:
outputs = []
msg = "set_runstate_outputs({})".format(outputs)
self.add_line_to_program(msg)
def _set_tool_voltage(self, voltage):
assert(voltage in [0, 12, 24])
msg = "set_tool_voltage({})".format(voltage)
self.add_line_to_program(msg)
def _sleep(self, value):
msg = "sleep({})".format(value)
self.add_line_to_program(msg)
def _socket_close(self, socket_name):
msg = "socket_close(\"{}\")".format(socket_name)
self.add_line_to_program(msg)
def _socket_get_var(self, var, socket_name):
msg = "socket_get_var(\"{}\",\"{}\")".format(var, socket_name)
self.add_line_to_program(msg)
self._sync()
def _socket_open(self, socket_host, socket_port, socket_name):
msg = "socket_open(\"{}\",{},\"{}\")".format(socket_host,
socket_port,
socket_name)
self.add_line_to_program(msg)
def _socket_read_byte_list(self, nbytes, socket_name):
msg = "global var_value = socket_read_byte_list({},\"{}\")".format(nbytes, socket_name) # noqa
self.add_line_to_program(msg)
self._sync()
def _socket_send_string(self, message, socket_name):
msg = "socket_send_string(\"{}\",\"{}\")".format(message, socket_name) # noqa
self.add_line_to_program(msg)
self._sync()
def _socket_set_var(self, var, value, socket_name):
msg = "socket_set_var(\"{}\",{},\"{}\")".format(var, value, socket_name) # noqa
self.add_line_to_program(msg)
self._sync()
def _sync(self):
msg = "sync()"
self.add_line_to_program(msg)