-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcar.py
392 lines (298 loc) · 10.9 KB
/
car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# coding=utf-8
from RPi import GPIO
import ultrasonic
import threading
import time
# 3V3 (1) (2) 5V
# GPIO2 (3) (4) 5V
# GPIO3 (5) (6) GND
# GPIO4 (7) (8) GPIO14
# GND (9) (10) GPIO15
# IFR_L GPIO17 (11) (12) GPIO18
# IFR_M GPIO27 (13) (14) GND
# IFR_R GPIO22 (15) (16) GPIO23
# 3V3 (17) (18) GPIO24
# TRK_L GPIO10 (19) (20) GND
# TRK_M GPIO9 (21) (22) GPIO25
# TRK_R GPIO11 (23) (24) GPIO8
# GND (25) (26) GPIO7
# GPIO0 (27) (28) GPIO1
# GPIO5 (29) (30) GND
# GPIO6 (31) (32) GPIO12
# IN2 GPIO13 (33) (34) GND
# IN1 GPIO19 (35) (36) GPIO16 IN3
# ENA GPIO26 (37) (38) GPIO20 IN4
# GND (39) (40) GPIO21 ENB
# GPIO pin number of track detectors
PIN_LEFT_TRACK = 19
PIN_MIDDLE_TRACK = 21
PIN_RIGHT_TRACK = 23
# GPIO pin number of infrared sensors
PIN_LEFT_INFRARED = 11
PIN_MIDDLE_INFRARED = 13
PIN_RIGHT_INFRARED = 15
# GPIO pin number of wheels
PIN_LEFT_WHEELS_ENABLER = 37
PIN_RIGHT_WHEELS_ENABLER = 40
PIN_WHEELS_IN1 = 35
PIN_WHEELS_IN2 = 33
PIN_WHEELS_IN3 = 36
PIN_WHEELS_IN4 = 38
# All input pins
INPUT_PINS = [
PIN_LEFT_TRACK,
PIN_MIDDLE_TRACK,
PIN_RIGHT_TRACK,
PIN_LEFT_INFRARED,
PIN_MIDDLE_INFRARED,
PIN_RIGHT_INFRARED,
]
# All output pins
OUTPUT_PINS = [
PIN_LEFT_WHEELS_ENABLER,
PIN_RIGHT_WHEELS_ENABLER,
PIN_WHEELS_IN1,
PIN_WHEELS_IN2,
PIN_WHEELS_IN3,
PIN_WHEELS_IN4
]
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
for pin in INPUT_PINS:
try:
GPIO.setup(pin, GPIO.IN)
except:
print('Error when setup {} as input pin'.format(pin))
for pin in OUTPUT_PINS:
try:
GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW)
except:
print('Error when setup {} as output pin'.format(pin))
# PWM of motor enablers
LEFT_WHEELS_PWM = GPIO.PWM(PIN_LEFT_WHEELS_ENABLER, 40000)
RIGHT_WHEELS_PWM = GPIO.PWM(PIN_RIGHT_WHEELS_ENABLER, 40000)
# GPIO.output(PIN_LEFT_WHEELS_ENABLER, GPIO.HIGH)
# GPIO.output(PIN_RIGHT_WHEELS_ENABLER, GPIO.HIGH)
# Current speed of left and right wheels
left_wheels_speed = 10
right_wheels_speed = 10
# Set a initial duty cycle for motor PWMs
LEFT_WHEELS_PWM.start(left_wheels_speed * 10)
RIGHT_WHEELS_PWM.start(right_wheels_speed * 10)
def _go_left():
GPIO.output(PIN_WHEELS_IN3, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN4, GPIO.HIGH)
def _go_right():
GPIO.output(PIN_WHEELS_IN1, GPIO.HIGH)
GPIO.output(PIN_WHEELS_IN2, GPIO.LOW)
def _back_left():
GPIO.output(PIN_WHEELS_IN3, GPIO.HIGH)
GPIO.output(PIN_WHEELS_IN4, GPIO.LOW)
def _back_right():
GPIO.output(PIN_WHEELS_IN1, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN2, GPIO.HIGH)
def _stop_left():
GPIO.output(PIN_WHEELS_IN3, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN4, GPIO.LOW)
def _stop_right():
GPIO.output(PIN_WHEELS_IN1, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN2, GPIO.LOW)
def set_left_wheels_speed(speed):
global left_wheels_speed
left_wheels_speed = max(0, min(10, speed))
if left_wheels_speed == 0:
_stop_left()
LEFT_WHEELS_PWM.ChangeDutyCycle(left_wheels_speed * 10)
def set_right_wheels_speed(speed):
global right_wheels_speed
right_wheels_speed = max(0, min(10, speed))
if right_wheels_speed == 0:
_stop_right()
LEFT_WHEELS_PWM.ChangeDutyCycle(right_wheels_speed * 10)
def set_global_speed(speed):
set_left_wheels_speed(speed)
set_right_wheels_speed(speed)
def go():
_go_left()
_go_right()
def back():
_back_left()
_back_right()
def brake():
_stop_left()
_stop_right()
def rotate_left():
_go_right()
_back_left()
def rotate_right():
_go_left()
_back_right()
def rotate_right_for(seconds):
rotate_right()
time.sleep(seconds)
brake()
def rotate_right_90():
tmp_l = left_wheels_speed
tmp_r = right_wheels_speed
set_global_speed(10)
rotate_right()
time.sleep(0.35)
brake()
set_left_wheels_speed(tmp_l)
set_right_wheels_speed(tmp_r)
def rotate_left_90():
tmp_l = left_wheels_speed
tmp_r = right_wheels_speed
set_global_speed(10)
rotate_left()
time.sleep(0.35)
brake()
set_left_wheels_speed(tmp_l)
set_right_wheels_speed(tmp_r)
def _move_forward_left_wheels():
GPIO.output(PIN_WHEELS_IN3, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN4, GPIO.HIGH)
def _move_forward_right_wheels():
GPIO.output(PIN_WHEELS_IN1, GPIO.HIGH)
GPIO.output(PIN_WHEELS_IN2, GPIO.LOW)
def _move_back_left_wheels():
GPIO.output(PIN_WHEELS_IN3, GPIO.HIGH)
GPIO.output(PIN_WHEELS_IN4, GPIO.LOW)
def _move_back_right_wheels():
GPIO.output(PIN_WHEELS_IN1, GPIO.LOW)
GPIO.output(PIN_WHEELS_IN2, GPIO.HIGH)
def rotate_left_in_place():
_move_forward_right_wheels()
_move_back_left_wheels()
def rotate_right_in_place():
_move_forward_left_wheels()
_move_back_right_wheels()
def get_infrared_sensor_status():
return GPIO.input(PIN_LEFT_INFRARED), GPIO.input(PIN_MIDDLE_INFRARED), GPIO.input(PIN_RIGHT_INFRARED)
def get_track_detector_status():
return GPIO.input(PIN_LEFT_TRACK), GPIO.input(PIN_MIDDLE_TRACK), GPIO.input(PIN_RIGHT_TRACK)
def get_ultrasonic_sensor_status():
return ultrasonic.distance()
_last_infrared_sensor_status = get_infrared_sensor_status()
_last_track_detector_status = get_track_detector_status()
_last_ultrasonic_sensor_status = get_ultrasonic_sensor_status()
_infrared_sensor_change_callbacks = []
_track_detector_change_callbacks = []
_ultrasonic_sensor_callbacks = []
def on_infrared_sensor_change(callback):
_infrared_sensor_change_callbacks.append(callback)
try:
callback(_last_infrared_sensor_status)
except Exception as e:
print('Error raised in change callback of infrared sensor: {}'.format(e))
def remove_infrared_sensor_change(callback):
if registered_infrared_sensor_callback(callback):
_infrared_sensor_change_callbacks.remove(callback)
def registered_infrared_sensor_callback(callback):
return callback in _infrared_sensor_change_callbacks
def on_track_detector_change(callback):
_track_detector_change_callbacks.append(callback)
try:
callback(_last_track_detector_status)
except Exception as e:
print('Error raised in change callback of track detector: {}'.format(e))
def remove_track_detector_callback(callback):
if registered_track_detector_callback(callback):
_track_detector_change_callbacks.remove(callback)
def registered_track_detector_callback(callback):
return callback in _track_detector_change_callbacks
def on_ultrasonic_in_range(callback, range_low: float, range_high: float, verbose: bool = False):
_ultrasonic_sensor_callbacks.append([callback, (range_low, range_high), verbose, time.time()])
try:
callback(_last_ultrasonic_sensor_status)
except Exception as e:
print('Error raised in change callback of ultrasonic sensor: {}'.format(e))
def _get_ultrasonic_callbacks(callback, parse=False):
if parse:
return list(filter(lambda x: x[0] == callback, _ultrasonic_sensor_callbacks))
return filter(lambda x: x[0] == callback, _ultrasonic_sensor_callbacks)
def remove_ultrasonic_callback(callback):
for tup in _get_ultrasonic_callbacks(callback):
_ultrasonic_sensor_callbacks.remove(tup)
def registered_ultrasonic_callback(callback):
return len(_get_ultrasonic_callbacks(callback, True)) > 0
should_stop_polling = False
def stop_polling():
global should_stop_polling
should_stop_polling = True
def _polling_thread_main():
global _last_infrared_sensor_status, _last_track_detector_status, _last_ultrasonic_sensor_status
while not should_stop_polling:
# infrared sensor
infrared_sensor_status = get_infrared_sensor_status()
# if changed
if infrared_sensor_status != _last_infrared_sensor_status:
# invoke callbacks
for callback in _infrared_sensor_change_callbacks:
try:
callback(infrared_sensor_status)
except Exception as e:
print('Error raised in change callback of infrared sensor: {}'.format(e))
# update status
_last_infrared_sensor_status = infrared_sensor_status
# track detectors
track_detector_status = get_track_detector_status()
# if changed
if track_detector_status != _last_track_detector_status:
# invoke callbacks
for callback in _track_detector_change_callbacks:
try:
callback(track_detector_status)
except Exception as e:
print('Error raised in change callback of track detector: {}'.format(e))
# update status
_last_track_detector_status = track_detector_status
# ultrasonic
ultrasonic_status = get_ultrasonic_sensor_status()
ultrasonic_status = round(ultrasonic_status / 2) * 2
# if valid
if 4 < ultrasonic_status < 50 and ultrasonic_status != _last_ultrasonic_sensor_status:
if _last_ultrasonic_sensor_status - 5 < ultrasonic_status < _last_ultrasonic_sensor_status + 5:
# if in range
for tup in _ultrasonic_sensor_callbacks:
callback, slope, verbose, last_ts = tup
low, high = slope
if low <= ultrasonic_status <= high and (
verbose or _last_ultrasonic_sensor_status < low or _last_ultrasonic_sensor_status > high) and time.time() - last_ts >= 0.1:
try:
callback(ultrasonic_status)
tup[3] = time.time()
except Exception as e:
print('Error raised in change callback of ultrasonic sensor: {}'.format(e))
elif (
ultrasonic_status < low or ultrasonic_status > high) and low <= _last_ultrasonic_sensor_status <= high and time.time() - last_ts >= 0.1:
try:
callback(ultrasonic_status)
tup[3] = time.time()
except Exception as e:
print('Error raised in change callback of ultrasonic sensor: {}'.format(e))
# update status
_last_ultrasonic_sensor_status = ultrasonic_status
# sleep for a while
time.sleep(0.001)
_sensor_polling_thread = threading.Thread(target=_polling_thread_main)
ultrasonic.start()
_sensor_polling_thread.start()
def simple_steer_track():
def track_detector_callback(status):
left, middle, right = status
if left == 1 and middle == 0:
rotate_left()
if right == 1 and middle == 0:
rotate_right()
if middle == 1:
go()
on_track_detector_change(track_detector_callback)
go()
if __name__ == '__main__':
from time import sleep
try:
simple_steer_track()
_sensor_polling_thread.join()
except KeyboardInterrupt:
GPIO.cleanup()