-
Notifications
You must be signed in to change notification settings - Fork 0
/
imu.py
81 lines (66 loc) · 2.47 KB
/
imu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/home/debain/gpslib/env/bin/python
# pylint: disable=import-error, wrong-import-position, pointless-string-statement, consider-using-sys-exit, broad-exception-caught
import json
import logging
import os
import sys
from time import sleep
import warnings
warnings.filterwarnings("ignore")
import zmq
from rcpy import mpu9250
from config import IPADDRESS, PORT
logging.basicConfig(filename='/var/log/imu.log', level=logging.INFO)
"""
accel: 3-axis accelerations (m/s 2)
gyro: 3-axis angular velocities (degree/s)
mag: 3D magnetic field vector in (μT)
quat: orientation quaternion
tb: pitch/roll/yaw X/Y/Z angles (radians)
head: heading from magnetometer (radians)
"""
class IMU:
"""
Represents an Inertial Measurement Unit (IMU) object that reads data from an MPU9250 sensor and publishes it over a ZeroMQ socket.
Attributes:
socket (zmq.Socket): ZeroMQ socket for publishing IMU data.
"""
def __init__(self):
""" Initializes the IMU object by setting up the ZeroMQ socket and MPU9250 sensor.
"""
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
self.socket.connect(f"tcp://{IPADDRESS}:{PORT}")
try:
self._imu = mpu9250.IMU(enable_dmp=True,
dmp_sample_rate=4,
enable_magnetometer=True,
enable_fusion=True)
except Exception as e:
print('check calibration', e)
logging.exception(e)
exit()
def run(self):
"""
Continuously reads data from the MPU9250 sensor and publishes it over the ZeroMQ socket.
This method runs an infinite loop that reads data from the MPU9250 sensor. The sensor data, including gyroscope,
accelerometer, and magnetometer readings, is published over the ZeroMQ socket as a JSON-encoded message.
"""
while True:
try:
data = self._imu.read()
except KeyboardInterrupt:
try:
sys.exit(130)
except SystemExit:
os._exit(130)
sleep(0.01)
data['gyro'] = mpu9250.read_gyro_data()
data['accel'] = mpu9250.read_accel_data()
data['mag'] = mpu9250.read_mag_data()
self.socket.send_multipart([b'', json.dumps(data).encode('utf-8')])
def main():
imu = IMU()
imu.run()
if __name__ == "__main__":
main()