Skip to content

Commit

Permalink
Merge pull request #41 from pupil-labs/imu
Browse files Browse the repository at this point in the history
Translate IMUPackets to IMUData with correct naming conventions
  • Loading branch information
dourvaris authored May 25, 2023
2 parents 3f75a33 + ef622d8 commit ce2938b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/pupil_labs/realtime_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Hardware(T.NamedTuple):
version: str = "unknown"
glasses_serial: str = "unknown"
world_camera_serial: str = "unknown"
module_serial: str = "unknown"


class NetworkDevice(T.NamedTuple):
Expand Down
5 changes: 5 additions & 0 deletions src/pupil_labs/realtime_api/simple/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def memory_state(self) -> Literal["OK", "LOW", "CRITICAL"]:
def version_glasses(self) -> str:
return self._status.hardware.version

@property
def module_serial(self) -> T.Union[str, None, Literal["default"]]:
"""Returns ``None`` or ``"default"`` if no glasses are connected"""
return self._status.hardware.module_serial

@property
def serial_number_glasses(self) -> T.Union[str, None, Literal["default"]]:
"""Returns ``None`` or ``"default"`` if no glasses are connected"""
Expand Down
55 changes: 52 additions & 3 deletions src/pupil_labs/realtime_api/streaming/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,55 @@
logger = logging.getLogger(__name__)


async def receive_imu_data(url, *args, **kwargs) -> T.AsyncIterator[ImuPacket]:
class Data3D(T.NamedTuple):
x: float
y: float
z: float


class Quaternion(T.NamedTuple):
x: float
y: float
z: float
w: float


class IMUData(T.NamedTuple):
gyro_data: Data3D
accel_data: Data3D
quaternion: Quaternion
timestamp_unix_nanoseconds: float
timestamp_unix_seconds: float


def IMUPacket_to_IMUData(imu_packet: ImuPacket) -> IMUData:
gyro_data = Data3D(
x=imu_packet.gyroData.x,
y=imu_packet.gyroData.y,
z=imu_packet.gyroData.z,
)
accel_data = Data3D(
x=imu_packet.accelData.x,
y=imu_packet.accelData.y,
z=imu_packet.accelData.z,
)
quaternion = Quaternion(
x=imu_packet.rotVecData.x,
y=imu_packet.rotVecData.y,
z=imu_packet.rotVecData.z,
w=imu_packet.rotVecData.w,
)
imu_data = IMUData(
gyro_data=gyro_data,
accel_data=accel_data,
quaternion=quaternion,
timestamp_unix_nanoseconds=imu_packet.tsNs,
timestamp_unix_seconds=imu_packet.tsNs / 1e9,
)
return imu_data


async def receive_imu_data(url, *args, **kwargs) -> T.AsyncIterator[IMUData]:
async with RTSPImuStreamer(url, *args, **kwargs) as streamer:
async for datum in streamer.receive():
yield datum
Expand All @@ -16,12 +64,13 @@ async def receive_imu_data(url, *args, **kwargs) -> T.AsyncIterator[ImuPacket]:
class RTSPImuStreamer(RTSPRawStreamer):
async def receive(
self,
) -> T.AsyncIterator[ImuPacket]:
) -> T.AsyncIterator[IMUData]:
async for data in super().receive():
try:
imu_packet = ImuPacket()
imu_packet.ParseFromString(data.raw)
yield imu_packet
imu_data = IMUPacket_to_IMUData(imu_packet)
yield imu_data
except Exception:
logger.exception(f"Unable to parse imu data {data}")
raise

0 comments on commit ce2938b

Please sign in to comment.