-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathrun_kalman_filter.py
136 lines (104 loc) · 3.94 KB
/
run_kalman_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Author: Shing-Yan Loo ([email protected])
# This script spawns a car at a random spawn point in the map, runs
# Kalman filter, and at the same time visualizes the sensor signals
# The list of sensors used are as follows:
# - camera (only for visualization, not being used for localization)
# - IMU
# - GNSS
import glob
import os
import sys
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
import random
import cv2
import time
from multiprocessing import Queue, Value, Process
from ctypes import c_bool
from car import Car
from kalman_filter import ExtendedKalmanFilter
from visualizer import visualizer
from util import destroy_queue
def main():
try:
client = carla.Client('localhost', 2000)
client.set_timeout(2.0)
world = client.get_world()
spawn_point = random.choice(world.get_map().get_spawn_points())
# EKF
ekf = ExtendedKalmanFilter()
# Create a car object
car = Car(world, client, spawn_point)
print('created a car object')
# Visualizer
visual_msg_queue = Queue()
quit = Value(c_bool, False)
proc = Process(target =visualizer, args=(visual_msg_queue, quit))
proc.daemon = True
proc.start()
# In case Matplotlib is not able to keep up the pace of the growing queue,
# we have to limit the rate of the items being pushed into the queue
visual_fps = 3
last_ts = time.time()
# Drive the car around and get sensor readings
while True:
world.tick()
frame = world.get_snapshot().frame
# Get sensor readings
sensors = car.get_sensor_readings(frame)
# get image
if sensors['image'] is not None:
image = sensors['image']
cv2.imshow('image', image)
cv2.waitKey(1)
# EKF initialization
# Don't run anything else before EKF is initialized
if not ekf.is_initialized():
if sensors['gnss'] is not None:
ekf.initialize_with_gnss(sensors['gnss'])
continue
# EKF prediction
if sensors['imu'] is not None:
ekf.predict_state_with_imu(sensors['imu'])
# EKF correction
if sensors['gnss'] is not None:
ekf.correct_state_with_gnss(sensors['gnss'])
# Limit the visualization frame-rate
if time.time() - last_ts < 1. / visual_fps:
continue
# timestamp for inserting a new item into the queue
last_ts = time.time()
# visual message
visual_msg = dict()
# Get ground truth vehicle location
gt_location = car.get_location()
visual_msg['gt_traj'] = [gt_location.x, gt_location.y, gt_location.z] # round(x, 1)
# Get estimated location
visual_msg['est_traj'] = ekf.get_location()
# Get imu reading
if sensors['imu'] is not None:
imu = sensors['imu']
accelero = imu.accelerometer
gyroscop = imu.gyroscope
visual_msg['imu'] = [accelero.x, accelero.y, accelero.z,
gyroscop.x, gyroscop.y, gyroscop.z]
# Get gps reading
if sensors['gnss'] is not None:
gnss = sensors['gnss']
visual_msg['gnss'] = [gnss.x, gnss.y, gnss.z]
visual_msg_queue.put(visual_msg)
finally:
print('Exiting visualizer')
quit.value = True
destroy_queue(visual_msg_queue)
print('destroying the car object')
car.destroy()
print('done')
if __name__ == '__main__':
main()