From f6cf990ba4c19bf60b35863100a8a624cf59aaf1 Mon Sep 17 00:00:00 2001 From: Kenneth Yang <82800265+kjy5@users.noreply.github.com> Date: Wed, 2 Oct 2024 15:12:15 -0700 Subject: [PATCH] uMp-3 support (#389) --- src/ephys_link/__about__.py | 2 +- src/ephys_link/bindings/ump_3_bindings.py | 138 ++++++++++++++++++++++ src/ephys_link/bindings/ump_4_bindings.py | 3 +- 3 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 src/ephys_link/bindings/ump_3_bindings.py diff --git a/src/ephys_link/__about__.py b/src/ephys_link/__about__.py index 8b3e3d8..24781e7 100644 --- a/src/ephys_link/__about__.py +++ b/src/ephys_link/__about__.py @@ -1 +1 @@ -__version__ = "2.0.0b5" +__version__ = "2.0.0b6" diff --git a/src/ephys_link/bindings/ump_3_bindings.py b/src/ephys_link/bindings/ump_3_bindings.py new file mode 100644 index 0000000..82521e9 --- /dev/null +++ b/src/ephys_link/bindings/ump_3_bindings.py @@ -0,0 +1,138 @@ +"""Bindings for Sensapex uMp-3 platform. + +Usage: Instantiate Ump4Bindings to interact with the Sensapex uMp-4 platform. +""" + +from asyncio import get_running_loop + +from sensapex import UMP, SensapexDevice +from vbl_aquarium.models.unity import Vector3, Vector4 + +from ephys_link.util.base_bindings import BaseBindings +from ephys_link.util.common import ( + RESOURCES_PATH, + array_to_vector4, + scalar_mm_to_um, + um_to_mm, + vector4_to_array, + vector_mm_to_um, +) + + +class Ump3Bindings(BaseBindings): + """Bindings for UMP-3 platform""" + + def __init__(self) -> None: + """Initialize UMP-3 bindings.""" + + # Establish connection to Sensapex API (exit if connection fails). + UMP.set_library_path(RESOURCES_PATH) + self._ump = UMP.get_ump() + if self._ump is None: + error_message = "Unable to connect to uMp" + raise ValueError(error_message) + + async def get_manipulators(self) -> list[str]: + return list(map(str, self._ump.list_devices())) + + async def get_axes_count(self) -> int: + return 3 + + def get_dimensions(self) -> Vector4: + return Vector4(x=20, y=20, z=20, w=20) + + async def get_position(self, manipulator_id: str) -> Vector4: + manipulator_position = self._get_device(manipulator_id).get_pos(1) + + # Add the depth axis to the end of the position. + manipulator_position.append(manipulator_position[0]) + + # Convert and return. + return um_to_mm(array_to_vector4(manipulator_position)) + + # noinspection PyTypeChecker + async def get_angles(self, _: str) -> Vector3: + """uMp-3 does not support getting angles so raise an error. + + :raises: AttributeError + """ + error_message = "UMP-3 does not support getting angles" + raise AttributeError(error_message) + + # noinspection PyTypeChecker + async def get_shank_count(self, _: str) -> int: + """uMp-3 does not support getting shank count so raise an error. + + :raises: AttributeError + """ + error_message = "UMP-3 does not support getting shank count" + raise AttributeError(error_message) + + def get_movement_tolerance(self) -> float: + return 0.001 + + # noinspection DuplicatedCode + async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: + # Convert position to micrometers. + target_position_um = vector_mm_to_um(position) + + # Request movement. + movement = self._get_device(manipulator_id).goto_pos( + vector4_to_array(target_position_um), scalar_mm_to_um(speed) + ) + + # Wait for movement to complete. + await get_running_loop().run_in_executor(None, movement.finished_event.wait, None) + + # Handle interrupted movement. + if movement.interrupted: + error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}" + raise RuntimeError(error_message) + + return um_to_mm(array_to_vector4(movement.last_pos)) + + async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: + # Augment current position with depth. + current_position = await self.get_position(manipulator_id) + new_platform_position = current_position.model_copy(update={"x": depth}) + + # Make the movement. + final_platform_position = await self.set_position(manipulator_id, new_platform_position, speed) + + # Return the final depth. + return float(final_platform_position.w) + + async def stop(self, manipulator_id: str) -> None: + self._get_device(manipulator_id).stop() + + def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: + # unified <- platform + # +x <- +y + # +y <- -x + # +z <- -z + # +d <- +d/x + + return Vector4( + x=platform_space.y, + y=self.get_dimensions().x - platform_space.x, + z=self.get_dimensions().z - platform_space.z, + w=platform_space.w, + ) + + def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: + # platform <- unified + # +x <- -y + # +y <- +x + # +z <- -z + # +d/x <- +d + + return Vector4( + x=self.get_dimensions().y - unified_space.y, + y=unified_space.x, + z=self.get_dimensions().z - unified_space.z, + w=unified_space.w, + ) + + # Helper methods. + def _get_device(self, manipulator_id: str) -> SensapexDevice: + return self._ump.get_device(int(manipulator_id)) diff --git a/src/ephys_link/bindings/ump_4_bindings.py b/src/ephys_link/bindings/ump_4_bindings.py index 62fd12c..c1e621d 100644 --- a/src/ephys_link/bindings/ump_4_bindings.py +++ b/src/ephys_link/bindings/ump_4_bindings.py @@ -65,6 +65,7 @@ async def get_shank_count(self, _: str) -> int: def get_movement_tolerance(self) -> float: return 0.001 + # noinspection DuplicatedCode async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: # Convert position to micrometers. target_position_um = vector_mm_to_um(position) @@ -75,7 +76,7 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa ) # Wait for movement to finish. - await get_running_loop().run_in_executor(None, movement.finished_event.wait) + await get_running_loop().run_in_executor(None, movement.finished_event.wait, None) # Handle interrupted movement. if movement.interrupted: