Skip to content

Commit

Permalink
Iterate on code comments and internal APIs for camera and `imagerne…
Browse files Browse the repository at this point in the history
…w` (#27)

* Fix errors in docstrings, improve code comments

* Simplify the public API of `camera.mqtt.Worker` for accessing the camera wrapper
  • Loading branch information
ethanjli authored Apr 22, 2024
1 parent d822161 commit 2448890
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 106 deletions.
20 changes: 13 additions & 7 deletions control/adafruithat/planktoscope/camera/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ def has_values(self) -> bool:
return any(value is not None for value in self._asdict().values())

def overlay(self, updates: "SettingsValues") -> "SettingsValues":
"""Create a new instance where provided non-`None` values overwrite existing values."""
"""Create a new instance where provided non-`None` values overwrite existing values.
This is intended to make it easy to combine existing settings with new settings.
"""
# pylint complains that this namedtuple has no `_asdict()` method even though mypy is fine;
# this is a false positive:
# pylint: disable-next=no-member
Expand Down Expand Up @@ -219,11 +222,11 @@ def __init__(
stream_config: StreamConfig = StreamConfig(preview_size=(640, 480), buffer_count=3),
initial_settings: SettingsValues = SettingsValues(),
) -> None:
"""Set up state needed to initialize the camera, but don't actually start the camera.
"""Set up state needed to initialize the camera, but don't actually start the camera yet.
Args:
preview_output: an image stream which this `PiCamera` instance will write camera preview
images to.
images to once the camera is started.
stream_config: configuration of camera output streams.
initial_settings: any camera settings to initialize the camera with.
"""
Expand Down Expand Up @@ -293,15 +296,16 @@ def stream_config(self) -> StreamConfig:

@property
def settings(self) -> SettingsValues:
"""An immutable copy of the camera settings values."""
"""Adjustable camera settings values."""
with self._settings_lock.gen_rlock():
return self._cached_settings

@settings.setter
def settings(self, updates: SettingsValues) -> None:
"""Update adjustable camera settings from all provided non-`None` values.
Fields provided with `None` values are ignored.
Fields provided with `None` values are ignored. If any of the provided non-`None` values is
invalid (e.g. out-of-range), none of the settinsg will be changed.
Raises:
RuntimeError: the method was called before the camera was started, or after it was
Expand Down Expand Up @@ -395,7 +399,9 @@ def capture_file(self, path: str) -> None:
def close(self) -> None:
"""Stop and close the camera.
The camera can be restarted after being closed by `start()` method again.
No more frames will be written to the preview output stream.
The camera can be restarted after being closed by calling the `start()` method again.
"""
if self._camera is None:
return
Expand Down Expand Up @@ -427,7 +433,7 @@ class PreviewStream(io.BufferedIOBase):
the buffer once they have access to it.
This stream can be used by anything which requires a [io.BufferedIOBase], assuming it never
splits buffers.
splits any buffer across multiple calls of the `write()` method.
"""

def __init__(self) -> None:
Expand Down
73 changes: 45 additions & 28 deletions control/adafruithat/planktoscope/camera/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""mqtt provides an MJPEG+MQTT API for camera interaction."""
"""mqtt provides an MJPEG+MQTT API for camera supervision and interaction."""

import json
import os
Expand All @@ -15,27 +15,21 @@


class Worker(threading.Thread):
"""Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings.
Attribs:
camera: the underlying camera exposed by this MQTT API. Don't access it until the
camera_checked event has been set!
camera_checked: when this event is set, either the camera is connected or has been
determined to be missing.
"""
"""Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings."""

def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None:
"""Initialize the backend.
Args:
mqtt_client: an MQTT client.
exposure_time: the default value for initializing the camera's exposure time.
mjpeg_server_address: the host and port for the MJPEG camera preview server to listen
on.
Raises:
ValueError: one or more values in the hardware config file are of the wrong type.
"""
super().__init__(name="camera")

# Settings
settings = hardware.SettingsValues(
auto_exposure=False,
exposure_time=125, # the default (minimum) exposure time in the PlanktoScope GUI
Expand All @@ -49,9 +43,8 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None:
blue=1.35,
),
sharpness=0, # disable the default "normal" sharpening level
jpeg_quality=95, # trade off between image file size and quality
jpeg_quality=95, # maximize image quality
)
# Settings
if os.path.exists("/home/pi/PlanktoScope/hardware.json"):
# load hardware.json
with open("/home/pi/PlanktoScope/hardware.json", "r", encoding="utf-8") as config_file:
Expand All @@ -69,26 +62,26 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None:
# I/O
self._preview_stream: hardware.PreviewStream = hardware.PreviewStream()
self._mjpeg_server_address = mjpeg_server_address
self.camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera(
self._camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera(
self._preview_stream, initial_settings=settings
)
self.camera_checked = threading.Event()
self._camera_checked = threading.Event()
self._stop_event_loop = threading.Event()

@loguru.logger.catch
def run(self) -> None:
"""Start the camera and run the main event loop."""
assert self.camera is not None
assert self._camera is not None

loguru.logger.info("Initializing the camera with default settings...")
try:
self.camera.open()
self._camera.open()
except RuntimeError:
loguru.logger.exception("Couldn't open the camera - maybe it's disconnected?")
self.camera = None
self.camera_checked.set()
self._camera = None
self._camera_checked.set()
return
self.camera_checked.set()
self._camera_checked.set()

loguru.logger.info("Starting the MJPEG streaming server...")
streaming_server = mjpeg.StreamingServer(self._preview_stream, self._mjpeg_server_address)
Expand All @@ -103,7 +96,7 @@ def run(self) -> None:
# TODO(ethanjli): allow an MQTT client to trigger this broadcast with an MQTT command. This
# requires modifying the MQTT API (by adding a new route), and we'll want to make the
# Node-RED dashboard query that route at startup, so we'll do this later.
mqtt.client.publish("status/imager", json.dumps({"camera_name": self.camera.camera_name}))
mqtt.client.publish("status/imager", json.dumps({"camera_name": self._camera.camera_name}))

try:
while not self._stop_event_loop.is_set():
Expand All @@ -126,8 +119,8 @@ def run(self) -> None:
streaming_thread.join()

loguru.logger.info("Stopping the camera...")
self.camera.close()
self.camera = None
self._camera.close()
self._camera = None

loguru.logger.success("Done shutting down!")

Expand All @@ -137,7 +130,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st
Returns a status update to broadcast.
"""
assert self.camera is not None
assert self._camera is not None

if message["topic"] != "imager/image" or message["payload"].get("action", "") != "settings":
return None
Expand All @@ -149,7 +142,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st
settings = message["payload"]["settings"]
try:
converted_settings = _convert_settings(
settings, self.camera.settings.white_balance_gains
settings, self._camera.settings.white_balance_gains
)
_validate_settings(converted_settings)
except (TypeError, ValueError) as e:
Expand All @@ -158,10 +151,24 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st
)
return json.dumps({"status": f"Error: {str(e)}"})

self.camera.settings = converted_settings
self._camera.settings = converted_settings
loguru.logger.success("Updated camera settings!")
return '{"status":"Camera settings updated"}'

@property
def camera(self) -> typing.Optional[hardware.PiCamera]:
"""Return the camera wrapper managed by this worker.
Blocks until this worker has attempted to start the camera (so this property will wait until
this worker has been started as a thread).
Returns:
The camera wrapper if it started successfully, or None if the camera wrapper could not
be started (e.g. because the camera does not exists).
"""
self._camera_checked.wait()
return self._camera

def shutdown(self):
"""Stop processing new MQTT messages and gracefully stop working."""
self._stop_event_loop.set()
Expand All @@ -175,8 +182,11 @@ def _convert_settings(
Args:
command_settings: the settings to convert.
default_white_balance_gains: white-balance gains to substitute for missing values if exactly
one gain is provided.
default_white_balance_gains: white-balance gains to substitute for missing values, if
exactly one gain was provided in `command_settings`.
Returns:
All settings extracted from the MQTT command.
Raises:
ValueError: at least one of the MQTT command settings is invalid.
Expand Down Expand Up @@ -212,6 +222,9 @@ def _convert_image_gain_settings(
Args:
command_settings: the settings to convert.
Returns:
Any image gain-related settings extracted from the MQTT command, but no other settings.
Raises:
ValueError: at least one of the MQTT command settings is invalid.
"""
Expand Down Expand Up @@ -254,6 +267,10 @@ def _convert_white_balance_gain_settings(
default_white_balance_gains: white-balance gains to substitute for missing values if exactly
one gain is provided.
Returns:
Any white balance gain-related settings extracted from the MQTT command, but no other
settings.
Raises:
ValueError: at least one of the MQTT command settings is invalid.
"""
Expand Down
35 changes: 22 additions & 13 deletions control/adafruithat/planktoscope/imagernew/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

# TODO(ethanjli): convert this from a process into a thread
class Worker(multiprocessing.Process):
"""An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules."""
"""An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules.
This launches the camera with an MQTT API for settings adjustments and an MJPEG server with a
live camera preview stream, and this also launches stop-flow acquisition routines in response to
commands received over the MQTT API.
"""

# TODO(ethanjli): instead of passing in a stop_event, just expose a `close()` method! This
# way, we don't give any process the ability to stop all other processes watching the same
# stop_event!
def __init__(self, stop_event):
"""Initialize the Imager class
def __init__(self, stop_event: threading.Event) -> None:
"""Initialize the worker's internal state, but don't start anything yet.
Args:
stop_event (multiprocessing.Event): shutdown event
iso (int, optional): ISO sensitivity. Defaults to 100.
exposure_time (int, optional): Shutter speed of the camera, default to 10000.
stop_event: shutdown signal
"""
super().__init__(name="imager")

Expand All @@ -53,7 +56,12 @@ def __init__(self, stop_event):

@loguru.logger.catch
def run(self) -> None:
"""Run the main event loop."""
"""Run the main event loop.
It will quit when the `stop_event` (passed into the constructor) event is set. If a camera
couldn't be started (e.g. because the camera is missing), it will clean up and then wait
until the `stop_event` event is set before quitting.
"""
loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}")
self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client")
self._mqtt.client.publish("status/imager", '{"status":"Starting up"}')
Expand All @@ -66,18 +74,17 @@ def run(self) -> None:
loguru.logger.info("Starting the camera...")
self._camera = camera.Worker()
self._camera.start()
self._camera.camera_checked.wait()
if self._camera.camera is None:
loguru.logger.error("Missing camera - maybe it's disconnected or it never started?")
# TODO(ethanjli): officially add this error status to the MQTT API!
self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}')
loguru.logger.success("Preemptively preparing to shut down since there's no camera...")
self._cleanup()
# Note(ethanjli): we just wait and do nothing until we receive the shutdown signal,
# because if we return early then the hardware controller will either shut down
# TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown
# signal, because if we return early then the hardware controller will either shut down
# everything (current behavior) or try to restart the imager (planned behavior
# according to a TODO left by @gromain). If there's a third option to quit without
# being restarted or causing everything else to quit, then we could just clean up
# according to a TODO left by @gromain). Once we make it possible to quit without
# being restarted or causing everything else to quit, then we should just clean up
# and return early here.
loguru.logger.success("Waiting for a shutdown signal...")
self._stop_event_loop.wait()
Expand Down Expand Up @@ -366,7 +373,9 @@ def stop(self) -> None:

# TODO(ethanjli): rearchitect the hardware controller so that the imager can directly call pump
# methods (by running all modules in the same process), so that we can just delete this entire class
# and simplify function calls between the imager and the pump!
# and simplify function calls between the imager and the pump! This will require launching the
# pump and the imager as threads in the same process, rather than launching them as separate
# processes.
class _PumpClient:
"""Thread-safe RPC stub for remotely controlling the pump over MQTT."""

Expand Down
13 changes: 8 additions & 5 deletions control/adafruithat/planktoscope/imagernew/stopflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""strategies provides ways of coordinating the hardware for image acquisition.
"""stopflow provides the domain logic for stop-flow imaging.
This basically contains the domain logic of the imager module. No actual I/O (hardware control,
filesystem interaction, or MQTT messaging) should be added to this module.
No actual I/O (hardware control, filesystem interaction, or MQTT messaging) should be added to this
module; instead, I/O drivers should be defined elsewhere and passed into functions/methods in this
module. This will allow us to write automated tests for the domain logic for stop-flow imaging
which we can run without a PlanktoScope.
"""

import datetime as dt
Expand Down Expand Up @@ -132,7 +134,8 @@ def run_step(self) -> typing.Optional[tuple[int, str]]:
)
self._camera.capture_file(capture_path)
os.sync()
# TODO: update the integrity file
# Note(ethanjli): updating the integrity file is the responsibility of the code which
# calls this `run_step()` method.

acquired_index = self._progress
self._progress += 1
Expand All @@ -150,5 +153,5 @@ def stop(self) -> None:

@property
def interrupted(self) -> bool:
"""Return whether the routine was manually interrupted."""
"""Check whether the routine was manually interrupted."""
return self._interrupted.is_set()
Loading

0 comments on commit 2448890

Please sign in to comment.