Skip to content

Commit

Permalink
328 use pydantic to handle message type (#335)
Browse files Browse the repository at this point in the history
* Add dependency to VBL aquarium

* WIP converting over to use aquarium types

* Goto Pos refactored

* Get manipulators

* Get pos

* Get angles

* Shank count

* Goto

* Utilize dictionary tricks

* Drive to depth

* Set inside brain

* Set can write

* Fixed lints

* Version bump

* Add helper to convert Vector4 to array
  • Loading branch information
kjy5 authored Mar 27, 2024
1 parent 2ec61fc commit 063c611
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 642 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Operating System :: Microsoft :: Windows",
"Intended Audience :: End Users/Desktop",
"Intended Audience :: Healthcare Industry",
"Intended Audience :: Science/Research",
Expand All @@ -37,6 +37,7 @@ dependencies = [
"pythonnet==3.0.3",
"requests==2.31.0",
"sensapex==1.400.0",
"vbl-aquarium==0.0.10"
]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion src/ephys_link/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.2.8"
__version__ = "1.3.0.dev0"
199 changes: 11 additions & 188 deletions src/ephys_link/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from __future__ import annotations

import json
from typing import TypedDict
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from vbl_aquarium.models.unity import Vector4

# Debugging flag
DEBUG = False
Expand Down Expand Up @@ -36,191 +38,12 @@ def dprint(message: str) -> None:
print(message)


# Input data formats
class GotoPositionInputDataFormat(TypedDict):
"""Data format for positional requests.
:param manipulator_id: ID of the manipulator to move.
:type manipulator_id: str
:param pos: Position to move to in mm (X, Y, Z, W).
:type pos: list[float]
:param speed: Speed to move at in mm/s.
:type speed: float
"""

manipulator_id: str
pos: list[float]
speed: float


class InsideBrainInputDataFormat(TypedDict):
"""Data format for setting inside brain state.
:param manipulator_id: ID of the manipulator to move.
:type manipulator_id: str
:param inside: Whether the manipulator is inside the brain.
:type inside: bool
"""

manipulator_id: str
inside: bool


class DriveToDepthInputDataFormat(TypedDict):
"""Data format for depth driving requests.
:param manipulator_id: ID of the manipulator to move.
:type manipulator_id: str
:param depth: Depth to drive to in mm.
:type depth: float
:param speed: Speed to drive at in mm/s.
:type speed: float
"""

manipulator_id: str
depth: float
speed: float


class CanWriteInputDataFormat(TypedDict):
"""Data format for setting can write state.
:param manipulator_id: ID of the manipulator to move.
:type manipulator_id: str
:param can_write: Whether the manipulator can write.
:type can_write: bool
:param hours: Number of hours the manipulator can write for.
:type hours: float
"""

manipulator_id: str
can_write: bool
hours: float


# Output data dictionaries
class GetManipulatorsOutputData(dict):
"""Output format for get manipulators request.
:param manipulators: List of manipulator IDs (as strings).
:type manipulators: list
:param num_axes: Number of axes this manipulator has.
:type num_axes: int
:param dimensions: Size of the movement space in mm (first 3 axes).
:type dimensions: list
:param error: Error message.
:type error: str
:example: Example generated dictionary
:code:`{"manipulators": ["1", "2"], "num_axes": 4, "dimensions": [20, 20, 20], "error": ""}`
"""

def __init__(self, manipulators: list, num_axes: int, dimensions: list, error: str) -> None:
"""Constructor"""
super().__init__(
manipulators=manipulators,
num_axes=num_axes,
dimensions=dimensions,
error=error,
)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)


class PositionalOutputData(dict):
"""Output format for positional requests.
:param position: Position in mm (as a list, empty on error) in X, Y, Z, W order.
:type position: list
:param error: Error message.
:type error: str
def vector4_to_array(vector4: Vector4) -> list[float]:
"""Convert a Vector4 to a list of floats.
:example: Example generated dictionary
:code:`{"position": [10.429, 12.332, 2.131, 12.312], "error": ""}`
:param vector4: Vector4 to convert.
:type vector4: :class:`vbl_aquarium.models.unity.Vector4`
:return: List of floats.
:rtype: list[float]
"""

def __init__(self, position: list, error: str) -> None:
"""Constructor"""
super().__init__(position=position, error=error)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)


class AngularOutputData(dict):
"""Output format for manipulator angle requests.
:param angles: Angles in degrees (as a list, can be empty) in yaw, pitch, roll order.
:type angles: list
:param error: Error message.
:type error: str
"""

def __init__(self, angles: list, error: str) -> None:
"""Constructor"""
super().__init__(angles=angles, error=error)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)


class ShankCountOutputData(dict):
"""Output format for number of shanks.
:param shank_count: Number of shanks on the probe (-1 if error).
:type shank_count: int
:param error: Error message.
:type error: str
"""

def __init__(self, shank_count: int, error: str) -> None:
"""Constructor"""
super().__init__(shank_count=shank_count, error=error)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)


class DriveToDepthOutputData(dict):
"""Output format for depth driving.
:param depth: Depth in mm (0 on error).
:type depth: float
:param error: Error message.
:type error: str
:example: Example generated dictionary :code:`{"depth": 1.23, "error": ""}`
"""

def __init__(self, depth: float, error: str) -> None:
"""Create drive to depth output data dictionary"""
super().__init__(depth=depth, error=error)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)


class StateOutputData(dict):
"""Output format for boolean state requests.
:param state: State of the event.
:type state: bool
:param error: Error message.
:type error: str
:example: Example generated dictionary :code:`{"state": True, "error": ""}`
"""

def __init__(self, state: bool, error: str) -> None:
"""Create state output data dictionary"""
super().__init__(state=state, error=error)

def json(self) -> str:
"""Return JSON string"""
return json.dumps(self)
return [vector4.x, vector4.y, vector4.z, vector4.w]
Loading

0 comments on commit 063c611

Please sign in to comment.