-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathUDPConsole.py
executable file
·265 lines (198 loc) · 8.03 KB
/
UDPConsole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
#!/usr/bin/env python
"""
UDPConsole
==========
This is a sample application that is similar to the UDPMultiplexer. It opens
a socket for unicast messages and (optionally) another for broadcast messages.
Run this application with a BACpypes IP address parameter.
$ python UDPConsole.py <addr> [ --nobroadcast ]
The address can be one of the following forms:
192.168.1.10 - unicast socket, no broadcast socket, port 47808
192.168.1.10/24 - unicast socket, 192.168.1.255 broadcast socket, port 47808
192.168.1.10:12345 - unicast socket, no broadcast socket, port 12345
192.168.1.10/24:12345 - unicast socket, 192.168.1.255 broadcast socket, port 12345
any - special tuple ('', 47808)
any:12345 - special tuple ('', 12345)
Use the --nobroadcast option to prevent the application from opening the
broadcast socket when one would otherwise be opened.
To send a packet, enter in a string in the form <addr> <message> where <addr>
is a BACpyes IP address (which may include the socket) or '*' for a local
broadcast.
Linux/MacOS Test Cases
----------------------
Here are some test cases for Linux and MacOS.
Using Any
~~~~~~~~~
$ python samples/UDPConsole.py any
* hi
received u'hi' from ('10.0.1.5', 47808)
In this case the application received its own broadcast, but did not recognize
it as a broadcast message and did not recognize that it came from itself.
Broadcast messages from other devices sent to 255.255.255.255 or 10.0.1.255
are received, but also not recognized as broadcast messages.
Using the Local Address
~~~~~~~~~~~~~~~~~~~~~~~
$ python samples/UDPConsole.py 10.0.1.5
* hi
received u'hi' from self
In this case it received its own broadcast and it recognized that it came from
itself, but it did not recognize it as a broadcast message. Broadcast messages
from other devices sent to 255.255.255.255 or 10.0.1.255 are not received.
Using the CIDR Address
~~~~~~~~~~~~~~~~~~~~~~
$ python samples/UDPConsole.py 10.0.1.5/24
* hi
received broadcast u'hi' from self
In this case it received its own broadcast, recognized that it came from itself,
and also recognized it was sent as a broadcast message. Broadcast messages
from other devices sent to 255.255.255.255 are not received, but those sent to
10.0.1.255 are received and recognized as broadcast messages.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.udp import UDPDirector
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import Address, PDU
from bacpypes.core import run, stop
from bacpypes.consolelogging import ArgumentParser
from bacpypes.console import ConsoleClient
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
local_unicast_tuple = None
local_broadcast_tuple = None
#
# MiddleMan
#
@bacpypes_debugging
class MiddleMan(Client, Server):
"""
An instance of this class sits between the UDPDirector and the
console. Downstream packets from a console have no concept of a
destination, so this is interpreted from the text and then a new
PDU is sent to the director. Upstream packets could be simply
forwarded to the console, in that case the source address is ignored,
this application interprets the source address for the user.
"""
def indication(self, pdu):
if _debug: MiddleMan._debug('indication %r', pdu)
# empty downstream packets mean EOF
if not pdu.pduData:
stop()
return
# decode the line and trim off the eol
line = pdu.pduData.decode('utf-8')[:-1]
if _debug: MiddleMan._debug(' - line: %r', line)
line_parts = line.split(' ', 1)
if _debug: MiddleMan._debug(' - line_parts: %r', line_parts)
if len(line_parts) != 2:
sys.stderr.write("err: invalid line: %r\n" % (line,))
return
addr, msg = line_parts
# check the address
if addr == "*":
if not local_broadcast_tuple:
sys.stderr.write("err: no local broadcast\n")
return
dest = local_broadcast_tuple
elif ':' in addr:
addr, port = addr.split(':')
if addr == "*":
if not local_broadcast_tuple:
sys.stderr.write("err: no local broadcast\n")
return
dest = (local_broadcast_tuple[0], int(port))
else:
dest = (addr, int(port))
else:
dest = (addr, local_unicast_tuple[1])
if _debug: MiddleMan._debug(' - dest: %r', dest)
# send it along
try:
self.request(PDU(msg.encode('utf_8'), destination=dest))
except Exception as err:
sys.stderr.write("err: %r\n" % (err,))
return
def confirmation(self, pdu):
if _debug: MiddleMan._debug('confirmation %r', pdu)
# decode the line
line = pdu.pduData.decode('utf_8')
if _debug: MiddleMan._debug(' - line: %r', line)
if pdu.pduSource == local_unicast_tuple:
sys.stdout.write("received %r from self\n" % (line,))
else:
sys.stdout.write("received %r from %s\n" % (line, pdu.pduSource))
#
# BroadcastReceiver
#
@bacpypes_debugging
class BroadcastReceiver(Client):
"""
An instance of this class sits above the UDPDirector that is
associated with the broadcast address. There are no downstream
packets, and it interprets the source address for the user.
"""
def confirmation(self, pdu):
if _debug: BroadcastReceiver._debug('confirmation %r', pdu)
# decode the line
line = pdu.pduData.decode('utf-8')
if _debug: MiddleMan._debug(' - line: %r', line)
if pdu.pduSource == local_unicast_tuple:
sys.stdout.write("received broadcast %r from self\n" % (line,))
else:
sys.stdout.write("received broadcast %r from %s\n" % (line, pdu.pduSource,))
#
# __main__
#
def main():
global local_unicast_tuple, local_broadcast_tuple
# parse the command line arguments
parser = ArgumentParser(usage=__doc__)
parser.add_argument("address",
help="address of socket",
)
parser.add_argument("--nobroadcast",
action="store_true",
dest="noBroadcast",
default=False,
help="do not create a broadcast socket",
)
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
if args.address == "any":
local_unicast_tuple = ('', 47808)
local_broadcast_tuple = ('255.255.255.255', 47808)
elif args.address.startswith("any:"):
port = int(args.address[4:])
local_unicast_tuple = ('', port)
local_broadcast_tuple = ('255.255.255.255', port)
else:
address = Address(args.address)
if _debug: _log.debug(" - local_address: %r", address)
local_unicast_tuple = address.addrTuple
local_broadcast_tuple = address.addrBroadcastTuple
if _debug: _log.debug(" - local_unicast_tuple: %r", local_unicast_tuple)
if _debug: _log.debug(" - local_broadcast_tuple: %r", local_broadcast_tuple)
console = ConsoleClient()
middle_man = MiddleMan()
unicast_director = UDPDirector(local_unicast_tuple)
bind(console, middle_man, unicast_director)
if args.noBroadcast:
_log.debug(" - skipping broadcast")
elif not local_broadcast_tuple:
_log.debug(" - no local broadcast")
elif local_unicast_tuple == local_broadcast_tuple:
_log.debug(" - identical unicast and broadcast tuples")
elif local_broadcast_tuple[0] == '255.255.255.255':
_log.debug(" - special broadcast address only for sending")
else:
broadcast_receiver = BroadcastReceiver()
broadcast_director = UDPDirector(local_broadcast_tuple, reuse=True)
bind(broadcast_receiver, broadcast_director)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()