diff --git a/src/depiction/calibration/deprecated/__init__.py b/src/depiction/calibration/deprecated/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/depiction/calibration/deprecated/adjust_median_shift.py b/src/depiction/calibration/deprecated/adjust_median_shift.py deleted file mode 100644 index bc8624b..0000000 --- a/src/depiction/calibration/deprecated/adjust_median_shift.py +++ /dev/null @@ -1,137 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import TYPE_CHECKING - -import numpy as np - -from depiction.calibration.deprecated.reference_distance_estimator import ( - ReferenceDistanceEstimator, -) -from depiction.parallel_ops import ( - ParallelConfig, - ReadSpectraParallel, - WriteSpectraParallel, -) -from depiction.image.spatial_smoothing import SpatialSmoothing - -if TYPE_CHECKING: - from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker - from numpy.typing import NDArray - from depiction.persistence import ( - ImzmlReadFile, - ImzmlReader, - ImzmlWriteFile, - ImzmlWriter, - ) - - -@dataclass(frozen=True) -class AdjustMedianShift: - """This class computes the median shift for each spectrum in an image, smoothes the values, and then allows - applying it to get a new spectrum to further calibrate. This is necessary to deal with acquisitions which have mass - errors above roughly 0.5 Da. - """ - - peak_picker: BasicPeakPicker - ref_mz_arr: NDArray[float] - parallel_config: ParallelConfig - max_distance: float = 2.0 - smooth_sigma: float = 10.0 - - # @cached_property - # def _ref_distance_estimator(self) -> ReferenceDistanceEstimator: - # return ReferenceDistanceEstimator(reference_mz=self.ref_mz_arr, n_candidates=3) - - def compute_shifts_ppm(self, mz_arr: NDArray[float], int_arr: NDArray[float]) -> NDArray[float]: - # TODO having to create an instance for every spectrum is a design flaw, the problem is that cached_property - # creates a Rwlock which prevents the instance from being used in parallel (unpickleable) - peak_idx = self.peak_picker.pick_peaks_index(mz_arr=mz_arr, int_arr=int_arr) - if len(peak_idx) == 0: - return np.array([]) - ( - indices, - signed_distances_mz, - ) = ReferenceDistanceEstimator(reference_mz=self.ref_mz_arr, n_candidates=3).compute_max_peak_within_distance( - mz_peaks=mz_arr[peak_idx], - int_peaks=int_arr[peak_idx], - max_distance=self.max_distance, - keep_missing=True, - ) - signed_distances_ppm = signed_distances_mz / self.ref_mz_arr * 1e6 - signed_distances_ppm = signed_distances_ppm[indices != -1] - return signed_distances_ppm - - def compute_median_shift_ppm(self, mz_arr: NDArray[float], int_arr: NDArray[float]) -> float: - """Computes the median shift for the specified mass spectrum in ppm.""" - signed_distances_ppm = self.compute_shifts_ppm(mz_arr=mz_arr, int_arr=int_arr) - return np.median(signed_distances_ppm) - - def compute_median_shifts( - self, read_file: ImzmlReadFile, spectra_indices: list[int] | None = None - ) -> NDArray[float]: - parallelize = ReadSpectraParallel.from_config(self.parallel_config) - values = parallelize.map_chunked( - read_file=read_file, - operation=self._compute_median_shifts_operation, - bind_args={ - "self_copy": self, - }, - reduce_fn=parallelize.reduce_concat, - spectra_indices=spectra_indices, - ) - return np.asarray(values) - - @classmethod - def _compute_median_shifts_operation( - cls, - reader: ImzmlReader, - spectra_ids: list[int], - self_copy: AdjustMedianShift, - ) -> list[float]: - values = [] - for spectrum_id in spectra_ids: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - values.append(self_copy.compute_median_shift_ppm(mz_arr=mz_arr, int_arr=int_arr)) - return values - - def smooth_median_shifts( - self, - median_shifts: NDArray[float], - coordinates_2d: NDArray[int], - ) -> NDArray[float]: - return SpatialSmoothing( - sigma=self.smooth_sigma, - background_fill_mode="nearest", - background_value=np.nan, - ).smooth_sparse(sparse_values=median_shifts, coordinates=coordinates_2d) - - def apply_correction( - self, read_file: ImzmlReadFile, write_file: ImzmlWriteFile - ) -> tuple[NDArray[float], NDArray[float]]: - median_shifts = self.compute_median_shifts(read_file=read_file) - smooth_median_shifts = self.smooth_median_shifts( - median_shifts=median_shifts, coordinates_2d=read_file.coordinates_2d - ) - parallelize = WriteSpectraParallel.from_config(self.parallel_config) - parallelize.map_chunked_to_file( - read_file=read_file, - write_file=write_file, - operation=self._apply_correction_operation, - bind_args={"median_shifts": smooth_median_shifts}, - ) - return median_shifts, smooth_median_shifts - - @classmethod - def _apply_correction_operation( - cls, - reader: ImzmlReader, - spectra_ids: list[int], - writer: ImzmlWriter, - median_shifts: NDArray[float], - ) -> None: - for spectrum_id in spectra_ids: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - shift_ppm = median_shifts[spectrum_id] - mz_arr_corrected = mz_arr - (mz_arr * shift_ppm / 1e6) - writer.add_spectrum(mz_arr_corrected, int_arr, reader.coordinates[spectrum_id]) diff --git a/src/depiction/calibration/deprecated/calibrate_image.py b/src/depiction/calibration/deprecated/calibrate_image.py deleted file mode 100644 index 919d65b..0000000 --- a/src/depiction/calibration/deprecated/calibrate_image.py +++ /dev/null @@ -1,139 +0,0 @@ -from typing import Optional - -import h5py -import numpy as np -from numpy.typing import NDArray - -from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker -from depiction.calibration.deprecated.calibrate_spectrum import CalibrateSpectrum -from depiction.calibration.deprecated.model_smoothing import ModelSmoothing -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel -from depiction.parallel_ops.parallel_config import ParallelConfig -from depiction.parallel_ops.read_spectra_parallel import ReadSpectraParallel -from depiction.parallel_ops.write_spectra_parallel import WriteSpectraParallel -from depiction.persistence import ( - ImzmlReadFile, - ImzmlWriteFile, - ImzmlReader, - ImzmlWriter, -) - - -class CalibrateImage: - def __init__( - self, - reference_mz: NDArray[float], - model_type: str, - parallel_config: ParallelConfig, - output_store: Optional[h5py.Group] = None, - ) -> None: - self._reference_mz = reference_mz - self._model_type = model_type - self._parallel_config = parallel_config - self._output_store = output_store - - def calibrate_image(self, read_file: ImzmlReadFile, write_file: ImzmlWriteFile) -> None: - print("Computing models...") - models_original = self._compute_models(read_file=read_file) - print("Smoothing models...") - smoothed_models = self._smooth_models(models_original) - print("Applying models...") - self._apply_models(read_file, smoothed_models, write_file) - - def _compute_models(self, read_file: ImzmlReadFile) -> list[LinearModel | PolynomialModel]: - parallelize = ReadSpectraParallel.from_config(self._parallel_config) - models_original = parallelize.map_chunked( - read_file=read_file, - operation=self._compute_models_operation, - bind_args=dict( - reference_mz=self._reference_mz, - model_type=self._model_type, - ), - reduce_fn=parallelize.reduce_concat, - ) - self._save_results( - model_coefs=[model.coef for model in models_original], - coordinates=read_file.coordinates, - reference_mz=self._reference_mz, - ) - return models_original - - @staticmethod - def _compute_models_operation( - reader: ImzmlReader, - spectra_indices: NDArray[int], - reference_mz: NDArray[float], - model_type: str, - ) -> list[LinearModel] | list[PolynomialModel]: - # TODO hardcoded parameters - peak_picker = BasicPeakPicker(smooth_sigma=0.1, min_prominence=0.05) - calibrate = CalibrateSpectrum(reference_mz=reference_mz, peak_picker=peak_picker, model_type=model_type) - results = [] - for spectrum_id in spectra_indices: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - model = calibrate.calibrate_spectrum(mz_arr=mz_arr, int_arr=int_arr) - results.append(model) - return results - - def _smooth_models( - self, models_original: list[LinearModel | PolynomialModel] - ) -> list[LinearModel | PolynomialModel]: - smoother = ModelSmoothing(sigma=1.0) - smoothed_models = smoother.smooth_sequential(models=models_original) - self._save_results(smoothed_models=[model.coef for model in smoothed_models]) - return smoothed_models - - def _apply_models( - self, - read_file: ImzmlReadFile, - smoothed_models: list[LinearModel | PolynomialModel], - write_file: ImzmlWriteFile, - mz_center: float = 0, - ) -> None: - # TODO full support for mz_center in this class - parallelize = WriteSpectraParallel.from_config(self._parallel_config) - parallelize.map_chunked_to_file( - read_file=read_file, - write_file=write_file, - operation=self._apply_models_operation, - bind_args=dict(smoothed_models=smoothed_models, mz_center=mz_center), - ) - - @staticmethod - def _apply_models_operation( - reader: ImzmlReader, - spectra_indices: list[int], - writer: ImzmlWriter, - smoothed_models: list[LinearModel | PolynomialModel], - mz_center: float, - ) -> None: - for spectrum_id in spectra_indices: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - mz_arr = mz_arr - mz_center - coordinates = reader.coordinates[spectrum_id] - model = smoothed_models[spectrum_id] - - mz_error = model.predict(x=mz_arr) - calibrated_mz = mz_arr - mz_error - - # Remove m/z values outside the range of the input spectrum - tolerance = 1.0 # TODO!!! customizable - is_valid = (np.min(mz_arr) - tolerance < calibrated_mz) & (calibrated_mz < np.max(mz_arr) + tolerance) - calibrated_mz = calibrated_mz[is_valid] - calibrated_int = int_arr[is_valid] - - if len(calibrated_mz) > 10: - writer.add_spectrum(calibrated_mz, calibrated_int, coordinates) - else: - print( - "WARNING: too few data points after calibration, " - "returning the original mz array without calibration." - ) - writer.add_spectrum(mz_arr, int_arr, coordinates) - - def _save_results(self, **kwargs) -> None: - """Write datasets according to the given kwargs to the output HDF5 store.""" - if self._output_store is not None: - for key, value in kwargs.items(): - self._output_store.create_dataset(key, data=value) diff --git a/src/depiction/calibration/deprecated/calibrate_image_targeted.py b/src/depiction/calibration/deprecated/calibrate_image_targeted.py deleted file mode 100644 index 3049964..0000000 --- a/src/depiction/calibration/deprecated/calibrate_image_targeted.py +++ /dev/null @@ -1,173 +0,0 @@ -from typing import Optional - -import h5py -import numpy as np -from numpy.typing import NDArray - -from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker -from depiction.calibration.deprecated.calibrate_spectrum import CalibrateSpectrum -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel -from depiction.parallel_ops.parallel_config import ParallelConfig -from depiction.parallel_ops.read_spectra_parallel import ReadSpectraParallel -from depiction.parallel_ops.write_spectra_parallel import WriteSpectraParallel -from depiction.spectrum.peak_filtering import FilterByIntensity -from depiction.persistence import ( - ImzmlReadFile, - ImzmlWriteFile, - ImzmlReader, -) -from depiction.calibration.deprecated.adjust_median_shift import AdjustMedianShift - - -class CalibrateImageTargeted: - """Calibrates an imzML file using a set of reference m/z values. - - The approach consists of the following steps: - - Compute the median shift in ppm (per-spectrum) - - Smooth the median shifts (per-image) - - Fit a calibration model, after removing the median shift (per-spectrum) - - Apply the calibration model to the m/z values (per-spectrum) - """ - - def __init__( - self, - reference_mz: NDArray[float], - model_type: str, - parallel_config: ParallelConfig, - output_store: Optional[h5py.Group] = None, - ) -> None: - self._reference_mz = reference_mz - self._model_type = model_type - self._parallel_config = parallel_config - self._output_store = output_store - - def calibrate_image(self, read_file: ImzmlReadFile, write_file: ImzmlWriteFile) -> None: - print("Compute median shift per spectrum...") - median_shifts_ppm, smooth_median_shifts_ppm = self._compute_median_shifts(read_file=read_file) - self._save_attrs(model_type=self._model_type) - self._save_results( - median_shifts_ppm=median_shifts_ppm, - smooth_median_shifts_ppm=smooth_median_shifts_ppm, - ) - - print("Compute models...") - models = self._compute_models(read_file=read_file, median_shifts_ppm=smooth_median_shifts_ppm) - self._save_results( - smoothed_models=[model.coef for model in models], - coordinates=read_file.coordinates_2d, - reference_mz=self._reference_mz, - ) - - # TODO smoothing etc - - print("Applying models...") - self._apply_models(read_file, models, write_file) - - def _compute_median_shifts(self, read_file: ImzmlReadFile) -> tuple[NDArray[float], NDArray[float]]: - peak_picker = BasicPeakPicker(smooth_sigma=1, min_prominence=0.002) - adjust_median_shift = AdjustMedianShift( - peak_picker=peak_picker, - ref_mz_arr=self._reference_mz, - parallel_config=self._parallel_config, - max_distance=2.0, - smooth_sigma=10.0, - ) - median_shifts = adjust_median_shift.compute_median_shifts(read_file=read_file) - smooth_median_shifts = adjust_median_shift.smooth_median_shifts( - median_shifts, coordinates_2d=read_file.coordinates_2d - ) - return median_shifts, smooth_median_shifts - - def _compute_models( - self, - read_file: ImzmlReadFile, - median_shifts_ppm: NDArray[float], - ) -> list[LinearModel | PolynomialModel]: - parallelize = ReadSpectraParallel.from_config(self._parallel_config) - models = parallelize.map_chunked( - read_file=read_file, - operation=self._compute_models_operation, - bind_args=dict( - reference_mz=self._reference_mz, - model_type=self._model_type, - median_shifts_ppm=median_shifts_ppm, - ), - reduce_fn=parallelize.reduce_concat, - ) - return models - - @classmethod - def _compute_models_operation( - cls, - reader: ImzmlReader, - spectra_indices: NDArray[int], - reference_mz: NDArray[float], - model_type: str, - median_shifts_ppm: NDArray[float], - ) -> list[LinearModel] | list[PolynomialModel]: - peak_filter = FilterByIntensity(min_intensity=0.0005, normalization="tic") - peak_picker = BasicPeakPicker(smooth_sigma=1e-6, min_prominence=0.01) - - models = [] - for spectrum_id in spectra_indices: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - problem_mz, problem_dist_mz = CalibrateSpectrum.get_matches_from_config( - peak_picker=peak_picker, - peak_filtering=peak_filter, - mz_arr=cls._apply_ppm_shift(mz_arr, -median_shifts_ppm[spectrum_id]), - int_arr=int_arr, - reference_mz_arr=reference_mz, - distance_limit=2.0, - ) - # the problem_dist returned by get_matches_from_config is in terms of m/z, not ppm - # so we need to convert it to ppm (TODO methods should always indicate the unit) - problem_dist_ppm = problem_dist_mz / problem_mz * 1e6 + median_shifts_ppm[spectrum_id] - model, _ = CalibrateSpectrum.fit_calibration_model_for_problem( - model_type=model_type, - problem_mz=problem_mz, - problem_distance=problem_dist_ppm, - mz_arr=mz_arr, - # TODO this prune_bad_limit can be problematic sometimes, while it makes sense as a safe guard - # there should be more functionality that can react appropriately to a bad value here (e.g. - # replace the model by its neighbours) - prune_bad_limit=3000, - ) - models.append(model) - return models - - @staticmethod - def _apply_ppm_shift(mz_arr: NDArray[float], ppm_shift: float) -> NDArray[float]: - return mz_arr * (1 + ppm_shift / 1e6) - - def _apply_models( - self, - read_file: ImzmlReadFile, - models: list[LinearModel | PolynomialModel], - write_file: ImzmlWriteFile, - ) -> None: - parallelize = WriteSpectraParallel.from_config(self._parallel_config) - - def chunk_operation(reader, spectra_indices, writer) -> None: - for spectrum_id in spectra_indices: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - model = models[spectrum_id] - mz_arr = np.array([mz * (1 - model.predict(mz) / 1e6) for mz in mz_arr]) - writer.add_spectrum(mz_arr, int_arr, reader.coordinates[spectrum_id]) - - parallelize.map_chunked_to_file( - read_file=read_file, - write_file=write_file, - operation=chunk_operation, - ) - - def _save_attrs(self, **kwargs) -> None: - if self._output_store is not None: - for key, value in kwargs.items(): - self._output_store.attrs[key] = value - - def _save_results(self, **kwargs) -> None: - """Write datasets according to the given kwargs to the output HDF5 store.""" - if self._output_store is not None: - for key, value in kwargs.items(): - self._output_store.create_dataset(key, data=value) diff --git a/src/depiction/calibration/deprecated/calibrate_image_targeted_v2.py b/src/depiction/calibration/deprecated/calibrate_image_targeted_v2.py deleted file mode 100644 index 5a70f61..0000000 --- a/src/depiction/calibration/deprecated/calibrate_image_targeted_v2.py +++ /dev/null @@ -1,279 +0,0 @@ -from typing import Optional - -import h5py -import numpy as np -from numpy.typing import NDArray - -from depiction.calibration.spectrum.reference_peak_distances import ReferencePeakDistances -from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker -from depiction.calibration.deprecated.calibrate_spectrum import CalibrateSpectrum -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel -from depiction.parallel_ops.parallel_config import ParallelConfig -from depiction.parallel_ops.parallel_map import ParallelMap -from depiction.parallel_ops.read_spectra_parallel import ReadSpectraParallel -from depiction.parallel_ops.write_spectra_parallel import WriteSpectraParallel -from depiction.persistence import ( - ImzmlReadFile, - ImzmlWriteFile, - ImzmlReader, - ImzmlWriter, - ImzmlModeEnum, -) -from depiction.image.spatial_smoothing_sparse_aware import SpatialSmoothingSparseAware - - -class CalibrateImageTargeted: - """Calibrates an imzML file to a set of reference m/z values. - - The approach consists of the following steps: - - 1. For every spectrum: - - 1.a. Pick the peaks. - - 1.b. For every reference: - - 1.b.a. Select a window of nearby peaks. - - 1.b.b. Pick the strongest signal peak, or, give up if there is no suitable peak. - - 1.b.c. Compute the median shift of these peaks. - - 1.b.d. After removing the median shift from the observed masses, - pick for every reference the nearest observed peak. - - 1.c. This yields a vector of distances for every reference, with some values missing. - - 2. Smooth these distance vectors spatially. - - 3. For every spectrum: - - 3.a. Fit a calibration model to the smoothed distances. - - 3.b. Apply the calibration model to the m/z values. - - 3.c. Save the results. - """ - - def __init__( - self, - reference_mz: NDArray[float], - model_type: str, - parallel_config: ParallelConfig, - peak_picker: BasicPeakPicker, - output_store: Optional[h5py.Group] = None, - input_smoothing_activated: bool = True, - input_smoothing_kernel_size: int = 27, - input_smoothing_kernel_std: float = 10.0, - distance_unit: str = "mz", - max_distance: float = 2.0, - accept_processed_data: bool = False, - ) -> None: - self._reference_mz = np.asarray(reference_mz) - self._model_type = model_type - self._parallel_config = parallel_config - self._basic_peak_picker = peak_picker - self._output_store = output_store - self._input_smoothing_activated = input_smoothing_activated - self._input_smoothing_kernel_size = input_smoothing_kernel_size - self._input_smoothing_kernel_std = input_smoothing_kernel_std - self._distance_unit = distance_unit - self._max_distance = max_distance - self._accept_processed_data = accept_processed_data - - def calibrate_image(self, read_file: ImzmlReadFile, write_file: ImzmlWriteFile) -> None: - print("Computing distance vectors...") - signed_distance_vectors, median_shifts = self._compute_reference_distance_vectors_for_file(read_file=read_file) - self._save_results(signed_distance_vectors=signed_distance_vectors, median_shifts=median_shifts) - - if self._input_smoothing_activated: - print("Smoothing distance vectors...") - smoother = SpatialSmoothingSparseAware( - kernel_size=self._input_smoothing_kernel_size, - kernel_std=self._input_smoothing_kernel_std, - ) - smoothed_distance_vectors = smoother.smooth_sparse_multi_channel( - sparse_values=signed_distance_vectors, - coordinates=read_file.coordinates_2d, - ) - # TODO one might also consider interpolation here, but for now i won't add it - self._save_results(smoothed_distance_vectors=smoothed_distance_vectors) - else: - print("Smoothing distance vectors skipped") - smoothed_distance_vectors = signed_distance_vectors - - print("Compute models...") - models = self._compute_models( - distance_vectors=smoothed_distance_vectors, - mz_refs=self._reference_mz, - model_type=self._model_type, - ) - self._save_results( - models=[model.coef for model in models], - coordinates=read_file.coordinates_2d, - reference_mz=self._reference_mz, - ) - self._save_attrs(model_type=self._model_type, distance_unit=self._distance_unit) - - print("Applying models...") - self._apply_models(read_file, models, write_file) - - def _compute_reference_distance_vectors_for_file( - self, read_file: ImzmlReadFile - ) -> tuple[NDArray[float], NDArray[float]]: - parallel = ReadSpectraParallel.from_config(config=self._parallel_config) - - return parallel.map_chunked( - read_file=read_file, - operation=self.process_chunk, - reduce_fn=lambda chunks: ( - np.concatenate([c[0] for c in chunks], axis=0), - np.concatenate([c[1] for c in chunks], axis=0), - ), - bind_args=dict( - peak_picker=self._basic_peak_picker, - distance_unit=self._distance_unit, - max_distance=self._max_distance, - reference_mz=self._reference_mz, - pick_peaks=not self._accept_processed_data or read_file.imzml_mode == ImzmlModeEnum.CONTINUOUS, - ), - ) - - @classmethod - def process_chunk( - cls, - reader: ImzmlReader, - spectra_ids: list[int], - peak_picker: BasicPeakPicker, - distance_unit: str, - max_distance: float, - reference_mz: NDArray[float], - pick_peaks: bool, - ) -> tuple[NDArray[float], NDArray[float]]: - distance_vectors = np.zeros((len(spectra_ids), len(reference_mz))) - median_shifts = np.zeros(len(spectra_ids)) - for i_result, spectrum_id in enumerate(spectra_ids): - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - vector, med_shift = cls._compute_reference_distance_vector( - mz_arr=mz_arr, - int_arr=int_arr, - unit=distance_unit, - max_distance=max_distance, - peak_picker=peak_picker, - mz_ref_arr=reference_mz, - pick_peaks=pick_peaks, - ) - distance_vectors[i_result] = vector - median_shifts[i_result] = med_shift - return distance_vectors, median_shifts - - @classmethod - def _compute_reference_distance_vector( - cls, - mz_arr: NDArray[float], - int_arr: NDArray[float], - unit: str, - max_distance: float, - peak_picker: BasicPeakPicker, - mz_ref_arr: NDArray[float], - pick_peaks: bool, - ) -> tuple[NDArray[float], float]: - # 1.a. Pick the peaks. - if pick_peaks: - mz_peak_arr, int_peak_arr = peak_picker.pick_peaks(mz_arr=mz_arr, int_arr=int_arr) - else: - mz_peak_arr, int_peak_arr = mz_arr, int_arr - - # 1.b. For every reference: - # 1.b.a. Select a window of nearby peaks. - # 1.b.b. Pick the strongest signal peak, or, give up if there is no suitable peak. - strongest_peak_distances = ReferencePeakDistances.get_distances_max_peak_in_window( - peak_mz_arr=mz_peak_arr, - peak_int_arr=int_peak_arr, - ref_mz_arr=mz_ref_arr, - max_distance=max_distance, - max_distance_unit=unit, - ) - - # 1.b.c. Compute the median shift of these peaks. - if np.all(np.isnan(strongest_peak_distances)): - # there are no peaks in the window around the reference, so there is also no model to fit - return np.full(len(mz_ref_arr), np.nan), np.nan - else: - median_shift = np.nanmedian(strongest_peak_distances) - - # 1.b.d. After removing the median shift from the observed masses, - # pick for every reference the nearest observed peak. - # 1.c. This yields a vector of distances for every reference, with some values missing. - signed_distances = ReferencePeakDistances.get_distances_nearest( - peak_mz_arr=mz_peak_arr - median_shift, - ref_mz_arr=mz_ref_arr, - max_distance=max_distance, - max_distance_unit=unit, - ) - signed_distances += median_shift - - # Also return the median shift for traceability... - return signed_distances, median_shift - - def _compute_models( - self, - distance_vectors: NDArray[float], - mz_refs: NDArray[float], - model_type: str, - ) -> list[LinearModel | PolynomialModel]: - parallel_map = ParallelMap(config=self._parallel_config) - return parallel_map( - operation=self._compute_models_for_chunk, - tasks=[ - distance_vectors[spectra_indices, :] - for spectra_indices in self._parallel_config.get_task_splits(n_items=distance_vectors.shape[0]) - ], - bind_kwargs=dict(mz_refs=mz_refs, model_type=model_type), - reduce_fn=ParallelMap.reduce_concat, - ) - - @classmethod - def _compute_models_for_chunk( - cls, - distance_vectors: NDArray[float], - mz_refs: NDArray[float], - model_type: str, - ) -> list[LinearModel] | list[PolynomialModel]: - models = [] - for i in range(distance_vectors.shape[0]): - x = mz_refs - y = distance_vectors[i, :] - - # remove nan values (only checking the distance for nans) - x = x[~np.isnan(y)] - y = y[~np.isnan(y)] - - models.append(CalibrateSpectrum.fit_model(x=x, y=y, model_type=model_type)) - return models - - def _apply_models( - self, - read_file: ImzmlReadFile, - models: list[LinearModel | PolynomialModel], - write_file: ImzmlWriteFile, - ) -> None: - parallelize = WriteSpectraParallel.from_config(self._parallel_config) - unit = self._distance_unit - - def chunk_operation(reader: ImzmlReader, spectra_indices: list[int], writer: ImzmlWriter) -> None: - for spectrum_id in spectra_indices: - mz_arr, int_arr = reader.get_spectrum(spectrum_id) - model = models[spectrum_id] - if unit == "mz": - mz_arr = mz_arr - model.predict(mz_arr) - elif unit == "ppm": - mz_arr = np.array([mz * (1 - model.predict(mz) / 1e6) for mz in mz_arr]) - else: - raise ValueError(f"Unknown unit={unit}") - writer.add_spectrum(mz_arr, int_arr, reader.coordinates[spectrum_id]) - - parallelize.map_chunked_to_file( - read_file=read_file, - write_file=write_file, - operation=chunk_operation, - ) - - def _save_attrs(self, **kwargs) -> None: - if self._output_store is not None: - for key, value in kwargs.items(): - self._output_store.attrs[key] = value - - def _save_results(self, **kwargs) -> None: - """Write datasets according to the given kwargs to the output HDF5 store.""" - if self._output_store is not None: - for key, value in kwargs.items(): - self._output_store.create_dataset(key, data=value) diff --git a/src/depiction/calibration/deprecated/calibrate_spectrum.py b/src/depiction/calibration/deprecated/calibrate_spectrum.py deleted file mode 100644 index ad84dca..0000000 --- a/src/depiction/calibration/deprecated/calibrate_spectrum.py +++ /dev/null @@ -1,220 +0,0 @@ -# TODO superseded by CalibrateSpectrumRegressShift - -from typing import Any, Union - -import numpy as np -from numpy.typing import NDArray - -from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel -from depiction.calibration.deprecated.reference_distance_estimator import ( - ReferenceDistanceEstimator, -) -from depiction.spectrum.peak_filtering import PeakFilteringType - - -# TODO refactor this code once development is more advanced -class CalibrateSpectrum: - """ - Computes the calibration model for a single spectrum. - :param reference_mz: m/z values of the reference spectrum - :param peak_picker: peak picker to use - :param n_candidates: number of candidates to consider for each peak to consider - :param model_type: either 'linear', 'poly_' or 'linear_siegelslopes' - :param distance_limit: maximum distance between a reference peak and a sample peak to be considered - """ - - def __init__( - self, - reference_mz: NDArray[float], - peak_picker: BasicPeakPicker, - n_candidates: int = 3, - model_type: str = "linear", - distance_limit: float = 2.0, - ) -> None: - self._ref_estimator = ReferenceDistanceEstimator(reference_mz=reference_mz, n_candidates=n_candidates) - self._model_type = model_type - self._model_class = PolynomialModel if model_type.startswith("poly_") else LinearModel - self._distance_limit = distance_limit - self._peak_picker = peak_picker - - @staticmethod - def fit_model(x: NDArray[float], y: NDArray[float], model_type: str) -> Union[LinearModel, PolynomialModel]: - if len(x) < 3: - # If there are not enough points, return a zero model. - if model_type.startswith("poly_"): - model_class = PolynomialModel - elif model_type.startswith("linear"): - model_class = LinearModel - else: - raise ValueError(f"Unknown {model_type=}") - model = model_class.zero() - elif model_type == "linear": - model = LinearModel.fit_lsq(x_arr=x, y_arr=y) - elif model_type.startswith("poly_"): - degree = int(model_type.split("_")[1]) - model = PolynomialModel.fit_lsq(x_arr=x, y_arr=y, degree=degree) - elif model_type == "linear_siegelslopes": - model = LinearModel.fit_siegelslopes(x_arr=x, y_arr=y) - else: - raise ValueError(f"Unknown {model_type=}") - return model - - def calibrate_spectrum(self, mz_arr: NDArray[float], int_arr: NDArray[float]) -> LinearModel | PolynomialModel: - """Computes the calibration model for a single spectrum.""" - # get the mz-distance pairs - problem_mz, problem_distance = self._get_mz_distance_pairs(mz_arr=mz_arr, int_arr=int_arr) - - # Fit the model - return self.fit_model(x=problem_mz, y=problem_distance, model_type=self._model_type) - - def _get_mz_distance_pairs( - self, mz_arr: NDArray[float], int_arr: NDArray[float] - ) -> tuple[NDArray[float], NDArray[float]]: - # TODO remove function later (if not used) - mz_peaks = self._peak_picker.pick_peaks_mz(mz_arr=mz_arr, int_arr=int_arr) - if len(mz_peaks) == 0: - return np.array([]), np.array([]) - distances, closest_indices = self._ref_estimator.compute_distances_for_peaks(mz_peaks=mz_peaks) - median_distance = np.median(distances[:, self._ref_estimator.closest_index]) - - # Pick for every reference, the distance which is closest to the median distance. - problem_mz = self._ref_estimator.reference_mz - problem_distance = distances[ - np.arange(distances.shape[0]), - np.argmin(np.abs(distances - median_distance), axis=1), - ] - - # Remove all points that are too far away. - within_range = np.abs(problem_distance) <= self._distance_limit - return problem_mz[within_range], problem_distance[within_range] - - @classmethod - def calibrate_from_config( - cls, - peak_picker: BasicPeakPicker, - peak_filtering, - mz_arr: NDArray[float], - int_arr: NDArray[float], - reference_mz_arr: NDArray[float], - model_type: str, - distance_limit: float, - return_info: bool = False, - prune_bad_limit: float = 5.0, - ) -> Union[ - LinearModel, - PolynomialModel, - tuple[Union[LinearModel, PolynomialModel], dict[str, Any]], - ]: - # TODO experimental method, for quick testing/comparison - # needs to be properly integrated later (maybe it is also indicate of the changes which are needed still) - problem_mz, problem_distance = cls.get_matches_from_config( - peak_picker=peak_picker, - peak_filtering=peak_filtering, - mz_arr=mz_arr, - int_arr=int_arr, - reference_mz_arr=reference_mz_arr, - distance_limit=distance_limit, - ) - model, pruned_limit = cls.fit_calibration_model_for_problem( - model_type=model_type, - problem_mz=problem_mz, - problem_distance=problem_distance, - mz_arr=mz_arr, - prune_bad_limit=prune_bad_limit, - ) - - # return the model - if return_info: - info = {"n_matches": len(problem_mz), "pruned_limit": pruned_limit} - return model, info - else: - return model - - @classmethod - def fit_calibration_model_for_problem( - cls, - model_type: str, - problem_mz: NDArray[float], - problem_distance: NDArray[float], - mz_arr: NDArray[float], - prune_bad_limit, - ): - # Fit the model - model = cls.fit_model(x=problem_mz, y=problem_distance, model_type=model_type) - pruned_limit = None - model_class = PolynomialModel if model_type.startswith("poly_") else LinearModel - if prune_bad_limit: - # if this is not zero/None then we check 3 values for the model and the distance, if it's bigger than this - # limit the model will be rejected - mz_values = [mz_arr[0], mz_arr[len(mz_arr) // 2], mz_arr[-1]] - distance_values = [model.predict(mz) for mz in mz_values] - if np.any(np.abs(distance_values) > prune_bad_limit): - model = model_class.zero() - pruned_limit = np.max(np.abs(distance_values)) - # TODO it would also make sense to reject models based on general goodness-of-fit - return model, pruned_limit - - @staticmethod - def match_peaks_to_references( - spectrum_mz_arr: NDArray[float], - spectrum_int_arr: NDArray[float], - peak_idx_arr: NDArray[int], - reference_mz_arr: NDArray[float], - distance_limit: float, - ) -> tuple[NDArray[float], NDArray[float]]: - # match each reference to its closest peak - if len(peak_idx_arr) < 3: - return np.array([]), np.array([]) - - ref_estimator = ReferenceDistanceEstimator(reference_mz=reference_mz_arr, n_candidates=3) - distances, closest_indices = ref_estimator.compute_distances_for_peaks(mz_peaks=spectrum_mz_arr[peak_idx_arr]) - median_distance = np.median(distances[:, ref_estimator.closest_index]) - # Pick for every reference, the distance which is closest to the median distance. - problem_mz = ref_estimator.reference_mz - problem_distance = distances[ - np.arange(distances.shape[0]), - np.argmin(np.abs(distances - median_distance), axis=1), - ] - - # drop points that are too far away - within_range = np.abs(problem_distance) <= distance_limit - if not np.any(within_range): - return np.array([]), np.array([]) - else: - return problem_mz[within_range], problem_distance[within_range] - - @classmethod - def get_matches_from_config( - cls, - peak_picker: BasicPeakPicker, - peak_filtering: PeakFilteringType, - mz_arr: NDArray[float], - int_arr: NDArray[float], - reference_mz_arr: NDArray[float], - distance_limit: float, - ) -> tuple[NDArray[float], NDArray[float]]: - # TODO experimental method, for quick testing/comparison - # needs to be properly integrated later (maybe it is also indicate of the changes which are needed still) - # pick peaks - spectrum_mz_arr = mz_arr - spectrum_int_arr = int_arr - peak_idx_arr = peak_picker.pick_peaks_index(mz_arr=spectrum_mz_arr, int_arr=spectrum_int_arr) - - # filter peaks - if len(peak_idx_arr) >= 3: - peak_idx_arr = peak_filtering.filter_index_peaks( - spectrum_mz_arr=spectrum_mz_arr, - spectrum_int_arr=spectrum_int_arr, - peak_idx_arr=peak_idx_arr, - ) - - # match each reference to its closest peak - return cls.match_peaks_to_references( - spectrum_mz_arr=spectrum_mz_arr, - spectrum_int_arr=spectrum_int_arr, - peak_idx_arr=peak_idx_arr, - reference_mz_arr=reference_mz_arr, - distance_limit=distance_limit, - ) diff --git a/src/depiction/calibration/deprecated/model_smoothing.py b/src/depiction/calibration/deprecated/model_smoothing.py deleted file mode 100644 index 6d5b276..0000000 --- a/src/depiction/calibration/deprecated/model_smoothing.py +++ /dev/null @@ -1,66 +0,0 @@ -import numpy as np -import scipy.ndimage -from numpy.typing import NDArray - -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel - - -class ModelSmoothing: - """ - Performs some smoothing of the model parameters obtained for a full image. - """ - - def __init__(self, sigma: float) -> None: - self._sigma = sigma - - def smooth_spatial( - self, coordinates: NDArray[int], models: list[LinearModel | PolynomialModel] - ) -> list[LinearModel | PolynomialModel]: - """ - Smoothes the values in ``values`` using a Gaussian kernel in a spatial neighborhood. - :param coordinates: shape (n_points, 2) the coordinates of the points - :param models: the models whose coefficients to smooth - :return list of smoothed models, with the same type as the input models - """ - model_type, values = self.get_model_type_and_values(models=models) - - # input validation - if coordinates.shape[0] != values.shape[0]: - raise ValueError("coordinates and values must have the same number of rows") - if coordinates.shape[1] != 2: - raise ValueError("coordinates must have two columns (2d coordinates)") - - # assign the values to a grid - value_grid = np.zeros(coordinates.max(axis=0) + 1, dtype=values.dtype) - value_grid[coordinates[:, 0], coordinates[:, 1]] = values - - # smooth the grid - smoothed_grid = scipy.ndimage.gaussian_filter(value_grid, sigma=self._sigma, axes=(0, 1)) - - # assign the smoothed values back to the points - smoothed_values = smoothed_grid[coordinates[:, 0], coordinates[:, 1]] - - return [model_type(coef=coef) for coef in smoothed_values] - - def smooth_sequential(self, models: list[LinearModel | PolynomialModel]) -> list[LinearModel | PolynomialModel]: - """ - Smoothes the values in ``values`` using a Gaussian kernel in a sequential neighborhood. - :param models: the models whose coefficients to smooth - :return list of smoothed models, with the same type as the input models - """ - model_type, values = self.get_model_type_and_values(models=models) - smoothed_values = scipy.ndimage.gaussian_filter1d(values, sigma=self._sigma, axis=0) - return [model_type(coef=coef) for coef in smoothed_values] - - def get_model_type_and_values( - self, models: list[LinearModel | PolynomialModel] - ) -> tuple[type[LinearModel | PolynomialModel], NDArray[float]]: - model_type = self._check_model_type(models=models) - return model_type, np.array([model.coef for model in models]) - - def _check_model_type(self, models: list[LinearModel | PolynomialModel]) -> type[LinearModel | PolynomialModel]: - model_types = {type(model) for model in models} - if len(model_types) != 1: - raise ValueError(f"all models must have the same type, found: {model_types}") - return list(model_types)[0] diff --git a/src/depiction/calibration/deprecated/reference_distance_estimator.py b/src/depiction/calibration/deprecated/reference_distance_estimator.py deleted file mode 100644 index fbf389d..0000000 --- a/src/depiction/calibration/deprecated/reference_distance_estimator.py +++ /dev/null @@ -1,134 +0,0 @@ -from dataclasses import dataclass - -import numpy as np -from numpy.typing import NDArray - - -@dataclass -class ReferenceDistanceEstimator: - """ - Estimates the distances between peaks in a sample spectrum and a reference spectrum. - For every reference peak, the nearest sample spectrum peak is identified and the distance to it's immediate neighbours is computed. - The number of neighbours is determined by the parameter n_candidates, and has to be odd so the nearest peak's distance is in the middle. - :param reference_mz: m/z values of the reference spectrum - :param n_candidates: number of candidates to consider for each peak to consider - """ - - reference_mz: NDArray[float] - n_candidates: int - - def __post_init__(self): - self.reference_mz = np.asarray(self.reference_mz) - if self.n_candidates % 2 == 0: - raise ValueError("n_candidates must be odd") - - @property - def n_targets(self) -> int: - """Returns the number of reference peaks.""" - return len(self.reference_mz) - - @property - def closest_index(self) -> int: - """Returns the index of the closest peak.""" - return self.n_candidates // 2 - - # TODO not fully sure if this is the correct place - # TODO also, it's not clear whether it would be better/possible to use a strategy that also takes into account outliers already here - def compute_max_peak_within_distance( - self, - mz_peaks: NDArray[float], - int_peaks: NDArray[float], - max_distance: float, - keep_missing: bool = False, - ) -> tuple[NDArray[int], NDArray[float]]: - """ - For every reference peak, a nearby peak in ``mz_peaks`` is identified by finding the peak with the highest intensity. - If no peak is found within ``max_distance``, the entry is skipped, i.e. the list might have less entries than ``n_targets``. - :param mz_peaks: m/z values of the sample spectrum's peaks - :param int_peaks: intensities of the sample spectrum's peaks - :param max_distance: maximum distance to consider - :param keep_missing: if True, missing peaks are indicated by -1 in the indices and NaN in the distances - :return: - - indices: a numpy array containing the indices in mz_peaks/int_peaks of the chosen peaks - - signed_distances: a numpy array containing the signed distances to the reference peaks, i.e. the error - """ - if len(mz_peaks) == 0: - raise ValueError("mz_peaks must not be empty") - - indices = [] - signed_distances = [] - for mz_reference in self.reference_mz: - abs_distances = np.abs(mz_peaks - mz_reference) - idx_valid = np.where(abs_distances <= max_distance)[0] - if not idx_valid.size: - # TODO make keep_missing the default - if keep_missing: - indices.append(-1) - signed_distances.append(np.nan) - continue - idx_max = idx_valid[np.argmax(int_peaks[idx_valid])] - indices.append(idx_max) - # "value +0.2 means that the peak is 0.2 m/z higher than the reference" - signed_distances.append(mz_peaks[idx_max] - mz_reference) - - return np.asarray(indices), np.asarray(signed_distances) - - def compute_distances_for_peaks(self, mz_peaks: NDArray[float]) -> tuple[NDArray[float], NDArray[int]]: - """ - For every reference peak, the closest correspondence in ``mz_peaks`` and its neighbours are identified - and their (signed) distances - $$mz_\\text{peaks} - mz_\\text{reference}$$ - will be returned. - The method also returns the indices of the closest peaks, whose distance is found at ``distances[i, closest_indices[i]]``. - :param mz_peaks: m/z values of the sample spectrum's peaks - :return: - - distances: a matrix of shape (n_targets, n_candidates) where each row corresponds to a target and each column to a candidate, - the values indicate the signed distance to the target - - closest_indices: a vector of length n_targets containing the indices of the nearest candidates (in the - mz_peaks array) for each target (i.e. the index of the center element of the corresponding row in distances) - """ - if len(mz_peaks) == 0: - raise ValueError("mz_peaks must not be empty") - - nearest_indices = np.zeros(self.n_targets, dtype=int) - distances = np.zeros((self.n_targets, self.n_candidates)) - - for i_target, mz_reference in enumerate(self.reference_mz): - # "value +0.2 means that the peak is 0.2 m/z higher than the reference" - signed_distances = mz_peaks - mz_reference - - # find the closest value - index_min = np.argmin(np.abs(signed_distances)) - nearest_indices[i_target] = index_min - - # now, computing the neighbourhood distances is easy, but it requires careful consideration of corner cases - distances[i_target] = self._convert_to_row(signed_distances, index_min) - - return distances, nearest_indices - - def _convert_to_row(self, signed_distances: NDArray[float], index_min: int) -> NDArray[float]: - """ - Extracts the neighborhood of the closest peak from the signed distances. - This method also handles corner cases at the beginning and the end of the peak list, - in which case distance is set to -inf or +inf, respectively. - :param signed_distances: a vector of signed distances - :param index_min: the index of the closest peak - """ - half_length = self.n_candidates // 2 - i_left = index_min - half_length - i_right = index_min + half_length + 1 - - # NOTE: The code assumes that it's not possible to be out-of-bounds on both sides simultaneously - if i_left < 0: - # pad with -inf from the left - return np.array([*np.repeat([-np.inf], -i_left), *signed_distances[:i_right]]) - elif i_right > len(signed_distances): - # pad with +inf from the right - return np.array( - [ - *signed_distances[i_left:], - *np.repeat([np.inf], i_right - len(signed_distances)), - ] - ) - else: - return signed_distances[i_left:i_right] diff --git a/src/depiction/calibration/deprecated/tests/__init__.py b/src/depiction/calibration/deprecated/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/depiction/calibration/deprecated/tests/test_adjust_median_shift.py b/src/depiction/calibration/deprecated/tests/test_adjust_median_shift.py deleted file mode 100644 index 4dd824f..0000000 --- a/src/depiction/calibration/deprecated/tests/test_adjust_median_shift.py +++ /dev/null @@ -1,52 +0,0 @@ -import unittest -from functools import cached_property -from unittest.mock import MagicMock - -import numpy as np - -from depiction.calibration.deprecated.adjust_median_shift import AdjustMedianShift -from typing import NoReturn - - -class TestAdjustMedianShift(unittest.TestCase): - def setUp(self) -> None: - self.mock_peak_picker = MagicMock(name="mock_peak_picker") - self.mock_ref_mz_arr = MagicMock(name="mock_ref_mz_arr") - self.mock_parallel_config = MagicMock(name="mock_parallel_config") - - @cached_property - def instance(self) -> AdjustMedianShift: - return AdjustMedianShift( - peak_picker=self.mock_peak_picker, - ref_mz_arr=self.mock_ref_mz_arr, - parallel_config=self.mock_parallel_config, - ) - - def test_compute_median_shift_ppm(self) -> None: - self.mock_ref_mz_arr = np.array([100.0, 150.0, 200.0, 300, 400]) - mock_int_arr = np.ones(5) - ppm_error_arr = np.array([200, 400, 200, 700, 700]) - mock_mz_arr = np.array( - [ref_mz + ref_mz * ppm_error * 1e-6 for ref_mz, ppm_error in zip(self.mock_ref_mz_arr, ppm_error_arr)] - ) - self.mock_peak_picker.pick_peaks_index.return_value = np.array([0, 1, 2]) - - median_shift = self.instance.compute_median_shift_ppm(mz_arr=mock_mz_arr, int_arr=mock_int_arr) - - self.assertAlmostEqual(200.0, median_shift, places=7) - - @unittest.skip - def test_compute_median_shifts(self) -> NoReturn: - raise NotImplementedError - - @unittest.skip - def test_smooth_median_shifts(self) -> NoReturn: - raise NotImplementedError - - @unittest.skip - def test_apply_correction(self) -> NoReturn: - raise NotImplementedError - - -if __name__ == "__main__": - unittest.main() diff --git a/src/depiction/calibration/deprecated/tests/test_calibrate_image_integration.py b/src/depiction/calibration/deprecated/tests/test_calibrate_image_integration.py deleted file mode 100644 index c3719c2..0000000 --- a/src/depiction/calibration/deprecated/tests/test_calibrate_image_integration.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -import unittest -from tempfile import TemporaryDirectory - -import numpy as np - -from depiction.calibration.deprecated.calibrate_image import CalibrateImage -from tests.integration.integration_test_utils import IntegrationTestUtils -from depiction.parallel_ops.parallel_config import ParallelConfig -from depiction.persistence import ImzmlModeEnum, ImzmlReadFile, ImzmlWriteFile - - -class TestCalibrateImageIntegration(unittest.TestCase): - def setUp(self): - self.tmp_dir = TemporaryDirectory() - self.addCleanup(self.tmp_dir.cleanup) - self.mock_input_file_path = os.path.join(self.tmp_dir.name, "input.imzML") - self.mock_output_file_path = os.path.join(self.tmp_dir.name, "output.imzML") - - def test_calibrate_image(self): - mz_list = np.linspace(100, 110, 23) - mz_list_outlier = mz_list.copy() - mz_list_outlier[10] += 0.3 - mz_ref = mz_list[2::5] - - mz_arr_list = np.array([mz_list, mz_list + 0.1, mz_list - 0.2, mz_list_outlier]) - int_arr_list = np.full_like(mz_arr_list, 1.0) - int_arr_list[..., 2::5] = 10.0 - - IntegrationTestUtils.populate_test_file( - path=self.mock_input_file_path, - mz_arr_list=mz_arr_list, - int_arr_list=int_arr_list, - imzml_mode=ImzmlModeEnum.PROCESSED, - ) - - read_file = ImzmlReadFile(self.mock_input_file_path) - write_file = ImzmlWriteFile(self.mock_output_file_path, imzml_mode=ImzmlModeEnum.PROCESSED) - - calibrate = CalibrateImage( - reference_mz=mz_ref, - model_type="linear", - parallel_config=ParallelConfig(n_jobs=2, task_size=None), - output_store=None, - ) - calibrate.calibrate_image(read_file=read_file, write_file=write_file) - - # read the results - with ImzmlReadFile(self.mock_output_file_path).reader() as reader: - spectra = reader.get_spectra([0, 1, 2, 3]) - self.assertEqual(ImzmlModeEnum.PROCESSED, reader.imzml_mode) - - for i_spectrum in range(4): - mz_arr = spectra[0][i_spectrum] - int_arr = spectra[1][i_spectrum] - - mz_peak = mz_arr[int_arr == 10.0] - # TODO ideally this would be as close to zero as possible: - print(mz_peak - mz_ref) - # but in practice it currently is not. - # it needs to be checked why this is the case... - - print() - # check the delta from the input - print(spectra[0]) - print() - print(spectra[1]) - delta = mz_arr_list - spectra[0] - print(delta) - self.assertFalse(True) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/depiction/calibration/deprecated/tests/test_calibrate_spectrum.py b/src/depiction/calibration/deprecated/tests/test_calibrate_spectrum.py deleted file mode 100644 index e24e7a5..0000000 --- a/src/depiction/calibration/deprecated/tests/test_calibrate_spectrum.py +++ /dev/null @@ -1,135 +0,0 @@ -import unittest -from functools import cached_property -from unittest.mock import MagicMock, patch - -import numpy as np - -from depiction.spectrum.peak_picking.basic_peak_picker import BasicPeakPicker -from depiction.calibration.deprecated.calibrate_spectrum import CalibrateSpectrum -from depiction.calibration.models.linear_model import LinearModel -from depiction.calibration.models.polynomial_model import PolynomialModel -from depiction.calibration.deprecated.reference_distance_estimator import ( - ReferenceDistanceEstimator, -) -from typing import NoReturn - - -class TestCalibrateSpectrum(unittest.TestCase): - def setUp(self) -> None: - self.mock_reference_mz = np.array([100, 200, 300]) - self.mock_detect_smooth_sigma = MagicMock(name="mock_detect_smooth_sigma") - self.mock_detect_prominence = MagicMock(name="mock_detect_prominence") - self.mock_n_candidates = MagicMock(name="mock_n_candidates") - self.mock_model_type = "linear" - self.mock_distance_limit = 2.0 - self.mock_basic_peak_picker = MagicMock(name="mock_basic_peak_picker", spec=BasicPeakPicker) - self.mock_reference_distance_estimator = MagicMock( - name="mock_reference_distance_estimator", - spec=ReferenceDistanceEstimator, - ) - self.mock_mz_arr = MagicMock(name="mock_mz_arr") - self.mock_int_arr = MagicMock(name="mock_int_arr") - self.mock_problem_mz = MagicMock(name="mock_problem_mz") - self.mock_problem_dist = MagicMock(name="mock_problem_dist") - - @cached_property - @patch("depiction.calibration.deprecated.calibrate_spectrum.ReferenceDistanceEstimator") - @patch("depiction.calibration.deprecated.calibrate_spectrum.BasicPeakPicker") - def mock_compute(self, mock_basic_peak_picker, mock_reference_distance_estimator) -> CalibrateSpectrum: - mock_basic_peak_picker.return_value = self.mock_basic_peak_picker - mock_reference_distance_estimator.return_value = self.mock_reference_distance_estimator - return CalibrateSpectrum( - reference_mz=self.mock_reference_mz, - peak_picker=self.mock_basic_peak_picker, - n_candidates=self.mock_n_candidates, - model_type=self.mock_model_type, - distance_limit=self.mock_distance_limit, - ) - - @patch.object(LinearModel, "zero") - @patch.object(CalibrateSpectrum, "_get_mz_distance_pairs") - def test_calibrate_spectrum_when_zero_points(self, mock_get_mz_distance_pairs, mock_zero) -> None: - self.mock_problem_mz.__len__.return_value = 0 - mock_get_mz_distance_pairs.return_value = ( - self.mock_problem_mz, - self.mock_problem_dist, - ) - model = self.mock_compute.calibrate_spectrum(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_get_mz_distance_pairs.assert_called_once_with(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_zero.assert_called_once_with() - self.assertEqual(mock_zero.return_value, model) - - @patch.object(LinearModel, "zero") - @patch.object(CalibrateSpectrum, "_get_mz_distance_pairs") - def test_calibrate_spectrum_when_too_few_points(self, mock_get_mz_distance_pairs, mock_zero) -> None: - self.mock_problem_mz.__len__.return_value = 2 - mock_get_mz_distance_pairs.return_value = ( - self.mock_problem_mz, - self.mock_problem_dist, - ) - model = self.mock_compute.calibrate_spectrum(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_get_mz_distance_pairs.assert_called_once_with(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_zero.assert_called_once_with() - self.assertEqual(mock_zero.return_value, model) - - @patch.object(LinearModel, "fit_lsq") - @patch.object(CalibrateSpectrum, "_get_mz_distance_pairs") - def test_calibrate_spectrum_when_linear(self, mock_get_mz_distance_pairs, mock_fit_linear) -> None: - self.mock_problem_mz.__len__.return_value = 3 - mock_get_mz_distance_pairs.return_value = ( - self.mock_problem_mz, - self.mock_problem_dist, - ) - model = self.mock_compute.calibrate_spectrum(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_get_mz_distance_pairs.assert_called_once_with(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_fit_linear.assert_called_once_with(x_arr=self.mock_problem_mz, y_arr=self.mock_problem_dist) - self.assertEqual(mock_fit_linear.return_value, model) - - @patch.object(LinearModel, "fit_siegelslopes") - @patch.object(CalibrateSpectrum, "_get_mz_distance_pairs") - def test_calibrate_spectrum_when_linear_siegelslopes( - self, mock_get_mz_distance_pairs, mock_fit_linear_siegelslopes - ) -> None: - self.mock_problem_mz.__len__.return_value = 3 - self.mock_compute._model_type = "linear_siegelslopes" - mock_get_mz_distance_pairs.return_value = ( - self.mock_problem_mz, - self.mock_problem_dist, - ) - model = self.mock_compute.calibrate_spectrum(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_get_mz_distance_pairs.assert_called_once_with(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_fit_linear_siegelslopes.assert_called_once_with(x_arr=self.mock_problem_mz, y_arr=self.mock_problem_dist) - self.assertEqual(mock_fit_linear_siegelslopes.return_value, model) - - @patch.object(PolynomialModel, "fit_lsq") - @patch.object(CalibrateSpectrum, "_get_mz_distance_pairs") - def test_calibrate_spectrum_when_polynomial(self, mock_get_mz_distance_pairs, mock_fit_polynomial) -> None: - self.mock_problem_mz.__len__.return_value = 3 - self.mock_compute._model_type = "poly_2" - mock_get_mz_distance_pairs.return_value = ( - self.mock_problem_mz, - self.mock_problem_dist, - ) - model = self.mock_compute.calibrate_spectrum(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_get_mz_distance_pairs.assert_called_once_with(mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr) - mock_fit_polynomial.assert_called_once_with(x_arr=self.mock_problem_mz, y_arr=self.mock_problem_dist, degree=2) - self.assertEqual(mock_fit_polynomial.return_value, model) - - @unittest.skip - def test_get_mz_distance_pairs(self) -> NoReturn: - raise NotImplementedError() - - def test_get_mz_distance_pairs_when_no_peaks(self) -> None: - self.mock_basic_peak_picker.pick_peaks_mz.return_value = np.array([]) - problem_mz, problem_dist = self.mock_compute._get_mz_distance_pairs( - mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr - ) - self.mock_basic_peak_picker.pick_peaks_mz.assert_called_once_with( - mz_arr=self.mock_mz_arr, int_arr=self.mock_int_arr - ) - np.testing.assert_array_equal(np.array([]), problem_mz) - np.testing.assert_array_equal(np.array([]), problem_dist) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/depiction/calibration/deprecated/tests/test_model_smoothing.py b/src/depiction/calibration/deprecated/tests/test_model_smoothing.py deleted file mode 100644 index 0cfbdcf..0000000 --- a/src/depiction/calibration/deprecated/tests/test_model_smoothing.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest -from functools import cached_property -from unittest.mock import MagicMock - -import numpy as np - -from depiction.calibration.deprecated.model_smoothing import ModelSmoothing -from typing import NoReturn - - -class TestModelSmoothing(unittest.TestCase): - def setUp(self) -> None: - self.mock_sigma = 1.0 - - @cached_property - def mock_smoothing(self) -> ModelSmoothing: - return ModelSmoothing(sigma=self.mock_sigma) - - @unittest.skip - def test_smooth_spatial(self) -> NoReturn: - raise NotImplementedError() - - @unittest.skip - def test_smooth_sequential(self) -> NoReturn: - raise NotImplementedError() - - def test_get_model_type_and_values_when_valid(self) -> None: - class MockModel: - def __init__(self, coef) -> None: - self.coef = coef - - mock_models = [MockModel(coef=[0, 1]), MockModel(coef=[0, 2])] - model_type, values = self.mock_smoothing.get_model_type_and_values(models=mock_models) - self.assertEqual(model_type, MockModel) - np.testing.assert_array_equal([[0, 1], [0, 2]], values) - - def test_get_model_type_and_values_when_invalid(self) -> None: - mock_models = [MagicMock(), MagicMock()] - with self.assertRaises(ValueError) as error: - self.mock_smoothing.get_model_type_and_values(models=mock_models) - self.assertIn("all models must have the same type", error.exception.args[0]) - - -if __name__ == "__main__": - unittest.main() diff --git a/src/depiction/calibration/deprecated/tests/test_reference_distance_estimator.py b/src/depiction/calibration/deprecated/tests/test_reference_distance_estimator.py deleted file mode 100644 index ad0c3ec..0000000 --- a/src/depiction/calibration/deprecated/tests/test_reference_distance_estimator.py +++ /dev/null @@ -1,79 +0,0 @@ -import unittest -from functools import cached_property - -import numpy as np - -from depiction.calibration.deprecated.reference_distance_estimator import ( - ReferenceDistanceEstimator, -) - - -class TestReferenceDistanceEstimator(unittest.TestCase): - def setUp(self) -> None: - self.mock_reference_mz = np.array([50.0, 75, 100, 110]) - self.mock_n_candidates = 3 - - @cached_property - def mock_estimator(self) -> ReferenceDistanceEstimator: - return ReferenceDistanceEstimator( - reference_mz=self.mock_reference_mz, - n_candidates=self.mock_n_candidates, - ) - - def test_n_targets(self) -> None: - self.assertEqual(4, self.mock_estimator.n_targets) - - def test_n_candidates(self) -> None: - self.assertEqual(3, self.mock_estimator.n_candidates) - - def test_reference_mz(self) -> None: - np.testing.assert_array_equal( - np.array([50.0, 75, 100, 110]), - self.mock_estimator.reference_mz, - ) - - def test_closest_index(self) -> None: - self.assertEqual(1, self.mock_estimator.closest_index) - - def test_compute_distances_for_peaks(self) -> None: - mz_peaks = np.array([50.0, 75, 100, 110, 120, 130, 140]) - distances, closest_indices = self.mock_estimator.compute_distances_for_peaks(mz_peaks) - np.testing.assert_array_equal( - np.array( - [ - [-np.inf, 0, 25], - [-25, 0, 25], - [-25, 0, 10], - [-10, 0, 10], - ] - ), - distances, - ) - np.testing.assert_array_equal( - np.array([0, 1, 2, 3]), - closest_indices, - ) - - def test_compute_distances_for_peaks_when_left_out_of_bounds(self) -> None: - self.mock_reference_mz = np.array([9.0, 10.0, 20.0, 20.5]) - mz_peaks = np.array([9.0, 18.0, 18.5, 19.0, 21.0, 25.0]) - distances, closest_indices = self.mock_estimator.compute_distances_for_peaks(mz_peaks) - np.testing.assert_array_equal([0, 0, 3, 4], closest_indices) - np.testing.assert_array_equal([-np.inf, 0, 9], distances[0]) - np.testing.assert_array_equal([-np.inf, -1, 8], distances[1]) - np.testing.assert_array_equal([-1.5, -1.0, 1], distances[2]) - np.testing.assert_array_equal([-1.5, 0.5, 4.5], distances[3]) - - def test_compute_distances_for_peaks_when_right_out_of_bounds(self) -> None: - self.mock_reference_mz = np.array([10.0, 20.0, 29.0, 30.0]) - mz_peaks = np.array([8.0, 9.0, 19, 28.0, 28.5, 29.0]) - distances, closest_indices = self.mock_estimator.compute_distances_for_peaks(mz_peaks) - np.testing.assert_array_equal([1, 2, 5, 5], closest_indices) - np.testing.assert_array_equal([-2.0, -1, 9], distances[0]) - np.testing.assert_array_equal([-11.0, -1, 8], distances[1]) - np.testing.assert_array_equal([-0.5, 0, np.inf], distances[2]) - np.testing.assert_array_equal([-1.5, -1, np.inf], distances[3]) - - -if __name__ == "__main__": - unittest.main()