-
Notifications
You must be signed in to change notification settings - Fork 0
/
cvClient.py
195 lines (150 loc) · 4.92 KB
/
cvClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import json
# Asyncio (for concurrency)
import asyncio
# Websockets (for communication with the server)
from websockets.sync.client import connect, ClientConnection # type: ignore
from websockets.exceptions import ConnectionClosedError # type: ignore
from websockets.typing import Data # type: ignore
# Game state
from gameState import GameState
# Server messages
from serverMessage import *
# Restore the ability to use Ctrl + C within asyncio
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Terminal colors for formatting output text
from terminalColors import *
from cv_locater import capture_loc, clean
from cv_arena import wall_correction
# Get the connect URL from the config.json file
def getConnectURL() -> str:
# Read the configuration file
with open('../config.json', 'r', encoding='UTF-8') as configFile:
config = json.load(configFile)
# Return the websocket connect address
return f'ws://{config["ServerIP"]}:{config["WebSocketPort"]}'
class PacbotClient:
'''
Sample implementation of a websocket client to communicate with the
Pacbot game server, using asyncio.
'''
def __init__(self, connectURL: str) -> None:
'''
Construct a new Pacbot client object
'''
# Connection URL (starts with ws://)
self.connectURL: str = connectURL
# Private variable to store whether the socket is open
self._socketOpen: bool = False
# Connection object to communicate with the server
self.connection: ClientConnection
# Game state object to store the game information
self.state: GameState = GameState()
async def run(self) -> None:
'''
Connect to the server, then run
'''
# Connect to the websocket server
await self.connect()
try: # Try receiving messages indefinitely
if self._socketOpen:
await asyncio.gather(
self.receiveLoop(),
self.cvLoop()
)
finally: # Disconnect once the connection is over
await self.disconnect()
async def connect(self) -> None:
'''
Connect to the websocket server
'''
# Connect to the specified URL
try:
self.connection = connect(self.connectURL)
self._socketOpen = True
self.state.setConnectionStatus(True)
# If the connection is refused, log and return
except ConnectionRefusedError:
print(
f'{RED}Websocket connection refused [{self.connectURL}]\n'
f'Are the address and port correct, and is the '
f'server running?{NORMAL}'
)
return
async def disconnect(self) -> None:
'''
Disconnect from the websocket server
'''
# Close the connection
if self._socketOpen:
self.connection.close()
self._socketOpen = False
self.state.setConnectionStatus(False)
# Return whether the connection is open
def isOpen(self) -> bool:
'''
Check whether the connection is open (unused)
'''
return self._socketOpen
async def receiveLoop(self) -> None:
'''
Receive loop for capturing messages from the server
'''
# Receive values as long as the connection is open
while self.isOpen():
# Try to receive messages (and skip to except in case of an error)
try:
# Receive a message from the connection
message: Data = self.connection.recv()
# Convert the message to bytes, if necessary
messageBytes: bytes
if isinstance(message, bytes):
messageBytes = message # type: ignore
else:
messageBytes = message.encode('ascii') # type: ignore
# Update the state, given this message from the server
self.state.update(messageBytes)
# Write a response back to the server if necessary
if self.state.writeServerBuf and self.state.writeServerBuf[0].tick():
response: bytes = self.state.writeServerBuf.popleft().getBytes()
self.connection.send(response)
# Free the event loop to allow another decision
await asyncio.sleep(0)
# Break once the connection is closed
except ConnectionClosedError:
print('Connection lost...')
self.state.setConnectionStatus(False)
break
async def cvLoop(self) -> None:
while self.state.isConnected():
# If the current messages haven't been sent out yet, skip this iteration
if len(self.state.writeServerBuf):
await asyncio.sleep(0)
continue
# Write back to the server, as a test (move right)
pac_pos = capture_loc()
print("Original: ",pac_pos)
pac_pos = wall_correction(pac_pos)
print("Wall corrected: ",pac_pos)
if (pac_pos[0] > 0 and pac_pos[1] > 0):
cv_output = [b'x',(pac_pos[1]).to_bytes(),(pac_pos[0]).to_bytes()]
self.state.writeServerBuf.append(
ServerMessage(b''.join(cv_output),1)
)
# Free up the event loop
await asyncio.sleep(0)
# Main function
async def main():
# Get the URL to connect to
connectURL = getConnectURL()
client = PacbotClient(connectURL)
await client.run()
# Once the connection is closed, end the event loop
loop = asyncio.get_event_loop()
loop.stop()
clean()
if __name__ == '__main__':
# Run the event loop forever
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()