Skip to content

Commit

Permalink
Update BaseMakeCalibrations to trigger cp_verify asynchronously
Browse files Browse the repository at this point in the history
- Refactor run_block to handle calibration and verification concurrently
  using asyncio
- Added helper methods (process_images, process_verification,
  process_calibration) to reduce code duplication
- Manage background tasks with a list, including timeout handling and
  cancellation if not completed in time
- Add configuration option `background_task_timeout` to control
  background task timeouts
- Added unit test for BaseMakeCalibrations
  • Loading branch information
iglesu committed Nov 7, 2024
1 parent a621637 commit e679860
Show file tree
Hide file tree
Showing 2 changed files with 512 additions and 114 deletions.
260 changes: 146 additions & 114 deletions python/lsst/ts/externalscripts/base_make_calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def __init__(self, index, descr):
"PTC",
]

self.background_tasks = []
self.certify_calib_failed = False

# Pipetask methods to get parameters for calibrations generation
self.pipetask_parameters = dict(
BIAS=self.get_pipetask_parameters_bias,
Expand Down Expand Up @@ -347,6 +350,10 @@ def get_schema(cls):
type: integer
default: 120
descriptor: Timeout value, in seconds, for OODS.
background_task_timeout:
type: integer
default: 30
descriptor: Timeout value, in seconds, for background tasks
note:
description: A descriptive note about the images being taken.
type: string
Expand Down Expand Up @@ -1403,6 +1410,7 @@ async def certify_calib(self, image_type, job_id_calib):
process = await asyncio.create_subprocess_shell(cmd)
stdout, stderr = await process.communicate()
self.log.debug(f"Process returned: {process.returncode}")

if process.returncode != 0:
self.log.debug(stdout)
self.log.error(stderr)
Expand Down Expand Up @@ -1547,6 +1555,9 @@ async def run_block(self):
# Basic sets of calibrations first : biases, darks, and flats.
# After the loop is done, do defects and PTC.
for im_type in image_types:
if self.certify_calib_failed:
break

# 1. Take images with the instrument, only for "BIAS,
# "DARK", or "FLAT".
if im_type == "BIAS":
Expand All @@ -1557,8 +1568,8 @@ async def run_block(self):
await self.checkpoint(f"Taking {self.config.n_flat} flats.")

# TODO: Before taking flats with LATISS (and also
# with LSSTComCam), check that the telescope is in
# position to do so. See DM-31496, DM-31497.
# with LSSTComCam), check that the telescope is in
# position to do so. See DM-31496, DM-31497.
exposure_ids_list = await self.take_images(im_type)

# Discard the first N exposures taken (DM-36422)
Expand All @@ -1570,100 +1581,9 @@ async def run_block(self):
f"Images taken: {self.exposure_ids[im_type]}; type: {im_type}"
)

if self.config.generate_calibrations:
# 2. Call the calibration pipetask via the OCPS
# to make a combined
self.log.info(
"Generating calibration from the images taken "
"as part of this script."
)
response_ocps_calib_pipetask = await self.call_pipetask(im_type)
job_id_calib = response_ocps_calib_pipetask["jobId"]
else:
self.log.info(
f"A combined {im_type} will not be generated from the "
"images taken as part of this script. Any needed input "
"calibrations by the verification pipetasks will be "
"sought in their input calibrations."
)
job_id_calib = None

# 3. Verify the combined calibration (implemented so far for bias,
# dark, and flat), and certify it if the verification
# tests pass and it was generated.
if self.config.do_verify:
try:
if self.config.generate_calibrations:
response_ocps_verify_pipetask = await self.verify_calib(
im_type, job_id_calib
)
# Check that the task running cp_verify
# did not fail.
job_id_verify = response_ocps_verify_pipetask["jobId"]
# Check verification statistics
report_check_verify_stats = await self.check_verification_stats(
im_type, job_id_verify, job_id_calib
)
# Inform the user about the results from
# running cp_verify.
# TODO: If verification failed, issue an
# alarm in the watcher: DM-33898.
await self.analyze_report_check_verify_stats(
im_type,
report_check_verify_stats,
job_id_verify,
job_id_calib,
)
# If the verification tests passed,
# certify the combined calibrations.
if report_check_verify_stats["CERTIFY_CALIB"]:
await self.certify_calib(im_type, job_id_calib)
# If tests did not pass, end the loop, as
# certified calibrations are needed to cons
# construct subsequent calibrations
# (bias->dark->flat).
else:
break
else:
# If combined calibrations are not being generated
# from the individual images just taken, and if
# do_verify=True, the verification task
# will run the tests using calibrations in its
# input collections as reference.
# Note that there is no certification of combined
# calibrations here, because we are not generating
# them.
# job_id_calib should be None
assert job_id_calib is None, "'job_id_calib' is not 'None'."
response_ocps_verify_pipetask = await self.verify_calib(
im_type, job_id_calib
)
job_id_verify = response_ocps_verify_pipetask["jobId"]
# Check verification statistics
report_check_verify_stats = await self.check_verification_stats(
im_type, job_id_verify, job_id_calib
)
# Inform the user about the results from running
# cp_verify.
# TODO: If verification failed, issue an alarm
# in the watcher: DM-33898
await self.analyze_report_check_verify_stats(
im_type,
report_check_verify_stats,
job_id_verify,
job_id_calib,
)
except Exception:
self.log.exception("Error in do_verify. Ignoring...")
# do verify is False
else:
if self.config.generate_calibrations:
self.log.info(
"'do_verify' is set to 'False' and "
"'generate_calibrations' to 'True'. "
f"{im_type} will be automatically certified."
)
await self.certify_calib(im_type, job_id_calib)
# Create a task that processes calibration and verification
task = asyncio.create_task(self.process_images(im_type))
self.background_tasks.append(task)

# After taking the basic images (biases, darks, and flats) do
# defects and PTC if requested.
Expand All @@ -1683,26 +1603,138 @@ async def run_block(self):

if len(calib_types):
for calib_type in calib_types:
try:
# Run the pipetask
response_ocps_calib_pipetask = await self.call_pipetask(calib_type)
job_id_calib = response_ocps_calib_pipetask["jobId"]
# Certify the calibrations in self.config.calib_collection
# The quick gain estimation does not need to be certified.
self.log.info(
f"Verification for {calib_type} is not implemented yet "
f"in this script. {calib_type} will be automatically certified."
)
if calib_type != "GAIN":
await self.certify_calib(calib_type, job_id_calib)
task = asyncio.create_task(self.process_calibration(calib_type))
self.background_tasks.append(task)

await self.checkpoint("Data-taking part completed.")

self.log.info(f"{calib_type} generation job ID: {job_id_calib}")
await self.wait_for_background_tasks()

# Report the estimated gain from each pair of flats
if calib_type in ["GAIN", "PTC"]:
await self.report_gains_from_flat_pairs(job_id_calib)
except Exception:
self.log.exception(f"Error processing {calib_type}. Ignoring...")
async def process_images(self, im_type):

if self.config.generate_calibrations:
# 2. Call the calibration pipetask via the OCPS
# to make a combined
self.log.info(
"Generating calibration from the images taken "
"as part of this script."
)
response_ocps_calib_pipetask = await self.call_pipetask(im_type)
job_id_calib = response_ocps_calib_pipetask["jobId"]
else:
self.log.info(
f"A combined {im_type} will not be generated from the "
"images taken as part of this script. Any needed input "
"calibrations by the verification pipetasks will be "
"sought in their input calibrations."
)
job_id_calib = None

# 3. Verify the combined calibration (implemented so far for bias,
# dark, and flat), and certify it if the verification
# tests pass and it was generated.
if self.config.do_verify:
try:
await self.process_verification(im_type, job_id_calib)
except Exception:
self.log.exception("Error in do_verify. Ignoring...")
# do verify is False
else:
if self.config.generate_calibrations:
self.log.info(
"'do_verify' is set to 'False' and "
"'generate_calibrations' to 'True'. "
f"{im_type} will be automatically certified."
)

await self.certify_calib(im_type, job_id_calib)

async def process_verification(self, im_type, job_id_calib):
try:
self.log.info(f"Starting verification for {im_type}.")

response_ocps_verify_pipetask = await self.verify_calib(
im_type, job_id_calib
)
# Check that the task running cp_verify
# did not fail.
job_id_verify = response_ocps_verify_pipetask["jobId"]
# Check verification statistics
report_check_verify_stats = await self.check_verification_stats(
im_type, job_id_verify, job_id_calib
)
# Inform the user about the results from
# running cp_verify.
# TODO: If verification failed, issue an
# alarm in the watcher: DM-33898.
await self.analyze_report_check_verify_stats(
im_type,
report_check_verify_stats,
job_id_verify,
job_id_calib,
)

if job_id_calib is not None and report_check_verify_stats["CERTIFY_CALIB"]:
await self.certify_calib(im_type, job_id_calib)
# If tests did not pass, end the loop, as
# certified calibrations are needed to cons
# construct subsequent calibrations
# (bias->dark->flat).
else:
self.certify_calib_failed = True

return report_check_verify_stats
# Note: Since we are not generating calibrations, we don't certify
except Exception as e:
self.log.exception(f"Error in processing verification for {im_type}: {e}")

async def process_calibration(self, calib_type):
try:
self.log.info(f"Starting calibration processing for {calib_type}.")
response_ocps_calib_pipetask = await self.call_pipetask(calib_type)
job_id_calib = response_ocps_calib_pipetask["jobId"]
# Certify the calibrations in self.config.calib_collection
# The quick gain estimation does not need to be certified.
self.log.info(
f"Verification for {calib_type} is not implemented yet "
f"in this script. {calib_type} will be automatically certified."
)
if calib_type != "GAIN":
await self.certify_calib(calib_type, job_id_calib)

self.log.info(f"{calib_type} generation job ID: {job_id_calib}")

# Report the estimated gain from each pair of flats
if calib_type in ["GAIN", "PTC"]:
await self.report_gains_from_flat_pairs(job_id_calib)
except Exception as e:
self.log.exception(f"Error processing {calib_type}: {e}")

async def wait_for_background_tasks(self):
self.log.info("Waiting for background tasks to complete.")
try:
# Note that when aysncio.wait_for times out, it cancels the task
# it's waiting on. If the task being waited on is an asyncio.gather
# instance, it propagates teh cancellation to all the tasks it has
# gathered.
await asyncio.wait_for(
asyncio.gather(*self.background_tasks, return_exceptions=True),
timeout=self.config.background_task_timeout
* len(self.background_tasks),
)
self.log.info("All background tasks have completed.")
except asyncio.TimeoutError:
self.log.warning("Background tasks did not complete before timeout.")
for task in self.background_tasks:
# all tasks should be done/cancelled at this point
if not task.done():
# this code should never be reached.
self.log.warning(f"Cancelling task {task}")
task.cancel()
except Exception as e:
self.log.exception(f"Error in background tasks: {e}")
finally:
self.background_tasks = []

@staticmethod
def get_exposure_id(obsid):
Expand Down
Loading

0 comments on commit e679860

Please sign in to comment.