From f85bb32f6d9802ac09e0f47b848a439d11f875ca Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Fri, 25 Oct 2024 16:59:57 +0200 Subject: [PATCH] basic implementation --- .../tools/process_spectra/evaluators.py | 16 ++++++++++++++++ .../tools/process_spectra/process.py | 19 +++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/depiction/tools/process_spectra/evaluators.py b/src/depiction/tools/process_spectra/evaluators.py index 1129b28..0181f2b 100644 --- a/src/depiction/tools/process_spectra/evaluators.py +++ b/src/depiction/tools/process_spectra/evaluators.py @@ -14,6 +14,7 @@ ProcessSpectraStepPickPeaks, ProcessSpectraStepRemoveBaseline, ProcessSpectraStepFilterPeaks, + ProcessSpectraConfig, ) @@ -34,6 +35,21 @@ def get_evaluator(step_config) -> Evaluator: raise ValueError(f"Unsupported step config: {step_config}") +def get_combined_evaluator(config: ProcessSpectraConfig) -> Evaluator: + evaluators = [get_evaluator(step_config) for step_config in config.steps] + return CombinedEvaluator(evaluators) + + +class CombinedEvaluator(Evaluator): + def __init__(self, evaluators) -> None: + self._evaluators = evaluators + + def evaluate(self, mz_arr, int_arr): + for evaluator in self._evaluators: + mz_arr, int_arr = evaluator.evaluate(mz_arr, int_arr) + return mz_arr, int_arr + + class EvaluatePickPeaks(Evaluator): def __init__(self, config: PickPeaksConfig) -> None: self._config = config diff --git a/src/depiction/tools/process_spectra/process.py b/src/depiction/tools/process_spectra/process.py index 92c8fad..5251b0c 100644 --- a/src/depiction/tools/process_spectra/process.py +++ b/src/depiction/tools/process_spectra/process.py @@ -1,7 +1,22 @@ -from depiction.parallel_ops import ParallelConfig -from depiction.persistence.types import GenericReadFile, GenericWriteFile +from depiction.parallel_ops import ParallelConfig, WriteSpectraParallel +from depiction.persistence.types import GenericReadFile, GenericWriteFile, GenericReader, GenericWriter from depiction.tools.process_spectra.config import ProcessSpectraConfig +from depiction.tools.process_spectra.evaluators import get_combined_evaluator def process_spectra(read_file: GenericReadFile, write_file: GenericWriteFile, config: ProcessSpectraConfig) -> None: parallel_config = ParallelConfig(n_jobs=config.n_jobs) + write_parallel = WriteSpectraParallel.from_config(parallel_config) + write_parallel.map_chunked_to_file( + read_file=read_file, write_file=write_file, operation=_process_chunk, bind_args={"config": config} + ) + + +def _process_chunk( + reader: GenericReader, spectra_indices: list[int], writer: GenericWriter, config: ProcessSpectraConfig +) -> None: + evaluator = get_combined_evaluator(config=config) + for spectrum_index in spectra_indices: + mz_arr, int_arr, coords = reader.get_spectrum_with_coords(spectrum_index) + mz_arr, int_arr = evaluator.evaluate(mz_arr, int_arr) + writer.add_spectrum(mz_arr, int_arr, coords)