From 6c52329476c65d7a463c8a9b14de1b0444be5ba4 Mon Sep 17 00:00:00 2001 From: Witold Wolski Date: Tue, 4 Jun 2024 15:13:49 +0200 Subject: [PATCH] mccm (mass cluster center model) calibration --- examples/spectra_calibration.py | 79 ++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/examples/spectra_calibration.py b/examples/spectra_calibration.py index 4e08782..bbe5a1a 100644 --- a/examples/spectra_calibration.py +++ b/examples/spectra_calibration.py @@ -1,22 +1,89 @@ from pathlib import Path +from typing import Tuple, Any from matplotlib import pyplot as plt from numpy.typing import NDArray from xarray import DataArray - +import numpy as np +import scipy +import statsmodels.api as sm +from statsmodels.robust.norms import HuberT +import pandas as pd +from depiction.calibration.models.fit_model import fit_model from depiction.persistence import ImzmlReadFile, ImzmlWriteFile +from depiction.spectrum.peak_filtering import FilterNHighestIntensityPartitioned from depiction.spectrum.peak_picking import BasicInterpolatedPeakPicker - +import scipy.stats as stats class NewCalibrationMethod: def extract_spectrum_features(self, peak_mz_arr: NDArray[float], peak_int_arr: NDArray[float]) -> DataArray: - return DataArray([1, 2, 3], dims=["c"]) + l_none = 1.000482 + #c_0 = 0.029 + #c_1 = 4.98*10e-4 + + plt.scatter(peak_mz_arr, peak_int_arr, color='blue', alpha=0.5, marker='x', label='delta_lambda vs x', s=0.3) + plt.xlim(1300, 1400) + plt.show() + + # compute all differences for elements in peak_mz_arr amd store in a DataArray + #delta = scipy.spatial.distance_matrix(np.expand_dims(peak_mz_arr,1), np.expand_dims(peak_mz_arr,1), p = 1) + delta = scipy.spatial.distance.pdist(np.expand_dims(peak_mz_arr, 1), metric='cityblock') + # get all distances smaller then 500 + delta = delta[delta < 500] + # for each x compute + # Compute delta_lambda for each x + delta_lambda = np.zeros_like(delta) + for i, mi in enumerate(delta): + term1 = (mi) % l_none + if term1 < 0.5: + delta_lambda[i] = term1 + else: + delta_lambda[i] = -1 + term1 + # create a scatterplot of delte_lambda vs x + + + sorted_indices = np.argsort(delta) + delta = delta[sorted_indices] + delta_lambda = delta_lambda[sorted_indices] + + # Add a constant term with the intercept set to zero + X = delta.reshape(-1, 1) + + + # Fit the model + robust_model = sm.RLM(delta_lambda, X, M = HuberT()) + results = robust_model.fit() + + plt.scatter(delta, delta_lambda, color='blue', alpha=0.5, marker='x', label='delta_lambda vs x', s = 0.3) + plt.plot(delta, results.predict(X) , color='red', label='fit') + plt.show() + slope = results.params[0] + peak_mz_corrected = peak_mz_arr + results.predict(np.expand_dims(peak_mz_arr, 1)) + + peak_mz_corrected2 = peak_mz_arr * ( 1 - slope) + + delta_intercept = np.zeros_like(peak_mz_corrected2) + for i, mi in enumerate(peak_mz_corrected2): + term1 = (mi) % l_none + if term1 < 0.5: + delta_intercept[i] = term1 + else: + delta_intercept[i] = -1 + term1 + #intercept_coef = np.mean(delta_intercept) + intercept_coef = stats.trim_mean(delta_intercept, 0.3) + # add histogram of delta_intercept + plt.hist(delta_intercept, bins=100) + # add vertical abline at intercept_coef + plt.axvline(intercept_coef, color='red') + plt.show() + + return DataArray([intercept_coef, slope], dims=["c"]) def apply_spectrum_model( self, spectrum_mz_arr: NDArray[float], spectrum_int_arr: NDArray[float], model_coef: DataArray ) -> tuple[NDArray[float], NDArray[float]]: - return spectrum_mz_arr + 0.05, spectrum_int_arr - + spectrum_corrected = spectrum_mz_arr * (1 - model_coef.values[1]) + model_coef.values[0] + return spectrum_corrected def main() -> None: with ImzmlReadFile("sample.imzML").reader() as reader: @@ -26,7 +93,7 @@ def main() -> None: spectra_original.append((mz_arr, int_arr)) # TODO adjust params? - peak_picker = BasicInterpolatedPeakPicker(min_prominence=0.5, min_distance=0.9, min_distance_unit="mz") + peak_picker = BasicInterpolatedPeakPicker(min_prominence=2, min_distance=0.9, min_distance_unit="mz", peak_filtering=FilterNHighestIntensityPartitioned(max_count=200, n_partitions=8),) spectra_picked = [ peak_picker.pick_peaks(mz_arr, int_arr) for mz_arr, int_arr in spectra_original ]