diff --git a/control/adafruithat/planktoscope/camera/hardware.py b/control/adafruithat/planktoscope/camera/hardware.py index 60e092a5..9f27bd13 100644 --- a/control/adafruithat/planktoscope/camera/hardware.py +++ b/control/adafruithat/planktoscope/camera/hardware.py @@ -141,7 +141,10 @@ def has_values(self) -> bool: return any(value is not None for value in self._asdict().values()) def overlay(self, updates: "SettingsValues") -> "SettingsValues": - """Create a new instance where provided non-`None` values overwrite existing values.""" + """Create a new instance where provided non-`None` values overwrite existing values. + + This is intended to make it easy to combine existing settings with new settings. + """ # pylint complains that this namedtuple has no `_asdict()` method even though mypy is fine; # this is a false positive: # pylint: disable-next=no-member @@ -219,11 +222,11 @@ def __init__( stream_config: StreamConfig = StreamConfig(preview_size=(640, 480), buffer_count=3), initial_settings: SettingsValues = SettingsValues(), ) -> None: - """Set up state needed to initialize the camera, but don't actually start the camera. + """Set up state needed to initialize the camera, but don't actually start the camera yet. Args: preview_output: an image stream which this `PiCamera` instance will write camera preview - images to. + images to once the camera is started. stream_config: configuration of camera output streams. initial_settings: any camera settings to initialize the camera with. """ @@ -293,7 +296,7 @@ def stream_config(self) -> StreamConfig: @property def settings(self) -> SettingsValues: - """An immutable copy of the camera settings values.""" + """Adjustable camera settings values.""" with self._settings_lock.gen_rlock(): return self._cached_settings @@ -301,7 +304,8 @@ def settings(self) -> SettingsValues: def settings(self, updates: SettingsValues) -> None: """Update adjustable camera settings from all provided non-`None` values. - Fields provided with `None` values are ignored. + Fields provided with `None` values are ignored. If any of the provided non-`None` values is + invalid (e.g. out-of-range), none of the settinsg will be changed. Raises: RuntimeError: the method was called before the camera was started, or after it was @@ -395,7 +399,9 @@ def capture_file(self, path: str) -> None: def close(self) -> None: """Stop and close the camera. - The camera can be restarted after being closed by `start()` method again. + No more frames will be written to the preview output stream. + + The camera can be restarted after being closed by calling the `start()` method again. """ if self._camera is None: return @@ -427,7 +433,7 @@ class PreviewStream(io.BufferedIOBase): the buffer once they have access to it. This stream can be used by anything which requires a [io.BufferedIOBase], assuming it never - splits buffers. + splits any buffer across multiple calls of the `write()` method. """ def __init__(self) -> None: diff --git a/control/adafruithat/planktoscope/camera/mqtt.py b/control/adafruithat/planktoscope/camera/mqtt.py index 64162e21..86ea07fb 100644 --- a/control/adafruithat/planktoscope/camera/mqtt.py +++ b/control/adafruithat/planktoscope/camera/mqtt.py @@ -1,4 +1,4 @@ -"""mqtt provides an MJPEG+MQTT API for camera interaction.""" +"""mqtt provides an MJPEG+MQTT API for camera supervision and interaction.""" import json import os @@ -15,27 +15,21 @@ class Worker(threading.Thread): - """Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings. - - Attribs: - camera: the underlying camera exposed by this MQTT API. Don't access it until the - camera_checked event has been set! - camera_checked: when this event is set, either the camera is connected or has been - determined to be missing. - """ + """Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings.""" def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: """Initialize the backend. Args: - mqtt_client: an MQTT client. - exposure_time: the default value for initializing the camera's exposure time. + mjpeg_server_address: the host and port for the MJPEG camera preview server to listen + on. Raises: ValueError: one or more values in the hardware config file are of the wrong type. """ super().__init__(name="camera") + # Settings settings = hardware.SettingsValues( auto_exposure=False, exposure_time=125, # the default (minimum) exposure time in the PlanktoScope GUI @@ -49,9 +43,8 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: blue=1.35, ), sharpness=0, # disable the default "normal" sharpening level - jpeg_quality=95, # trade off between image file size and quality + jpeg_quality=95, # maximize image quality ) - # Settings if os.path.exists("/home/pi/PlanktoScope/hardware.json"): # load hardware.json with open("/home/pi/PlanktoScope/hardware.json", "r", encoding="utf-8") as config_file: @@ -69,26 +62,26 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: # I/O self._preview_stream: hardware.PreviewStream = hardware.PreviewStream() self._mjpeg_server_address = mjpeg_server_address - self.camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera( + self._camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera( self._preview_stream, initial_settings=settings ) - self.camera_checked = threading.Event() + self._camera_checked = threading.Event() self._stop_event_loop = threading.Event() @loguru.logger.catch def run(self) -> None: """Start the camera and run the main event loop.""" - assert self.camera is not None + assert self._camera is not None loguru.logger.info("Initializing the camera with default settings...") try: - self.camera.open() + self._camera.open() except RuntimeError: loguru.logger.exception("Couldn't open the camera - maybe it's disconnected?") - self.camera = None - self.camera_checked.set() + self._camera = None + self._camera_checked.set() return - self.camera_checked.set() + self._camera_checked.set() loguru.logger.info("Starting the MJPEG streaming server...") streaming_server = mjpeg.StreamingServer(self._preview_stream, self._mjpeg_server_address) @@ -103,7 +96,7 @@ def run(self) -> None: # TODO(ethanjli): allow an MQTT client to trigger this broadcast with an MQTT command. This # requires modifying the MQTT API (by adding a new route), and we'll want to make the # Node-RED dashboard query that route at startup, so we'll do this later. - mqtt.client.publish("status/imager", json.dumps({"camera_name": self.camera.camera_name})) + mqtt.client.publish("status/imager", json.dumps({"camera_name": self._camera.camera_name})) try: while not self._stop_event_loop.is_set(): @@ -126,8 +119,8 @@ def run(self) -> None: streaming_thread.join() loguru.logger.info("Stopping the camera...") - self.camera.close() - self.camera = None + self._camera.close() + self._camera = None loguru.logger.success("Done shutting down!") @@ -137,7 +130,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st Returns a status update to broadcast. """ - assert self.camera is not None + assert self._camera is not None if message["topic"] != "imager/image" or message["payload"].get("action", "") != "settings": return None @@ -149,7 +142,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st settings = message["payload"]["settings"] try: converted_settings = _convert_settings( - settings, self.camera.settings.white_balance_gains + settings, self._camera.settings.white_balance_gains ) _validate_settings(converted_settings) except (TypeError, ValueError) as e: @@ -158,10 +151,24 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st ) return json.dumps({"status": f"Error: {str(e)}"}) - self.camera.settings = converted_settings + self._camera.settings = converted_settings loguru.logger.success("Updated camera settings!") return '{"status":"Camera settings updated"}' + @property + def camera(self) -> typing.Optional[hardware.PiCamera]: + """Return the camera wrapper managed by this worker. + + Blocks until this worker has attempted to start the camera (so this property will wait until + this worker has been started as a thread). + + Returns: + The camera wrapper if it started successfully, or None if the camera wrapper could not + be started (e.g. because the camera does not exists). + """ + self._camera_checked.wait() + return self._camera + def shutdown(self): """Stop processing new MQTT messages and gracefully stop working.""" self._stop_event_loop.set() @@ -175,8 +182,11 @@ def _convert_settings( Args: command_settings: the settings to convert. - default_white_balance_gains: white-balance gains to substitute for missing values if exactly - one gain is provided. + default_white_balance_gains: white-balance gains to substitute for missing values, if + exactly one gain was provided in `command_settings`. + + Returns: + All settings extracted from the MQTT command. Raises: ValueError: at least one of the MQTT command settings is invalid. @@ -212,6 +222,9 @@ def _convert_image_gain_settings( Args: command_settings: the settings to convert. + Returns: + Any image gain-related settings extracted from the MQTT command, but no other settings. + Raises: ValueError: at least one of the MQTT command settings is invalid. """ @@ -254,6 +267,10 @@ def _convert_white_balance_gain_settings( default_white_balance_gains: white-balance gains to substitute for missing values if exactly one gain is provided. + Returns: + Any white balance gain-related settings extracted from the MQTT command, but no other + settings. + Raises: ValueError: at least one of the MQTT command settings is invalid. """ diff --git a/control/adafruithat/planktoscope/imagernew/mqtt.py b/control/adafruithat/planktoscope/imagernew/mqtt.py index 9c52c39e..cf4ff9f6 100644 --- a/control/adafruithat/planktoscope/imagernew/mqtt.py +++ b/control/adafruithat/planktoscope/imagernew/mqtt.py @@ -19,18 +19,21 @@ # TODO(ethanjli): convert this from a process into a thread class Worker(multiprocessing.Process): - """An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules.""" + """An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules. + + This launches the camera with an MQTT API for settings adjustments and an MJPEG server with a + live camera preview stream, and this also launches stop-flow acquisition routines in response to + commands received over the MQTT API. + """ # TODO(ethanjli): instead of passing in a stop_event, just expose a `close()` method! This # way, we don't give any process the ability to stop all other processes watching the same # stop_event! - def __init__(self, stop_event): - """Initialize the Imager class + def __init__(self, stop_event: threading.Event) -> None: + """Initialize the worker's internal state, but don't start anything yet. Args: - stop_event (multiprocessing.Event): shutdown event - iso (int, optional): ISO sensitivity. Defaults to 100. - exposure_time (int, optional): Shutter speed of the camera, default to 10000. + stop_event: shutdown signal """ super().__init__(name="imager") @@ -53,7 +56,12 @@ def __init__(self, stop_event): @loguru.logger.catch def run(self) -> None: - """Run the main event loop.""" + """Run the main event loop. + + It will quit when the `stop_event` (passed into the constructor) event is set. If a camera + couldn't be started (e.g. because the camera is missing), it will clean up and then wait + until the `stop_event` event is set before quitting. + """ loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -66,18 +74,17 @@ def run(self) -> None: loguru.logger.info("Starting the camera...") self._camera = camera.Worker() self._camera.start() - self._camera.camera_checked.wait() if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") # TODO(ethanjli): officially add this error status to the MQTT API! self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') loguru.logger.success("Preemptively preparing to shut down since there's no camera...") self._cleanup() - # Note(ethanjli): we just wait and do nothing until we receive the shutdown signal, - # because if we return early then the hardware controller will either shut down + # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown + # signal, because if we return early then the hardware controller will either shut down # everything (current behavior) or try to restart the imager (planned behavior - # according to a TODO left by @gromain). If there's a third option to quit without - # being restarted or causing everything else to quit, then we could just clean up + # according to a TODO left by @gromain). Once we make it possible to quit without + # being restarted or causing everything else to quit, then we should just clean up # and return early here. loguru.logger.success("Waiting for a shutdown signal...") self._stop_event_loop.wait() @@ -366,7 +373,9 @@ def stop(self) -> None: # TODO(ethanjli): rearchitect the hardware controller so that the imager can directly call pump # methods (by running all modules in the same process), so that we can just delete this entire class -# and simplify function calls between the imager and the pump! +# and simplify function calls between the imager and the pump! This will require launching the +# pump and the imager as threads in the same process, rather than launching them as separate +# processes. class _PumpClient: """Thread-safe RPC stub for remotely controlling the pump over MQTT.""" diff --git a/control/adafruithat/planktoscope/imagernew/stopflow.py b/control/adafruithat/planktoscope/imagernew/stopflow.py index 80923834..a7a9e97d 100644 --- a/control/adafruithat/planktoscope/imagernew/stopflow.py +++ b/control/adafruithat/planktoscope/imagernew/stopflow.py @@ -1,7 +1,9 @@ -"""strategies provides ways of coordinating the hardware for image acquisition. +"""stopflow provides the domain logic for stop-flow imaging. -This basically contains the domain logic of the imager module. No actual I/O (hardware control, -filesystem interaction, or MQTT messaging) should be added to this module. +No actual I/O (hardware control, filesystem interaction, or MQTT messaging) should be added to this +module; instead, I/O drivers should be defined elsewhere and passed into functions/methods in this +module. This will allow us to write automated tests for the domain logic for stop-flow imaging +which we can run without a PlanktoScope. """ import datetime as dt @@ -132,7 +134,8 @@ def run_step(self) -> typing.Optional[tuple[int, str]]: ) self._camera.capture_file(capture_path) os.sync() - # TODO: update the integrity file + # Note(ethanjli): updating the integrity file is the responsibility of the code which + # calls this `run_step()` method. acquired_index = self._progress self._progress += 1 @@ -150,5 +153,5 @@ def stop(self) -> None: @property def interrupted(self) -> bool: - """Return whether the routine was manually interrupted.""" + """Check whether the routine was manually interrupted.""" return self._interrupted.is_set() diff --git a/control/planktoscopehat/planktoscope/camera/hardware.py b/control/planktoscopehat/planktoscope/camera/hardware.py index 60e092a5..9f27bd13 100644 --- a/control/planktoscopehat/planktoscope/camera/hardware.py +++ b/control/planktoscopehat/planktoscope/camera/hardware.py @@ -141,7 +141,10 @@ def has_values(self) -> bool: return any(value is not None for value in self._asdict().values()) def overlay(self, updates: "SettingsValues") -> "SettingsValues": - """Create a new instance where provided non-`None` values overwrite existing values.""" + """Create a new instance where provided non-`None` values overwrite existing values. + + This is intended to make it easy to combine existing settings with new settings. + """ # pylint complains that this namedtuple has no `_asdict()` method even though mypy is fine; # this is a false positive: # pylint: disable-next=no-member @@ -219,11 +222,11 @@ def __init__( stream_config: StreamConfig = StreamConfig(preview_size=(640, 480), buffer_count=3), initial_settings: SettingsValues = SettingsValues(), ) -> None: - """Set up state needed to initialize the camera, but don't actually start the camera. + """Set up state needed to initialize the camera, but don't actually start the camera yet. Args: preview_output: an image stream which this `PiCamera` instance will write camera preview - images to. + images to once the camera is started. stream_config: configuration of camera output streams. initial_settings: any camera settings to initialize the camera with. """ @@ -293,7 +296,7 @@ def stream_config(self) -> StreamConfig: @property def settings(self) -> SettingsValues: - """An immutable copy of the camera settings values.""" + """Adjustable camera settings values.""" with self._settings_lock.gen_rlock(): return self._cached_settings @@ -301,7 +304,8 @@ def settings(self) -> SettingsValues: def settings(self, updates: SettingsValues) -> None: """Update adjustable camera settings from all provided non-`None` values. - Fields provided with `None` values are ignored. + Fields provided with `None` values are ignored. If any of the provided non-`None` values is + invalid (e.g. out-of-range), none of the settinsg will be changed. Raises: RuntimeError: the method was called before the camera was started, or after it was @@ -395,7 +399,9 @@ def capture_file(self, path: str) -> None: def close(self) -> None: """Stop and close the camera. - The camera can be restarted after being closed by `start()` method again. + No more frames will be written to the preview output stream. + + The camera can be restarted after being closed by calling the `start()` method again. """ if self._camera is None: return @@ -427,7 +433,7 @@ class PreviewStream(io.BufferedIOBase): the buffer once they have access to it. This stream can be used by anything which requires a [io.BufferedIOBase], assuming it never - splits buffers. + splits any buffer across multiple calls of the `write()` method. """ def __init__(self) -> None: diff --git a/control/planktoscopehat/planktoscope/camera/mqtt.py b/control/planktoscopehat/planktoscope/camera/mqtt.py index 64162e21..86ea07fb 100644 --- a/control/planktoscopehat/planktoscope/camera/mqtt.py +++ b/control/planktoscopehat/planktoscope/camera/mqtt.py @@ -1,4 +1,4 @@ -"""mqtt provides an MJPEG+MQTT API for camera interaction.""" +"""mqtt provides an MJPEG+MQTT API for camera supervision and interaction.""" import json import os @@ -15,27 +15,21 @@ class Worker(threading.Thread): - """Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings. - - Attribs: - camera: the underlying camera exposed by this MQTT API. Don't access it until the - camera_checked event has been set! - camera_checked: when this event is set, either the camera is connected or has been - determined to be missing. - """ + """Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings.""" def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: """Initialize the backend. Args: - mqtt_client: an MQTT client. - exposure_time: the default value for initializing the camera's exposure time. + mjpeg_server_address: the host and port for the MJPEG camera preview server to listen + on. Raises: ValueError: one or more values in the hardware config file are of the wrong type. """ super().__init__(name="camera") + # Settings settings = hardware.SettingsValues( auto_exposure=False, exposure_time=125, # the default (minimum) exposure time in the PlanktoScope GUI @@ -49,9 +43,8 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: blue=1.35, ), sharpness=0, # disable the default "normal" sharpening level - jpeg_quality=95, # trade off between image file size and quality + jpeg_quality=95, # maximize image quality ) - # Settings if os.path.exists("/home/pi/PlanktoScope/hardware.json"): # load hardware.json with open("/home/pi/PlanktoScope/hardware.json", "r", encoding="utf-8") as config_file: @@ -69,26 +62,26 @@ def __init__(self, mjpeg_server_address: tuple[str, int] = ("", 8000)) -> None: # I/O self._preview_stream: hardware.PreviewStream = hardware.PreviewStream() self._mjpeg_server_address = mjpeg_server_address - self.camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera( + self._camera: typing.Optional[hardware.PiCamera] = hardware.PiCamera( self._preview_stream, initial_settings=settings ) - self.camera_checked = threading.Event() + self._camera_checked = threading.Event() self._stop_event_loop = threading.Event() @loguru.logger.catch def run(self) -> None: """Start the camera and run the main event loop.""" - assert self.camera is not None + assert self._camera is not None loguru.logger.info("Initializing the camera with default settings...") try: - self.camera.open() + self._camera.open() except RuntimeError: loguru.logger.exception("Couldn't open the camera - maybe it's disconnected?") - self.camera = None - self.camera_checked.set() + self._camera = None + self._camera_checked.set() return - self.camera_checked.set() + self._camera_checked.set() loguru.logger.info("Starting the MJPEG streaming server...") streaming_server = mjpeg.StreamingServer(self._preview_stream, self._mjpeg_server_address) @@ -103,7 +96,7 @@ def run(self) -> None: # TODO(ethanjli): allow an MQTT client to trigger this broadcast with an MQTT command. This # requires modifying the MQTT API (by adding a new route), and we'll want to make the # Node-RED dashboard query that route at startup, so we'll do this later. - mqtt.client.publish("status/imager", json.dumps({"camera_name": self.camera.camera_name})) + mqtt.client.publish("status/imager", json.dumps({"camera_name": self._camera.camera_name})) try: while not self._stop_event_loop.is_set(): @@ -126,8 +119,8 @@ def run(self) -> None: streaming_thread.join() loguru.logger.info("Stopping the camera...") - self.camera.close() - self.camera = None + self._camera.close() + self._camera = None loguru.logger.success("Done shutting down!") @@ -137,7 +130,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st Returns a status update to broadcast. """ - assert self.camera is not None + assert self._camera is not None if message["topic"] != "imager/image" or message["payload"].get("action", "") != "settings": return None @@ -149,7 +142,7 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st settings = message["payload"]["settings"] try: converted_settings = _convert_settings( - settings, self.camera.settings.white_balance_gains + settings, self._camera.settings.white_balance_gains ) _validate_settings(converted_settings) except (TypeError, ValueError) as e: @@ -158,10 +151,24 @@ def _receive_message(self, message: dict[str, typing.Any]) -> typing.Optional[st ) return json.dumps({"status": f"Error: {str(e)}"}) - self.camera.settings = converted_settings + self._camera.settings = converted_settings loguru.logger.success("Updated camera settings!") return '{"status":"Camera settings updated"}' + @property + def camera(self) -> typing.Optional[hardware.PiCamera]: + """Return the camera wrapper managed by this worker. + + Blocks until this worker has attempted to start the camera (so this property will wait until + this worker has been started as a thread). + + Returns: + The camera wrapper if it started successfully, or None if the camera wrapper could not + be started (e.g. because the camera does not exists). + """ + self._camera_checked.wait() + return self._camera + def shutdown(self): """Stop processing new MQTT messages and gracefully stop working.""" self._stop_event_loop.set() @@ -175,8 +182,11 @@ def _convert_settings( Args: command_settings: the settings to convert. - default_white_balance_gains: white-balance gains to substitute for missing values if exactly - one gain is provided. + default_white_balance_gains: white-balance gains to substitute for missing values, if + exactly one gain was provided in `command_settings`. + + Returns: + All settings extracted from the MQTT command. Raises: ValueError: at least one of the MQTT command settings is invalid. @@ -212,6 +222,9 @@ def _convert_image_gain_settings( Args: command_settings: the settings to convert. + Returns: + Any image gain-related settings extracted from the MQTT command, but no other settings. + Raises: ValueError: at least one of the MQTT command settings is invalid. """ @@ -254,6 +267,10 @@ def _convert_white_balance_gain_settings( default_white_balance_gains: white-balance gains to substitute for missing values if exactly one gain is provided. + Returns: + Any white balance gain-related settings extracted from the MQTT command, but no other + settings. + Raises: ValueError: at least one of the MQTT command settings is invalid. """ diff --git a/control/planktoscopehat/planktoscope/imagernew/mqtt.py b/control/planktoscopehat/planktoscope/imagernew/mqtt.py index 9c52c39e..cf4ff9f6 100644 --- a/control/planktoscopehat/planktoscope/imagernew/mqtt.py +++ b/control/planktoscopehat/planktoscope/imagernew/mqtt.py @@ -19,18 +19,21 @@ # TODO(ethanjli): convert this from a process into a thread class Worker(multiprocessing.Process): - """An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules.""" + """An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules. + + This launches the camera with an MQTT API for settings adjustments and an MJPEG server with a + live camera preview stream, and this also launches stop-flow acquisition routines in response to + commands received over the MQTT API. + """ # TODO(ethanjli): instead of passing in a stop_event, just expose a `close()` method! This # way, we don't give any process the ability to stop all other processes watching the same # stop_event! - def __init__(self, stop_event): - """Initialize the Imager class + def __init__(self, stop_event: threading.Event) -> None: + """Initialize the worker's internal state, but don't start anything yet. Args: - stop_event (multiprocessing.Event): shutdown event - iso (int, optional): ISO sensitivity. Defaults to 100. - exposure_time (int, optional): Shutter speed of the camera, default to 10000. + stop_event: shutdown signal """ super().__init__(name="imager") @@ -53,7 +56,12 @@ def __init__(self, stop_event): @loguru.logger.catch def run(self) -> None: - """Run the main event loop.""" + """Run the main event loop. + + It will quit when the `stop_event` (passed into the constructor) event is set. If a camera + couldn't be started (e.g. because the camera is missing), it will clean up and then wait + until the `stop_event` event is set before quitting. + """ loguru.logger.info(f"The imager control thread has been started in process {os.getpid()}") self._mqtt = mqtt.MQTT_Client(topic="imager/#", name="imager_client") self._mqtt.client.publish("status/imager", '{"status":"Starting up"}') @@ -66,18 +74,17 @@ def run(self) -> None: loguru.logger.info("Starting the camera...") self._camera = camera.Worker() self._camera.start() - self._camera.camera_checked.wait() if self._camera.camera is None: loguru.logger.error("Missing camera - maybe it's disconnected or it never started?") # TODO(ethanjli): officially add this error status to the MQTT API! self._mqtt.client.publish("status/imager", '{"status": "Error: missing camera"}') loguru.logger.success("Preemptively preparing to shut down since there's no camera...") self._cleanup() - # Note(ethanjli): we just wait and do nothing until we receive the shutdown signal, - # because if we return early then the hardware controller will either shut down + # TODO(ethanjli): currently we just wait and do nothing until we receive the shutdown + # signal, because if we return early then the hardware controller will either shut down # everything (current behavior) or try to restart the imager (planned behavior - # according to a TODO left by @gromain). If there's a third option to quit without - # being restarted or causing everything else to quit, then we could just clean up + # according to a TODO left by @gromain). Once we make it possible to quit without + # being restarted or causing everything else to quit, then we should just clean up # and return early here. loguru.logger.success("Waiting for a shutdown signal...") self._stop_event_loop.wait() @@ -366,7 +373,9 @@ def stop(self) -> None: # TODO(ethanjli): rearchitect the hardware controller so that the imager can directly call pump # methods (by running all modules in the same process), so that we can just delete this entire class -# and simplify function calls between the imager and the pump! +# and simplify function calls between the imager and the pump! This will require launching the +# pump and the imager as threads in the same process, rather than launching them as separate +# processes. class _PumpClient: """Thread-safe RPC stub for remotely controlling the pump over MQTT.""" diff --git a/control/planktoscopehat/planktoscope/imagernew/stopflow.py b/control/planktoscopehat/planktoscope/imagernew/stopflow.py index 80923834..a7a9e97d 100644 --- a/control/planktoscopehat/planktoscope/imagernew/stopflow.py +++ b/control/planktoscopehat/planktoscope/imagernew/stopflow.py @@ -1,7 +1,9 @@ -"""strategies provides ways of coordinating the hardware for image acquisition. +"""stopflow provides the domain logic for stop-flow imaging. -This basically contains the domain logic of the imager module. No actual I/O (hardware control, -filesystem interaction, or MQTT messaging) should be added to this module. +No actual I/O (hardware control, filesystem interaction, or MQTT messaging) should be added to this +module; instead, I/O drivers should be defined elsewhere and passed into functions/methods in this +module. This will allow us to write automated tests for the domain logic for stop-flow imaging +which we can run without a PlanktoScope. """ import datetime as dt @@ -132,7 +134,8 @@ def run_step(self) -> typing.Optional[tuple[int, str]]: ) self._camera.capture_file(capture_path) os.sync() - # TODO: update the integrity file + # Note(ethanjli): updating the integrity file is the responsibility of the code which + # calls this `run_step()` method. acquired_index = self._progress self._progress += 1 @@ -150,5 +153,5 @@ def stop(self) -> None: @property def interrupted(self) -> bool: - """Return whether the routine was manually interrupted.""" + """Check whether the routine was manually interrupted.""" return self._interrupted.is_set()