Skip to content

Commit

Permalink
code style: parallel_ops
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Jul 1, 2024
1 parent ad388a7 commit 50a1456
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 33 deletions.
8 changes: 5 additions & 3 deletions src/depiction/parallel_ops/read_spectra_parallel.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

from typing import (
Any,
Callable,
TypeVar,
TYPE_CHECKING,
TypedDict,
)

import numpy as np
Expand All @@ -28,11 +30,11 @@ def __init__(self, config: ParallelConfig) -> None:
self._config = config

@classmethod
def from_config(cls, config: ParallelConfig):
def from_config(cls, config: ParallelConfig) -> ReadSpectraParallel:
return cls(config=config)

@classmethod
def from_params(cls, n_jobs: int, task_size: int | None, verbose: int = 1):
def from_params(cls, n_jobs: int, task_size: int | None, verbose: int = 1) -> ReadSpectraParallel:
"""In general, try to use from_config and pass the configuration throughout the application as appropriate."""
return cls(config=ParallelConfig(n_jobs=n_jobs, task_size=task_size, verbose=verbose))

Expand Down Expand Up @@ -76,7 +78,7 @@ def map_chunked(
for task_index, task in enumerate(self._config.get_task_splits(item_indices=spectra_indices))
]

def execute_task(args, **kwargs) -> list[T]:
def execute_task(args: list[Any], **kwargs: TypedDict[str, Any]) -> list[T]:
with read_file.reader() as reader:
return operation(reader, *args, **kwargs)

Expand Down
47 changes: 26 additions & 21 deletions src/depiction/parallel_ops/write_spectra_parallel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations
import contextlib
import functools
import os
from typing import Optional, Callable, Any, Union
from typing import Callable, Any, TYPE_CHECKING
from tempfile import TemporaryDirectory
from numpy.typing import NDArray

from depiction.parallel_ops import ReadSpectraParallel
from depiction.parallel_ops.parallel_config import ParallelConfig
from depiction.persistence import (
ImzmlReadFile,
ImzmlWriteFile,
Expand All @@ -16,25 +15,29 @@
)
from depiction.tools.merge_imzml import MergeImzml

if TYPE_CHECKING:
from numpy.typing import NDArray
from depiction.parallel_ops.parallel_config import ParallelConfig


class WriteSpectraParallel:
def __init__(self, config: ParallelConfig) -> None:
self._config = config

@classmethod
def from_config(cls, config: ParallelConfig):
def from_config(cls, config: ParallelConfig) -> WriteSpectraParallel:
return cls(config)

def map_chunked_to_files(
self,
read_file: ImzmlReadFile,
write_files: list[ImzmlWriteFile],
operation: Union[
Callable[[ImzmlReader, list[int], list[ImzmlWriter], ...], None],
Callable[[ImzmlReader, list[int], list[ImzmlWriteFile], ...], None],
],
spectra_indices: Optional[NDArray[int]] = None,
bind_args: Optional[dict[str, Any]] = None,
operation: (
Callable[[ImzmlReader, list[int], list[ImzmlWriter], ...], None]
| Callable[[ImzmlReader, list[int], list[ImzmlWriteFile], ...], None]
),
spectra_indices: NDArray[int] | None = None,
bind_args: dict[str, Any] | None = None,
open_write_files: bool = True,
) -> None:
"""Maps an operation over a file, in chunks, writing the results to a list of files.
Expand Down Expand Up @@ -81,14 +84,14 @@ def map_chunked_external_to_files(
read_file: ImzmlReadFile,
write_files: list[ImzmlWriteFile],
operation: Callable[[str, list[str]], None],
spectra_indices: Optional[NDArray[int]] = None,
bind_args: Optional[dict[str, Any]] = None,
spectra_indices: NDArray[int] | None = None,
bind_args: dict[str, Any] | None = None,
) -> None:
def op(
reader: ImzmlReader,
spectra_ids: list[int],
write_files: list[ImzmlWriteFile],
**kwargs,
**kwargs: dict[str, Any],
) -> None:
# TODO maybe kwarg handling could be done a bit more clean here in the future
# TODO also it's currently untested
Expand Down Expand Up @@ -120,7 +123,7 @@ def _get_split_modes_and_paths(
work_directory: str,
read_file: ImzmlReadFile,
write_files: list[ImzmlWriteFile],
spectra_indices: Optional[NDArray[int]],
spectra_indices: NDArray[int] | None,
) -> list[tuple[ImzmlModeEnum, list[str]]]:
# determine the number of tasks
if spectra_indices is not None:
Expand All @@ -142,10 +145,10 @@ def _write_transformed_chunked_operation(
reader: ImzmlReader,
spectra_indices: list[int],
task_index: int,
operation: Union[
Callable[[ImzmlReader, list[int], list[ImzmlWriter], ...], None],
Callable[[ImzmlReader, list[int], list[ImzmlWriteFile], ...], None],
],
operation: (
Callable[[ImzmlReader, list[int], list[ImzmlWriter], ...], None]
| Callable[[ImzmlReader, list[int], list[ImzmlWriteFile], ...], None]
),
open_write_files: bool,
split_modes_and_paths: list[tuple[ImzmlModeEnum, list[str]]],
) -> None:
Expand Down Expand Up @@ -192,10 +195,12 @@ def map_chunked_to_file(
read_file: ImzmlReadFile,
write_file: ImzmlWriteFile,
operation: Callable[[ImzmlReader, list[int], ImzmlWriter], None],
spectra_indices: Optional[NDArray[int]] = None,
bind_args: Optional[dict[str, Any]] = None,
spectra_indices: NDArray[int] | None = None,
bind_args: dict[str, Any] | None = None,
):
def wrap_operation(reader: ImzmlReader, spectra_ids: list[int], writers: list[ImzmlWriter], **kwargs):
def wrap_operation(
reader: ImzmlReader, spectra_ids: list[int], writers: list[ImzmlWriter], **kwargs: dict[str, Any]
):
return operation(reader, spectra_ids, writers[0], **kwargs)

return self.map_chunked_to_files(
Expand Down
6 changes: 4 additions & 2 deletions src/depiction/tools/cli/correct_baseline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import shutil
from pathlib import Path
from typing import Annotated, Literal
from typing import Annotated, Literal, TYPE_CHECKING

import typer
from loguru import logger
Expand All @@ -12,6 +11,9 @@
from depiction.persistence import ImzmlReadFile, ImzmlWriteFile
from depiction.tools.correct_baseline import BaselineVariants, CorrectBaseline

if TYPE_CHECKING:
from pathlib import Path


def correct_baseline(
input_imzml: Annotated[Path, Argument()],
Expand Down
12 changes: 6 additions & 6 deletions src/depiction/tools/correct_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@

from depiction.parallel_ops.parallel_config import ParallelConfig
from depiction.parallel_ops.write_spectra_parallel import WriteSpectraParallel
from depiction.persistence import (
ImzmlReadFile,
ImzmlWriteFile,
ImzmlWriter,
ImzmlReader,
)
from depiction.spectrum.baseline.local_medians_baseline import LocalMediansBaseline
from depiction.spectrum.baseline.tophat_baseline import TophatBaseline

if TYPE_CHECKING:
from depiction.persistence import (
ImzmlReadFile,
ImzmlWriteFile,
ImzmlWriter,
ImzmlReader,
)
from numpy.typing import NDArray
from depiction.spectrum.baseline.baseline import Baseline

Expand Down
2 changes: 1 addition & 1 deletion src/depiction/tools/create_imzml_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _write_imzml_file(self, write_file: ImzmlWriteFile) -> None:
with write_file.writer() as writer:
for imzml_file in tqdm(self._source_files, desc="Copying file"):
with imzml_file.reader() as reader:
abs_path = str(imzml_file.imzml_file.absolute())
str(imzml_file.imzml_file.absolute())
spectrum_ids = self.pool_source_df.query("abs_path == @abs_path").iloc[0]["source_spectrum_id"]
writer.copy_spectra(reader, spectrum_ids)

Expand Down
1 change: 1 addition & 0 deletions src/depiction/tools/split_imzml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
from pathlib import Path
from typing import Optional

import numpy as np
Expand Down

0 comments on commit 50a1456

Please sign in to comment.