diff --git a/python/lsst/pipe/tasks/coaddBase.py b/python/lsst/pipe/tasks/coaddBase.py
index a689812f2..3179e5930 100644
--- a/python/lsst/pipe/tasks/coaddBase.py
+++ b/python/lsst/pipe/tasks/coaddBase.py
@@ -26,6 +26,7 @@
import lsst.pipe.base as pipeBase
import lsst.geom as geom
+from lsst.afw.geom import Polygon
from lsst.meas.algorithms import ScaleVarianceTask
from .selectImages import PsfWcsSelectImagesTask
from .coaddInputRecorder import CoaddInputRecorderTask
@@ -251,3 +252,37 @@ def subBBoxIter(bbox, subregionSize):
"colShift=%s, rowShift=%s" %
(bbox, subregionSize, colShift, rowShift))
yield subBBox
+# Note that this is implemented as a free-floating function to enable reuse in
+# lsst.pipe.tasks.makeWarp and in lsst.drp.tasks.make_psf_matched_warp
+# without creating any relationships between the two classes.
+# This may be converted to a method after makeWarp.py is removed altogether in
+# DM-47916.
+def growValidPolygons(coaddInputs, growBy: int) -> None:
+ """Grow coaddInputs' ccds' ValidPolygons in place.
+ Either modify each ccd's validPolygon in place, or if CoaddInputs
+ does not have a validPolygon, create one from its bbox.
+ Parameters
+ ----------
+ coaddInputs : `lsst.afw.image.coaddInputs`
+ CoaddInputs object containing the ccds to grow the valid polygons of.
+ growBy : `int`
+ The value to grow the valid polygons by.
+ Notes
+ -----
+ Negative values for ``growBy`` can shrink the polygons.
+ """
+ for ccd in coaddInputs.ccds:
+ polyOrig = ccd.getValidPolygon()
+ validPolyBBox = polyOrig.getBBox() if polyOrig else ccd.getBBox()
+ validPolyBBox.grow(growBy)
+ if polyOrig:
+ validPolygon = polyOrig.intersectionSingle(validPolyBBox)
+ else:
+ validPolygon = Polygon(geom.Box2D(validPolyBBox))
+ ccd.validPolygon = validPolygon
diff --git a/python/lsst/pipe/tasks/makeWarp.py b/python/lsst/pipe/tasks/makeWarp.py
index 2177f4d35..f5baa6588 100644
--- a/python/lsst/pipe/tasks/makeWarp.py
+++ b/python/lsst/pipe/tasks/makeWarp.py
@@ -36,8 +36,7 @@
from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig, GaussianPsfFactory
from lsst.skymap import BaseSkyMap
from lsst.utils.timer import timeMethod
-from .coaddBase import CoaddBaseTask, makeSkyInfo, reorderAndPadList
-from .make_psf_matched_warp import growValidPolygons
+from .coaddBase import CoaddBaseTask, growValidPolygons, makeSkyInfo, reorderAndPadList
from .warpAndPsfMatch import WarpAndPsfMatchTask
from collections.abc import Iterable
diff --git a/python/lsst/pipe/tasks/make_direct_warp.py b/python/lsst/pipe/tasks/make_direct_warp.py
index 2ba6d0ddf..2bb06c487 100644
--- a/python/lsst/pipe/tasks/make_direct_warp.py
+++ b/python/lsst/pipe/tasks/make_direct_warp.py
@@ -19,956 +19,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from __future__ import annotations
-from collections.abc import Mapping
-import dataclasses
-from typing import TYPE_CHECKING, Iterable
-import numpy as np
-from lsst.afw.image import ExposureF, Mask
-from lsst.afw.math import BackgroundList, Warper
-from lsst.coadd.utils import copyGoodPixels
-from lsst.daf.butler import DataCoordinate, DeferredDatasetHandle
-from lsst.geom import Box2D
-from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig
-from lsst.meas.algorithms.cloughTocher2DInterpolator import (
- CloughTocher2DInterpolateTask,
-from lsst.meas.base import DetectorVisitIdGeneratorConfig
-from lsst.pex.config import (
- ConfigField,
- ConfigurableField,
- Field,
- RangeField,
-from lsst.pipe.base import (
- NoWorkFound,
- PipelineTask,
- PipelineTaskConfig,
- PipelineTaskConnections,
- Struct,
-from lsst.pipe.base.connectionTypes import Input, Output
-from lsst.pipe.tasks.coaddBase import makeSkyInfo
-from lsst.pipe.tasks.selectImages import PsfWcsSelectImagesTask
-from lsst.skymap import BaseSkyMap
-from .coaddInputRecorder import CoaddInputRecorderTask
- from lsst.afw.image import MaskedImage
- from lsst.afw.table import ExposureCatalog
- from lsst.pipe.base import InMemoryDatasetHandle
-__all__ = (
- "MakeDirectWarpConfig",
- "MakeDirectWarpTask",
-class WarpDetectorInputs:
- """Inputs passed to `MakeDirectWarpTask.run` for a single detector.
- """
- exposure_or_handle: ExposureF | DeferredDatasetHandle | InMemoryDatasetHandle
- """Detector image with initial calibration objects, or a deferred-load
- handle for one.
- """
- data_id: DataCoordinate
- """Butler data ID for this detector."""
- background_revert: BackgroundList | None = None
- """Background model to restore in (i.e. add to) the image."""
- background_apply: BackgroundList | None = None
- """Background model to apply to (i.e. subtract from) the image."""
- @property
- def exposure(self) -> ExposureF:
- """Get the exposure object, loading it if necessary."""
- if not isinstance(self.exposure_or_handle, ExposureF):
- self.exposure_or_handle = self.exposure_or_handle.get()
- return self.exposure_or_handle
- def apply_background(self) -> None:
- """Apply (subtract) the `background_apply` to the exposure in-place.
- Raises
- ------
- RuntimeError
- Raised if `background_apply` is None.
- """
- if self.background_apply is None:
- raise RuntimeError("No background to apply")
- if self.background_apply:
- # Subtract only if `background_apply` is not a trivial background.
- self.exposure.maskedImage -= self.background_apply.getImage()
- def revert_background(self) -> None:
- """Revert (add) the `background_revert` from the exposure in-place.
- Raises
- ------
- RuntimeError
- Raised if `background_revert` is None.
- """
- if self.background_revert is None:
- raise RuntimeError("No background to revert")
- if self.background_revert:
- # Add only if `background_revert` is not a trivial background.
- self.exposure.maskedImage += self.background_revert.getImage()
-class MakeDirectWarpConnections(
- PipelineTaskConnections,
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
- defaultTemplates={
- "coaddName": "deep",
- },
- """Connections for MakeWarpTask"""
- calexp_list = Input(
- doc="Input exposures to be interpolated and resampled onto a SkyMap "
- "projection/patch.",
- name="calexp",
- storageClass="ExposureF",
- dimensions=("instrument", "visit", "detector"),
- multiple=True,
- deferLoad=True,
- )
- background_revert_list = Input(
- doc="Background to be reverted (i.e., added back to the calexp). "
- "This connection is used only if doRevertOldBackground=False.",
- name="calexpBackground",
- storageClass="Background",
- dimensions=("instrument", "visit", "detector"),
- multiple=True,
- )
- background_apply_list = Input(
- doc="Background to be applied (subtracted from the calexp). "
- "This is used only if doApplyNewBackground=True.",
- name="skyCorr",
- storageClass="Background",
- dimensions=("instrument", "visit", "detector"),
- multiple=True,
- )
- visit_summary = Input(
- doc="Input visit-summary catalog with updated calibration objects.",
- name="finalVisitSummary",
- storageClass="ExposureCatalog",
- dimensions=("instrument", "visit"),
- )
- sky_map = Input(
- doc="Input definition of geometry/bbox and projection/wcs for warps.",
- storageClass="SkyMap",
- dimensions=("skymap",),
- )
- # Declare all possible outputs (except noise, which is configurable)
- warp = Output(
- doc="Output direct warped exposure produced by resampling calexps "
- "onto the skyMap patch geometry.",
- name="{coaddName}Coadd_directWarp",
- storageClass="ExposureF",
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
- )
- masked_fraction_warp = Output(
- doc="Output masked fraction warped exposure.",
- name="{coaddName}Coadd_directWarp_maskedFraction",
- storageClass="ImageF",
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
- )
- def __init__(self, *, config=None):
- super().__init__(config=config)
- if not config:
- return
- if not config.doRevertOldBackground:
- del self.background_revert_list
- if not config.doApplyNewBackground:
- del self.background_apply_list
- if not config.doWarpMaskedFraction:
- del self.masked_fraction_warp
- # Dynamically set output connections for noise images, depending on the
- # number of noise realization specified in the config.
- for n in range(config.numberOfNoiseRealizations):
- noise_warp = Output(
- doc=f"Output direct warped noise exposure ({n})",
- name=f"{config.connections.coaddName}Coadd_directWarp_noise{n}",
- # Store it as a MaskedImage to preserve the variance plane.
- storageClass="MaskedImageF",
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
- )
- setattr(self, f"noise_warp{n}", noise_warp)
-class MakeDirectWarpConfig(
- PipelineTaskConfig,
- pipelineConnections=MakeDirectWarpConnections,
- """Configuration for the MakeDirectWarpTask.
- The config fields are as similar as possible to the corresponding fields in
- MakeWarpConfig.
- Notes
- -----
- The config fields are in camelCase to match the fields in the earlier
- version of the makeWarp task as closely as possible.
- """
- """
- numberOfNoiseRealizations is defined as a RangeField to prevent from
- making multiple output connections and blowing up the memory usage by
- accident. An upper bound of 3 is based on the best guess of the maximum
- number of noise realizations that will be used for metadetection.
- """
- numberOfNoiseRealizations = RangeField[int](
- doc="Number of noise realizations to simulate and persist.",
- default=0,
- min=0,
- inclusiveMax=True,
- )
- seedOffset = Field[int](
- doc="Offset to the seed used for the noise realization. This can be "
- "used to create a different noise realization if the default ones "
- "are catastrophic, or for testing sensitivity to the noise.",
- default=0,
- )
- useMedianVariance = Field[bool](
- doc="Use the median of variance plane in the input calexp to generate "
- "noise realizations? If False, per-pixel variance will be used.",
- default=True,
- )
- doRevertOldBackground = Field[bool](
- doc="Revert the old backgrounds from the `background_revert_list` "
- "connection?",
- default=False,
+# TODO: Remove this entire file in DM-47521.
+import warnings
+ from lsst.drp.tasks.make_direct_warp import MakeDirectWarpConfig, MakeDirectWarpTask # noqa: F401
+except ImportError as error:
+ error.msg += ". Please import the warping tasks from drp_tasks package."
+ raise error
+ warnings.warn(
+ "lsst.pipe.tasks.make_direct_warp is deprecated and will be removed after v29; "
+ "Please use lsst.drp.tasks.make_direct_warp instead.",
+ DeprecationWarning,
+ stacklevel=2,
- doApplyNewBackground = Field[bool](
- doc="Apply the new backgrounds from the `background_apply_list` "
- "connection?",
- default=False,
- )
- useVisitSummaryPsf = Field[bool](
- doc="If True, use the PSF model and aperture corrections from the "
- "'visit_summary' connection to make the warp. If False, use the "
- "PSF model and aperture corrections from the 'calexp' connection.",
- default=True,
- )
- doSelectPreWarp = Field[bool](
- doc="Select ccds before warping?",
- default=True,
- )
- select = ConfigurableField(
- doc="Image selection subtask.",
- target=PsfWcsSelectImagesTask,
- )
- doPreWarpInterpolation = Field[bool](
- doc="Interpolate over bad pixels before warping?",
- default=False,
- )
- preWarpInterpolation = ConfigurableField(
- doc="Interpolation task to use for pre-warping interpolation",
- target=CloughTocher2DInterpolateTask,
- )
- inputRecorder = ConfigurableField(
- doc="Subtask that helps fill CoaddInputs catalogs added to the final "
- "coadd",
- target=CoaddInputRecorderTask,
- )
- includeCalibVar = Field[bool](
- doc="Add photometric calibration variance to warp variance plane?",
- default=False,
- )
- border = Field[int](
- doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later",
- default=256,
- )
- warper = ConfigField(
- doc="Configuration for the warper that warps the image and noise",
- dtype=Warper.ConfigClass,
- )
- doWarpMaskedFraction = Field[bool](
- doc="Warp the masked fraction image?",
- default=False,
- )
- maskedFractionWarper = ConfigField(
- doc="Configuration for the warp that warps the mask fraction image",
- dtype=Warper.ConfigClass,
- )
- coaddPsf = ConfigField(
- doc="Configuration for CoaddPsf",
- dtype=CoaddPsfConfig,
- )
- idGenerator = DetectorVisitIdGeneratorConfig.make_field()
- # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig,
- # but as properties instead of config fields.
- @property
- def bgSubtracted(self) -> bool:
- return not self.doRevertOldBackground
- @bgSubtracted.setter
- def bgSubtracted(self, value: bool) -> None:
- self.doRevertOldBackground = not value
- @property
- def doApplySkyCorr(self) -> bool:
- return self.doApplyNewBackground
- @doApplySkyCorr.setter
- def doApplySkyCorr(self, value: bool) -> None:
- self.doApplyNewBackground = value
- def setDefaults(self) -> None:
- super().setDefaults()
- self.warper.warpingKernelName = "lanczos3"
- self.warper.cacheSize = 0
- self.maskedFractionWarper.warpingKernelName = "bilinear"
-class MakeDirectWarpTask(PipelineTask):
- """Warp single-detector images onto a common projection.
- This task iterates over multiple images (corresponding to different
- detectors) from a single visit that overlap the target patch. Pixels that
- receive no input from any detector are set to NaN in the output image, and
- NO_DATA bit is set in the mask plane.
- This differs from the standard `MakeWarp` Task in the following
- ways:
- 1. No selection on ccds at the time of warping. This is done later during
- the coaddition stage.
- 2. Interpolate over a set of masked pixels before warping.
- 3. Generate an image where each pixel denotes how much of the pixel is
- masked.
- 4. Generate multiple noise warps with the same interpolation applied.
- 5. No option to produce a PSF-matched warp.
- """
- ConfigClass = MakeDirectWarpConfig
- _DefaultName = "makeDirectWarp"
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.makeSubtask("inputRecorder")
- self.makeSubtask("preWarpInterpolation")
- if self.config.doSelectPreWarp:
- self.makeSubtask("select")
- self.warper = Warper.fromConfig(self.config.warper)
- if self.config.doWarpMaskedFraction:
- self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)
- def runQuantum(self, butlerQC, inputRefs, outputRefs):
- # Docstring inherited.
- inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs
- for handle in butlerQC.get(inputRefs.calexp_list):
- inputs[handle.dataId["detector"]] = WarpDetectorInputs(
- exposure_or_handle=handle,
- data_id=handle.dataId,
- )
- if not inputs:
- raise NoWorkFound("No input warps provided for co-addition")
- # Add backgrounds to the inputs struct, being careful not to assume
- # they're present for the same detectors we got image handles for, in
- # case of upstream errors.
- for ref in getattr(inputRefs, "background_revert_list", []):
- inputs[ref.dataId["detector"]].background_revert = butlerQC.get(ref)
- for ref in getattr(inputRefs, "background_apply_list", []):
- inputs[ref.dataId["detector"]].background_apply = butlerQC.get(ref)
- visit_summary = butlerQC.get(inputRefs.visit_summary)
- sky_map = butlerQC.get(inputRefs.sky_map)
- quantumDataId = butlerQC.quantum.dataId
- sky_info = makeSkyInfo(
- sky_map,
- tractId=quantumDataId["tract"],
- patchId=quantumDataId["patch"],
- )
- results = self.run(inputs, sky_info, visit_summary=visit_summary)
- butlerQC.put(results, outputRefs)
- def _preselect_inputs(
- self,
- inputs: Mapping[int, WarpDetectorInputs],
- sky_info: Struct,
- visit_summary: ExposureCatalog,
- ) -> dict[int, WarpDetectorInputs]:
- """Filter the list of inputs via a 'select' subtask.
- Parameters
- ----------
- inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ]
- Per-detector input structs.
- sky_info : `lsst.pipe.base.Struct`
- Structure with information about the tract and patch.
- visit_summary : `~lsst.afw.table.ExposureCatalog`
- Record with updated calibration information for the full visit.
- Returns
- -------
- filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ]
- Like ``inputs``, with rejected detectors dropped.
- """
- data_id_list, bbox_list, wcs_list = [], [], []
- for detector_id, detector_inputs in inputs.items():
- row = visit_summary.find(detector_id)
- if row is None:
- raise RuntimeError(f"Unexpectedly incomplete visit_summary: {detector_id=} is missing.")
- data_id_list.append(detector_inputs.data_id)
- bbox_list.append(row.getBBox())
- wcs_list.append(row.getWcs())
- cornerPosList = Box2D(sky_info.bbox).getCorners()
- coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList]
- good_indices = self.select.run(
- bboxList=bbox_list,
- wcsList=wcs_list,
- visitSummary=visit_summary,
- coordList=coordList,
- dataIds=data_id_list,
- )
- detector_ids = list(inputs.keys())
- good_detector_ids = [detector_ids[i] for i in good_indices]
- return {detector_id: inputs[detector_id] for detector_id in good_detector_ids}
- def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct:
- """Create a Warp dataset from inputs.
- Parameters
- ----------
- inputs : `Mapping` [ `int`, `WarpDetectorInputs` ]
- Dictionary of input datasets, with the detector id being the key.
- sky_info : `~lsst.pipe.base.Struct`
- A Struct object containing wcs, bounding box, and other information
- about the patches within the tract.
- visit_summary : `~lsst.afw.table.ExposureCatalog` | None
- Table of visit summary information. If provided, the visit summary
- information will be used to update the calibration of the input
- exposures. If None, the input exposures will be used as-is.
- Returns
- -------
- results : `~lsst.pipe.base.Struct`
- A Struct object containing the warped exposure, noise exposure(s),
- and masked fraction image.
- """
- if self.config.doSelectPreWarp:
- inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary)
- if not inputs:
- raise NoWorkFound("No input warps remain after selection for co-addition")
- sky_info.bbox.grow(self.config.border)
- target_bbox, target_wcs = sky_info.bbox, sky_info.wcs
- # Initialize the objects that will hold the warp.
- final_warp = ExposureF(target_bbox, target_wcs)
- for _, warp_detector_input in inputs.items():
- visit_id = warp_detector_input.data_id["visit"]
- break # Just need the visit id from any one of the inputs.
- # The warpExposure routine is expensive, and we do not want to call
- # it twice (i.e., a second time for PSF-matched warps). We do not
- # want to hold all the warped exposures in memory at once either.
- # So we create empty exposure(s) to accumulate the warps of each type,
- # and then process each detector serially.
- final_warp = self._prepareEmptyExposure(sky_info)
- final_masked_fraction_warp = self._prepareEmptyExposure(sky_info)
- final_noise_warps = {
- n_noise: self._prepareEmptyExposure(sky_info)
- for n_noise in range(self.config.numberOfNoiseRealizations)
- }
- # We need a few bookkeeping variables only for the science coadd.
- totalGoodPixels = 0
- inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
- visit_id,
- len(inputs),
- )
- for index, detector_inputs in enumerate(inputs.values()):
- self.log.debug(
- "Warping exposure %d/%d for id=%s",
- index + 1,
- len(inputs),
- detector_inputs.data_id,
- )
- input_exposure = detector_inputs.exposure
- # Generate noise image(s) in-situ.
- seed = self.get_seed_from_data_id(detector_inputs.data_id)
- rng = np.random.RandomState(seed + self.config.seedOffset)
- # Generate noise images in-situ.
- noise_calexps = self.make_noise_exposures(input_exposure, rng)
- warpedExposure = self.process(
- detector_inputs,
- target_wcs,
- self.warper,
- visit_summary,
- destBBox=target_bbox,
- )
- if warpedExposure is None:
- self.log.debug(
- "Skipping exposure %s because it could not be warped.", detector_inputs.data_id
- )
- continue
- if final_warp.photoCalib is not None:
- ratio = (
- final_warp.photoCalib.getInstFluxAtZeroMagnitude()
- / warpedExposure.photoCalib.getInstFluxAtZeroMagnitude()
- )
- else:
- ratio = 1
- self.log.debug("Scaling exposure %s by %f", detector_inputs.data_id, ratio)
- warpedExposure.maskedImage *= ratio
- # Accumulate the partial warps in an online fashion.
- nGood = copyGoodPixels(
- final_warp.maskedImage,
- warpedExposure.maskedImage,
- final_warp.mask.getPlaneBitMask(["NO_DATA"]),
- )
- if final_warp.photoCalib is None and nGood > 0:
- final_warp.setPhotoCalib(warpedExposure.photoCalib)
- ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id
- inputRecorder.addCalExp(input_exposure, ccdId, nGood)
- totalGoodPixels += nGood
- if self.config.doWarpMaskedFraction:
- # Obtain the masked fraction exposure and warp it.
- if self.config.doPreWarpInterpolation:
- badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
- else:
- badMaskPlanes = []
- masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes)
- masked_fraction_warp = self.maskedFractionWarper.warpExposure(
- target_wcs, masked_fraction_exp, destBBox=target_bbox
- )
- copyGoodPixels(
- final_masked_fraction_warp.maskedImage,
- masked_fraction_warp.maskedImage,
- final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]),
- )
- # Process and accumulate noise images.
- for n_noise in range(self.config.numberOfNoiseRealizations):
- noise_calexp = noise_calexps[n_noise]
- noise_pseudo_inputs = dataclasses.replace(
- detector_inputs,
- exposure_or_handle=noise_calexp,
- background_revert=BackgroundList(),
- background_apply=BackgroundList(),
- )
- warpedNoise = self.process(
- noise_pseudo_inputs,
- target_wcs,
- self.warper,
- visit_summary,
- destBBox=target_bbox,
- )
- warpedNoise.maskedImage *= ratio
- copyGoodPixels(
- final_noise_warps[n_noise].maskedImage,
- warpedNoise.maskedImage,
- final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]),
- )
- # If there are no good pixels, return a Struct filled with None.
- if totalGoodPixels == 0:
- results = Struct(
- warp=None,
- masked_fraction_warp=None,
- )
- for noise_index in range(self.config.numberOfNoiseRealizations):
- setattr(results, f"noise_warp{noise_index}", None)
- return results
- # Finish the inputRecorder and add the coaddPsf to the final warp.
- inputRecorder.finish(final_warp, totalGoodPixels)
- coaddPsf = CoaddPsf(
- inputRecorder.coaddInputs.ccds,
- sky_info.wcs,
- self.config.coaddPsf.makeControl(),
- )
- final_warp.setPsf(coaddPsf)
- for _, warp_detector_input in inputs.items():
- final_warp.setFilter(warp_detector_input.exposure.getFilter())
- final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo())
- break # Just need the filter and visit info from any one of the inputs.
- results = Struct(
- warp=final_warp,
- )
- if self.config.doWarpMaskedFraction:
- results.masked_fraction_warp = final_masked_fraction_warp.image
- for noise_index, noise_exposure in final_noise_warps.items():
- setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)
- return results
- def process(
- self,
- detector_inputs: WarpDetectorInputs,
- target_wcs,
- warper,
- visit_summary=None,
- maxBBox=None,
- destBBox=None,
- ) -> ExposureF | None:
- """Process an exposure.
- There are three processing steps that are applied to the input:
- 1. Interpolate over bad pixels before warping.
- 2. Apply all calibrations from visit_summary to the exposure.
- 3. Warp the exposure to the target coordinate system.
- Parameters
- ----------
- detector_inputs : `WarpDetectorInputs`
- The input exposure to be processed, along with any other
- per-detector modifications.
- target_wcs : `~lsst.afw.geom.SkyWcs`
- The WCS of the target patch.
- warper : `~lsst.afw.math.Warper`
- The warper to use for warping the input exposure.
- visit_summary : `~lsst.afw.table.ExposureCatalog` | None
- Table of visit summary information. If not None, the visit_summary
- information will be used to update the calibration of the input
- exposures. Otherwise, the input exposures will be used as-is.
- maxBBox : `~lsst.geom.Box2I` | None
- Maximum bounding box of the warped exposure. If None, this is
- determined automatically.
- destBBox : `~lsst.geom.Box2I` | None
- Exact bounding box of the warped exposure. If None, this is
- determined automatically.
- Returns
- -------
- warped_exposure : `~lsst.afw.image.Exposure` | None
- The processed and warped exposure, if all the calibrations could
- be applied successfully. Otherwise, None.
- """
- input_exposure = detector_inputs.exposure
- if self.config.doPreWarpInterpolation:
- self.preWarpInterpolation.run(input_exposure.maskedImage)
- success = self._apply_all_calibrations(
- detector_inputs,
- visit_summary=visit_summary,
- includeScaleUncertainty=self.config.includeCalibVar,
- )
- if not success:
- return None
- with self.timer("warp"):
- warped_exposure = warper.warpExposure(
- target_wcs,
- input_exposure,
- maxBBox=maxBBox,
- destBBox=destBBox,
- )
- # Potentially a post-warp interpolation here? Relies on DM-38630.
- return warped_exposure
- def _apply_all_calibrations(
- self,
- detector_inputs: WarpDetectorInputs,
- *,
- visit_summary: ExposureCatalog | None = None,
- includeScaleUncertainty: bool = False,
- ) -> bool:
- """Apply all of the calibrations from visit_summary to the exposure.
- Specifically, this method updates the following (if available) to the
- input exposure in place from ``visit_summary``:
- - Aperture correction map
- - Photometric calibration
- - PSF
- - WCS
- It also reverts and applies backgrounds in ``detector_inputs``.
- Parameters
- ----------
- detector_inputs : `WarpDetectorInputs`
- The input exposure to be processed, along with any other
- per-detector modifications.
- visit_summary : `~lsst.afw.table.ExposureCatalog` | None
- Table of visit summary information. If not None, the visit summary
- information will be used to update the calibration of the input
- exposures. Otherwise, the input exposures will be used as-is.
- includeScaleUncertainty : bool
- Whether to include the uncertainty on the calibration in the
- resulting variance? Passed onto the `calibrateImage` method of the
- PhotoCalib object attached to ``exp``.
- Returns
- -------
- success : `bool`
- True if all calibrations were successfully applied, False otherwise.
- Raises
- ------
- RuntimeError
- Raised if ``visit_summary`` is provided but does not contain a
- record corresponding to ``detector_inputs``, or if one of the
- background fields in ``detector_inputs`` is inconsistent with the
- task configuration.
- """
- if self.config.doRevertOldBackground:
- detector_inputs.revert_background()
- elif detector_inputs.background_revert:
- # This could get trigged only if runQuantum is skipped and run is
- # called directly.
- raise RuntimeError(
- f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert."
- )
- input_exposure = detector_inputs.exposure
- if visit_summary is not None:
- detector = input_exposure.info.getDetector().getId()
- row = visit_summary.find(detector)
- if row is None:
- self.log.info(
- "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.",
- detector,
- )
- return False
- if photo_calib := row.getPhotoCalib():
- input_exposure.setPhotoCalib(photo_calib)
- else:
- self.log.info(
- "No photometric calibration found in visit summary for detector = %s. Skipping it.",
- detector,
- )
- return False
- if wcs := row.getWcs():
- input_exposure.setWcs(wcs)
- else:
- self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector)
- return False
- if self.config.useVisitSummaryPsf:
- if psf := row.getPsf():
- input_exposure.setPsf(psf)
- else:
- self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector)
- return False
- if apcorr_map := row.getApCorrMap():
- input_exposure.setApCorrMap(apcorr_map)
- else:
- self.log.info(
- "No aperture correction map found in visit summary for detector = %s. Skipping it",
- detector,
- )
- return False
- elif visit_summary is not None:
- # We can only get here by calling `run`, not `runQuantum`.
- raise RuntimeError(
- "useVisitSummaryPsf=True, but visit_summary is provided. "
- )
- if self.config.doApplyNewBackground:
- detector_inputs.apply_background()
- elif detector_inputs.background_apply:
- # This could get trigged only if runQuantum is skipped and run is
- # called directly.
- raise RuntimeError(
- f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply."
- )
- # Calibrate the (masked) image.
- # This should likely happen even if visit_summary is None.
- photo_calib = input_exposure.photoCalib
- input_exposure.maskedImage = photo_calib.calibrateImage(
- input_exposure.maskedImage,
- includeScaleUncertainty=includeScaleUncertainty,
- )
- input_exposure.maskedImage /= photo_calib.getCalibrationMean()
- return True
- # This method is copied from makeWarp.py
- @classmethod
- def _prepareEmptyExposure(cls, sky_info):
- """Produce an empty exposure for a given patch.
- Parameters
- ----------
- sky_info : `lsst.pipe.base.Struct`
- Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo` with
- geometric information about the patch.
- Returns
- -------
- exp : `lsst.afw.image.exposure.ExposureF`
- An empty exposure for a given patch.
- """
- exp = ExposureF(sky_info.bbox, sky_info.wcs)
- exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf)
- return exp
- @staticmethod
- def compute_median_variance(mi: MaskedImage) -> float:
- """Compute the median variance across the good pixels of a MaskedImage.
- Parameters
- ----------
- mi : `~lsst.afw.image.MaskedImage`
- The input image on which to compute the median variance.
- Returns
- -------
- median_variance : `float`
- Median variance of the input calexp.
- """
- # Shouldn't this exclude pixels that are masked, to be safe?
- # This is implemented as it was in descwl_coadd.
- return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)])
- def get_seed_from_data_id(self, data_id) -> int:
- """Get a seed value given a data_id.
- This method generates a unique, reproducible pseudo-random number for
- a data id. This is not affected by ordering of the input, or what
- set of visits, ccds etc. are given.
- This is implemented as a public method, so that simulations that
- don't necessary deal with the middleware can mock up a ``data_id``
- instance, or override this method with a different one to obtain a
- seed value consistent with the pipeline task.
- Parameters
- ----------
- data_id : `~lsst.daf.butler.DataCoordinate`
- Data identifier dictionary.
- Returns
- -------
- seed : `int`
- A unique seed for this data_id to seed a random number generator.
- """
- return self.config.idGenerator.apply(data_id).catalog_id
- def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]:
- """Make pure noise realizations based on ``calexp``.
- Parameters
- ----------
- calexp : `~lsst.afw.image.ExposureF`
- The input exposure on which to base the noise realizations.
- rng : `np.random.RandomState`
- Random number generator to use for the noise realizations.
- Returns
- -------
- noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`]
- A mapping of integers ranging from 0 up to
- config.numberOfNoiseRealizations to the corresponding
- noise realization exposures.
- """
- noise_calexps = {}
- # If no noise exposures are requested, return the empty dictionary
- # without any further computations.
- if self.config.numberOfNoiseRealizations == 0:
- return noise_calexps
- if self.config.useMedianVariance:
- variance = self.compute_median_variance(calexp.maskedImage)
- else:
- variance = calexp.variance.array
- for n_noise in range(self.config.numberOfNoiseRealizations):
- noise_calexp = calexp.clone()
- noise_calexp.image.array[:, :] = rng.normal(
- scale=np.sqrt(variance),
- size=noise_calexp.image.array.shape,
- )
- noise_calexp.variance.array[:, :] = variance
- noise_calexps[n_noise] = noise_calexp
- return noise_calexps
- @classmethod
- def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF:
- """Get an Exposure of bad mask
- Parameters
- ----------
- exp: `lsst.afw.image.Exposure`
- The exposure data.
- badMaskPlanes: `list` [`str`]
- List of mask planes to be considered as bad.
- Returns
- -------
- bad_mask: `~lsst.afw.image.Exposure`
- An Exposure with boolean array with True if inverse variance <= 0
- or if any of the badMaskPlanes bits are set, and False otherwise.
- """
- bad_mask = exp.clone()
- var = exp.variance.array
- mask = exp.mask.array
- bitMask = exp.mask.getPlaneBitMask(badMaskPlanes)
- bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0)
- bad_mask.variance.array *= 0.0
- return bad_mask
diff --git a/python/lsst/pipe/tasks/make_psf_matched_warp.py b/python/lsst/pipe/tasks/make_psf_matched_warp.py
index e997afe3b..e2fec1746 100644
--- a/python/lsst/pipe/tasks/make_psf_matched_warp.py
+++ b/python/lsst/pipe/tasks/make_psf_matched_warp.py
@@ -19,281 +19,36 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from __future__ import annotations
+# TODO: Remove this entire file in DM-47521.
-__all__ = (
- "growValidPolygons",
- "MakePsfMatchedWarpConfig",
- "MakePsfMatchedWarpConnections",
- "MakePsfMatchedWarpTask",
-from typing import TYPE_CHECKING
-import lsst.geom as geom
-import numpy as np
import warnings
-from lsst.afw.geom import Polygon, makeWcsPairTransform, SinglePolygonException
-from lsst.coadd.utils import copyGoodPixels
-from lsst.ip.diffim import ModelPsfMatchTask
-from lsst.meas.algorithms import GaussianPsfFactory, WarpedPsf
-from lsst.pex.config import ConfigurableField
-from lsst.pipe.base import (
- PipelineTask,
- PipelineTaskConfig,
- PipelineTaskConnections,
- Struct,
-from lsst.pipe.base.connectionTypes import Input, Output
-from lsst.pipe.tasks.coaddBase import makeSkyInfo
-from lsst.skymap import BaseSkyMap
-from lsst.utils.timer import timeMethod
+from deprecated.sphinx import deprecated
+from .coaddBase import growValidPolygons as growValidPolygonsNew
- from lsst.afw.image import Exposure
+ reason="Please use growValidPolygons from lsst.pipe.tasks.coaddBase instead.",
+ category=FutureWarning,
+ )
+def growValidPolygons(*args, **kwargs):
+ """Deprecated alias to lsst.pipe.tasks.coaddBase.growValidPolygons."""
+ return growValidPolygonsNew(*args, **kwargs)
-class MakePsfMatchedWarpConnections(
- PipelineTaskConnections,
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
- defaultTemplates={
- "coaddName": "deep",
- },
- """Connections for MakePsfMatchedWarpTask"""
- sky_map = Input(
- doc="Input definition of geometry/bbox and projection/wcs for warps.",
- storageClass="SkyMap",
- dimensions=("skymap",),
- )
- direct_warp = Input(
- doc="Direct warped exposure produced by resampling calexps onto the skyMap patch geometry",
- name="{coaddName}Coadd_directWarp",
- storageClass="ExposureF",
- dimensions=("tract", "patch", "skymap", "instrument", "visit"),
+ from lsst.drp.tasks.make_psf_matched_warp import ( # noqa: F401
+ MakePsfMatchedWarpConfig,
+ MakePsfMatchedWarpConnections,
+ MakePsfMatchedWarpTask,
- psf_matched_warp = Output(
- doc=(
- "Output PSF-Matched warped exposure, produced by resampling ",
- "calexps onto the skyMap patch geometry and PSF-matching to a model PSF.",
- ),
- name="{coaddName}Coadd_psfMatchedWarp",
- storageClass="ExposureF",
- dimensions=("tract", "patch", "skymap", "visit", "instrument"),
+except ImportError as error:
+ error.msg += ". Please import the warping tasks from drp_tasks package."
+ raise error
+ warnings.warn(
+ "lsst.pipe.tasks.make_psf_matched_warp is deprecated and will be removed after v29; "
+ "Please use lsst.drp.tasks.make_psf_matched_warp instead.",
+ DeprecationWarning,
+ stacklevel=2,
-class MakePsfMatchedWarpConfig(
- PipelineTaskConfig,
- pipelineConnections=MakePsfMatchedWarpConnections,
- """Config for MakePsfMatchedWarpTask."""
- modelPsf = GaussianPsfFactory.makeField(doc="Model Psf factory")
- psfMatch = ConfigurableField(
- target=ModelPsfMatchTask,
- doc="Task to warp and PSF-match calexp",
- )
- def setDefaults(self):
- super().setDefaults()
- self.psfMatch.kernel["AL"].alardSigGauss = [1.0, 2.0, 4.5]
- self.modelPsf.defaultFwhm = 7.7
-class MakePsfMatchedWarpTask(PipelineTask):
- ConfigClass = MakePsfMatchedWarpConfig
- _DefaultName = "makePsfMatchedWarp"
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.makeSubtask("psfMatch")
- def runQuantum(self, butlerQC, inputRefs, outputRefs):
- # Docstring inherited.
- # Read in all inputs.
- inputs = butlerQC.get(inputRefs)
- sky_map = inputs.pop("sky_map")
- quantumDataId = butlerQC.quantum.dataId
- sky_info = makeSkyInfo(
- sky_map,
- tractId=quantumDataId["tract"],
- patchId=quantumDataId["patch"],
- )
- results = self.run(inputs["direct_warp"], sky_info.bbox)
- butlerQC.put(results, outputRefs)
- @timeMethod
- def run(self, direct_warp: Exposure, bbox: geom.Box2I):
- """Make a PSF-matched warp from a direct warp.
- Each individual detector from the direct warp is isolated, one at a
- time, and PSF-matched to the same model PSF. The PSF-matched images are
- then added back together to form the final PSF-matched warp. The bulk
- of the work is done by the `psfMatchTask`.
- Notes
- -----
- Pixels that receive no inputs are set to NaN, for e.g, chip gaps. This
- violates LSST algorithms group policy.
- Parameters
- ----------
- direct_warp : `lsst.afw.image.Exposure`
- Direct warp to be PSF-matched.
- Returns
- -------
- struct : `lsst.pipe.base.Struct`
- Struct containing the PSF-matched warp under the attribute
- `psf_matched_warp`.
- """
- modelPsf = self.config.modelPsf.apply()
- # Prepare the output exposure. We clone the input image to keep the
- # metadata, but reset the image and variance plans.
- exposure_psf_matched = direct_warp[bbox].clone()
- exposure_psf_matched.image.array[:, :] = np.nan
- exposure_psf_matched.variance.array[:, :] = np.inf
- exposure_psf_matched.setPsf(modelPsf)
- bit_mask = direct_warp.mask.getPlaneBitMask("NO_DATA")
- total_good_pixels = 0 # Total number of pixels copied to output.
- for row in direct_warp.info.getCoaddInputs().ccds:
- transform = makeWcsPairTransform(row.wcs, direct_warp.wcs)
- warp_psf = WarpedPsf(row.getPsf(), transform)
- if (src_polygon := row.validPolygon) is None:
- # Calculate the polygon for this detector.
- src_polygon = Polygon(
- [geom.Point2D(corner) for corner in row.getBBox().getCorners()]
- )
- self.log.debug("Polygon for detector=%d is calculated as %s",
- row["ccd"],
- src_polygon
- )
- else:
- self.log.debug("Polygon for detector=%d is read from the input calexp as %s",
- row["ccd"],
- src_polygon
- )
- try:
- destination_polygon = src_polygon.transform(transform).intersectionSingle(
- geom.Box2D(direct_warp.getBBox())
- )
- except SinglePolygonException:
- self.log.info(
- "Skipping CCD %d as its polygon does not intersect the direct warp",
- row["ccd"],
- )
- continue
- # Compute the minimum possible bounding box that overlaps the CCD.
- # First find the intersection polygon between the per-detector warp
- # and the warp bounding box.
- bbox = geom.Box2I()
- for corner in destination_polygon.getVertices():
- bbox.include(geom.Point2I(corner))
- bbox.clip(direct_warp.getBBox()) # Additional safeguard
- # Because the direct warps are larger, it is possible that after
- # clipping, `bbox` lies outside PSF-matched warp's bounding box.
- if not bbox.overlaps(exposure_psf_matched.getBBox()):
- self.log.debug(
- "Skipping CCD %d as its bbox %s does not overlap the PSF-matched warp",
- row["ccd"],
- bbox,
- )
- continue
- self.log.debug("PSF-matching CCD %d with bbox %s", row["ccd"], bbox)
- ccd_mask_array = ~(destination_polygon.createImage(bbox).array <= 0)
- # Clone the subimage, set the PSF to the model and reset the planes
- # outside the detector.
- temp_warp = direct_warp[bbox].clone()
- temp_warp.setPsf(warp_psf)
- temp_warp.image.array *= ccd_mask_array
- temp_warp.mask.array |= (~ccd_mask_array) * bit_mask
- # We intend to divide by zero outside the detector to set the
- # per-pixel variance values to infinity. Suppress the warning.
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", message="divide by zero", category=RuntimeWarning)
- temp_warp.variance.array /= ccd_mask_array
- temp_psf_matched = self.psfMatch.run(temp_warp, modelPsf).psfMatchedExposure
- del temp_warp
- # Set pixels outside the intersection polygon to NO_DATA.
- temp_psf_matched.maskedImage[bbox].mask.array |= (~ccd_mask_array) * bit_mask
- # Clip the bbox to the PSF-matched warp bounding box.
- bbox.clip(exposure_psf_matched.getBBox())
- num_good_pixels = copyGoodPixels(
- exposure_psf_matched.maskedImage[bbox],
- temp_psf_matched.maskedImage[bbox],
- bit_mask,
- )
- del temp_psf_matched
- self.log.info(
- "Copied %d pixels from CCD %d to exposure_psf_matched", num_good_pixels, row["ccd"],
- )
- total_good_pixels += num_good_pixels
- self.log.info("Total number of good pixels = %d", total_good_pixels)
- if total_good_pixels > 0:
- growValidPolygons(
- exposure_psf_matched.info.getCoaddInputs(),
- -self.config.psfMatch.kernel.active.kernelSize // 2
- )
- return Struct(psf_matched_warp=exposure_psf_matched)
- else:
- return Struct(psf_matched_warp=None)
-# Note that this is implemented as a free-floating function to enable reuse in
-# makeWarp.py without creating any relationships between the two classes.
-# This may be converted to a method after makeWarp.py is removed altogether.
-def growValidPolygons(coaddInputs, growBy: int) -> None:
- """Grow coaddInputs' ccds' ValidPolygons in place.
- Either modify each ccd's validPolygon in place, or if CoaddInputs
- does not have a validPolygon, create one from its bbox.
- Parameters
- ----------
- coaddInputs : `lsst.afw.image.coaddInputs`
- CoaddInputs object containing the ccds to grow the valid polygons of.
- growBy : `int`
- The value to grow the valid polygons by.
- Notes
- -----
- Negative values for ``growBy`` can shrink the polygons.
- """
- for ccd in coaddInputs.ccds:
- polyOrig = ccd.getValidPolygon()
- validPolyBBox = polyOrig.getBBox() if polyOrig else ccd.getBBox()
- validPolyBBox.grow(growBy)
- if polyOrig:
- validPolygon = polyOrig.intersectionSingle(validPolyBBox)
- else:
- validPolygon = Polygon(geom.Box2D(validPolyBBox))
- ccd.validPolygon = validPolygon
diff --git a/tests/test_import.py b/tests/test_import.py
index 031a333e1..3ccc9ef1a 100644
--- a/tests/test_import.py
+++ b/tests/test_import.py
@@ -37,6 +37,13 @@ class PipeTasksImportTestCase(ImportTestCase):
+ "lsst.pipe.tasks": {
+ "make_direct_warp.py", # TODO: Remove in DM-47521.
+ "make_psf_matched_warp.py", # TODO: Remove in DM-47521.
+ }
+ }
if __name__ == "__main__":
diff --git a/tests/test_makeWarp.py b/tests/test_makeWarp.py
index b7b0c56c1..2e1142e8d 100644
--- a/tests/test_makeWarp.py
+++ b/tests/test_makeWarp.py
@@ -29,8 +29,6 @@
import lsst.afw.image
from lsst.daf.butler import DataCoordinate, DimensionUniverse
-from lsst.pipe.base import InMemoryDatasetHandle
-from lsst.pipe.tasks.make_direct_warp import MakeDirectWarpTask, WarpDetectorInputs
from lsst.pipe.tasks.makeWarp import (MakeWarpTask, MakeWarpConfig)
from lsst.pipe.tasks.coaddBase import makeSkyInfo
import lsst.skymap as skyMap
@@ -237,53 +235,6 @@ def test_psfMatched(self):
- def test_compare_warps(self):
- """Test that the warp from MakeWarpTask and MakeDirectWarpTask agree.
- """
- dataIdList = [{'visit_id': self.visit, 'detector_id': self.detector, "band": "i"}]
- dataRefs = [
- InMemoryDatasetHandle(self.exposure, dataId=self.generate_data_id(**dataId))
- for dataId in dataIdList
- ]
- for dataId in dataIdList:
- dataId["detector"] = dataId.pop("detector_id")
- dataId["visit"] = dataId.pop("visit_id")
- self.config.makePsfMatched = True
- makeWarp = MakeWarpTask(config=self.config)
- result0 = makeWarp.run(
- calExpList=[self.exposure],
- ccdIdList=[self.detector],
- skyInfo=self.skyInfo,
- visitId=self.visit,
- dataIdList=dataIdList,
- )
- config = MakeDirectWarpTask.ConfigClass()
- config.doPreWarpInterpolation = False
- config.doSelectPreWarp = False
- config.useVisitSummaryPsf = False
- task = MakeDirectWarpTask(config=config)
- warp_detector_inputs = {
- dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.exposure,
- data_id=dataRef.dataId,
- )
- for dataRef in dataRefs
- }
- result1 = task.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp0 = result0.exposures["direct"]
- warp1 = result1.warp[warp0.getBBox()]
- self.assertMaskedImagesAlmostEqual(warp0.maskedImage, warp1.maskedImage, rtol=3e-7, atol=6e-6)
def setup_module(module):
diff --git a/tests/test_make_direct_warp.py b/tests/test_make_direct_warp.py
deleted file mode 100644
index fbf2a453b..000000000
--- a/tests/test_make_direct_warp.py
+++ /dev/null
@@ -1,391 +0,0 @@
-# This file is part of pipe_tasks.
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (https://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# GNU General Public License for more details.
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-from __future__ import annotations
-from typing import Self, Type
-import unittest
-import numpy as np
-import lsst.utils.tests
-import lsst.afw.image
-import lsst.afw.math as afwMath
-from lsst.daf.butler import DataCoordinate, DimensionUniverse
-from lsst.pipe.base import InMemoryDatasetHandle
-from lsst.pipe.tasks.make_direct_warp import (MakeDirectWarpConfig, MakeDirectWarpTask, WarpDetectorInputs)
-from lsst.pipe.tasks.coaddBase import makeSkyInfo
-import lsst.skymap as skyMap
-from lsst.afw.detection import GaussianPsf
-import lsst.afw.cameraGeom.testUtils
-class MakeWarpTestCase(lsst.utils.tests.TestCase):
- def setUp(self):
- np.random.seed(12345)
- self.config = MakeDirectWarpConfig()
- self.config.useVisitSummaryPsf = False
- self.config.doSelectPreWarp = False
- self.config.doWarpMaskedFraction = True
- self.config.numberOfNoiseRealizations = 1
- meanCalibration = 1e-4
- calibrationErr = 1e-5
- self.exposurePhotoCalib = lsst.afw.image.PhotoCalib(meanCalibration, calibrationErr)
- # An external photoCalib calibration to return
- self.externalPhotoCalib = lsst.afw.image.PhotoCalib(1e-6, 1e-8)
- crpix = lsst.geom.Point2D(0, 0)
- crval = lsst.geom.SpherePoint(0, 45, lsst.geom.degrees)
- cdMatrix = lsst.afw.geom.makeCdMatrix(scale=1.0*lsst.geom.arcseconds)
- self.skyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, cdMatrix)
- externalCdMatrix = lsst.afw.geom.makeCdMatrix(scale=0.9*lsst.geom.arcseconds)
- # An external skyWcs to return
- self.externalSkyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, externalCdMatrix)
- self.exposure = lsst.afw.image.ExposureF(100, 150)
- self.exposure.maskedImage.image.array = np.random.random((150, 100)).astype(np.float32) * 1000
- self.exposure.maskedImage.variance.array = np.random.random((150, 100)).astype(np.float32)
- # mask at least one pixel
- self.exposure.maskedImage.mask[5, 5] = 3
- # set the PhotoCalib and Wcs objects of this exposure.
- self.exposure.setPhotoCalib(lsst.afw.image.PhotoCalib(meanCalibration, calibrationErr))
- self.exposure.setWcs(self.skyWcs)
- self.exposure.setPsf(GaussianPsf(5, 5, 2.5))
- self.exposure.setFilter(lsst.afw.image.FilterLabel(physical="fakeFilter", band="fake"))
- self.visit = 100
- self.detector = 5
- detectorName = f"detector {self.detector}"
- detector = lsst.afw.cameraGeom.testUtils.DetectorWrapper(name=detectorName, id=self.detector).detector
- self.exposure.setDetector(detector)
- dataId_dict = {"detector_id": self.detector, "visit_id": 1248, "band": "i"}
- dataId = self.generate_data_id(**dataId_dict)
- self.dataRef = InMemoryDatasetHandle(self.exposure, dataId=dataId)
- simpleMapConfig = skyMap.discreteSkyMap.DiscreteSkyMapConfig()
- simpleMapConfig.raList = [crval.getRa().asDegrees()]
- simpleMapConfig.decList = [crval.getDec().asDegrees()]
- simpleMapConfig.radiusList = [0.1]
- self.simpleMap = skyMap.DiscreteSkyMap(simpleMapConfig)
- self.tractId = 0
- self.patchId = self.simpleMap[0].findPatch(crval).sequential_index
- self.skyInfo = makeSkyInfo(self.simpleMap, self.tractId, self.patchId)
- @classmethod
- def generate_data_id(
- cls: Type[Self],
- *,
- tract: int = 9813,
- patch: int = 42,
- band: str = "r",
- detector_id: int = 9,
- visit_id: int = 1234,
- detector_max: int = 109,
- visit_max: int = 10000
- ) -> DataCoordinate:
- """Generate a DataCoordinate instance to use as data_id.
- Parameters
- ----------
- tract : `int`, optional
- Tract ID for the data_id
- patch : `int`, optional
- Patch ID for the data_id
- band : `str`, optional
- Band for the data_id
- detector_id : `int`, optional
- Detector ID for the data_id
- visit_id : `int`, optional
- Visit ID for the data_id
- detector_max : `int`, optional
- Maximum detector ID for the data_id
- visit_max : `int`, optional
- Maximum visit ID for the data_id
- Returns
- -------
- data_id : `lsst.daf.butler.DataCoordinate`
- An expanded data_id instance.
- """
- universe = DimensionUniverse()
- instrument = universe["instrument"]
- instrument_record = instrument.RecordClass(
- name="DummyCam",
- class_name="lsst.obs.base.instrument_tests.DummyCam",
- detector_max=detector_max,
- visit_max=visit_max,
- )
- skymap = universe["skymap"]
- skymap_record = skymap.RecordClass(name="test_skymap")
- band_element = universe["band"]
- band_record = band_element.RecordClass(name=band)
- visit = universe["visit"]
- visit_record = visit.RecordClass(id=visit_id, instrument="test")
- detector = universe["detector"]
- detector_record = detector.RecordClass(id=detector_id, instrument="test")
- physical_filter = universe["physical_filter"]
- physical_filter_record = physical_filter.RecordClass(name=band, instrument="test", band=band)
- patch_element = universe["patch"]
- patch_record = patch_element.RecordClass(
- skymap="test_skymap", tract=tract, patch=patch,
- )
- if "day_obs" in universe:
- day_obs_element = universe["day_obs"]
- day_obs_record = day_obs_element.RecordClass(id=20240201, instrument="test")
- else:
- day_obs_record = None
- # A dictionary with all the relevant records.
- record = {
- "instrument": instrument_record,
- "visit": visit_record,
- "detector": detector_record,
- "patch": patch_record,
- "tract": 9813,
- "band": band_record.name,
- "skymap": skymap_record.name,
- "physical_filter": physical_filter_record,
- }
- if day_obs_record:
- record["day_obs"] = day_obs_record
- # A dictionary with all the relevant recordIds.
- record_id = record.copy()
- for key in ("visit", "detector"):
- record_id[key] = record_id[key].id
- # TODO: Catching mypy failures on Github Actions should be made easier,
- # perhaps in DM-36873. Igroring these for now.
- data_id = DataCoordinate.standardize(record_id, universe=universe)
- return data_id.expanded(record)
- def test_makeWarp(self):
- """Test basic MakeDirectWarpTask."""
- makeWarp = MakeDirectWarpTask(config=self.config)
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- )
- }
- result = makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp = result.warp
- mfrac = result.masked_fraction_warp
- noise = result.noise_warp0
- # Ensure we got an exposure out
- self.assertIsInstance(warp, lsst.afw.image.ExposureF)
- # Ensure that masked fraction is an ImageF object.
- self.assertIsInstance(mfrac, lsst.afw.image.ImageF)
- # Ensure that the noise image is a MaskedImageF object.
- self.assertIsInstance(noise, lsst.afw.image.MaskedImageF)
- # Check that the noise image is not accidentally the same as the image.
- with self.assertRaises(AssertionError):
- self.assertImagesAlmostEqual(noise.image, warp.image)
- # Ensure the warp has valid pixels
- self.assertGreater(np.isfinite(warp.image.array.ravel()).sum(), 0)
- # Ensure the warp has the correct WCS
- self.assertEqual(warp.getWcs(), self.skyInfo.wcs)
- # Ensure that mfrac has pixels between 0 and 1
- self.assertTrue(np.nanmax(mfrac.array) <= 1)
- self.assertTrue(np.nanmin(mfrac.array) >= 0)
- def make_backgroundList(self):
- """Obtain a BackgroundList for the image."""
- bgCtrl = afwMath.BackgroundControl(10, 10)
- interpStyle = afwMath.Interpolate.AKIMA_SPLINE
- undersampleStyle = afwMath.REDUCE_INTERP_ORDER
- approxStyle = afwMath.ApproximateControl.UNKNOWN
- approxOrderX = 0
- approxOrderY = 0
- approxWeighting = False
- backgroundList = afwMath.BackgroundList()
- bkgd = afwMath.makeBackground(self.exposure.image, bgCtrl)
- backgroundList.append(
- (bkgd, interpStyle, undersampleStyle, approxStyle, approxOrderX, approxOrderY, approxWeighting)
- )
- return backgroundList
- @lsst.utils.tests.methodParametersProduct(
- doApplyNewBackground=[True, False], doRevertOldBackground=[True, False]
- )
- def test_backgrounds(self, doApplyNewBackground, doRevertOldBackground):
- """Test that applying and reverting backgrounds runs without errors,
- especially on noise images.
- """
- backgroundList = self.make_backgroundList()
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- background_apply=backgroundList if doApplyNewBackground else None,
- background_revert=backgroundList if doRevertOldBackground else None,
- )
- }
- self.config.numberOfNoiseRealizations = 1
- self.config.doApplyNewBackground = doApplyNewBackground
- self.config.doRevertOldBackground = doRevertOldBackground
- makeWarp = MakeDirectWarpTask(config=self.config)
- makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- def test_background_errors(self):
- """Test that MakeDirectWarpTask raises errors when backgrounds are not
- set correctly.
- """
- backgroundList = self.make_backgroundList()
- self.config.numberOfNoiseRealizations = 1
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- background_apply=backgroundList,
- )
- }
- makeWarp = MakeDirectWarpTask(config=self.config)
- with self.assertRaises(RuntimeError, msg="doApplyNewBackground is False, but"):
- makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- background_apply=None,
- )
- }
- self.config.doApplyNewBackground = True
- makeWarp = MakeDirectWarpTask(config=self.config)
- with self.assertRaises(RuntimeError, msg="No background to apply"):
- makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- background_apply=backgroundList,
- background_revert=backgroundList,
- )
- }
- makeWarp = MakeDirectWarpTask(config=self.config)
- with self.assertRaises(RuntimeError, msg="doRevertOldBackground is False, but"):
- makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef,
- data_id=self.dataRef.dataId,
- background_apply=backgroundList,
- background_revert=None,
- )
- }
- self.config.doRevertOldBackground = True
- makeWarp = MakeDirectWarpTask(config=self.config)
- with self.assertRaises(RuntimeError, msg="No background to revert"):
- makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
-class MakeWarpNoGoodPixelsTestCase(MakeWarpTestCase):
- def setUp(self):
- super().setUp()
- self.exposure.mask.array |= self.exposure.mask.getPlaneBitMask("NO_DATA")
- self.dataRef = InMemoryDatasetHandle(self.exposure, dataId=self.dataRef.dataId)
- def test_makeWarp(self):
- """Test MakeDirectWarpTask fails gracefully with no good pixels.
- It should return an empty exposure, with no PSF.
- """
- makeWarp = MakeDirectWarpTask(config=self.config)
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef, data_id=self.dataRef.dataId
- )
- }
- result = makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- # Ensure we got None
- self.assertIsNone(result.warp)
- self.assertIsNone(result.masked_fraction_warp)
- self.assertIsNone(result.noise_warp0)
-def setup_module(module):
- lsst.utils.tests.init()
-class MatchMemoryTestCase(lsst.utils.tests.MemoryTestCase):
- pass
-if __name__ == "__main__":
- lsst.utils.tests.init()
- unittest.main()
diff --git a/tests/test_make_psf_matched_warp.py b/tests/test_make_psf_matched_warp.py
deleted file mode 100644
index 825498861..000000000
--- a/tests/test_make_psf_matched_warp.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# This file is part of pipe_tasks.
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (https://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# GNU General Public License for more details.
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-from __future__ import annotations
-import unittest
-import numpy as np
-import lsst.utils.tests
-import lsst.afw.image
-from lsst.pipe.tasks.make_direct_warp import MakeDirectWarpTask, WarpDetectorInputs
-from lsst.pipe.tasks.make_psf_matched_warp import MakePsfMatchedWarpTask
-import lsst.afw.cameraGeom.testUtils
-from test_make_direct_warp import MakeWarpTestCase
-class MakePsfMatchedWarpTestCase(MakeWarpTestCase):
- def test_makeWarp(self):
- """Test basic MakePsfMatchedWarpTask
- This constructs a direct_warp using `MakeDirectWarpTask` and then
- runs `MakePsfMatchedWarpTask` on it.
- """
- makeWarp = MakeDirectWarpTask(config=self.config)
- warp_detector_inputs = {
- self.dataRef.dataId.detector.id: WarpDetectorInputs(
- exposure_or_handle=self.dataRef, data_id=self.dataRef.dataId
- )
- }
- result = makeWarp.run(
- warp_detector_inputs,
- sky_info=self.skyInfo,
- visit_summary=None
- )
- warp = result.warp
- config = MakePsfMatchedWarpTask.ConfigClass()
- makePsfMatchedWarp = MakePsfMatchedWarpTask(config=config)
- result = makePsfMatchedWarp.run(
- warp,
- bbox=self.skyInfo.bbox,
- )
- psf_matched_warp = result.psf_matched_warp
- # Ensure we got an exposure out
- self.assertIsInstance(psf_matched_warp, lsst.afw.image.ExposureF)
- # Check that the PSF is not None.
- psf = psf_matched_warp.getPsf()
- assert psf is not None
- # Ensure the warp has valid pixels
- self.assertGreater(np.isfinite(psf_matched_warp.image.array.ravel()).sum(), 0)
-def setup_module(module):
- lsst.utils.tests.init()
-class MatchMemoryTestCase(lsst.utils.tests.MemoryTestCase):
- pass
-if __name__ == "__main__":
- lsst.utils.tests.init()
- unittest.main()