diff --git a/doc/news/DM-45406.feature.rst b/doc/news/DM-45406.feature.rst new file mode 100644 index 000000000..2f5d9a480 --- /dev/null +++ b/doc/news/DM-45406.feature.rst @@ -0,0 +1 @@ +Add new TakePTCFlatsComcam script to take PTC flats with ComCam while scanning electrometer. \ No newline at end of file diff --git a/python/lsst/ts/externalscripts/base_take_ptc_flats.py b/python/lsst/ts/externalscripts/base_take_ptc_flats.py new file mode 100644 index 000000000..e230c2d41 --- /dev/null +++ b/python/lsst/ts/externalscripts/base_take_ptc_flats.py @@ -0,0 +1,443 @@ +# This file is part of ts_externalscripts +# +# Developed for the LSST Telescope and Site Systems. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License + +__all__ = ["BaseTakePTCFlats"] + +import abc +import asyncio +import types + +import yaml +from lsst.ts import salobj +from lsst.ts.standardscripts.base_block_script import BaseBlockScript +from lsst.ts.xml.enums.Electrometer import UnitToRead + + +class BaseTakePTCFlats(BaseBlockScript, metaclass=abc.ABCMeta): + """Base class for taking PTC flats interleaved with darks.""" + + def __init__( + self, index, descr="Base script for taking PTC flats and darks." + ) -> None: + super().__init__(index, descr) + + self.electrometer = None + self.config = None + + self.instrument_setup_time = 0.0 + self.extra_scan_time = 1.0 + + self.long_timeout = 30 + + @property + @abc.abstractmethod + def camera(self): + raise NotImplementedError() + + async def configure_camera(self): + """Abstract method to configure the camera, to be implemented + in subclasses. + """ + raise NotImplementedError() + + async def configure_electrometer(self, index): + """Configure the Electrometer remote object.""" + if self.electrometer is None: + self.log.debug(f"Configuring remote for Electrometer index: {index}") + self.electrometer = salobj.Remote( + self.domain, name="Electrometer", index=index + ) + await self.electrometer.start_task + else: + self.log.debug("Electrometer already configured. Ignoring.") + + @classmethod + def get_schema(cls): + schema_yaml = """ + $schema: http://json-schema.org/draft-07/schema# + $id: https://github.com/lsst-ts/ts_externalscripts/base_take_ptc_flats.yaml + title: BaseTakePTCFlats + description: Configuration schema for BaseTakePTCFlats. + type: object + properties: + flats_exp_times: + description: > + A list of exposure times for the flats (sec). Each provided + time will be used to take two exposures sequentially. + type: array + items: + type: number + exclusiveMinimum: 0 + minItems: 2 + interleave_darks: + description: Darks interleave settings. + type: object + properties: + dark_exp_times: + description: > + Exposure time for each dark image (sec) + or alist of exposure times. + anyOf: + - type: number + minimum: 0 + - type: array + items: + type: number + minimum: 0 + minItems: 1 + default: 30 + n_darks: + description: > + Number of dark images to interleave between flat pairs. + type: integer + minimum: 1 + default: 2 + electrometer_scan: + description: Electrometer scan settings. + type: object + properties: + index: + description: Electrometer index to configure. + type: integer + minimum: 1 + mode: + description: > + Electrometer measurement mode as a string. Valid options + are "CURRENT" and "CHARGE". + type: string + enum: ["CURRENT", "CHARGE"] + default: "CURRENT" + range: + description: > + Electrometer measurement range. -1 for autorange. + type: number + default: -1 + integration_time: + description: Electrometer integration time. + type: number + exclusiveMinimum: 0 + default: 0.1 + ignore: + description: >- + CSCs from the camera group to ignore in status check. + Name must match those in self.group.components. + type: array + items: + type: string + + additionalProperties: false + """ + schema_dict = yaml.safe_load(schema_yaml) + + base_schema_dict = super().get_schema() + + for properties in base_schema_dict["properties"]: + schema_dict["properties"][properties] = base_schema_dict["properties"][ + properties + ] + + return schema_dict + + async def configure(self, config: types.SimpleNamespace): + """Configure script components including camera and electrometer. + + Parameters + ---------- + config : `types.SimpleNamespace` + Script configuration, as defined by `schema`. + """ + + await self.configure_camera() + + if hasattr(config, "electrometer_scan"): + await self.configure_electrometer(config.electrometer_scan["index"]) + + if hasattr(config, "ignore"): + for comp in config.ignore: + if comp in self.camera.components_attr: + self.log.debug(f"Ignoring Camera component {comp}.") + setattr(self.camera.check, comp, False) + else: + self.log.warning( + f"Component {comp} not in CSC Group. " + f"Must be one of {self.camera.components_attr}. " + f"Ignoring." + ) + + # Handle interleave darks settings + if hasattr(config, "interleave_darks"): + + if isinstance(config.interleave_darks["dark_exp_times"], list): + self.log.warning( + "'n_darks' is ignored because 'dark_exp_times' is an array." + ) + config.interleave_darks["n_darks"] = len( + config.interleave_darks["dark_exp_times"] + ) + + if not isinstance(config.interleave_darks["dark_exp_times"], list): + config.interleave_darks["dark_exp_times"] = [ + config.interleave_darks["dark_exp_times"] + ] * (config.interleave_darks["n_darks"]) + else: + config.interleave_darks["n_darks"] = len( + config.interleave_darks["dark_exp_times"] + ) + + self.config = config + + await super().configure(config) + + @abc.abstractmethod + def get_instrument_name(self): + """Abstract method to be defined in subclasses to provide the + instrument name. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_instrument_configuration(self) -> dict: + """Abstract method to get the instrument configuration. + + Returns + ------- + dict + Dictionary with instrument configuration. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_instrument_filter(self) -> str: + """Abstract method to get the instrument filter configuration. + + Returns + ------- + str + Instrument filter configuration. + """ + raise NotImplementedError() + + async def setup_electrometer( + self, mode: str, range: float, integration_time: float + ) -> None: + """Setup the Electrometer with specified mode, range, + and integration time. + + Parameters + ---------- + mode : `str` + Electrometer measurement mode. + range : `float` + Electrometer measurement range. -1 for autorange. + integration_time : `float` + Electrometer integration time. + """ + assert self.electrometer is not None, "Electrometer is not configured." + + electrometer_mode = getattr(UnitToRead, mode).value + + await self.electrometer.cmd_setMode.set_start( + mode=electrometer_mode, + timeout=self.long_timeout, + ) + await self.electrometer.cmd_setRange.set_start( + setRange=range, + timeout=self.long_timeout, + ) + await self.electrometer.cmd_setIntegrationTime.set_start( + intTime=integration_time, + timeout=self.long_timeout, + ) + await self.electrometer.cmd_performZeroCalib.start(timeout=self.long_timeout) + await self.electrometer.cmd_setDigitalFilter.set_start( + activateFilter=False, + activateAvgFilter=False, + activateMedFilter=False, + timeout=self.long_timeout, + ) + + def set_metadata(self, metadata: salobj.BaseMsgType) -> None: + """Set script metadata, including estimated duration.""" + + flats_exp_times = self.config.flats_exp_times + n_flats = len(flats_exp_times) + n_flat_pairs = n_flats // 2 # Each flat exptime is provided in pairs + + # Initialize total_dark_exptime and total_flat_exptime + total_dark_exptime = 0 + total_flat_exptime = sum(flats_exp_times) + + # Include electrometer scan overhead if configured + if hasattr(self.config, "electrometer_scan"): + total_flat_exptime += n_flat_pairs # 1 second overhead per pair + + # Calculate dark exposure time if interleaving darks + if hasattr(self.config, "interleave_darks"): + dark_exp_times = self.config.interleave_darks["dark_exp_times"] + n_darks = ( + n_flat_pairs * len(dark_exp_times) * 2 + ) # Two sets of darks per pair of flats + total_dark_exptime = sum(dark_exp_times) * n_flat_pairs * 2 + else: + n_darks = 0 # No darks if not interleaving + + # Setup time for the camera (readout and shutter time) + setup_time_per_image = self.camera.read_out_time + self.camera.shutter_time + + # Total duration calculation + total_duration = ( + self.instrument_setup_time # Initial setup time for the instrument + + total_flat_exptime # Time for taking all flats + + total_dark_exptime # Time for taking all darks + + setup_time_per_image * (n_flats + n_darks) # Setup time p/image + ) + + metadata.duration = total_duration + metadata.instrument = self.get_instrument_name() + metadata.filter = self.get_instrument_filter() + + async def take_electrometer_scan(self, exposure_time: float | None) -> list[str]: + """Perform an electrometer scan for the specified duration. + + Parameters + ---------- + exposure_time : `float` | None + Exposure time for the electrometer scan (seconds). + + Returns + ------- + electrometer_exposures : `list`[`str`] + List of large file URLs indicating where the electrometer + data is stored. + """ + self.electrometer.evt_largeFileObjectAvailable.flush() + + electrometer_exposures = [] + + if exposure_time is not None: + try: + await self.electrometer.cmd_startScanDt.set_start( + scanDuration=exposure_time, + timeout=exposure_time + self.long_timeout, + ) + except salobj.AckTimeoutError: + self.log.exception( + "Timed out waiting for the command acknowledgment. Continuing." + ) + + # Ensure a new large file object (LFO) was created + try: + lfo = await self.electrometer.evt_largeFileObjectAvailable.next( + timeout=exposure_time + self.long_timeout, flush=False + ) + electrometer_exposures.append(lfo.url) + + # Log the name or URL of the electrometer file + self.log.info(f"Electrometer scan file created: {lfo.url}") + + except asyncio.TimeoutError: + raise RuntimeError("Electrometer is not configured.") + + return electrometer_exposures + + async def take_ptc_flats(self): + if hasattr(self.config, "electrometer_scan"): + self.log.info( + f"Setting up electrometer with mode: " + f"{self.config.electrometer_scan['mode']}, range: " + f"{self.config.electrometer_scan['range']} and " + f"integration_time: " + f"{self.config.electrometer_scan['integration_time']}." + ) + await self.setup_electrometer( + mode=self.config.electrometer_scan["mode"], + range=self.config.electrometer_scan["range"], + integration_time=self.config.electrometer_scan["integration_time"], + ) + + for i, exp_time in enumerate(self.config.flats_exp_times): + exp_time_pair = [exp_time, exp_time] + + await self.checkpoint( + f"Taking pair {i + 1} of {len(self.config.flats_exp_times)}." + ) + for j, time in enumerate(exp_time_pair): + if hasattr(self.config, "electrometer_scan"): + self.log.info( + f"Taking flat {j+1}/2 with exposure time: {time} " + f"seconds and scanning electrometer for " + f"{time + self.extra_scan_time} seconds." + ) + + electrometer_task = self.take_electrometer_scan( + time + self.extra_scan_time + ) + + flat_task = self.camera.take_flats( + exptime=time, + n=1, + group_id=self.group_id, + program=self.program, + reason=self.reason, + **self.get_instrument_configuration(), + ) + + await asyncio.gather(electrometer_task, flat_task) + + else: + self.log.info( + f"Taking flat {j+1}/2 with exposure time: {time} seconds." + ) + await self.camera.take_flats( + exptime=time, + n=1, + group_id=self.group_id, + program=self.program, + reason=self.reason, + **self.get_instrument_configuration(), + ) + + if hasattr(self.config, "interleave_darks"): + for k, dark_exp_time in enumerate( + self.config.interleave_darks["dark_exp_times"] + ): + self.log.info( + f"Taking dark {k+1}/{len(self.config.interleave_darks['dark_exp_times'])} " + f"for pair {i + 1} of {len(self.config.flats_exp_times)}." + ) + await self.camera.take_darks( + exptime=dark_exp_time, + n=1, + group_id=self.group_id, + program=self.program, + reason=self.reason, + ) + + async def assert_feasibility(self) -> None: + """Verify that camera is in a feasible state to + execute the script. + """ + await self.camera.assert_all_enabled() + + async def run_block(self): + """Run the block of tasks to take PTC flats sequence.""" + + await self.assert_feasibility() + await self.take_ptc_flats() diff --git a/python/lsst/ts/externalscripts/data/scripts/maintel/take_ptc_flats_comcam.py b/python/lsst/ts/externalscripts/data/scripts/maintel/take_ptc_flats_comcam.py new file mode 100755 index 000000000..36209d824 --- /dev/null +++ b/python/lsst/ts/externalscripts/data/scripts/maintel/take_ptc_flats_comcam.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# This file is part of ts_externalscripts +# +# Developed for the LSST Telescope and Site Systems. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License + +import asyncio + +from lsst.ts.externalscripts.maintel import TakePTCFlatsComCam + +asyncio.run(TakePTCFlatsComCam.amain()) diff --git a/python/lsst/ts/externalscripts/maintel/__init__.py b/python/lsst/ts/externalscripts/maintel/__init__.py index 67c0fd96b..76b5d2948 100644 --- a/python/lsst/ts/externalscripts/maintel/__init__.py +++ b/python/lsst/ts/externalscripts/maintel/__init__.py @@ -21,5 +21,6 @@ from .make_comcam_calibrations import * from .take_comcam_guider_image import * +from .take_ptc_flats_comcam import * from .track_target_sched import * from .warmup_hexapod import * diff --git a/python/lsst/ts/externalscripts/maintel/take_ptc_flats_comcam.py b/python/lsst/ts/externalscripts/maintel/take_ptc_flats_comcam.py new file mode 100644 index 000000000..a74c442fb --- /dev/null +++ b/python/lsst/ts/externalscripts/maintel/take_ptc_flats_comcam.py @@ -0,0 +1,94 @@ +# This file is part of ts_externalscripts +# +# Developed for the LSST Telescope and Site Systems. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License + +__all__ = ["TakePTCFlatsComCam"] + +import yaml +from lsst.ts.observatory.control.maintel.comcam import ComCam, ComCamUsages + +from ..base_take_ptc_flats import BaseTakePTCFlats + + +class TakePTCFlatsComCam(BaseTakePTCFlats): + """Specialized script for taking PTC flats with ComCam.""" + + def __init__(self, index): + super().__init__(index=index, descr="Take PTC flats with ComCam.") + + self.comcam = None + self.instrument_name = "LSSTComCam" + + @property + def camera(self): + return self.comcam + + async def configure_camera(self) -> None: + """Handle creating the camera object and waiting remote to start.""" + if self.comcam is None: + self.log.debug("Creating Camera.") + self.comcam = ComCam( + self.domain, intended_usage=ComCamUsages.TakeImage, log=self.log + ) + await self.comcam.start_task + else: + self.log.debug("Camera already defined, skipping.") + + @classmethod + def get_schema(cls): + schema_yaml = """ + $schema: http://json-schema.org/draft-07/schema# + $id: https://github.com/lsst-ts/ts_externalscripts/take_ptc_flats_comcam.yaml + title: TakePTCFlatsComCam v1 + description: Configuration for TakePTCFlatsComCam. + type: object + properties: + filter: + description: Filter name or ID. + anyOf: + - type: string + - type: integer + minimum: 1 + - type: "null" + default: "r_03" + additionalProperties: false + """ + schema_dict = yaml.safe_load(schema_yaml) + + base_schema_dict = super(TakePTCFlatsComCam, cls).get_schema() + + for prop in base_schema_dict["properties"]: + schema_dict["properties"][prop] = base_schema_dict["properties"][prop] + + return schema_dict + + def get_instrument_name(self) -> str: + """Get instrument name. + + Returns + ------- + instrument_name: `string` + """ + return self.instrument_name + + def get_instrument_configuration(self) -> dict: + return dict(filter=self.config.filter) + + def get_instrument_filter(self) -> str: + return f"{self.config.filter}" diff --git a/tests/maintel/test_take_ptc_flats_comcam.py b/tests/maintel/test_take_ptc_flats_comcam.py new file mode 100644 index 000000000..51762ae63 --- /dev/null +++ b/tests/maintel/test_take_ptc_flats_comcam.py @@ -0,0 +1,107 @@ +import unittest +import unittest.mock as mock + +import pytest +from lsst.ts import externalscripts, salobj, standardscripts +from lsst.ts.externalscripts.maintel.take_ptc_flats_comcam import TakePTCFlatsComCam + + +class TestTakePTCFlatsComCam( + standardscripts.BaseScriptTestCase, unittest.IsolatedAsyncioTestCase +): + async def basic_make_script(self, index): + self.script = TakePTCFlatsComCam(index=index) + self.mock_camera() + self.mock_electrometer() + + return (self.script,) + + def mock_camera(self): + """Mock camera instance and its methods.""" + self.script.comcam = mock.AsyncMock() + self.script.comcam.assert_liveliness = mock.AsyncMock() + self.script.comcam.assert_all_enabled = mock.AsyncMock() + self.script.comcam.take_imgtype = mock.AsyncMock(return_value=[1234]) + + def mock_electrometer(self): + """Mock electrometer instance and its methods.""" + self.script.electrometer = mock.AsyncMock() + self.script.electrometer.cmd_setMode = mock.AsyncMock() + self.script.electrometer.cmd_setRange = mock.AsyncMock() + self.script.electrometer.cmd_setIntegrationTime = mock.AsyncMock() + self.script.electrometer.cmd_performZeroCalib = mock.AsyncMock() + self.script.electrometer.cmd_setDigitalFilter = mock.AsyncMock() + self.script.electrometer.cmd_startScanDt = mock.AsyncMock() + self.script.electrometer.evt_largeFileObjectAvailable = mock.AsyncMock() + + async def test_configure(self): + electrometer_config = { + "index": 201, + "mode": "CURRENT", + "range": -1, + "integration_time": 0.1, + } + interleave_darks_config = {"dark_exp_times": 30, "n_darks": 2} + + config = { + "filter": "r_03", + "flats_exp_times": [7.25, 5.25, 0.75, 12.75], + "electrometer_scan": electrometer_config, + "interleave_darks": interleave_darks_config, + } + + async with self.make_script(): + await self.configure_script(**config) + + assert self.script.config.flats_exp_times == [7.25, 5.25, 0.75, 12.75] + assert self.script.config.electrometer_scan["index"] == 201 + assert self.script.config.electrometer_scan["mode"] == "CURRENT" + assert self.script.config.electrometer_scan["range"] == -1 + assert self.script.config.electrometer_scan["integration_time"] == 0.1 + assert self.script.config.interleave_darks["n_darks"] == 2 + assert self.script.config.interleave_darks["dark_exp_times"] == [30, 30] + + async def test_invalid_electrometer_mode_config(self): + bad_config = { + "filter": "r_03", + "flats_exp_times": [7.25, 7.25, 0.75, 0.75], + "electrometer_scan": { + "index": 201, + "mode": "INVALID_MODE", + "range": -1, + "integration_time": 0.1, + }, + } + + async with self.make_script(): + with pytest.raises(salobj.ExpectedError): + await self.configure_script(**bad_config) + + async def test_take_ptc_flats(self): + config = { + "filter": "r_03", + "flats_exp_times": [7.25, 0.75, 3.5], + "electrometer_scan": { + "index": 201, + "mode": "CURRENT", + "range": -1, + "integration_time": 0.1, + }, + "interleave_darks": {"dark_exp_times": 30, "n_darks": 2}, + } + + async with self.make_script(): + await self.configure_script(**config) + await self.run_script() + + # 4 flats + 8 darks + assert self.script.comcam.take_flats.call_count == 6 + assert self.script.comcam.take_darks.call_count == 12 + + # Check if the electrometer scan was called + assert self.script.electrometer.cmd_startScanDt.set_start.call_count == 6 + + async def test_executable(self): + scripts_dir = externalscripts.get_scripts_dir() + script_path = scripts_dir / "maintel" / "take_ptc_flats_comcam.py" + await self.check_executable(script_path)