-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcomms_import.py
52 lines (40 loc) · 1.43 KB
/
comms_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class CommServer:
"""
Communications Server. Allows other bots to connect via a CommClient.
"""
current_connection = None
def __init__(self, hostAddress, port):
"""Initialise the communication server."""
import bluetooth
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.bind((hostAddress, port))
# Synchronous (1 Backlog)?
self.socket.listen(1)
self.clients = []
def accept_client(self):
"""
Wait for a client to connect, then return the client and client information
If on ev3, this returns a bluetooth socket, as well as addr/port tuple.
If on sim, this returns a mocked socket object, as well as addr/port tuple.
"""
self.clients.append(self.socket.accept())
return self.clients[-1]
def close(self):
for client in self.clients:
client.close()
self.socket.close()
class CommClient:
"""
Communications Client. Can connect to other bots with CommServer running.
"""
def __init__(self, hostAddress, port):
"""Initialise the connection."""
import bluetooth
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.connect((hostAddress, port))
def send(self, data):
self.socket.send(data)
def recv(self, limit):
self.socket.recv(limit)
def close(self):
self.socket.close()