diff --git a/.github/workflows/autoformat-and-lint.yml b/.github/workflows/autoformat-and-lint.yml index 2ca4f20..151943c 100644 --- a/.github/workflows/autoformat-and-lint.yml +++ b/.github/workflows/autoformat-and-lint.yml @@ -1,4 +1,4 @@ -name: Autoformat and Lint +name: Auto Format and Lint on: pull_request: @@ -9,7 +9,7 @@ on: jobs: autoformat-and-lint: - name: Autoformat and Lint + name: Auto Format and Lint if: github.actor != 'dependabot[bot]' runs-on: ubuntu-latest permissions: @@ -37,9 +37,12 @@ jobs: - name: โœ… Commit code format changes uses: stefanzweifel/git-auto-commit-action@v5 with: - commit_message: "Autoformat code" + commit_message: "Auto format code" skip_fetch: true skip_checkout: true - - name: ๐Ÿ” Lint - run: hatch fmt --check \ No newline at end of file + - name: ๐Ÿงถ Lint + run: hatch fmt --check + + - name: ๐Ÿ” Type Check + run: hatch -e types run check \ No newline at end of file diff --git a/.github/workflows/build.yml.off b/.github/workflows/build.yml.off deleted file mode 100644 index 8af3605..0000000 --- a/.github/workflows/build.yml.off +++ /dev/null @@ -1,36 +0,0 @@ -name: Build - -on: - push: - branches: - - main - merge_group: - -jobs: - build: - name: Build - runs-on: windows-latest - - steps: - - name: ๐Ÿ›Ž Checkout - uses: actions/checkout@v4 - with: - ref: ${{ github.head_ref }} - - - name: ๐Ÿ Setup Python - uses: actions/setup-python@v4 - with: - python-version: '3.12' - cache: 'pip' - - - name: ๐Ÿ“ฆ Install Hatch - run: pip install hatch - - - name: ๐Ÿ—ƒ๏ธ Cache Build Artifacts - uses: actions/cache@v4 - with: - path: C:\Users\runneradmin\AppData\Local\hatch\env\virtual\ephys-link - key: ${{ runner.os }}-build-${{ hashFiles('pyproject.toml') }} - - - name: ๐Ÿ”จ Build - run: hatch -e exe run build diff --git a/ephys_link.spec b/ephys_link.spec index ad3bc08..0eaef3f 100644 --- a/ephys_link.spec +++ b/ephys_link.spec @@ -10,6 +10,7 @@ options = parser.parse_args() FILE_NAME = f"EphysLink-v{version}" +# noinspection PyUnresolvedReferences a = Analysis( ['src\\ephys_link\\__main__.py'], pathex=[], @@ -23,9 +24,11 @@ a = Analysis( noarchive=False, optimize=1, ) +# noinspection PyUnresolvedReferences pyz = PYZ(a.pure) if options.dir: + # noinspection PyUnresolvedReferences exe = EXE( pyz, a.scripts, @@ -45,8 +48,10 @@ if options.dir: entitlements_file=None, icon='assets\\icon.ico', ) + # noinspection PyUnresolvedReferences coll = COLLECT(exe, a.binaries, a.datas, strip=False, upx=True, upx_exclude=[], name=FILE_NAME) else: + # noinspection PyUnresolvedReferences exe = EXE( pyz, a.scripts, diff --git a/pyproject.toml b/pyproject.toml index c311fe1..38894e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,8 +35,9 @@ dependencies = [ "pyserial==3.5", "python-socketio[asyncio_client]==5.11.3", "pythonnet==3.0.3", + "requests==2.32.3", "sensapex==1.400.0", - "vbl-aquarium==0.0.17" + "vbl-aquarium==0.0.18" ] [project.urls] @@ -59,7 +60,6 @@ exclude = ["/.github", "/.idea"] python = "3.12" dependencies = [ "coverage[toml]>=6.5", - "mypy>=1.0.0", "pytest", ] [tool.hatch.envs.default.scripts] @@ -73,7 +73,6 @@ cov = [ "test-cov", "cov-report", ] -types = "mypy --strict --install-types --non-interactive {args:src/ephys_link tests}" #[[tool.hatch.envs.all.matrix]] #python = ["3.8", "3.9", "3.10", "3.11", "3.12"] @@ -88,6 +87,15 @@ build = "pyinstaller.exe ephys_link.spec -y -- -d && pyinstaller.exe ephys_link. build_onefile = "pyinstaller.exe ephys_link.spec -y" build_clean = "pyinstaller.exe ephys_link.spec -y --clean" +[tool.hatch.envs.types] +python = "3.12" +skip-install = true +dependencies = [ + "mypy", +] +[tool.hatch.envs.types.scripts] +check = "mypy --strict --install-types --non-interactive --ignore-missing-imports {args:src/ephys_link tests}" + [tool.coverage.run] source_pkgs = ["ephys_link", "tests"] branch = true @@ -105,7 +113,4 @@ exclude_lines = [ "no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:", -] - -[tool.ruff.lint] -extend-ignore = ["T201", "PLW0603", "BLE001", "FBT001", "ARG002", "S310"] \ No newline at end of file +] \ No newline at end of file diff --git a/scripts/move_tester.py b/scripts/move_tester.py index 4c546b5..1fd720a 100644 --- a/scripts/move_tester.py +++ b/scripts/move_tester.py @@ -1,21 +1,15 @@ -import json +from asyncio import run -import socketio +from vbl_aquarium.models.ephys_link import GotoPositionRequest +from vbl_aquarium.models.unity import Vector4 -if __name__ == "__main__": - sio = socketio.Client() - sio.connect("http://localhost:8081") +from ephys_link.back_end.platform_handler import PlatformHandler +from ephys_link.util.console import Console - sio.emit("register_manipulator", "6") - sio.emit("bypass_calibration", "6") - sio.emit("set_can_write", json.dumps({"manipulator_id": "6", "can_write": True, "hours": 0})) +c = Console(enable_debug=True) +p = PlatformHandler("ump-4", c) +target = Vector4() +# target = Vector4(x=10, y=10, z=10, w=10) - end = "" - while end == "": - sio.emit("goto_pos", json.dumps({"manipulator_id": "6", "pos": [0, 10, 10, 10], "speed": 0.5})) - - input("Press enter to continue...") - - sio.emit("goto_pos", json.dumps({"manipulator_id": "6", "pos": [10, 10, 10, 10], "speed": 1})) - - end = input("Press enter to continue (or type any key then enter to end)...") +print(run(p.set_position(GotoPositionRequest(manipulator_id="6", position=target, speed=5))).to_string()) +print("Done!") diff --git a/scripts/proxy_dev.py b/scripts/proxy_dev.py deleted file mode 100644 index 3d11501..0000000 --- a/scripts/proxy_dev.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from asyncio import run - -from socketio import AsyncClient - -pinpoint_id = "4158ebf3" -is_requester = True - -sio = AsyncClient() - - -async def main(): - await sio.connect("http://localhost:3000") - # await sio.emit("get_manipulators", lambda m: print(m)) - await sio.wait() - - -@sio.event -async def get_pinpoint_id() -> tuple[str, bool]: - return pinpoint_id, is_requester - - -run(main()) diff --git a/scripts/server_tester.py b/scripts/server_tester.py new file mode 100644 index 0000000..2fca50e --- /dev/null +++ b/scripts/server_tester.py @@ -0,0 +1,23 @@ +from time import sleep + +from socketio import SimpleClient +from vbl_aquarium.models.ephys_link import DriveToDepthRequest, InsideBrainRequest +from vbl_aquarium.models.unity import Vector4 + +with SimpleClient() as sio: + sio.connect("http://localhost:3000") + + print(sio.call("set_inside_brain", InsideBrainRequest(manipulator_id="6", inside=True).to_string())) + + target = Vector4() + # target = Vector4(x=10, y=10, z=10, w=10) + + sio.emit( + "set_depth", + DriveToDepthRequest(manipulator_id="6", depth=10, speed=3).to_string(), + ) + sleep(1) + print(sio.call("stop")) + # while True: + # print(sio.call("get_position", "6")) + sio.disconnect() diff --git a/src/ephys_link/__about__.py b/src/ephys_link/__about__.py index 7b1e312..ef2ec23 100644 --- a/src/ephys_link/__about__.py +++ b/src/ephys_link/__about__.py @@ -1 +1 @@ -__version__ = "1.3.3" +__version__ = "2.0.0b0" diff --git a/src/ephys_link/__main__.py b/src/ephys_link/__main__.py index 65534fa..7184a5e 100644 --- a/src/ephys_link/__main__.py +++ b/src/ephys_link/__main__.py @@ -1,104 +1,42 @@ -from argparse import ArgumentParser -from asyncio import run -from sys import argv +"""Ephys Link entry point. + +Responsible for gathering launch options, instantiating the appropriate interface, and starting the application. -from ephys_link import common as com -from ephys_link.__about__ import __version__ as version -from ephys_link.emergency_stop import EmergencyStop -from ephys_link.gui import GUI -from ephys_link.server import Server +Usage: call main() to start. +""" + +from sys import argv -# Setup argument parser. -parser = ArgumentParser( - description="Electrophysiology Manipulator Link: a websocket interface for" - " manipulators in electrophysiology experiments.", - prog="python -m ephys-link", -) -parser.add_argument("-b", "--background", dest="background", action="store_true", help="Skip configuration window.") -parser.add_argument( - "-i", "--ignore-updates", dest="ignore_updates", action="store_true", help="Skip (ignore) checking for updates." -) -parser.add_argument( - "-t", - "--type", - type=str, - dest="type", - default="sensapex", - help='Manipulator type (i.e. "sensapex", "new_scale", or "new_scale_pathfinder"). Default: "sensapex".', -) -parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="Enable debug mode.") -parser.add_argument("-x", "--use-proxy", dest="use_proxy", action="store_true", help="Enable proxy mode.") -parser.add_argument( - "-a", - "--proxy-address", - type=str, - default="proxy2.virtualbrainlab.org", - dest="proxy_address", - help="Proxy IP address.", -) -parser.add_argument( - "-p", - "--port", - type=int, - default=8081, - dest="port", - help="TCP/IP port to use. Default: 8081 (avoids conflict with other HTTP servers).", -) -parser.add_argument( - "--pathfinder_port", - type=int, - default=8080, - dest="pathfinder_port", - help="Port New Scale Pathfinder's server is on. Default: 8080.", -) -parser.add_argument( - "-s", - "--serial", - type=str, - default="no-e-stop", - dest="serial", - nargs="?", - help="Emergency stop serial port (i.e. COM3). Default: disables emergency stop.", -) -parser.add_argument( - "-v", - "--version", - action="version", - version=f"Electrophysiology Manipulator Link v{version}", - help="Print version and exit.", -) +from ephys_link.back_end.platform_handler import PlatformHandler +from ephys_link.back_end.server import Server +from ephys_link.front_end.cli import CLI +from ephys_link.front_end.gui import GUI +from ephys_link.util.console import Console def main() -> None: - """Main function""" + """Ephys Link entry point. - # Parse arguments. - args = parser.parse_args() + 1. Get options via CLI or GUI. + 2. Instantiate the Console and make it globally accessible. + 3. Instantiate the Platform Handler with the appropriate platform bindings. + 4. Instantiate the Emergency Stop service. + 5. Start the server. + """ - # Launch GUI if there are no CLI arguments. - if len(argv) == 1: - gui = GUI() - gui.launch() - return None + # 1. Get options via CLI or GUI (if no CLI options are provided). + options = CLI().parse_args() if len(argv) > 1 else GUI().get_options() - # Otherwise, create Server from CLI. - server = Server() + # 2. Instantiate the Console and make it globally accessible. + console = Console(enable_debug=options.debug) - # Continue with CLI if not. - com.DEBUG = args.debug + # 3. Instantiate the Platform Handler with the appropriate platform bindings. + platform_handler = PlatformHandler(options.type, console) - # Setup serial port. - if args.serial != "no-e-stop": - e_stop = EmergencyStop(server, args.serial) - e_stop.watch() + # 4. Instantiate the Emergency Stop service. - # Launch with parsed arguments on main thread. - if args.use_proxy: - run( - server.launch_for_proxy(args.proxy_address, args.port, args.type, args.pathfinder_port, args.ignore_updates) - ) - else: - server.launch(args.type, args.port, args.pathfinder_port, args.ignore_updates) + # 5. Start the server. + Server(options, platform_handler, console).launch() if __name__ == "__main__": diff --git a/src/ephys_link/back_end/__init__.py b/src/ephys_link/back_end/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py new file mode 100644 index 0000000..766c9a7 --- /dev/null +++ b/src/ephys_link/back_end/platform_handler.py @@ -0,0 +1,281 @@ +# ruff: noqa: BLE001 +"""Manipulator platform handler. + +Responsible for performing the various manipulator commands. +Instantiates the appropriate bindings based on the platform type and uses them to perform the commands. + +Usage: Instantiate PlatformHandler with the platform type and call the desired command. +""" + +from uuid import uuid4 + +from vbl_aquarium.models.ephys_link import ( + AngularResponse, + BooleanStateResponse, + DriveToDepthRequest, + DriveToDepthResponse, + GetManipulatorsResponse, + GotoPositionRequest, + InsideBrainRequest, + PositionalResponse, + ShankCountResponse, +) +from vbl_aquarium.models.proxy import PinpointIdResponse +from vbl_aquarium.models.unity import Vector4 + +from ephys_link.__about__ import __version__ +from ephys_link.bindings.fake_bindings import FakeBindings +from ephys_link.bindings.ump_4_bindings import Ump4Bindings +from ephys_link.util.base_bindings import BaseBindings +from ephys_link.util.common import vector4_to_array +from ephys_link.util.console import Console + + +class PlatformHandler: + """Handler for platform commands.""" + + def __init__(self, platform_type: str, console: Console) -> None: + """Initialize platform handler. + + :param platform_type: Platform type to initialize bindings from. + :type platform_type: str + """ + + # Store the platform type. + self._platform_type = platform_type + + # Store the console. + self._console = console + + # Define bindings based on platform type. + self._bindings = self._match_platform_type(platform_type) + + # Record which IDs are inside the brain. + self._inside_brain: set[str] = set() + + # Generate a Pinpoint ID for proxy usage. + self._pinpoint_id = str(uuid4())[:8] + + def _match_platform_type(self, platform_type: str) -> BaseBindings: + """Match the platform type to the appropriate bindings. + + :param platform_type: Platform type. + :type platform_type: str + :returns: Bindings for the specified platform type. + :rtype: :class:`ephys_link.util.base_bindings.BaseBindings` + """ + match platform_type: + case "ump-4": + return Ump4Bindings() + case "fake": + return FakeBindings() + case _: + error_message = f'Platform type "{platform_type}" not recognized.' + self._console.labeled_error_print("PLATFORM", error_message) + raise ValueError(error_message) + + # Ephys Link metadata. + + @staticmethod + def get_version() -> str: + """Get Ephys Link's version. + + :returns: Ephys Link's version. + :rtype: str + """ + return __version__ + + def get_pinpoint_id(self) -> PinpointIdResponse: + """Get the Pinpoint ID for proxy usage. + + :returns: Pinpoint ID response. + :rtype: :class:`vbl_aquarium.models.ephys_link.PinpointIDResponse` + """ + return PinpointIdResponse(pinpoint_id=self._pinpoint_id, is_requester=False) + + def get_platform_type(self) -> str: + """Get the manipulator platform type connected to Ephys Link. + + :returns: Platform type config identifier (see CLI options for examples). + :rtype: str + """ + return self._platform_type + + # Manipulator commands. + + async def get_manipulators(self) -> GetManipulatorsResponse: + """Get a list of available manipulators on the current handler. + + :returns: List of manipulator IDs, number of axes, dimensions of manipulators (mm), and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.GetManipulatorsResponse` + """ + try: + manipulators = await self._bindings.get_manipulators() + num_axes = await self._bindings.get_num_axes() + dimensions = self._bindings.get_dimensions() + except Exception as e: + self._console.exception_error_print("Get Manipulators", e) + return GetManipulatorsResponse(error=self._console.pretty_exception(e)) + else: + return GetManipulatorsResponse( + manipulators=manipulators, + num_axes=num_axes, + dimensions=dimensions, + ) + + async def get_position(self, manipulator_id: str) -> PositionalResponse: + """Get the current translation position of a manipulator in unified coordinates (mm). + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Current position of the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.PositionalResponse` + """ + try: + unified_position = self._bindings.platform_space_to_unified_space( + await self._bindings.get_position(manipulator_id) + ) + except Exception as e: + self._console.exception_error_print("Get Position", e) + return PositionalResponse(error=str(e)) + else: + return PositionalResponse(position=unified_position) + + async def get_angles(self, manipulator_id: str) -> AngularResponse: + """Get the current rotation angles of a manipulator in Yaw, Pitch, Roll (degrees). + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Current angles of the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.AngularResponse` + """ + try: + angles = await self._bindings.get_angles(manipulator_id) + except Exception as e: + self._console.exception_error_print("Get Angles", e) + return AngularResponse(error=self._console.pretty_exception(e)) + else: + return AngularResponse(angles=angles) + + async def get_shank_count(self, manipulator_id: str) -> ShankCountResponse: + """Get the number of shanks on a manipulator. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Number of shanks on the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.ShankCountResponse` + """ + try: + shank_count = await self._bindings.get_shank_count(manipulator_id) + except Exception as e: + self._console.exception_error_print("Get Shank Count", e) + return ShankCountResponse(error=self._console.pretty_exception(e)) + else: + return ShankCountResponse(shank_count=shank_count) + + async def set_position(self, request: GotoPositionRequest) -> PositionalResponse: + """Move a manipulator to a specified translation position in unified coordinates (mm). + + :param request: Request to move a manipulator to a specified position. + :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` + :returns: Final position of the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.Position` + """ + try: + # Disallow setting manipulator position while inside the brain. + if request.manipulator_id in self._inside_brain: + error_message = 'Can not move manipulator while inside the brain. Set the depth ("set_depth") instead.' + self._console.error_print(error_message) + return PositionalResponse(error=error_message) + + # Move to the new position. + final_platform_position = await self._bindings.set_position( + manipulator_id=request.manipulator_id, + position=self._bindings.unified_space_to_platform_space(request.position), + speed=request.speed, + ) + final_unified_position = self._bindings.platform_space_to_unified_space(final_platform_position) + + # Return error if movement did not reach target within tolerance. + for index, axis in enumerate(vector4_to_array(final_unified_position - request.position)): + # End once index is greater than the number of axes. + if index >= await self._bindings.get_num_axes(): + break + + # Check if the axis is within the movement tolerance. + if abs(axis) > await self._bindings.get_movement_tolerance(): + error_message = ( + f"Manipulator {request.manipulator_id} did not reach target" + f" position on axis {list(Vector4.model_fields.keys())[index]}." + f"Requested: {request.position}, got: {final_unified_position}." + ) + self._console.error_print(error_message) + return PositionalResponse(error=error_message) + except Exception as e: + self._console.exception_error_print("Set Position", e) + return PositionalResponse(error=self._console.pretty_exception(e)) + else: + return PositionalResponse(position=final_unified_position) + + async def set_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: + """Move a manipulator's depth translation stage to a specific value (mm). + + :param request: Request to move a manipulator to a specified depth. + :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` + :returns: Final depth of the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.DriveToDepthResponse` + """ + try: + # Create a position based on the new depth. + current_platform_position = await self._bindings.get_position(request.manipulator_id) + current_unified_position = self._bindings.platform_space_to_unified_space(current_platform_position) + target_unified_position = current_unified_position.model_copy(update={"w": request.depth}) + target_platform_position = self._bindings.unified_space_to_platform_space(target_unified_position) + + # Move to the new depth. + final_platform_position = await self._bindings.set_position( + manipulator_id=request.manipulator_id, + position=target_platform_position, + speed=request.speed, + ) + final_unified_position = self._bindings.platform_space_to_unified_space(final_platform_position) + except Exception as e: + self._console.exception_error_print("Set Depth", e) + return DriveToDepthResponse(error=self._console.pretty_exception(e)) + else: + return DriveToDepthResponse(depth=final_unified_position.w) + + async def set_inside_brain(self, request: InsideBrainRequest) -> BooleanStateResponse: + """Mark a manipulator as inside the brain or not. + + This should restrict the manipulator's movement to just the depth axis. + + :param request: Request to set a manipulator's inside brain state. + :type request: :class:`vbl_aquarium.models.ephys_link.InsideBrainRequest` + :returns: Inside brain state of the manipulator and an error message if any. + :rtype: :class:`vbl_aquarium.models.ephys_link.BooleanStateResponse` + """ + try: + if request.inside: + self._inside_brain.add(request.manipulator_id) + else: + self._inside_brain.discard(request.manipulator_id) + except Exception as e: + self._console.exception_error_print("Set Inside Brain", e) + return BooleanStateResponse(error=self._console.pretty_exception(e)) + else: + return BooleanStateResponse(state=request.inside) + + async def stop(self) -> str: + """Stop all manipulators. + + :returns: Error message if any. + :rtype: str + """ + try: + await self._bindings.stop() + except Exception as e: + self._console.exception_error_print("Stop", e) + return self._console.pretty_exception(e) + else: + return "" diff --git a/src/ephys_link/back_end/server.py b/src/ephys_link/back_end/server.py new file mode 100644 index 0000000..ecb254d --- /dev/null +++ b/src/ephys_link/back_end/server.py @@ -0,0 +1,197 @@ +from asyncio import get_event_loop, run +from collections.abc import Callable, Coroutine +from json import JSONDecodeError, dumps, loads +from typing import Any + +from aiohttp.web import Application, run_app +from pydantic import ValidationError +from socketio import AsyncClient, AsyncServer +from vbl_aquarium.models.ephys_link import ( + DriveToDepthRequest, + EphysLinkOptions, + GotoPositionRequest, + InsideBrainRequest, +) +from vbl_aquarium.models.generic import VBLBaseModel + +from ephys_link.back_end.platform_handler import PlatformHandler +from ephys_link.util.common import PORT, check_for_updates, server_preamble +from ephys_link.util.console import Console + + +class Server: + def __init__(self, options: EphysLinkOptions, platform_handler: PlatformHandler, console: Console) -> None: + """Initialize server fields based on options and platform handler.""" + + # Save fields. + self._options = options + self._platform_handler = platform_handler + self._console = console + + # Initialize based on proxy usage. + self._sio: AsyncServer | AsyncClient = AsyncClient() if self._options.use_proxy else AsyncServer() + if not self._options.use_proxy: + self._app = Application() + self._sio.attach(self._app) + + # Bind connection events. + self._sio.on("connect", self.connect) + self._sio.on("disconnect", self.disconnect) + + # Store connected client. + self._client_sid: str = "" + + # Bind events. + self._sio.on("*", self.platform_event_handler) + + # Server launch. + def launch(self) -> None: + # Preamble. + server_preamble() + + # Check for updates. + check_for_updates() + + # List platform and available manipulators. + self._console.info_print("PLATFORM", self._platform_handler.get_platform_type()) + self._console.info_print( + "MANIPULATORS", + str(get_event_loop().run_until_complete(self._platform_handler.get_manipulators()).manipulators), + ) + + # Launch server + if self._options.use_proxy: + self._console.info_print("PINPOINT ID", self._platform_handler.get_pinpoint_id().pinpoint_id) + + async def connect_proxy() -> None: + # noinspection HttpUrlsUsage + await self._sio.connect(f"http://{self._options.proxy_address}:{PORT}") + await self._sio.wait() + + run(connect_proxy()) + else: + run_app(self._app, port=PORT) + + # Helper functions. + def _malformed_request_response(self, request: str, data: tuple[tuple[Any], ...]) -> str: + """Return a response for a malformed request.""" + self._console.labeled_error_print("MALFORMED REQUEST", f"{request}: {data}") + return dumps({"error": "Malformed request."}) + + async def _run_if_data_available( + self, function: Callable[[str], Coroutine[Any, Any, VBLBaseModel]], event: str, data: tuple[tuple[Any], ...] + ) -> str: + """Run a function if data is available.""" + request_data = data[1] + if request_data: + return str((await function(str(request_data))).to_string()) + return self._malformed_request_response(event, request_data) + + async def _run_if_data_parses( + self, + function: Callable[[VBLBaseModel], Coroutine[Any, Any, VBLBaseModel]], + data_type: type[VBLBaseModel], + event: str, + data: tuple[tuple[Any], ...], + ) -> str: + """Run a function if data parses.""" + request_data = data[1] + if request_data: + try: + parsed_data = data_type(**loads(str(request_data))) + except JSONDecodeError: + return self._malformed_request_response(event, request_data) + except ValidationError as e: + self._console.exception_error_print(event, e) + return self._malformed_request_response(event, request_data) + else: + return str((await function(parsed_data)).to_string()) + return self._malformed_request_response(event, request_data) + + # Event Handlers. + + async def connect(self, sid: str, _: str) -> bool: + """Handle connections to the server + + :param sid: Socket session ID. + :type sid: str + :param _: Extra connection data (unused). + :type _: str + :returns: False on error to refuse connection, True otherwise. + :rtype: bool + """ + self._console.info_print("CONNECTION REQUEST", sid) + + if self._client_sid == "": + self._client_sid = sid + self._console.info_print("CONNECTION GRANTED", sid) + return True + + self._console.error_print(f"CONNECTION REFUSED to {sid}. Client {self._client_sid} already connected.") + return False + + async def disconnect(self, sid: str) -> None: + """Handle disconnections from the server + + :param sid: Socket session ID. + :type sid: str + """ + self._console.info_print("DISCONNECTED", sid) + + # Reset client SID if it matches. + if self._client_sid == sid: + self._client_sid = "" + else: + self._console.error_print(f"Client {sid} disconnected without being connected.") + + # noinspection PyTypeChecker + async def platform_event_handler(self, event: str, *args: tuple[Any]) -> str: + """Handle events from the server + + :param event: Event name. + :type event: str + :param args: Event arguments. + :type args: tuple[Any] + :returns: Response data. + :rtype: str + """ + + # Log event. + self._console.debug_print("EVENT", event) + + # Handle event. + match event: + # Server metadata. + case "get_version": + return self._platform_handler.get_version() + case "get_pinpoint_id": + return str(self._platform_handler.get_pinpoint_id().to_string()) + case "get_platform_type": + return self._platform_handler.get_platform_type() + + # Manipulator commands. + case "get_manipulators": + return str((await self._platform_handler.get_manipulators()).to_string()) + case "get_position": + return await self._run_if_data_available(self._platform_handler.get_position, event, args) + case "get_angles": + return await self._run_if_data_available(self._platform_handler.get_angles, event, args) + case "get_shank_count": + return await self._run_if_data_available(self._platform_handler.get_shank_count, event, args) + case "set_position": + return await self._run_if_data_parses( + self._platform_handler.set_position, GotoPositionRequest, event, args + ) + case "set_depth": + return await self._run_if_data_parses( + self._platform_handler.set_depth, DriveToDepthRequest, event, args + ) + case "set_inside_brain": + return await self._run_if_data_parses( + self._platform_handler.set_inside_brain, InsideBrainRequest, event, args + ) + case "stop": + return await self._platform_handler.stop() + case _: + self._console.error_print(f"Unknown event: {event}.") + return dumps({"error": "Unknown event."}) diff --git a/src/ephys_link/bindings/__init__.py b/src/ephys_link/bindings/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ephys_link/bindings/fake_bindings.py b/src/ephys_link/bindings/fake_bindings.py new file mode 100644 index 0000000..38f41fe --- /dev/null +++ b/src/ephys_link/bindings/fake_bindings.py @@ -0,0 +1,54 @@ +from vbl_aquarium.models.unity import Vector3, Vector4 + +from ephys_link.util.base_bindings import BaseBindings + + +class FakeBindings(BaseBindings): + def __init__(self) -> None: + """Initialize fake manipulator infos.""" + + self._positions = [Vector4() for _ in range(8)] + self._angles = [ + Vector3(x=90, y=60, z=0), + Vector3(x=-90, y=60, z=0), + Vector3(x=180, y=60, z=0), + Vector3(x=0, y=60, z=0), + Vector3(x=45, y=30, z=0), + Vector3(x=-45, y=30, z=0), + Vector3(x=135, y=30, z=0), + Vector3(x=-135, y=30, z=0), + ] + + async def get_manipulators(self) -> list[str]: + return list(map(str, range(8))) + + async def get_num_axes(self) -> int: + return 4 + + def get_dimensions(self) -> Vector4: + return Vector4(x=20, y=20, z=20, w=20) + + async def get_position(self, manipulator_id: str) -> Vector4: + return self._positions[int(manipulator_id)] + + async def get_angles(self, manipulator_id: str) -> Vector3: + return self._angles[int(manipulator_id)] + + async def get_shank_count(self, _: str) -> int: + return 1 + + async def get_movement_tolerance(self) -> float: + return 0.001 + + async def set_position(self, manipulator_id: str, position: Vector4, _: float) -> Vector4: + self._positions[int(manipulator_id)] = position + return position + + async def stop(self) -> None: + pass + + def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: + pass + + def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: + pass diff --git a/src/ephys_link/bindings/ump_4_bindings.py b/src/ephys_link/bindings/ump_4_bindings.py new file mode 100644 index 0000000..5c537fd --- /dev/null +++ b/src/ephys_link/bindings/ump_4_bindings.py @@ -0,0 +1,128 @@ +"""Bindings for Sensapex uMp-4 platform. + +Usage: Instantiate Ump4Bindings to interact with the Sensapex uMp-4 platform. +""" + +from asyncio import get_running_loop + +from sensapex import UMP, SensapexDevice +from vbl_aquarium.models.unity import Vector3, Vector4 + +from ephys_link.util.base_bindings import BaseBindings +from ephys_link.util.common import RESOURCES_PATH, array_to_vector4, mm_to_um, mmps_to_umps, um_to_mm, vector4_to_array +from ephys_link.util.console import Console + + +class Ump4Bindings(BaseBindings): + """Bindings for UMP-4 platform""" + + def __init__(self) -> None: + """Initialize UMP-4 bindings.""" + + # Establish connection to Sensapex API (exit if connection fails). + UMP.set_library_path(RESOURCES_PATH) + self._ump = UMP.get_ump() + if self._ump is None: + error_message = "Unable to connect to uMp" + Console.error_print(error_message) + raise ValueError(error_message) + + async def get_manipulators(self) -> list[str]: + return list(map(str, self._ump.list_devices())) + + async def get_num_axes(self) -> int: + return 4 + + def get_dimensions(self) -> Vector4: + return Vector4(x=20, y=20, z=20, w=20) + + async def get_position(self, manipulator_id: str) -> Vector4: + return um_to_mm(array_to_vector4(self._get_device(manipulator_id).get_pos(1))) + + # noinspection PyTypeChecker + async def get_angles(self, _: str) -> Vector3: + """uMp-4 does not support getting angles so raise an error. + + :raises: AttributeError + """ + error_message = "UMP-4 does not support getting angles" + raise AttributeError(error_message) + + # noinspection PyTypeChecker + async def get_shank_count(self, _: str) -> int: + """uMp-4 does not support getting shank count so raise an error. + + :raises: AttributeError + """ + error_message = "UMP-4 does not support getting shank count" + raise AttributeError(error_message) + + async def get_movement_tolerance(self) -> float: + return 0.001 + + async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: + """Set the position of the manipulator. + + Waits using Asyncio until the movement is finished. This assumes the application is running in an event loop. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :param position: Platform space position to set the manipulator to (mm). + :type position: Vector4 + :param speed: Speed to move the manipulator to the position (mm/s). + :type speed: float + :returns: Final position of the manipulator in platform space (mm). + :rtype: Vector4 + :raises RuntimeError: If the movement is interrupted. + """ + # Convert position to micrometers. + target_position_um = mm_to_um(position) + + # Request movement. + movement = self._get_device(manipulator_id).goto_pos(vector4_to_array(target_position_um), mmps_to_umps(speed)) + + # Wait for movement to finish. + await get_running_loop().run_in_executor(None, movement.finished_event.wait) + + # Handle interrupted movement. + if movement.interrupted: + error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}" + raise RuntimeError(error_message) + + return um_to_mm(array_to_vector4(movement.last_pos)) + + async def stop(self) -> None: + for device_ids in await self.get_manipulators(): + self._get_device(device_ids).stop() + + def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: + # unified <- platform + # +x <- +y + # +y <- -z + # +z <- +x + # +d <- +d + + return Vector4( + x=platform_space.y, + y=self.get_dimensions().z - platform_space.z, + z=platform_space.x, + w=platform_space.w, + ) + + def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: + # platform <- unified + # +x <- +z + # +y <- +x + # +z <- -y + # +d <- +d + + return Vector4( + x=unified_space.z, + y=unified_space.x, + z=self.get_dimensions().z - unified_space.y, + w=unified_space.w, + ) + + # Helper methods. + def _get_device(self, manipulator_id: str) -> SensapexDevice: + return self._ump.get_device(int(manipulator_id)) diff --git a/src/ephys_link/common.py b/src/ephys_link/common.py deleted file mode 100644 index 23b406c..0000000 --- a/src/ephys_link/common.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Commonly used functions and dictionaries - -Contains globally used helper functions and typed dictionaries (to be used as -callback parameters) -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from vbl_aquarium.models.unity import Vector4 - -# Debugging flag -DEBUG = False - -# Ephys Link ASCII -ASCII = r""" - ______ _ _ _ _ - | ____| | | | | (_) | | - | |__ _ __ | |__ _ _ ___ | | _ _ __ | | __ - | __| | '_ \| '_ \| | | / __| | | | | '_ \| |/ / - | |____| |_) | | | | |_| \__ \ | |____| | | | | < - |______| .__/|_| |_|\__, |___/ |______|_|_| |_|_|\_\ - | | __/ | - |_| |___/ -""" - - -def dprint(message: str) -> None: - """Print message if debug is enabled. - - :param message: Message to print. - :type message: str - :return: None - """ - if DEBUG: - print(message) - - -def vector4_to_array(vector4: Vector4) -> list[float]: - """Convert a Vector4 to a list of floats. - - :param vector4: Vector4 to convert. - :type vector4: :class:`vbl_aquarium.models.unity.Vector4` - :return: List of floats. - :rtype: list[float] - """ - return [vector4.x, vector4.y, vector4.z, vector4.w] diff --git a/src/ephys_link/emergency_stop.py b/src/ephys_link/emergency_stop.py deleted file mode 100644 index 985e6d9..0000000 --- a/src/ephys_link/emergency_stop.py +++ /dev/null @@ -1,67 +0,0 @@ -from signal import SIGINT, SIGTERM, signal -from threading import Event, Thread -from time import sleep - -from serial import Serial -from serial.tools import list_ports - -from ephys_link.common import dprint -from ephys_link.server import Server - - -class EmergencyStop: - """Serial system for emergency stop""" - - def __init__(self, server: Server, serial_port: str) -> None: - """Setup serial port for emergency stop - - :param server: The Ephys Link server instance - :type server: Server - :param serial_port: The serial port to poll - :type serial_port: str - :return: None - """ - self._server = server - self._serial_port = serial_port - self._poll_rate = 0.05 - self._kill_serial_event = Event() - self._poll_serial_thread = Thread(target=self._poll_serial, daemon=True) - - # Register signals - signal(SIGTERM, self._close_serial) - signal(SIGINT, self._close_serial) - - def watch(self) -> None: - """Start polling serial port for emergency stop""" - self._poll_serial_thread.start() - - def _poll_serial(self) -> None: - """Continuously poll serial port for data - - Close port on kill event - """ - target_port = self._serial_port - if self._serial_port is None: - # Search for serial ports - for port, desc, _ in list_ports.comports(): - if "Arduino" in desc or "USB Serial Device" in desc: - target_port = port - break - - serial = Serial(target_port, 9600, timeout=self._poll_rate) - while not self._kill_serial_event.is_set(): - if serial.in_waiting > 0: - serial.readline() - # Cause a break - dprint("[EMERGENCY STOP]\t\t Stopping all manipulators") - self._server.platform.stop() - serial.reset_input_buffer() - sleep(self._poll_rate) - print("Close poll") - serial.close() - - def _close_serial(self, _, __) -> None: - """Close the serial connection""" - print("[INFO]\t\t Closing serial") - self._kill_serial_event.set() - self._poll_serial_thread.join() diff --git a/src/ephys_link/front_end/__init__.py b/src/ephys_link/front_end/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ephys_link/front_end/cli.py b/src/ephys_link/front_end/cli.py new file mode 100644 index 0000000..a14718e --- /dev/null +++ b/src/ephys_link/front_end/cli.py @@ -0,0 +1,98 @@ +"""Command-line interface for the Electrophysiology Manipulator Link. + +Usage: instantiate CLI and call parse_args() to get the parsed arguments. +""" + +from argparse import ArgumentParser + +from vbl_aquarium.models.ephys_link import EphysLinkOptions + +from ephys_link.__about__ import __version__ as version + + +class CLI: + """Command-line interface for the Electrophysiology Manipulator Link. + + Configures the CLI parser and options. + """ + + def __init__(self) -> None: + """Initialize CLI parser.""" + + self._parser = ArgumentParser( + description="Electrophysiology Manipulator Link:" + " a Socket.IO interface for manipulators in electrophysiology experiments.", + prog="python -m ephys-link", + ) + + self._parser.add_argument( + "-b", "--background", dest="background", action="store_true", help="Skip configuration window." + ) + self._parser.add_argument( + "-i", + "--ignore-updates", + dest="ignore_updates", + action="store_true", + help="Skip (ignore) checking for updates.", + ) + self._parser.add_argument( + "-t", + "--type", + type=str, + dest="type", + default="ump-4", + help='Manipulator type (i.e. "ump-4", "ump-3", "pathfinder-mpm", "new-scale", "fake"). Default: "ump-4".', + ) + self._parser.add_argument( + "-d", + "--debug", + dest="debug", + action="store_true", + help="Enable debug mode.", + ) + self._parser.add_argument( + "-p", + "--use-proxy", + dest="use_proxy", + action="store_true", + help="Enable proxy mode.", + ) + self._parser.add_argument( + "-a", + "--proxy-address", + type=str, + default="proxy2.virtualbrainlab.org", + dest="proxy_address", + help="Proxy IP address.", + ) + self._parser.add_argument( + "--mpm-port", + type=int, + default=8080, + dest="mpm_port", + help="Port New Scale Pathfinder MPM's server is on. Default: 8080.", + ) + self._parser.add_argument( + "-s", + "--serial", + type=str, + default="no-e-stop", + dest="serial", + nargs="?", + help="Emergency stop serial port (i.e. COM3). Default: disables emergency stop.", + ) + self._parser.add_argument( + "-v", + "--version", + action="version", + version=f"Electrophysiology Manipulator Link v{version}", + help="Print version and exit.", + ) + + def parse_args(self) -> EphysLinkOptions: + """Parse arguments and return them + + :returns: Parsed arguments + :rtype: EphysLinkOptions + """ + return EphysLinkOptions(**vars(self._parser.parse_args())) diff --git a/src/ephys_link/gui.py b/src/ephys_link/front_end/gui.py similarity index 53% rename from src/ephys_link/gui.py rename to src/ephys_link/front_end/gui.py index e60c7b4..325c7c9 100644 --- a/src/ephys_link/gui.py +++ b/src/ephys_link/front_end/gui.py @@ -1,64 +1,89 @@ -from asyncio import run -from json import dumps, load +"""Graphical User Interface for Ephys Link. + +Usage: create a GUI instance and call get_options() to get the options. +""" + +from json import load from os import makedirs -from os.path import exists +from os.path import exists, join from socket import gethostbyname, gethostname +from sys import exit from tkinter import CENTER, RIGHT, BooleanVar, E, IntVar, StringVar, Tk, ttk from platformdirs import user_config_dir +from vbl_aquarium.models.ephys_link import EphysLinkOptions -import ephys_link.common as com from ephys_link.__about__ import __version__ as version -from ephys_link.emergency_stop import EmergencyStop -from ephys_link.server import Server -SETTINGS_DIR = f"{user_config_dir()}\\VBL\\Ephys Link" -SETTINGS_FILENAME = "settings.json" +# Define options path. +OPTIONS_DIR = join(user_config_dir(), "VBL", "Ephys Link") +OPTIONS_FILENAME = "options.json" +OPTIONS_PATH = join(OPTIONS_DIR, OPTIONS_FILENAME) class GUI: - """GUI definition for Ephys Link""" + """Graphical User Interface for Ephys Link. + + Gathers options from the user and saves them to a file. + """ def __init__(self) -> None: - """Setup and construction of the Tk GUI""" + """Setup GUI properties.""" self._root = Tk() - # Create default settings dictionary - settings = { - "ignore_updates": False, - "type": "sensapex", - "debug": False, - "proxy": False, - "proxy_address": "proxy2.virtualbrainlab.org", - "port": 8081, - "pathfinder_port": 8080, - "serial": "no-e-stop", - } - - # Read settings. - if exists(f"{SETTINGS_DIR}\\{SETTINGS_FILENAME}"): - with open(f"{SETTINGS_DIR}\\{SETTINGS_FILENAME}") as settings_file: - settings = load(settings_file) - - self._ignore_updates = BooleanVar(value=settings["ignore_updates"]) - self._type = StringVar(value=settings["type"]) - self._debug = BooleanVar(value=settings["debug"]) - self._proxy = BooleanVar(value=settings["proxy"]) - self._proxy_address = StringVar(value=settings["proxy_address"]) - self._port = IntVar(value=settings["port"]) - self._pathfinder_port = IntVar(value=settings["pathfinder_port"]) - self._serial = StringVar(value=settings["serial"]) - - def launch(self) -> None: - """Build and launch GUI""" - - # Build and run GUI. + # Create default options. + options = EphysLinkOptions() + + # Read options. + if exists(OPTIONS_PATH): + with open(OPTIONS_PATH) as options_file: + options = EphysLinkOptions(**load(options_file)) + + # Load options into GUI variables. + self._ignore_updates = BooleanVar(value=options.ignore_updates) + self._type = StringVar(value=options.type) + self._debug = BooleanVar(value=options.debug) + self._use_proxy = BooleanVar(value=options.use_proxy) + self._proxy_address = StringVar(value=options.proxy_address) + self._mpm_port = IntVar(value=options.mpm_port) + self._serial = StringVar(value=options.serial) + + # Submit flag. + self._submit = False + + def get_options(self) -> EphysLinkOptions: + """Get options from GUI.""" + + # Launch GUI. self._build_gui() self._root.mainloop() - def _build_gui(self): - """Build GUI""" + # Exit if the user did not submit options. + if not self._submit: + exit(1) + + # Extract options from GUI. + options = EphysLinkOptions( + ignore_updates=self._ignore_updates.get(), + type=self._type.get(), + debug=self._debug.get(), + use_proxy=self._use_proxy.get(), + proxy_address=self._proxy_address.get(), + mpm_port=self._mpm_port.get(), + serial=self._serial.get(), + ) + + # Save options. + makedirs(OPTIONS_DIR, exist_ok=True) + with open(OPTIONS_PATH, "w+") as options_file: + options_file.write(options.model_dump_json()) + + # Return options + return options + + def _build_gui(self) -> None: + """Build GUI.""" self._root.title(f"Ephys Link v{version}") @@ -84,7 +109,7 @@ def _build_gui(self): ) ttk.Checkbutton( server_serving_settings, - variable=self._proxy, + variable=self._use_proxy, ).grid(column=1, row=1, sticky="we") # Proxy address. @@ -95,12 +120,6 @@ def _build_gui(self): column=1, row=2, sticky="we" ) - # Port. - ttk.Label(server_serving_settings, text="Port:", anchor=E, justify=RIGHT).grid(column=0, row=3, sticky="we") - ttk.Entry(server_serving_settings, textvariable=self._port, width=5, justify=CENTER).grid( - column=1, row=3, sticky="we" - ) - # Ignore updates. ttk.Label(server_serving_settings, text="Ignore Updates:", anchor=E, justify=RIGHT).grid( column=0, row=4, sticky="we" @@ -110,6 +129,15 @@ def _build_gui(self): variable=self._ignore_updates, ).grid(column=1, row=4, sticky="we") + # Debug mode. + ttk.Label(server_serving_settings, text="Debug mode:", anchor=E, justify=RIGHT).grid( + column=0, row=5, sticky="we" + ) + ttk.Checkbutton( + server_serving_settings, + variable=self._debug, + ).grid(column=1, row=5, sticky="we") + # --- # Platform type. @@ -120,38 +148,44 @@ def _build_gui(self): platform_type_settings, text="Sensapex uMp-4", variable=self._type, - value="sensapex", + value="ump-4", ).grid(column=0, row=0, sticky="we") ttk.Radiobutton( platform_type_settings, text="Sensapex uMp-3", variable=self._type, - value="ump3", + value="ump-3", ).grid(column=0, row=1, sticky="we") ttk.Radiobutton( platform_type_settings, text="Pathfinder MPM Control v2.8.8+", variable=self._type, - value="new_scale_pathfinder", + value="pathfinder-mpm", ).grid(column=0, row=2, sticky="we") ttk.Radiobutton( platform_type_settings, text="New Scale M3-USB-3:1-EP", variable=self._type, - value="new_scale", + value="new-scale", ).grid(column=0, row=3, sticky="we") + ttk.Radiobutton( + platform_type_settings, + text="Fake Platform", + variable=self._type, + value="fake", + ).grid(column=0, row=4, sticky="we") # --- # New Scale Settings. - new_scale_settings = ttk.LabelFrame(mainframe, text="Pathfinder Settings", padding=3) + new_scale_settings = ttk.LabelFrame(mainframe, text="Pathfinder MPM Settings", padding=3) new_scale_settings.grid(column=0, row=2, sticky="news") # Port ttk.Label(new_scale_settings, text="HTTP Server Port:", anchor=E, justify=RIGHT).grid( column=0, row=1, sticky="we" ) - ttk.Entry(new_scale_settings, textvariable=self._pathfinder_port, width=5, justify=CENTER).grid( + ttk.Entry(new_scale_settings, textvariable=self._mpm_port, width=5, justify=CENTER).grid( column=1, row=1, sticky="we" ) @@ -173,45 +207,9 @@ def _build_gui(self): ).grid(column=0, row=4, columnspan=2, sticky="we") def _launch_server(self) -> None: - """Launch server based on GUI settings""" + """Close GUI and return to the server. - # Close GUI. + Options are saved in fields. + """ + self._submit = True self._root.destroy() - - # Save settings. - settings = { - "ignore_updates": self._ignore_updates.get(), - "type": self._type.get(), - "debug": self._debug.get(), - "proxy": self._proxy.get(), - "proxy_address": self._proxy_address.get(), - "port": self._port.get(), - "pathfinder_port": self._pathfinder_port.get(), - "serial": self._serial.get(), - } - makedirs(SETTINGS_DIR, exist_ok=True) - with open(f"{SETTINGS_DIR}\\{SETTINGS_FILENAME}", "w+") as f: - f.write(dumps(settings)) - - # Launch server. - server = Server() - - com.DEBUG = self._debug.get() - - if self._serial.get() != "no-e-stop": - e_stop = EmergencyStop(server, self._serial.get()) - e_stop.watch() - - # Launch with parsed arguments on main thread. - if self._proxy.get(): - run( - server.launch_for_proxy( - self._proxy_address.get(), - self._port.get(), - self._type.get(), - self._pathfinder_port.get(), - self._ignore_updates.get(), - ) - ) - else: - server.launch(self._type.get(), self._port.get(), self._pathfinder_port.get(), self._ignore_updates.get()) diff --git a/src/ephys_link/platform_handler.py b/src/ephys_link/platform_handler.py deleted file mode 100644 index c570157..0000000 --- a/src/ephys_link/platform_handler.py +++ /dev/null @@ -1,465 +0,0 @@ -"""Handle communications with platform specific API - -Handles relaying WebSocket messages to the appropriate platform API functions and -conducting error checks on the input and output values - -Function names here are the same as the WebSocket events. They are called when the -server receives an event from a client. In general, each function does the following: - -1. Receive extracted arguments from :mod:`ephys_link.server` -2. Call and check the appropriate platform API function (overloaded by each platform) -3. Log/handle successes and failures -4. Return the callback parameters to :mod:`ephys_link.server` -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -from vbl_aquarium.models.ephys_link import ( - AngularResponse, - BooleanStateResponse, - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GetManipulatorsResponse, - GotoPositionRequest, - InsideBrainRequest, - PositionalResponse, - ShankCountResponse, -) -from vbl_aquarium.models.unity import Vector4 - -from ephys_link import common as com - -if TYPE_CHECKING: - from socketio import AsyncClient, AsyncServer - - -class PlatformHandler(ABC): - """An abstract class that defines the interface for a manipulator handler.""" - - def __init__(self): - """Initialize the manipulator handler with a dictionary of manipulators.""" - - # Registered manipulators are stored as a dictionary of IDs (string) to - # manipulator objects. - self.manipulators = {} - self.num_axes = 4 - - # Platform axes dimensions in mm - self.dimensions = Vector4(x=20, y=20, z=20, w=20) - - # Platform Handler Methods. - - def reset(self) -> bool: - """Reset handler. - - :return: True if successful, False otherwise. - :rtype: bool - """ - stop_result = self.stop() - self.manipulators.clear() - return stop_result - - def stop(self) -> bool: - """Stop handler. - - :return: True if successful, False otherwise. - :rtype: bool - """ - try: - for manipulator in self.manipulators.values(): - if hasattr(manipulator, "stop"): - manipulator.stop() - except Exception as e: - print(f"[ERROR]\t\t Could not stop manipulators: {e}\n") - return False - else: - return True - - def get_manipulators(self) -> GetManipulatorsResponse: - """Get all registered manipulators. - - :return: Result of connected manipulators, platform information, and error message (if any). - :rtype: :class:`vbl_aquarium.models.ephys_link.GetManipulatorsResponse` - """ - try: - manipulators = self._get_manipulators() - except Exception as e: - print(f"[ERROR]\t\t Getting manipulators: {type(e)}: {e}\n") - return GetManipulatorsResponse(error="Error getting manipulators") - else: - return GetManipulatorsResponse( - manipulators=manipulators, num_axes=self.num_axes, dimensions=self.dimensions - ) - - def register_manipulator(self, manipulator_id: str) -> str: - """Register a manipulator. - - :param manipulator_id: The ID of the manipulator to register. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - # Check if manipulator is already registered - if manipulator_id in self.manipulators: - print(f"[ERROR]\t\t Manipulator already registered:" f" {manipulator_id}\n") - return "Manipulator already registered" - - try: - # Register manipulator - self._register_manipulator(manipulator_id) - except ValueError as ve: - # Manipulator not found in UMP - print(f"[ERROR]\t\t Manipulator not found: {manipulator_id}: {ve}\n") - return "Manipulator not found" - except Exception as e: - # Other error - print(f"[ERROR]\t\t Registering manipulator: {manipulator_id}") - print(f"{type(e)}: {e}\n") - return "Error registering manipulator" - else: - com.dprint(f"[SUCCESS]\t Registered manipulator: {manipulator_id}\n") - return "" - - def unregister_manipulator(self, manipulator_id: str) -> str: - """Unregister a manipulator. - - :param manipulator_id: The ID of the manipulator to unregister. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - # Check if manipulator is not registered - if manipulator_id not in self.manipulators: - print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n") - return "Manipulator not registered" - - try: - # Unregister manipulator - self._unregister_manipulator(manipulator_id) - except Exception as e: - # Other error - print(f"[ERROR]\t\t Unregistering manipulator: {manipulator_id}") - print(f"{e}\n") - return "Error unregistering manipulator" - else: - com.dprint(f"[SUCCESS]\t Unregistered manipulator: {manipulator_id}\n") - return "" - - def get_pos(self, manipulator_id: str) -> PositionalResponse: - """Get the current position of a manipulator. - - :param manipulator_id: The ID of the manipulator to get the position of. - :type manipulator_id: str - :return: Positional information for the manipulator and error message (if any). - :rtype: :class:`vbl_aquarium.models.ephys_link.PositionalResponse` - """ - try: - # Check calibration status. - if ( - hasattr(self.manipulators[manipulator_id], "get_calibrated") - and not self.manipulators[manipulator_id].get_calibrated() - ): - print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n") - return PositionalResponse(error="Manipulator not calibrated") - - # Get position and convert to unified space. - manipulator_pos = self._get_pos(manipulator_id) - - # Shortcut return for Pathfinder. - if self.num_axes == -1: - return manipulator_pos - - # Convert position to unified space. - return manipulator_pos.model_copy( - update={"position": self._platform_space_to_unified_space(manipulator_pos.position)} - ) - except KeyError: - # Manipulator not found in registered manipulators. - print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}") - return PositionalResponse(error="Manipulator not registered") - - def get_angles(self, manipulator_id: str) -> AngularResponse: - """Get the current position of a manipulator. - - :param manipulator_id: The ID of the manipulator to get the angles of. - :type manipulator_id: str - :return: Angular information for the manipulator and error message (if any). - :rtype: :class:`vbl_aquarium.models.ephys_link.AngularResponse` - """ - try: - # Check calibration status - if ( - hasattr(self.manipulators[manipulator_id], "get_calibrated") - and not self.manipulators[manipulator_id].get_calibrated() - ): - print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n") - return AngularResponse(error="Manipulator not calibrated") - - # Get position - return self._get_angles(manipulator_id) - - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}") - return AngularResponse(error="Manipulator not registered") - - def get_shank_count(self, manipulator_id: str) -> ShankCountResponse: - """Get the number of shanks on the probe - - :param manipulator_id: The ID of the manipulator to get the number of shanks of. - :type manipulator_id: str - :return: Number of shanks on the probe. - :rtype: :class:`vbl_aquarium.models.ephys_link.ShankCountResponse` - """ - return self._get_shank_count(manipulator_id) - - async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - """Move manipulator to position - - :param request: The goto request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` - :return: Resulting position of the manipulator and error message (if any). - :rtype: :class:`vbl_aquarium.models.ephys_link.PositionalResponse` - """ - try: - # Check calibration status. - if not self.manipulators[request.manipulator_id].get_calibrated(): - print(f"[ERROR]\t\t Calibration not complete: {request.manipulator_id}\n") - return PositionalResponse(error="Manipulator not calibrated") - - # Check write state. - if not self.manipulators[request.manipulator_id].get_can_write(): - print(f"[ERROR]\t\t Cannot write to manipulator: {request.manipulator_id}") - return PositionalResponse(error="Cannot write to manipulator") - - # Convert position to platform space, move, and convert final position back to - # unified space. - end_position = await self._goto_pos( - request.model_copy(update={"position": self._unified_space_to_platform_space(request.position)}) - ) - return end_position.model_copy( - update={"position": self._platform_space_to_unified_space(end_position.position)} - ) - except KeyError: - # Manipulator not found in registered manipulators. - print(f"[ERROR]\t\t Manipulator not registered: {request.manipulator_id}\n") - return PositionalResponse(error="Manipulator not registered") - - async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - """Drive manipulator to depth - - :param request: The drive to depth request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` - :return: Resulting depth of the manipulator and error message (if any). - :rtype: :class:`ephys_link.common.DriveToDepthOutputData` - """ - try: - # Check calibration status - if not self.manipulators[request.manipulator_id].get_calibrated(): - print(f"[ERROR]\t\t Calibration not complete: {request.manipulator_id}\n") - return DriveToDepthResponse(error="Manipulator not calibrated") - - # Check write state - if not self.manipulators[request.manipulator_id].get_can_write(): - print(f"[ERROR]\t\t Cannot write to manipulator: {request.manipulator_id}") - return DriveToDepthResponse(error="Cannot write to manipulator") - - end_depth = await self._drive_to_depth( - request.model_copy(update={"depth": self._unified_space_to_platform_space(Vector4(w=request.depth)).w}) - ) - return end_depth.model_copy( - update={"depth": self._platform_space_to_unified_space(Vector4(w=end_depth.depth)).w} - ) - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator not registered: {request.manipulator_id}\n") - return DriveToDepthResponse(error="Manipulator not registered") - - def set_inside_brain(self, request: InsideBrainRequest) -> BooleanStateResponse: - """Set manipulator inside brain state (restricts motion) - - :param request: The inside brain request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.InsideBrainRequest` - :return: New inside brain state of the manipulator and error message (if any). - :rtype: :class:`vbl_aquarium.models.ephys_link.BooleanStateResponse` - """ - try: - # Check calibration status - if ( - hasattr(self.manipulators[request.manipulator_id], "get_calibrated") - and not self.manipulators[request.manipulator_id].get_calibrated() - ): - print("[ERROR]\t\t Calibration not complete\n") - return BooleanStateResponse(error="Manipulator not calibrated") - - return self._set_inside_brain(request) - - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator {request.manipulator_id} not registered\n") - return BooleanStateResponse(error="Manipulator not " "registered") - - except Exception as e: - # Other error - print(f"[ERROR]\t\t Set manipulator {request.manipulator_id} inside brain " f"state") - print(f"{e}\n") - return BooleanStateResponse(error="Error setting inside brain") - - async def calibrate(self, manipulator_id: str, sio: AsyncClient | AsyncServer) -> str: - """Calibrate manipulator - - :param manipulator_id: ID of manipulator to calibrate - :type manipulator_id: str - :param sio: SocketIO object (to call sleep) - :type sio: :class:`socketio.AsyncServer` - :return: Error message on error, empty string otherwise. - :rtype: str - """ - try: - # Check write state - if not self.manipulators[manipulator_id].get_can_write(): - print(f"[ERROR]\t\t Cannot write to manipulator: {manipulator_id}") - return "Cannot write to manipulator" - - return await self._calibrate(manipulator_id, sio) - - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator {manipulator_id} not registered\n") - return "Manipulator not registered" - - except Exception as e: - # Other error - print(f"[ERROR]\t\t Calibrate manipulator {manipulator_id}") - print(f"{e}\n") - return "Error calibrating manipulator" - - def bypass_calibration(self, manipulator_id: str) -> str: - """Bypass calibration of manipulator - - :param manipulator_id: ID of manipulator to bypass calibration - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - try: - # Bypass calibration - return self._bypass_calibration(manipulator_id) - - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator {manipulator_id} not registered\n") - return "Manipulator not registered" - - except Exception as e: - # Other error - print(f"[ERROR]\t\t Bypass calibration of manipulator {manipulator_id}") - print(f"{e}\n") - return "Error bypassing calibration" - - def set_can_write( - self, - request: CanWriteRequest, - ) -> BooleanStateResponse: - """Set manipulator can_write state (enables/disabled moving manipulator) - - :param request: The can write request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` - :return: New can_write state of the manipulator and error message (if any). - :rtype: :class:`ephys_link.common.StateOutputData` - """ - try: - return self._set_can_write(request) - except KeyError: - # Manipulator not found in registered manipulators - print(f"[ERROR]\t\t Manipulator not registered: {request.manipulator_id}\n") - return BooleanStateResponse(error="Manipulator not registered") - except Exception as e: - # Other error - print(f"[ERROR]\t\t Set manipulator {request.manipulator_id} can_write state") - print(f"{e}\n") - return BooleanStateResponse(error="Error setting can_write") - - # Platform specific methods to override - - @abstractmethod - def _get_manipulators(self) -> list: - raise NotImplementedError - - @abstractmethod - def _register_manipulator(self, manipulator_id: str) -> None: - raise NotImplementedError - - @abstractmethod - def _unregister_manipulator(self, manipulator_id: str) -> None: - raise NotImplementedError - - @abstractmethod - def _get_pos(self, manipulator_id: str) -> PositionalResponse: - raise NotImplementedError - - @abstractmethod - def _get_angles(self, manipulator_id: str) -> AngularResponse: - raise NotImplementedError - - @abstractmethod - def _get_shank_count(self, manipulator_id: str) -> ShankCountResponse: - raise NotImplementedError - - @abstractmethod - async def _goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - raise NotImplementedError - - @abstractmethod - async def _drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - raise NotImplementedError - - @abstractmethod - def _set_inside_brain(self, request: InsideBrainRequest) -> BooleanStateResponse: - raise NotImplementedError - - @abstractmethod - async def _calibrate(self, manipulator_id: str, sio: AsyncClient | AsyncServer) -> str: - """Calibrate manipulator - - :param manipulator_id: ID of manipulator to calibrate - :type manipulator_id: str - :param sio: SocketIO object (to call sleep) - :type sio: :class:`socketio.AsyncServer` - :return: Callback parameters (manipulator ID, error message) - :rtype: str - """ - - @abstractmethod - def _bypass_calibration(self, manipulator_id: str) -> str: - raise NotImplementedError - - @abstractmethod - def _set_can_write(self, request: CanWriteRequest) -> BooleanStateResponse: - raise NotImplementedError - - @abstractmethod - def _platform_space_to_unified_space(self, platform_position: Vector4) -> Vector4: - """Convert position in platform space to position in unified manipulator space - - :param platform_position: Position in platform space (x, y, z, w) in mm - :type platform_position: Vector4 - :return: Position in unified manipulator space (x, y, z, w) in mm - :rtype: Vector4 - """ - raise NotImplementedError - - @abstractmethod - def _unified_space_to_platform_space(self, unified_position: Vector4) -> Vector4: - """Convert position in unified manipulator space to position in platform space - - :param unified_position: Position in unified manipulator space (x, y, z, w) in mm - :type unified_position: Vector4 - :return: Position in platform space (x, y, z, w) in mm - :rtype: Vector4 - """ - raise NotImplementedError diff --git a/src/ephys_link/platform_manipulator.py b/src/ephys_link/platform_manipulator.py deleted file mode 100644 index 58f160b..0000000 --- a/src/ephys_link/platform_manipulator.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Defines what the properties and required functionality of a manipulator are. - -Most functionality will be implemented on the platform handler side. This is mostly -for enforcing implementation of the stop method and hold common properties. -""" - -from abc import ABC, abstractmethod - -# Constants -MM_TO_UM = 1000 -HOURS_TO_SECONDS = 3600 -POSITION_POLL_DELAY = 0.1 - - -class PlatformManipulator(ABC): - """An abstract class that defines the interface for a manipulator.""" - - def __init__(self): - """Initialize manipulator.""" - - self._id = None - self._movement_tolerance = 0.001 - self._calibrated = False - self._inside_brain = False - self._can_write = False - self._reset_timer = None - self._is_moving = False - - @abstractmethod - def stop(self) -> None: - """Stop all axes on manipulator - - :returns None - """ - raise NotImplementedError diff --git a/src/ephys_link/platforms/__init__.py b/src/ephys_link/platforms/__init__.py deleted file mode 100644 index e18438c..0000000 --- a/src/ephys_link/platforms/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -import clr - -# Load New Scale API -# noinspection PyUnresolvedReferences -clr.AddReference("ephys_link/resources/NstMotorCtrl") diff --git a/src/ephys_link/platforms/new_scale_handler.py b/src/ephys_link/platforms/new_scale_handler.py deleted file mode 100644 index 5172286..0000000 --- a/src/ephys_link/platforms/new_scale_handler.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Handle communications with New Scale API - -Implements New Scale specific API calls. - -This is a subclass of :class:`ephys_link.platform_handler.PlatformHandler`. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -# noinspection PyUnresolvedReferences -from NstMotorCtrl import NstCtrlHostIntf -from vbl_aquarium.models.ephys_link import ( - AngularResponse, - BooleanStateResponse, - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - InsideBrainRequest, - PositionalResponse, - ShankCountResponse, -) -from vbl_aquarium.models.unity import Vector4 - -from ephys_link import common as com -from ephys_link.platform_handler import PlatformHandler -from ephys_link.platforms.new_scale_manipulator import NewScaleManipulator - -if TYPE_CHECKING: - import socketio - - -class NewScaleHandler(PlatformHandler): - """Handler for New Scale platform""" - - def __init__(self) -> None: - """Initialize New Scale handler""" - super().__init__() - - self.num_axes = 3 - self.dimensions = Vector4(x=15, y=15, z=15, w=0) - - self.ctrl = NstCtrlHostIntf() - - # Connect manipulators and initialize - self.ctrl.ShowProperties() - self.ctrl.Initialize() - - def _get_manipulators(self) -> list: - return list(map(str, range(self.ctrl.PortCount))) - - def _register_manipulator(self, manipulator_id: str) -> None: - # Check if ID is numeric - if not manipulator_id.isnumeric(): - msg = "Manipulator ID must be numeric" - raise ValueError(msg) - - # Check if ID is connected - if manipulator_id not in self._get_manipulators(): - msg = f"Manipulator {manipulator_id} not connected" - raise ValueError(msg) - - # Check if there are enough axes - if int(manipulator_id) * 3 + 2 >= self.ctrl.AxisCount: - msg = f"Manipulator {manipulator_id} has no axes" - raise ValueError(msg) - - # Register manipulator - first_axis_index = int(manipulator_id) * 3 - self.manipulators[manipulator_id] = NewScaleManipulator( - manipulator_id, - self.ctrl.GetAxis(first_axis_index), - self.ctrl.GetAxis(first_axis_index + 1), - self.ctrl.GetAxis(first_axis_index + 2), - ) - - def _unregister_manipulator(self, manipulator_id: str) -> None: - del self.manipulators[manipulator_id] - - def _get_pos(self, manipulator_id: str) -> PositionalResponse: - return self.manipulators[manipulator_id].get_pos() - - def _get_angles(self, manipulator_id: str) -> AngularResponse: - raise NotImplementedError - - def _get_shank_count(self, manipulator_id: str) -> ShankCountResponse: - raise NotImplementedError - - async def _goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - return await self.manipulators[request.manipulator_id].goto_pos(request) - - async def _drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - return await self.manipulators[request.manipulator_id].drive_to_depth(request) - - def _set_inside_brain(self, request: InsideBrainRequest) -> BooleanStateResponse: - self.manipulators[request.manipulator_id].set_inside_brain(request.inside) - com.dprint(f"[SUCCESS]\t Set inside brain state for manipulator: {request.manipulator_id}\n") - return BooleanStateResponse(state=request.inside) - - async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: - return "" if self.manipulators[manipulator_id].calibrate() else "Error calling calibrate" - - def _bypass_calibration(self, manipulator_id: str) -> str: - self.manipulators[manipulator_id].set_calibrated() - com.dprint(f"[SUCCESS]\t Bypassed calibration for manipulator" f" {manipulator_id}\n") - return "" - - def _set_can_write(self, request: CanWriteRequest) -> BooleanStateResponse: - self.manipulators[request.manipulator_id].set_can_write(request) - com.dprint(f"[SUCCESS]\t Set can_write state for manipulator {request.manipulator_id}\n") - return BooleanStateResponse(state=request.can_write) - - def _platform_space_to_unified_space(self, platform_position: Vector4) -> Vector4: - # unified <- platform - # +x <- -x - # +y <- +z - # +z <- +y - # +d <- -d - - return Vector4( - x=self.dimensions.x - platform_position.x, - y=platform_position.z, - z=platform_position.y, - w=self.dimensions.z - platform_position.w, - ) - - def _unified_space_to_platform_space(self, unified_position: Vector4) -> Vector4: - # platform <- unified - # +x <- -x - # +y <- +z - # +z <- +y - # +d <- -d - - return Vector4( - x=self.dimensions.x - unified_position.x, - y=unified_position.z, - z=unified_position.y, - w=self.dimensions.z - unified_position.w, - ) diff --git a/src/ephys_link/platforms/new_scale_manipulator.py b/src/ephys_link/platforms/new_scale_manipulator.py deleted file mode 100644 index de6b4db..0000000 --- a/src/ephys_link/platforms/new_scale_manipulator.py +++ /dev/null @@ -1,312 +0,0 @@ -"""New Scale Manipulator class - -Handles logic for calling New Scale API functions. Also includes extra logic for safe -function calls, error handling, managing per-manipulator attributes, and returning the -appropriate callback parameters like in :mod:`ephys_link.new_scale_handler`. -""" - -from __future__ import annotations - -import asyncio -import threading -from typing import TYPE_CHECKING - -from vbl_aquarium.models.ephys_link import ( - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - PositionalResponse, -) -from vbl_aquarium.models.unity import Vector4 - -import ephys_link.common as com -from ephys_link.common import vector4_to_array -from ephys_link.platform_manipulator import ( - HOURS_TO_SECONDS, - MM_TO_UM, - POSITION_POLL_DELAY, - PlatformManipulator, -) - -if TYPE_CHECKING: - # noinspection PyUnresolvedReferences - from NstMotorCtrl import NstCtrlAxis - -# Constants -ACCELERATION_MULTIPLIER = 5 -CUTOFF_MULTIPLIER = 0.005 -AT_TARGET_FLAG = 0x040000 - - -class NewScaleManipulator(PlatformManipulator): - def __init__( - self, - manipulator_id: str, - x_axis: NstCtrlAxis, - y_axis: NstCtrlAxis, - z_axis: NstCtrlAxis, - ) -> None: - """Construct a new Manipulator object - - :param manipulator_id: Manipulator ID - :param x_axis: X axis object - :param y_axis: Y axis object - :param z_axis: Z axis object - """ - - super().__init__() - self._id = manipulator_id - self._movement_tolerance = 0.01 - self._x = x_axis - self._y = y_axis - self._z = z_axis - self._axes = [self._x, self._y, self._z] - - # Set to CL control - self._x.SetCL_Enable(True) - self._y.SetCL_Enable(True) - self._z.SetCL_Enable(True) - - def query_all_axes(self): - """Query all axes for their position and status""" - for axis in self._axes: - axis.QueryPosStatus() - - def get_pos(self) -> PositionalResponse: - """Get the current position of the manipulator and convert it into mm. - - :return: Position of manipulator in (x, y, z, z) in mm (or an empty array on error) and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - self.query_all_axes() - - # Get position data and convert from ยตm to mm - try: - # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") - return PositionalResponse( - position=Vector4( - x=self._x.CurPosition / MM_TO_UM, - y=self._y.CurPosition / MM_TO_UM, - z=self._z.CurPosition / MM_TO_UM, - w=self._z.CurPosition / MM_TO_UM, - ) - ) - except Exception as e: - print(f"[ERROR]\t\t Getting position of manipulator {self._id}") - print(f"{e}\n") - return PositionalResponse(error="Error getting position") - - async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - """Move manipulator to position. - - :param request: The goto request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` - :return: Resulting position of manipulator in (x, y, z, z) in mm (or an empty array on error) - and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - # Check if able to write - if not self._can_write: - print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Stop current movement - if self._is_moving: - for axis in self._axes: - axis.Stop() - self._is_moving = False - - try: - target_position_um = request.position * MM_TO_UM - - # Restrict target position to just z-axis if inside brain - if self._inside_brain: - z_axis = target_position_um.z - target_position_um = target_position_um.model_copy( - update={**self.get_pos().position.model_dump(), "z": z_axis} - ) - - # Mark movement as started - self._is_moving = True - - # Send move command - speed_um = request.speed * MM_TO_UM - for i in range(3): - self._axes[i].SetCL_Speed( - speed_um, - speed_um * ACCELERATION_MULTIPLIER, - speed_um * CUTOFF_MULTIPLIER, - ) - self._axes[i].MoveAbsolute(vector4_to_array(target_position_um)[i]) - - # Check and wait for completion (while able to write) - self.query_all_axes() - while ( - not (self._x.CurStatus & AT_TARGET_FLAG) - or not (self._y.CurStatus & AT_TARGET_FLAG) - or not (self._z.CurStatus & AT_TARGET_FLAG) - ) and self._can_write: - await asyncio.sleep(POSITION_POLL_DELAY) - self.query_all_axes() - - # Get position - final_position = self.get_pos().position - - # Mark movement as finished - self._is_moving = False - - # Return success unless write was disabled during movement (meaning a stop occurred) - if not self._can_write: - com.dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Return error if movement did not reach target. - if not all( - abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position)[:3] - ): - com.dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position.") - com.dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") - return PositionalResponse(error="Manipulator did not reach target position") - - # Made it to the target. - com.dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_position}\n") - return PositionalResponse(position=final_position) - except Exception as e: - print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") - print(f"{e}\n") - return PositionalResponse(error="Error moving manipulator") - - async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - """Drive the manipulator to a certain depth. - - :param request: The drive to depth request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` - :return: Resulting depth of manipulator in mm (or 0 on error) and error message (if any). - :rtype: :class:`ephys_link.common.DriveToDepthOutputData` - """ - # Check if able to write - if not self._can_write: - print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return DriveToDepthResponse(error="Manipulator movement canceled") - - # Stop current movement - if self._is_moving: - for axis in self._axes: - axis.Stop() - self._is_moving = False - - try: - target_depth_um = request.depth * MM_TO_UM - - # Mark movement as started - self._is_moving = True - - # Send move command to just z axis - speed_um = request.speed * MM_TO_UM - self._z.SetCL_Speed( - speed_um, - speed_um * ACCELERATION_MULTIPLIER, - speed_um * CUTOFF_MULTIPLIER, - ) - self._z.MoveAbsolute(target_depth_um) - - # Check and wait for completion (while able to write) - self._z.QueryPosStatus() - while not (self._z.CurStatus & AT_TARGET_FLAG) and self._can_write: - await asyncio.sleep(0.1) - self._z.QueryPosStatus() - - # Get position - final_depth = self.get_pos().position.w - - # Mark movement as finished - self._is_moving = False - - # Return success unless write was disabled during movement (meaning a stop occurred) - if not self._can_write: - com.dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return DriveToDepthResponse(error="Manipulator movement canceled") - - # Return error if movement did not reach target. - if not abs(final_depth - request.depth) < self._movement_tolerance: - com.dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target depth") - com.dprint(f"\t\t\t Expected: {request.depth}, Got: {final_depth}") - return DriveToDepthResponse(error="Manipulator did not reach target depth") - - # Made it to the target. - com.dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_depth}\n") - return DriveToDepthResponse(depth=final_depth) - except Exception as e: - print(f"[ERROR]\t\t Moving manipulator {self._id} to depth {request.depth}") - print(f"{e}\n") - # Return 0 and error message on failure - return DriveToDepthResponse(error="Error driving manipulator") - - def calibrate(self) -> bool: - """Calibrate the manipulator. - - :return: None - """ - return self._x.CalibrateFrequency() and self._y.CalibrateFrequency() and self._z.CalibrateFrequency() - - def get_calibrated(self) -> bool: - """Return the calibration state of the manipulator. - - :return: True if the manipulator is calibrated, False otherwise. - :rtype: bool - """ - return self._calibrated - - def set_calibrated(self) -> None: - """Set the manipulator to calibrated - - :return: None - """ - self._calibrated = True - - def set_can_write(self, request: CanWriteRequest) -> None: - """Set if the manipulator can move. - - :param request: The can write request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` - :return: None - """ - self._can_write = request.can_write - - if request.can_write and request.hours > 0: - if self._reset_timer: - self._reset_timer.cancel() - self._reset_timer = threading.Timer(request.hours * HOURS_TO_SECONDS, self.reset_can_write) - self._reset_timer.start() - - def reset_can_write(self) -> None: - """Reset the :attr:`can_write` flag.""" - self._can_write = False - - def get_can_write(self) -> bool: - """Return if the manipulator can move. - - :return: True if the manipulator can move, False otherwise. - :rtype: bool - """ - return self._can_write - - def set_inside_brain(self, inside: bool) -> None: - """Set if the manipulator is inside the brain. - - :param inside: True if the manipulator is inside the brain, False otherwise. - :type inside: bool - :return: None - """ - self._inside_brain = inside - - def stop(self) -> None: - """Stop all axes on manipulator. - - :returns None - """ - for axis in self._axes: - axis.Stop() - self._can_write = False - com.dprint(f"[SUCCESS]\t Stopped manipulator {self._id}\n") diff --git a/src/ephys_link/platforms/new_scale_pathfinder_handler.py b/src/ephys_link/platforms/new_scale_pathfinder_handler.py deleted file mode 100644 index e81676c..0000000 --- a/src/ephys_link/platforms/new_scale_pathfinder_handler.py +++ /dev/null @@ -1,235 +0,0 @@ -"""Handle communications with New Scale's HTTP server - -Implements New Scale specific API calls. - -This is a subclass of :class:`ephys_link.platform_handler.PlatformHandler`. -""" - -from __future__ import annotations - -import json -from sys import exit -from typing import TYPE_CHECKING -from urllib import request -from urllib.error import URLError - -from vbl_aquarium.models.ephys_link import ( - AngularResponse, - BooleanStateResponse, - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - InsideBrainRequest, - PositionalResponse, - ShankCountResponse, -) -from vbl_aquarium.models.unity import Vector3, Vector4 - -from ephys_link.platform_handler import PlatformHandler - -if TYPE_CHECKING: - import socketio - - -class NewScalePathfinderHandler(PlatformHandler): - """Handler for New Scale HTTP server""" - - # Valid New Scale manipulator IDs - VALID_MANIPULATOR_IDS = ( - "A", - "B", - "C", - "D", - "E", - "F", - "G", - "H", - "I", - "J", - "K", - "L", - "M", - "N", - "O", - "P", - "Q", - "R", - "S", - "T", - "U", - "V", - "W", - "X", - "Y", - "Z", - "AA", - "AB", - "AC", - "AD", - "AE", - "AF", - "AG", - "AH", - "AI", - "AJ", - "AK", - "AL", - "AM", - "AN", - ) - - def __init__(self, port: int = 8080) -> None: - """ - Initialize New Scale via Pathfinder handler - - :param port: Port of New Scale Pathfinder HTTP server - :type port: int - """ - super().__init__() - - self.num_axes = -1 - self.dimensions = Vector4(x=15, y=15, z=15, w=0) - - self.port = port - - # Test connection to New Scale HTTP server - try: - request.urlopen(f"http://localhost:{self.port}") - except URLError: - print(f"New Scale Pathfinder HTTP server not online on port {self.port}") - print("Please start the HTTP server and try again.") - input("Press Enter to exit...") - exit(1) - - def query_data(self) -> dict: - """Query New Scale HTTP server for data and return as dict. - - :return: Parsed JSON data from New Scale HTTP server. - :rtype: dict - """ - try: - return json.loads(request.urlopen(f"http://localhost:{self.port}").read()) - except Exception as e: - print(f"[ERROR]\t\t Unable to query for New Scale data: {type(e)} {e}\n") - - def query_manipulator_data(self, manipulator_id: str) -> dict: - """Query New Scale HTTP server for data on a specific manipulator. - - :param manipulator_id: manipulator ID. - :return: Parsed JSON data for a particular manipulator. - :rtype: dict - :raises ValueError: if manipulator ID is not found in query. - """ - data_query = self.query_data()["ProbeArray"] - manipulator_data = data_query[self.manipulators[manipulator_id]] - - # If the order of the manipulators switched (somehow) - if manipulator_data["Id"] != manipulator_id: - # Recalculate index and get data - (manipulator_index, manipulator_data) = next( - ( - (index, data) - for index, data in enumerate(self.query_data()["ProbeArray"]) - if data["Id"] == manipulator_id - ), - (None, None), - ) - # Update index in manipulators dict - if manipulator_index: - self.manipulators[manipulator_id] = manipulator_index - - # If data query was unsuccessful - if not manipulator_data: - msg = f"Unable to find manipulator {manipulator_id}" - raise ValueError(msg) - - # Return data - return manipulator_data - - def _get_manipulators(self) -> list: - return [probe["Id"] for probe in self.query_data()["ProbeArray"]] - - def _register_manipulator(self, manipulator_id: str) -> None: - # Check if ID is a valid New Scale manipulator ID - if manipulator_id not in self.VALID_MANIPULATOR_IDS: - msg = f"Invalid manipulator ID {manipulator_id}" - raise ValueError(msg) - - # Check if ID is connected - if manipulator_id not in self._get_manipulators(): - msg = f"Manipulator {manipulator_id} not connected" - raise ValueError(msg) - - # Get index of the manipulator - manipulator_index = next( - (index for index, data in enumerate(self.query_data()["ProbeArray"]) if data["Id"] == manipulator_id), - None, - ) - if manipulator_index is None: - msg = f"Unable to find manipulator {manipulator_id}" - raise ValueError(msg) - self.manipulators[manipulator_id] = manipulator_index - - def _unregister_manipulator(self, manipulator_id: str) -> None: - del self.manipulators[manipulator_id] - - def _get_pos(self, manipulator_id: str) -> PositionalResponse: - """Get the current position of the manipulator in mm - - :param manipulator_id: manipulator ID - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error) in mm, error message) - """ - manipulator_data = self.query_manipulator_data(manipulator_id) - - return PositionalResponse( - position=Vector4( - x=manipulator_data["Tip_X_ML"], y=manipulator_data["Tip_Y_AP"], z=manipulator_data["Tip_Z_DV"], w=0 - ) - ) - - def _get_angles(self, manipulator_id: str) -> AngularResponse: - manipulator_data = self.query_manipulator_data(manipulator_id) - - # Apply PosteriorAngle to Polar to get the correct angle. - adjusted_polar = manipulator_data["Polar"] - self.query_data()["PosteriorAngle"] - - return AngularResponse( - angles=Vector3( - x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar, - y=manipulator_data["Pitch"], - z=manipulator_data.get("ShankOrientation", 0), - ) - ) - - def _get_shank_count(self, manipulator_id: str) -> ShankCountResponse: - for probe in self.query_data()["ProbeArray"]: - if probe["Id"] == manipulator_id: - return ShankCountResponse(shank_count=probe.get("ShankCount", 1)) - - return ShankCountResponse(error="Unable to find manipulator") - - async def _goto_pos(self, _: GotoPositionRequest) -> PositionalResponse: - raise NotImplementedError - - async def _drive_to_depth(self, _: DriveToDepthRequest) -> DriveToDepthResponse: - raise NotImplementedError - - def _set_inside_brain(self, _: InsideBrainRequest) -> BooleanStateResponse: - raise NotImplementedError - - async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: - raise NotImplementedError - - def _bypass_calibration(self, manipulator_id: str) -> str: - return "" - - def _set_can_write(self, _: CanWriteRequest) -> BooleanStateResponse: - raise NotImplementedError - - def _unified_space_to_platform_space(self, unified_position: list[float]) -> list[float]: - raise NotImplementedError - - def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]: - raise NotImplementedError diff --git a/src/ephys_link/platforms/sensapex_handler.py b/src/ephys_link/platforms/sensapex_handler.py deleted file mode 100644 index 66825a1..0000000 --- a/src/ephys_link/platforms/sensapex_handler.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Handle communications with Sensapex uMp API - -This supports the uMp-4 manipulator. Any Sensapex variants should extend this class. - -Implements Sensapex uMp specific API calls including coordinating the usage of the -:class:`ephys_link.platforms.sensapex_manipulator.SensapexManipulator` class. - -This is a subclass of :class:`ephys_link.platform_handler.PlatformHandler`. -""" - -from __future__ import annotations - -from pathlib import Path -from typing import TYPE_CHECKING - -from sensapex import UMP, UMError -from vbl_aquarium.models.ephys_link import ( - AngularResponse, - BooleanStateResponse, - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - InsideBrainRequest, - PositionalResponse, - ShankCountResponse, -) -from vbl_aquarium.models.unity import Vector4 - -import ephys_link.common as com -from ephys_link.platform_handler import PlatformHandler -from ephys_link.platforms.sensapex_manipulator import SensapexManipulator - -if TYPE_CHECKING: - import socketio - - -class SensapexHandler(PlatformHandler): - """Handler for Sensapex platform.""" - - def __init__(self) -> None: - super().__init__() - - # Establish connection to Sensapex API (exit if connection fails) - UMP.set_library_path(str(Path(__file__).parent.parent.absolute()) + "/resources/") - self.ump = UMP.get_ump() - if self.ump is None: - msg = "Unable to connect to uMp" - raise ValueError(msg) - - def _get_manipulators(self) -> list: - return list(map(str, self.ump.list_devices())) - - def _register_manipulator(self, manipulator_id: str) -> None: - if not manipulator_id.isnumeric(): - msg = "Manipulator ID must be numeric" - raise ValueError(msg) - - self.manipulators[manipulator_id] = SensapexManipulator(self.ump.get_device(int(manipulator_id))) - - def _unregister_manipulator(self, manipulator_id: str) -> None: - del self.manipulators[manipulator_id] - - def _get_pos(self, manipulator_id: str) -> PositionalResponse: - return self.manipulators[manipulator_id].get_pos() - - def _get_angles(self, manipulator_id: str) -> AngularResponse: - raise NotImplementedError - - def _get_shank_count(self, manipulator_id: str) -> ShankCountResponse: - raise NotImplementedError - - async def _goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - return await self.manipulators[request.manipulator_id].goto_pos(request) - - async def _drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - return await self.manipulators[request.manipulator_id].drive_to_depth(request) - - def _set_inside_brain(self, request: InsideBrainRequest) -> BooleanStateResponse: - self.manipulators[request.manipulator_id].set_inside_brain(request.inside) - com.dprint(f"[SUCCESS]\t Set inside brain state for manipulator: {request.manipulator_id}\n") - return BooleanStateResponse(state=request.inside) - - async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: - try: - # Move manipulator to max position - await self.manipulators[manipulator_id].goto_pos([20000, 20000, 20000, 20000], 2000) - - # Call calibrate - self.manipulators[manipulator_id].call_calibrate() - - # Wait for calibration to complete - still_working = True - while still_working: - cur_pos = self.manipulators[manipulator_id].get_pos()["position"] - - # Check difference between current and target position - for prev, cur in zip([10000, 10000, 10000, 10000], cur_pos, strict=False): - if abs(prev - cur) > 1: - still_working = True - break - still_working = False - - # Sleep for a bit - await sio.sleep(0.5) - - # Calibration complete - self.manipulators[manipulator_id].set_calibrated() - com.dprint(f"[SUCCESS]\t Calibrated manipulator {manipulator_id}\n") - except UMError as e: - # SDK call error - print(f"[ERROR]\t\t Calling calibrate manipulator {manipulator_id}") - print(f"{e}\n") - return "Error calling calibrate" - else: - return "" - - def _bypass_calibration(self, manipulator_id: str) -> str: - self.manipulators[manipulator_id].set_calibrated() - com.dprint(f"[SUCCESS]\t Bypassed calibration for manipulator" f" {manipulator_id}\n") - return "" - - def _set_can_write(self, request: CanWriteRequest) -> BooleanStateResponse: - self.manipulators[request.manipulator_id].set_can_write(request) - com.dprint(f"[SUCCESS]\t Set can_write state for manipulator {request.manipulator_id}\n") - return BooleanStateResponse(state=request.can_write) - - def _platform_space_to_unified_space(self, platform_position: Vector4) -> Vector4: - # unified <- platform - # +x <- +y - # +y <- -z - # +z <- +x - # +d <- +d - - return Vector4( - x=platform_position.y, - y=self.dimensions.z - platform_position.z, - z=platform_position.x, - w=platform_position.w, - ) - - def _unified_space_to_platform_space(self, unified_position: Vector4) -> Vector4: - # platform <- unified - # +x <- +z - # +y <- +x - # +z <- -y - # +d <- +d - - return Vector4( - x=unified_position.z, y=unified_position.x, z=self.dimensions.z - unified_position.y, w=unified_position.w - ) diff --git a/src/ephys_link/platforms/sensapex_manipulator.py b/src/ephys_link/platforms/sensapex_manipulator.py deleted file mode 100644 index 69139e5..0000000 --- a/src/ephys_link/platforms/sensapex_manipulator.py +++ /dev/null @@ -1,227 +0,0 @@ -"""Sensapex Manipulator class - -Handles logic for calling Sensapex API functions. Also includes extra logic for safe -function calls, error handling, managing per-manipulator attributes, and returning the -appropriate callback parameters like in :mod:`ephys_link.sensapex_handler`. -""" - -from __future__ import annotations - -import asyncio -import threading -from typing import TYPE_CHECKING - -from vbl_aquarium.models.ephys_link import ( - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - PositionalResponse, -) -from vbl_aquarium.models.unity import Vector4 - -from ephys_link.common import dprint, vector4_to_array -from ephys_link.platform_manipulator import ( - HOURS_TO_SECONDS, - MM_TO_UM, - POSITION_POLL_DELAY, - PlatformManipulator, -) - -if TYPE_CHECKING: - from sensapex import SensapexDevice - - -class SensapexManipulator(PlatformManipulator): - """Representation of a single Sensapex manipulator - - :param device: A Sensapex device - :type device: :class: `sensapex.SensapexDevice` - """ - - def __init__(self, device: SensapexDevice) -> None: - """Construct a new Manipulator object - - :param device: A Sensapex device - """ - super().__init__() - self._device = device - self._id = device.dev_id - - # Device functions - def get_pos(self) -> PositionalResponse: - """Get the current position of the manipulator and convert it into mm. - - :return: Position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - try: - # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") - return PositionalResponse( - position=Vector4( - **dict( - zip( - Vector4.model_fields.keys(), - [axis / MM_TO_UM for axis in self._device.get_pos(1)], - strict=False, - ) - ) - ) - ) - except Exception as e: - print(f"[ERROR]\t\t Getting position of manipulator {self._id}") - print(f"{e}\n") - return PositionalResponse(error="Error getting position") - - async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - """Move manipulator to position. - - :param request: The goto request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` - :return: Resulting position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - # Check if able to write - if not self._can_write: - print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Stop current movement - if self._is_moving: - self._device.stop() - self._is_moving = False - - try: - target_position_um = request.position * MM_TO_UM - - # Restrict target position to just depth-axis if inside brain - if self._inside_brain: - d_axis = target_position_um.w - target_position_um = target_position_um.model_copy( - update={**self.get_pos().position.model_dump(), "w": d_axis} - ) - - # Mark movement as started - self._is_moving = True - - # Send move command - movement = self._device.goto_pos( - vector4_to_array(target_position_um), - request.speed * MM_TO_UM, - ) - - # Wait for movement to finish - while not movement.finished: - await asyncio.sleep(POSITION_POLL_DELAY) - - # Get position - final_position = self.get_pos().position - - # Mark movement as finished. - self._is_moving = False - - # Return success unless write was disabled during movement (meaning a stop occurred). - if not self._can_write: - dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Return error if movement did not reach target. - if not all( - abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position) - ): - dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position") - dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") - return PositionalResponse(error="Manipulator did not reach target position") - - # Made it to the target. - dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position {final_position}\n") - return PositionalResponse(position=final_position) - except Exception as e: - print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") - print(f"{e}\n") - return PositionalResponse(error="Error moving manipulator") - - async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - """Drive the manipulator to a certain depth. - - :param request: The drive to depth request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` - :return: Resulting depth in mm (or 0 on error) and error message (if any). - :rtype: :class:`ephys_link.common.DriveToDepthOutputData` - """ - # Get position before this movement - target_pos = self.get_pos().position - - target_pos = target_pos.model_copy(update={"w": request.depth}) - movement_result = await self.goto_pos(GotoPositionRequest(**request.model_dump(), position=target_pos)) - return DriveToDepthResponse(depth=movement_result.position.w, error=movement_result.error) - - def set_inside_brain(self, inside: bool) -> None: - """Set if the manipulator is inside the brain. - - Used to signal that the brain should move at :const:`INSIDE_BRAIN_SPEED_LIMIT` - - :param inside: True if the manipulator is inside the brain, False otherwise. - :type inside: bool - :return: None - """ - self._inside_brain = inside - - def get_can_write(self) -> bool: - """Return if the manipulator can move. - - :return: True if the manipulator can move, False otherwise. - :rtype: bool - """ - return self._can_write - - def set_can_write(self, request: CanWriteRequest) -> None: - """Set if the manipulator can move. - - :param request: The can write request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` - :return: None - """ - self._can_write = request.can_write - - if request.can_write and request.hours > 0: - if self._reset_timer: - self._reset_timer.cancel() - self._reset_timer = threading.Timer(request.hours * HOURS_TO_SECONDS, self.reset_can_write) - self._reset_timer.start() - - def reset_can_write(self) -> None: - """Reset the :attr:`can_write` flag.""" - self._can_write = False - - # Calibration - def call_calibrate(self) -> None: - """Calibrate the manipulator. - - :return: None - """ - self._device.calibrate_zero_position() - - def get_calibrated(self) -> bool: - """Return the calibration state of the manipulator. - - :return: True if the manipulator is calibrated, False otherwise. - :rtype: bool - """ - return self._calibrated - - def set_calibrated(self) -> None: - """Set the manipulator to be calibrated. - - :return: None - """ - self._calibrated = True - - def stop(self) -> None: - """Stop the manipulator - - :return: None - """ - self._can_write = False - self._device.stop() - dprint(f"[SUCCESS]\t Stopped manipulator {self._id}") diff --git a/src/ephys_link/platforms/ump3_handler.py b/src/ephys_link/platforms/ump3_handler.py deleted file mode 100644 index 603ad8d..0000000 --- a/src/ephys_link/platforms/ump3_handler.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Handle communications with Sensapex uMp-3 API - -Uses the Sensapex uMp handler which implements for the uMp-4 manipulator -and extends it to support the uMp-3 manipulator. - -This is a subclass of :class:`ephys_link.platforms.sensapex_handler`. -""" - -from __future__ import annotations - -from vbl_aquarium.models.unity import Vector4 - -from ephys_link.platforms.sensapex_handler import SensapexHandler -from ephys_link.platforms.ump3_manipulator import UMP3Manipulator - - -class UMP3Handler(SensapexHandler): - def __init__(self): - super().__init__() - - self.num_axes = 3 - self.dimensions = Vector4(x=20, y=20, z=20, w=0) - - def _register_manipulator(self, manipulator_id: str) -> None: - if not manipulator_id.isnumeric(): - msg = "Manipulator ID must be numeric" - raise ValueError(msg) - - self.manipulators[manipulator_id] = UMP3Manipulator(self.ump.get_device(int(manipulator_id))) - - def _platform_space_to_unified_space(self, platform_position: Vector4) -> Vector4: - # unified <- platform - # +x <- +y - # +y <- -x - # +z <- -z - # +d <- +d/x - - return Vector4( - x=platform_position.y, - y=self.dimensions.x - platform_position.x, - z=self.dimensions.z - platform_position.z, - w=platform_position.w, - ) - - def _unified_space_to_platform_space(self, unified_position: Vector4) -> Vector4: - # platform <- unified - # +x <- -y - # +y <- +x - # +z <- -z - # +d/x <- +d - - return Vector4( - x=self.dimensions.y - unified_position.y, - y=unified_position.x, - z=self.dimensions.z - unified_position.z, - w=unified_position.w, - ) diff --git a/src/ephys_link/platforms/ump3_manipulator.py b/src/ephys_link/platforms/ump3_manipulator.py deleted file mode 100644 index 6f5105e..0000000 --- a/src/ephys_link/platforms/ump3_manipulator.py +++ /dev/null @@ -1,147 +0,0 @@ -"""Sensapex uMp-3 Manipulator class - -Extends from :class:`ephys_link.platforms.sensapex_manipulator.SensapexManipulator` to support the uMp-3 manipulator. -""" - -from __future__ import annotations - -import asyncio -from typing import TYPE_CHECKING - -from vbl_aquarium.models.ephys_link import ( - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - PositionalResponse, -) -from vbl_aquarium.models.unity import Vector4 - -from ephys_link.common import dprint, vector4_to_array -from ephys_link.platform_manipulator import ( - MM_TO_UM, - POSITION_POLL_DELAY, -) -from ephys_link.platforms.sensapex_manipulator import SensapexManipulator - -if TYPE_CHECKING: - from sensapex import SensapexDevice - - -class UMP3Manipulator(SensapexManipulator): - """Representation of a single Sensapex manipulator - - :param device: A Sensapex device - :type device: :class: `sensapex.SensapexDevice` - """ - - def __init__(self, device: SensapexDevice) -> None: - """Construct a new Manipulator object - - :param device: A Sensapex device - """ - super().__init__(device) - - # Device functions - def get_pos(self) -> PositionalResponse: - """Get the current position of the manipulator and convert it into mm. - - :return: Position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - try: - position = [axis / MM_TO_UM for axis in self._device.get_pos(1)] - - # Add the depth axis to the end of the position. - position.append(position[0]) - - # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") - return PositionalResponse( - position=Vector4(**dict(zip(Vector4.model_fields.keys(), position, strict=False))) - ) - except Exception as e: - print(f"[ERROR]\t\t Getting position of manipulator {self._id}") - print(f"{e}\n") - return PositionalResponse(error="Error getting position") - - async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: - """Move manipulator to position. - - :param request: The goto request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` - :return: Resulting position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ - # Check if able to write - if not self._can_write: - print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Stop current movement - if self._is_moving: - self._device.stop() - self._is_moving = False - - try: - target_position_um = request.position * MM_TO_UM - - # Restrict target position to just depth-axis if inside brain - if self._inside_brain: - d_axis = target_position_um.x - target_position_um = target_position_um.model_copy( - update={**self.get_pos().position.model_dump(), "x": d_axis} - ) - - # Mark movement as started - self._is_moving = True - - # Send move command - movement = self._device.goto_pos( - vector4_to_array(target_position_um), - request.speed * MM_TO_UM, - ) - - # Wait for movement to finish - while not movement.finished: - await asyncio.sleep(POSITION_POLL_DELAY) - - # Get position - final_position = self.get_pos().position - - # Mark movement as finished - self._is_moving = False - - # Return success unless write was disabled during movement (meaning a stop occurred) - if not self._can_write: - dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") - return PositionalResponse(error="Manipulator movement canceled") - - # Return error if movement did not reach target. - if not all( - abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position) - ): - dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position") - dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") - return PositionalResponse(error="Manipulator did not reach target position") - - # Made it to the target. - dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_position}\n") - return PositionalResponse(position=final_position) - except Exception as e: - print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") - print(f"{e}\n") - return PositionalResponse(error="Error moving manipulator") - - async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: - """Drive the manipulator to a certain depth. - - :param request: The drive to depth request parsed from the server. - :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` - :return: Resulting depth in mm (or 0 on error) and error message (if any). - :rtype: :class:`ephys_link.common.DriveToDepthOutputData` - """ - # Get position before this movement - target_pos = self.get_pos().position - - target_pos = target_pos.model_copy(update={"x": request.depth}) - movement_result = await self.goto_pos(GotoPositionRequest(**request.model_dump(), position=target_pos)) - return DriveToDepthResponse(depth=movement_result.position.w, error=movement_result.error) diff --git a/src/ephys_link/server.py b/src/ephys_link/server.py deleted file mode 100644 index eb5660c..0000000 --- a/src/ephys_link/server.py +++ /dev/null @@ -1,508 +0,0 @@ -"""WebSocket server and communication handler - -Manages the WebSocket server and handles connections and events from the client. For -every event, the server does the following: - -1. Extract the arguments passed in the event -2. Log that the event was received -3. Call the appropriate function in :mod:`ephys_link.sensapex_handler` with arguments -4. Relay the response from :mod:`ephys_link.sensapex_handler` to the callback function -""" - -from __future__ import annotations - -from asyncio import get_event_loop -from json import loads -from signal import SIGINT, SIGTERM, signal -from typing import TYPE_CHECKING, Any -from uuid import uuid4 - -from aiohttp import ClientConnectionError, ClientSession -from aiohttp.web import Application, run_app -from aiohttp.web_runner import GracefulExit -from packaging.version import parse -from pydantic import ValidationError - -# from socketio import AsyncServer -from socketio import AsyncClient, AsyncServer -from vbl_aquarium.models.ephys_link import ( - BooleanStateResponse, - CanWriteRequest, - DriveToDepthRequest, - DriveToDepthResponse, - GotoPositionRequest, - InsideBrainRequest, - PositionalResponse, -) -from vbl_aquarium.models.proxy import PinpointIdResponse - -from ephys_link.__about__ import __version__ -from ephys_link.common import ( - ASCII, - dprint, -) -from ephys_link.platforms.new_scale_handler import NewScaleHandler -from ephys_link.platforms.new_scale_pathfinder_handler import NewScalePathfinderHandler -from ephys_link.platforms.sensapex_handler import SensapexHandler -from ephys_link.platforms.ump3_handler import UMP3Handler - -if TYPE_CHECKING: - from ephys_link.platform_handler import PlatformHandler - - -class Server: - def __init__(self) -> None: - """Declare and setup server object. Launching is done is a separate function.""" - - # Server object. - self.sio: AsyncClient | AsyncServer | None = None - - # Web application object. - self.app: Application | None = None - - # Proxy server ID. - self.pinpoint_id: str = "" - - # Manipulator platform handler. - self.platform: PlatformHandler | None = None - # Is there a client connected? - self.is_connected = False - - # Is the server running? - self.is_running = False - - # Register server exit handlers. - signal(SIGTERM, self.close_server) - signal(SIGINT, self.close_server) - - # Server events. - async def connect(self, sid, _, __) -> bool: - """Acknowledge connection to the server. - - :param sid: Socket session ID. - :type sid: str - :param _: WSGI formatted dictionary with request info (unused). - :type _: dict - :param __: Authentication details (unused). - :type __: dict - :return: False on error to refuse connection. True otherwise. - :rtype: bool - """ - print(f"[CONNECTION REQUEST]:\t\t {sid}\n") - - if not self.is_connected: - print(f"[CONNECTION GRANTED]:\t\t {sid}\n") - self.is_connected = True - return True - - print(f"[CONNECTION DENIED]:\t\t {sid}: another client is already connected\n") - return False - - async def disconnect(self, sid) -> None: - """Acknowledge disconnection from the server. - - :param sid: Socket session ID. - :type sid: str - :return: None - """ - print(f"[DISCONNECTION]:\t {sid}\n") - - self.platform.reset() - self.is_connected = False - - # Ephys Link Events - - async def get_pinpoint_id(self) -> str: - """Get the pinpoint ID. - - :return: Pinpoint ID and whether the client is a requester. - :rtype: tuple[str, bool] - """ - return PinpointIdResponse(pinpoint_id=self.pinpoint_id, is_requester=False).to_string() - - @staticmethod - async def get_version(_) -> str: - """Get the version number of the server. - - :param _: Socket session ID (unused). - :type _: str - :return: Version number as defined in :mod:`ephys_link.__about__`. - :rtype: str - """ - dprint("[EVENT]\t\t Get version") - - return __version__ - - async def get_manipulators(self, _) -> str: - """Get the list of discoverable manipulators. - - :param _: Socket session ID (unused). - :type _: str - :return: :class:`vbl_aquarium.models.ephys_link.GetManipulatorsResponse` as JSON formatted string. - :rtype: str - """ - dprint("[EVENT]\t\t Get discoverable manipulators") - - return self.platform.get_manipulators().to_string() - - async def register_manipulator(self, _, manipulator_id: str) -> str: - """Register a manipulator with the server. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of the manipulator to register. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - dprint(f"[EVENT]\t\t Register manipulator: {manipulator_id}") - - return self.platform.register_manipulator(manipulator_id) - - async def unregister_manipulator(self, _, manipulator_id: str) -> str: - """Unregister a manipulator from the server. - - :param _: Socket session ID (unused) - :type _: str - :param manipulator_id: ID of the manipulator to unregister. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - dprint(f"[EVENT]\t\t Unregister manipulator: {manipulator_id}") - - return self.platform.unregister_manipulator(manipulator_id) - - async def get_pos(self, _, manipulator_id: str) -> str: - """Position of manipulator request. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of manipulator to pull position from. - :type manipulator_id: str - :return: :class:`vbl_aquarium.models.ephys_link.PositionalResponse` as JSON formatted string. - :rtype: str - """ - # dprint(f"[EVENT]\t\t Get position of manipulator" f" {manipulator_id}") - - return self.platform.get_pos(manipulator_id).to_string() - - async def get_angles(self, _, manipulator_id: str) -> str: - """Angles of manipulator request. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of manipulator to pull angles from. - :type manipulator_id: str - :return: :class:`vbl_aquarium.models.ephys_link.AngularResponse` as JSON formatted string. - :rtype: str - """ - - return self.platform.get_angles(manipulator_id).to_string() - - async def get_shank_count(self, _, manipulator_id: str) -> str: - """Number of shanks of manipulator request. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of manipulator to pull number of shanks from. - :type manipulator_id: str - :return: :class:`vbl_aquarium.models.ephys_link.ShankCountResponse` as JSON formatted string. - :rtype: str - """ - - return self.platform.get_shank_count(manipulator_id).to_string() - - async def goto_pos(self, _, data: str) -> str: - """Move manipulator to position. - - :param _: Socket session ID (unused). - :type _: str - :param data: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` as JSON formatted string. - :type data: str - :return: :class:`vbl_aquarium.models.ephys_link.PositionalResponse` as JSON formatted string. - :rtype: str - """ - try: - request = GotoPositionRequest(**loads(data)) - except ValidationError as ve: - print(f"[ERROR]\t\t Invalid goto_pos data: {data}\n{ve}\n") - return PositionalResponse(error="Invalid data format").to_string() - except Exception as e: - print(f"[ERROR]\t\t Error in goto_pos: {e}\n") - return PositionalResponse(error="Error in goto_pos").to_string() - else: - dprint(f"[EVENT]\t\t Move manipulator {request.manipulator_id} to position {request.position}") - goto_result = await self.platform.goto_pos(request) - return goto_result.to_string() - - async def drive_to_depth(self, _, data: str) -> str: - """Drive to depth. - - :param _: Socket session ID (unused). - :type _: str - :param data: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` as JSON formatted string. - :type data: str - :return: :class:`vbl_aquarium.models.ephys_link.DriveToDepthResponse` as JSON formatted string. - :rtype: str - """ - try: - request = DriveToDepthRequest(**loads(data)) - except KeyError: - print(f"[ERROR]\t\t Invalid drive_to_depth data: {data}\n") - return DriveToDepthResponse(error="Invalid data " "format").to_string() - except Exception as e: - print(f"[ERROR]\t\t Error in drive_to_depth: {e}\n") - return DriveToDepthResponse(error="Error in drive_to_depth").to_string() - else: - dprint(f"[EVENT]\t\t Drive manipulator {request.manipulator_id} to depth {request.depth}") - drive_result = await self.platform.drive_to_depth(request) - return drive_result.to_string() - - async def set_inside_brain(self, _, data: str) -> str: - """Set the inside brain state. - - :param _: Socket session ID (unused). - :type _: str - :param data: :class:`vbl_aquarium.models.ephys_link.InsideBrainRequest` as JSON formatted string. - :type data: str - :return: :class:`vbl_aquarium.models.ephys_link.BooleanStateResponse` as JSON formatted string. - :rtype: str - """ - try: - request = InsideBrainRequest(**loads(data)) - except KeyError: - print(f"[ERROR]\t\t Invalid set_inside_brain data: {data}\n") - return BooleanStateResponse(error="Invalid data format").to_string() - except Exception as e: - print(f"[ERROR]\t\t Error in inside_brain: {e}\n") - return BooleanStateResponse(error="Error in set_inside_brain").to_string() - else: - dprint(f"[EVENT]\t\t Set manipulator {request.manipulator_id} inside brain to {request.inside}") - return self.platform.set_inside_brain(request).to_string() - - async def calibrate(self, _, manipulator_id: str) -> str: - """Calibrate manipulator. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of manipulator to calibrate. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - dprint(f"[EVENT]\t\t Calibrate manipulator" f" {manipulator_id}") - - return await self.platform.calibrate(manipulator_id, self.sio) - - async def bypass_calibration(self, _, manipulator_id: str) -> str: - """Bypass calibration of manipulator. - - :param _: Socket session ID (unused). - :type _: str - :param manipulator_id: ID of manipulator to bypass calibration. - :type manipulator_id: str - :return: Error message on error, empty string otherwise. - :rtype: str - """ - dprint(f"[EVENT]\t\t Bypass calibration of manipulator" f" {manipulator_id}") - - return self.platform.bypass_calibration(manipulator_id) - - async def set_can_write(self, _, data: str) -> str: - """Set manipulator can_write state. - - :param _: Socket session ID (unused) - :type _: str - :param data: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` as JSON formatted string. - :type data: str - :return: :class:`vbl_aquarium.models.ephys_link.BooleanStateResponse` as JSON formatted string. - :rtype: str - """ - try: - request = CanWriteRequest(**loads(data)) - except KeyError: - print(f"[ERROR]\t\t Invalid set_can_write data: {data}\n") - return BooleanStateResponse(error="Invalid data format").to_string() - except Exception as e: - print(f"[ERROR]\t\t Error in inside_brain: {e}\n") - return BooleanStateResponse(error="Error in set_can_write").to_string() - else: - dprint(f"[EVENT]\t\t Set manipulator {request.manipulator_id} can_write state to {request.can_write}") - return self.platform.set_can_write(request).to_string() - - def stop(self, _) -> bool: - """Stop all manipulators. - - :param _: Socket session ID (unused). - :type _: str - :return: True if successful, False otherwise. - :rtype: bool - """ - dprint("[EVENT]\t\t Stop all manipulators") - - return self.platform.stop() - - @staticmethod - async def catch_all(_, __, data: Any) -> str: - """Catch all event. - - :param _: Socket session ID (unused). - :type _: str - :param __: Client ID (unused). - :type __: str - :param data: Data received from client. - :type data: Any - :return: "UNKNOWN_EVENT" response message. - :rtype: str - """ - print(f"[UNKNOWN EVENT]:\t {data}") - return "UNKNOWN_EVENT" - - # Server functions - async def launch_setup(self, platform_type: str, pathfinder_port: int, ignore_updates) -> None: - # Import correct manipulator handler - match platform_type: - case "sensapex": - self.platform = SensapexHandler() - case "ump3": - self.platform = UMP3Handler() - case "new_scale": - self.platform = NewScaleHandler() - case "new_scale_pathfinder": - self.platform = NewScalePathfinderHandler(pathfinder_port) - case _: - error = f"[ERROR]\t\t Invalid manipulator type: {platform_type}" - raise ValueError(error) - - # Preamble. - print(ASCII) - print(f"v{__version__}") - - # Check for newer version. - if not ignore_updates: - try: - async with ( - ClientSession() as session, - session.get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags") as response, - ): - latest_version = (await response.json())[0]["name"] - if parse(latest_version) > parse(__version__): - print(f"New version available: {latest_version}") - print("Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") - - await session.close() - except ClientConnectionError: - pass - - # Explain window. - print() - print("This is the Ephys Link server window.") - print("You may safely leave it running in the background.") - print("To stop it, close this window or press CTRL + Pause/Break.") - print() - - # List available manipulators - print("Available Manipulators:") - print(self.platform.get_manipulators().manipulators) - print() - - async def launch_for_proxy( - self, proxy_address: str, port: int, platform_type: str, pathfinder_port: int | None, ignore_updates: bool - ) -> None: - """Launch the server in proxy mode. - - :param proxy_address: Proxy IP address. - :type proxy_address: str - :param port: Port to serve the server. - :type port: int - :param platform_type: Parsed argument for platform type. - :type platform_type: str - :param pathfinder_port: Port New Scale Pathfinder's server is on. - :type pathfinder_port: int - :param ignore_updates: Flag to ignore checking for updates. - :type ignore_updates: bool - :return: None - """ - - # Launch setup - await self.launch_setup(platform_type, pathfinder_port, ignore_updates) - - # Create AsyncClient. - self.sio = AsyncClient() - self.pinpoint_id = str(uuid4())[:8] - - # Bind events. - self.bind_events() - - # Connect and mark that server is running. - await self.sio.connect(f"http://{proxy_address}:{port}") - self.is_running = True - print(f"Pinpoint ID: {self.pinpoint_id}") - await self.sio.wait() - - def launch( - self, - platform_type: str, - port: int, - pathfinder_port: int | None, - ignore_updates: bool, - ) -> None: - """Launch the server. - - :param platform_type: Parsed argument for platform type. - :type platform_type: str - :param port: HTTP port to serve the server. - :type port: int - :param pathfinder_port: Port New Scale Pathfinder's server is on. - :type pathfinder_port: int - :param ignore_updates: Flag to ignore checking for updates. - :type ignore_updates: bool - :return: None - """ - - # Launch setup (synchronously) - get_event_loop().run_until_complete(self.launch_setup(platform_type, pathfinder_port, ignore_updates)) - - # Create AsyncServer - self.sio = AsyncServer() - self.app = Application() - self.sio.attach(self.app) - - # Bind events - self.sio.on("connect", self.connect) - self.sio.on("disconnect", self.disconnect) - self.bind_events() - - # Mark that server is running - self.is_running = True - run_app(self.app, port=port) - - def bind_events(self) -> None: - """Bind Ephys Link events to the server.""" - self.sio.on("get_pinpoint_id", self.get_pinpoint_id) - self.sio.on("get_version", self.get_version) - self.sio.on("get_manipulators", self.get_manipulators) - self.sio.on("register_manipulator", self.register_manipulator) - self.sio.on("unregister_manipulator", self.unregister_manipulator) - self.sio.on("get_pos", self.get_pos) - self.sio.on("get_angles", self.get_angles) - self.sio.on("get_shank_count", self.get_shank_count) - self.sio.on("goto_pos", self.goto_pos) - self.sio.on("drive_to_depth", self.drive_to_depth) - self.sio.on("set_inside_brain", self.set_inside_brain) - self.sio.on("calibrate", self.calibrate) - self.sio.on("bypass_calibration", self.bypass_calibration) - self.sio.on("set_can_write", self.set_can_write) - self.sio.on("stop", self.stop) - self.sio.on("*", self.catch_all) - - def close_server(self, _, __) -> None: - """Close the server.""" - print("[INFO]\t\t Closing server") - - # Stop movement - self.platform.stop() - - # Exit - raise GracefulExit diff --git a/src/ephys_link/util/__init__.py b/src/ephys_link/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ephys_link/util/base_bindings.py b/src/ephys_link/util/base_bindings.py new file mode 100644 index 0000000..1a95c0d --- /dev/null +++ b/src/ephys_link/util/base_bindings.py @@ -0,0 +1,133 @@ +"""Binding methods for Ephys Link manipulator platforms. + +Definition of the methods a platform binding class must implement to be used by Ephys Link. + +Usage: Implement the BaseBindings class when defining a platform binding to ensure it supports the necessary methods. +""" + +from abc import ABC, abstractmethod + +from vbl_aquarium.models.unity import Vector3, Vector4 + + +class BaseBindings(ABC): + """Base class to enforce bindings manipulator platforms will support. + + No need to catch exceptions as the Platform Handler will catch them. + """ + + @abstractmethod + async def get_manipulators(self) -> list[str]: + """Get a list of available manipulators on the current platform. + + :returns: List of manipulator IDs. + :rtype: list[str] + """ + + @abstractmethod + async def get_num_axes(self) -> int: + """Get the number of axes for the current platform. + + :returns: Number of axes. + :rtype: int + """ + + @abstractmethod + def get_dimensions(self) -> Vector4: + """Get the dimensions of the manipulators on the current platform (mm). + + For 3-axis manipulators, copy the dimension of the axis parallel to the probe into w. + + :returns: Dimensions of the manipulators. + :rtype: Vector4 + """ + + @abstractmethod + async def get_position(self, manipulator_id: str) -> Vector4: + """Get the current position of a manipulator. + + These will be the translation values of the manipulator (mm), so they may need to be rotated to unified space. + For 3-axis manipulators, copy the position of the axis parallel to the probe into w. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Current position of the manipulator in platform space (mm). + :rtype: Vector4 + """ + + @abstractmethod + async def get_angles(self, manipulator_id: str) -> Vector3: + """Get the current rotation angles of a manipulator in Yaw, Pitch, Roll (degrees). + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Current angles of the manipulator. + :rtype: Vector3 + """ + + @abstractmethod + async def get_shank_count(self, manipulator_id: str) -> int: + """Get the number of shanks on a manipulator. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :returns: Number of shanks on the manipulator. + :rtype: int + """ + + @abstractmethod + async def get_movement_tolerance(self) -> float: + """Get the tolerance for how close the final position must be to the target position in a movement (mm). + + :returns: Movement tolerance (mm). + :rtype: float + """ + + @abstractmethod + async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: + """Set the position of a manipulator. + + This will directly set the position in the original platform space. + Unified space coordinates will need to be converted to platform space. + For 3-axis manipulators, the first 3 values of the position will be used. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :param position: Platform space position to set the manipulator to (mm). + :type position: Vector4 + :param speed: Speed to move the manipulator to the position (mm/s). + :type speed: float + :returns: Final position of the manipulator in platform space (mm). + :rtype: Vector4 + """ + + @abstractmethod + async def stop(self) -> None: + """Stop all manipulators.""" + + @abstractmethod + def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: + """Convert platform space coordinates to unified space coordinates. + + This is an axes-swapping transformation. + + Unified coordinate space is the standard left-handed cartesian coordinate system + with an additional depth axis pointing from the base of the probe to the tip. + + :param platform_space: Platform space coordinates. + :type platform_space: Vector4 + :returns: Unified space coordinates. + :rtype: Vector4 + """ + + @abstractmethod + def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: + """Convert unified space coordinates to platform space coordinates. + + This is an axes-swapping transformation. + + :param unified_space: Unified space coordinates. + :type unified_space: Vector4 + :returns: Platform space coordinates. + :rtype: Vector4 + """ diff --git a/src/ephys_link/util/common.py b/src/ephys_link/util/common.py new file mode 100644 index 0000000..fe1dcd0 --- /dev/null +++ b/src/ephys_link/util/common.py @@ -0,0 +1,121 @@ +# ruff: noqa: T201 +"""Commonly used utility functions and constants.""" + +from os.path import join +from pathlib import Path + +from packaging.version import parse +from requests import get +from vbl_aquarium.models.unity import Vector4 + +from ephys_link.__about__ import __version__ +from ephys_link.util.console import Console + +# Ephys Link ASCII. +ASCII = r""" + ______ _ _ _ _ +| ____| | | | | (_) | | +| |__ _ __ | |__ _ _ ___ | | _ _ __ | | __ +| __| | '_ \| '_ \| | | / __| | | | | '_ \| |/ / +| |____| |_) | | | | |_| \__ \ | |____| | | | | < +|______| .__/|_| |_|\__, |___/ |______|_|_| |_|_|\_\ + | | __/ | + |_| |___/ +""" + +# Absolute path to the resource folder. +RESOURCES_PATH = join(str(Path(__file__).parent.parent.absolute()), "resources") + +# Ephys Link Port +PORT = 3000 + + +# Server startup. +def server_preamble() -> None: + """Print the server startup preamble.""" + print(ASCII) + print(__version__) + print() + print("This is the Ephys Link server window.") + print("You may safely leave it running in the background.") + print("To stop it, close this window or press CTRL + Pause/Break.") + print() + + +def check_for_updates() -> None: + """Check for updates to the Ephys Link.""" + response = get("https://api.github.com/repos/VirtualBrainLab/ephys-link/tags", timeout=10) + latest_version = response.json()[0]["name"] + if parse(latest_version) > parse(__version__): + Console.info_print("Update available", latest_version) + Console.info_print("", "Download at: https://github.com/VirtualBrainLab/ephys-link/releases/latest") + + +# Unit conversions + + +def mmps_to_umps(mmps: float) -> float: + """Convert millimeters per second to micrometers per second. + + :param mmps: Speed in millimeters per second. + :type mmps: float + :returns: Speed in micrometers per second. + :rtype: float + """ + return mmps * 1_000 + + +def mm_to_um(mm: Vector4) -> Vector4: + """Convert millimeters to micrometers. + + :param mm: Length in millimeters. + :type mm: Vector4 + :returns: Length in micrometers. + :rtype: Vector4 + """ + return mm * 1_000 + + +def um_to_mm(um: Vector4) -> Vector4: + """Convert micrometers to millimeters. + + :param um: Length in micrometers. + :type um: Vector4 + :returns: Length in millimeters. + :rtype: Vector4 + """ + return um / 1_000 + + +def vector4_to_array(vector4: Vector4) -> list[float]: + """Convert a Vector4 to a list of floats. + + :param vector4: Vector4 to convert. + :type vector4: :class:`vbl_aquarium.models.unity.Vector4` + :return: List of floats. + :rtype: list[float] + """ + return [vector4.x, vector4.y, vector4.z, vector4.w] + + +def array_to_vector4(array: list[float]) -> Vector4: + """Convert a list of floats to a Vector4. + + :param array: List of floats. + :type array: list[float] + :return: First four elements of the list as a Vector4 padded with zeros if necessary. + :rtype: :class:`vbl_aquarium.models.unity.Vector4` + """ + + def get_element(this_array: list[float], index: int) -> float: + try: + return this_array[index] + except IndexError: + return 0.0 + + return Vector4( + x=get_element(array, 0), + y=get_element(array, 1), + z=get_element(array, 2), + w=get_element(array, 3), + ) diff --git a/src/ephys_link/util/console.py b/src/ephys_link/util/console.py new file mode 100644 index 0000000..3820a0f --- /dev/null +++ b/src/ephys_link/util/console.py @@ -0,0 +1,112 @@ +# ruff: noqa: T201 +"""Console class for printing messages to the console. + +Configure the console to print error and debug messages. + +Usage: Create a Console object and call the appropriate method to print messages. +""" + +from traceback import print_exc + +from colorama import Back, Fore, Style, init + +# Constants. +TAB_BLOCK = "\t\t" + + +class Console: + def __init__(self, *, enable_debug: bool) -> None: + """Initialize console properties. + + :param enable_debug: Enable debug mode. + :type enable_debug: bool + """ + self._enable_debug = enable_debug + + # Repeat message fields. + self._last_message = "" + self._repeat_counter = 1 + + # Initialize colorama. + init(autoreset=True) + + @staticmethod + def error_print(msg: str) -> None: + """Print an error message to the console. + + :param msg: Error message to print. + :type msg: str + """ + print(f"\n{Back.RED}{Style.BRIGHT} ERROR {Style.RESET_ALL}{TAB_BLOCK}{Fore.RED}{msg}") + + @staticmethod + def labeled_error_print(label: str, msg: str) -> None: + """Print an error message with a label to the console. + + :param label: Label for the error message. + :type label: str + :param msg: Error message to print. + :type msg: str + """ + print(f"\n{Back.RED}{Style.BRIGHT} ERROR {label} {Style.RESET_ALL}{TAB_BLOCK}{Fore.RED}{msg}") + + @staticmethod + def pretty_exception(exception: Exception) -> str: + """Pretty print an exception. + + :param exception: Exception to pretty print. + :type exception: Exception + :return: Pretty printed exception. + :rtype: str + """ + return f"{type(exception).__name__}: {exception}" + + @staticmethod + def exception_error_print(label: str, exception: Exception) -> None: + """Print an error message with exception details to the console. + + :param label: Label for the error message. + :type label: str + :param exception: Exception to print. + :type exception: Exception + """ + Console.labeled_error_print(label, Console.pretty_exception(exception)) + print_exc() + + def debug_print(self, label: str, msg: str) -> None: + """Print a debug message to the console. + + :param label: Label for the debug message. + :type label: str + :param msg: Debug message to print. + :type msg: str + """ + if self._enable_debug: + self._repeat_print(f"{Back.BLUE}{Style.BRIGHT} DEBUG {label} {Style.RESET_ALL}{TAB_BLOCK}{Fore.BLUE}{msg}") + + @staticmethod + def info_print(label: str, msg: str) -> None: + """Print info to console. + + :param label: Label for the message. + :type label: str + :param msg: Message to print. + :type msg: str + """ + print(f"\n{Back.GREEN}{Style.BRIGHT} {label} {Style.RESET_ALL}{TAB_BLOCK}{Fore.GREEN}{msg}") + + # Helper methods. + def _repeat_print(self, msg: str) -> None: + """Print a message to the console with repeat counter. + + :param msg: Message to print. + :type msg: str + """ + if msg == self._last_message: + self._repeat_counter += 1 + else: + self._repeat_counter = 1 + self._last_message = msg + print() + + print(f"\r{msg}{f" (x{self._repeat_counter})" if self._repeat_counter > 1 else ""}", end="")