Skip to content

Commit

Permalink
Release 0.5.0
Browse files Browse the repository at this point in the history
Make task and device methods asynchronous
  • Loading branch information
aangelos28 committed Oct 24, 2024
1 parent 28c032b commit 803b641
Show file tree
Hide file tree
Showing 36 changed files with 195 additions and 180 deletions.
8 changes: 4 additions & 4 deletions docs/user-guide/color_mixing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ This is the Python code for the color analyzer device:
class ColorAnalyzerDevice(BaseDevice):
def _initialize(self, initialization_parameters: dict[str, Any]) -> None:
async def _initialize(self, initialization_parameters: dict[str, Any]) -> None:
port = int(initialization_parameters["port"])
self.client = DeviceClient(port)
self.client.open_connection()
def _cleanup(self) -> None:
async def _cleanup(self) -> None:
self.client.close_connection()
def _report(self) -> dict[str, Any]:
async def _report(self) -> dict[str, Any]:
return {}
def analyze(self, container: Container) -> tuple[Container, tuple[int, int, int]]:
Expand Down Expand Up @@ -168,7 +168,7 @@ This is the Python code the "Analyze color" task:
class AnalyzeColorTask(BaseTask):
def _execute(
async def _execute(
self,
devices: BaseTask.DevicesType,
parameters: BaseTask.ParametersType,
Expand Down
6 changes: 3 additions & 3 deletions docs/user-guide/devices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ Below is a example implementation of a magnetic mixer device:
from user.color_lab.common.device_client import DeviceClient
class MagneticMixerDevice(BaseDevice):
def _initialize(self, initialization_parameters: Dict[str, Any]) -> None:
async def _initialize(self, initialization_parameters: Dict[str, Any]) -> None:
port = int(initialization_parameters["port"])
self.client = DeviceClient(port)
self.client.open_connection()
def _cleanup(self) -> None:
async def _cleanup(self) -> None:
self.client.close_connection()
def _report(self) -> Dict[str, Any]:
async def _report(self) -> Dict[str, Any]:
return {}
def mix(self, container: Container, mixing_time: int, mixing_speed: int) -> Container:
Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Python File (task.yml)
class MagneticMixingTask(BaseTask):
def _execute(
async def _execute(
self,
devices: BaseTask.DevicesType,
parameters: BaseTask.ParametersType,
Expand Down
131 changes: 64 additions & 67 deletions eos/devices/base_device.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,39 @@
import asyncio
import atexit
import threading
from abc import ABC, abstractmethod, ABCMeta
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any

from eos.devices.exceptions import (
EosDeviceInitializationError,
EosDeviceCleanupError,
EosDeviceError,
)


class DeviceStatus(Enum):
DISABLED = "DISABLED"
IDLE = "IDLE"
BUSY = "BUSY"
ERROR = "ERROR"


def capture_exceptions(func: callable) -> callable:
def wrapper(self, *args, **kwargs) -> Any:
try:
return func(self, *args, **kwargs)

except (
EosDeviceInitializationError,
EosDeviceCleanupError,
) as e:
raise e
except Exception as e:
self._status = DeviceStatus.ERROR
raise EosDeviceError(f"Error in the function '{func.__name__}' in device '{self._device_id}'.") from e
def register_async_exit_callback(async_fn, *args, **kwargs) -> None:
"""
Register an async function to run at program exit.
"""

return wrapper
async def _run_async_fn() -> None:
await async_fn(*args, **kwargs)

def _run_on_exit() -> None:
loop = asyncio.new_event_loop()
loop.run_until_complete(_run_async_fn())
loop.close()

class DeviceMeta(ABCMeta):
def __new__(cls, name: str, bases: tuple, dct: dict):
cls._add_exception_capture_to_child_methods(bases, dct)
return super().__new__(cls, name, bases, dct)
atexit.register(_run_on_exit)

@staticmethod
def _add_exception_capture_to_child_methods(bases: tuple, dct: dict) -> None:
base_methods = set()
for base in bases:
if isinstance(base, DeviceMeta):
base_methods.update(base.__dict__.keys())

for attr, value in dct.items():
if callable(value) and not attr.startswith("__") and attr not in base_methods:
dct[attr] = capture_exceptions(value)
class DeviceStatus(Enum):
DISABLED = "DISABLED"
IDLE = "IDLE"
BUSY = "BUSY"
ERROR = "ERROR"


class BaseDevice(ABC, metaclass=DeviceMeta):
class BaseDevice(ABC):
"""
The base class for all devices in EOS.
"""
Expand All @@ -62,43 +43,42 @@ def __init__(
device_id: str,
lab_id: str,
device_type: str,
initialization_parameters: dict[str, Any],
):
self._device_id = device_id
self._lab_id = lab_id
self._device_type = device_type
self._status = DeviceStatus.DISABLED
self._initialization_parameters = initialization_parameters
self._initialization_parameters = {}

self._lock = threading.Lock()
self._lock = asyncio.Lock()

atexit.register(self.cleanup)
self.initialize(initialization_parameters)
register_async_exit_callback(self.cleanup)

def initialize(self, initialization_parameters: dict[str, Any]) -> None:
async def initialize(self, initialization_parameters: dict[str, Any]) -> None:
"""
Initialize the device. After calling this method, the device is ready to be used for tasks
and the status is IDLE.
"""
with self._lock:
async with self._lock:
if self._status != DeviceStatus.DISABLED:
raise EosDeviceInitializationError(f"Device {self._device_id} is already initialized.")

try:
self._initialize(initialization_parameters)
await self._initialize(initialization_parameters)
self._status = DeviceStatus.IDLE
self._initialization_parameters = initialization_parameters
except Exception as e:
self._status = DeviceStatus.ERROR
raise EosDeviceInitializationError(
f"Error initializing device {self._device_id}: {e!s}",
) from e

def cleanup(self) -> None:
async def cleanup(self) -> None:
"""
Clean up the device. After calling this method, the device can no longer be used for tasks and the status is
DISABLED.
"""
with self._lock:
async with self._lock:
if self._status == DeviceStatus.DISABLED:
return

Expand All @@ -108,67 +88,84 @@ def cleanup(self) -> None:
)

try:
self._cleanup()
await self._cleanup()
self._status = DeviceStatus.DISABLED
except Exception as e:
self._status = DeviceStatus.ERROR
raise EosDeviceCleanupError(f"Error cleaning up device {self._device_id}: {e!s}") from e

def enable(self) -> None:
async def report(self) -> dict[str, Any]:
"""
Return a dictionary with any member variables needed for logging purposes and progress tracking.
"""
return await self._report()

async def enable(self) -> None:
"""
Enable the device. The status should be IDLE after calling this method.
"""
if self._status == DeviceStatus.DISABLED:
self.initialize(self._initialization_parameters)
await self.initialize(self._initialization_parameters)

def disable(self) -> None:
async def disable(self) -> None:
"""
Disable the device. The status should be DISABLED after calling this method.
"""
if self._status != DeviceStatus.DISABLED:
self.cleanup()

def report(self) -> dict[str, Any]:
"""
Return a dictionary with any member variables needed for logging purposes and progress tracking.
"""
return self._report()
await self.cleanup()

def report_status(self) -> dict[str, Any]:
"""
Return a dictionary with the id and status of the task handler.
"""
def get_status(self) -> dict[str, Any]:
return {
"id": self._device_id,
"status": self._status,
}

def get_id(self) -> str:
return self._device_id

def get_lab_id(self) -> str:
return self._lab_id

def get_device_type(self) -> str:
return self._device_type

def get_initialization_parameters(self) -> dict[str, Any]:
return self._initialization_parameters

@property
def id(self) -> str:
return self._device_id

@property
def type(self) -> str:
def lab_id(self) -> str:
return self._lab_id

@property
def device_type(self) -> str:
return self._device_type

@property
def status(self) -> DeviceStatus:
return self._status

@property
def initialization_parameters(self) -> dict[str, Any]:
return self._initialization_parameters

@abstractmethod
def _initialize(self, initialization_parameters: dict[str, Any]) -> None:
async def _initialize(self, initialization_parameters: dict[str, Any]) -> None:
"""
Implementation for the initialization of the device.
"""

@abstractmethod
def _cleanup(self) -> None:
async def _cleanup(self) -> None:
"""
Implementation for the cleanup of the device.
"""

@abstractmethod
def _report(self) -> dict[str, Any]:
async def _report(self) -> dict[str, Any]:
"""
Implementation for the report method.
"""
9 changes: 5 additions & 4 deletions eos/devices/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def _create_devices_for_lab(self, lab_id: str) -> None:
computer=device_config.computer,
)
devices_to_upsert.append(new_device)
self._create_device_actor(new_device)
await self._create_device_actor(new_device)

if devices_to_upsert:
await self._devices.bulk_upsert([device.model_dump() for device in devices_to_upsert])
Expand All @@ -160,7 +160,7 @@ def _restore_device_actor(self, device: Device) -> None:
)
log.debug(f"Restored device actor {device_actor_id}")

def _create_device_actor(self, device: Device) -> None:
async def _create_device_actor(self, device: Device) -> None:
lab_config = self._configuration_manager.labs[device.lab_id]
device_config = lab_config.devices[device.id]
computer_name = device_config.computer.lower()
Expand Down Expand Up @@ -194,10 +194,11 @@ def _create_device_actor(self, device: Device) -> None:
name=device_actor_id,
num_cpus=0,
resources=resources,
).remote(device.id, device.lab_id, device.type, initialization_parameters)
).remote(device.id, device.lab_id, device.type)
await self._device_actor_handles[device_actor_id].initialize.remote(initialization_parameters)

def _check_device_actors_healthy(self) -> None:
status_reports = [actor_handle.report_status.remote() for actor_handle in self._device_actor_handles.values()]
status_reports = [actor_handle.get_status.remote() for actor_handle in self._device_actor_handles.values()]
status_report_to_device_actor_id = {
status_report: device_actor_id
for device_actor_id, status_report in zip(self._device_actor_handles.keys(), status_reports, strict=False)
Expand Down
6 changes: 3 additions & 3 deletions eos/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ def __init__(self, experiment_id: str, task_id: str) -> None:
self._experiment_id = experiment_id
self._task_id = task_id

def execute(
async def execute(
self, devices: DevicesType, parameters: ParametersType, containers: ContainersType
) -> OutputType | None:
"""Execute a task with the given input and return the output."""
try:
output = self._execute(devices, parameters, containers)
output = await self._execute(devices, parameters, containers)

output_parameters, output_containers, output_files = ({}, {}, {})

Expand All @@ -42,7 +42,7 @@ def execute(
raise EosTaskExecutionError(f"Error executing task {self._task_id}") from e

@abstractmethod
def _execute(
async def _execute(
self, devices: DevicesType, parameters: ParametersType, containers: ContainersType
) -> OutputType | None:
"""Implementation for the execution of a task."""
2 changes: 1 addition & 1 deletion eos/tasks/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def _ray_execute_task(
) -> tuple:
task = task_class_type(_experiment_id, _task_id)
devices = DeviceActorWrapperRegistry(_devices_actor_references)
return task.execute(devices, _parameters, _containers)
return asyncio.run(task.execute(devices, _parameters, _containers))

await self._task_manager.start_task(experiment_id, task_id)
log.info(f"EXP '{experiment_id}' - Started task '{task_id}'.")
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def task_manager(setup_lab_experiment, configuration_manager, db_interface

@pytest.fixture(scope="session", autouse=True)
def ray_cluster():
ray.init(namespace="test-eos", ignore_reinit_error=True, resources={"eos-core": 1})
ray.init(namespace="test-eos", ignore_reinit_error=True, resources={"eos-core": 1000})
yield
ray.shutdown()

Expand Down
Loading

0 comments on commit 803b641

Please sign in to comment.