diff --git a/src/depiction/parallel_ops/parallel_config.py b/src/depiction/parallel_ops/parallel_config.py index 5a0700d..8d4c69e 100644 --- a/src/depiction/parallel_ops/parallel_config.py +++ b/src/depiction/parallel_ops/parallel_config.py @@ -9,7 +9,7 @@ class ParallelConfig: """Encapsulates the configuration for parallel processing, and some common functionality to use this information. :param n_jobs: the number of parallel jobs to run :param task_size: the size of each task, if None, the number of tasks will be divided by the number of jobs - :param verbose: the verbosity level of the parallel processing (passed to joblib currently) + :param verbose: the verbosity level of the parallel processing (TODO currently unused) """ n_jobs: int diff --git a/src/depiction/parallel_ops/parallel_map.py b/src/depiction/parallel_ops/parallel_map.py index 22096a8..5b277ea 100644 --- a/src/depiction/parallel_ops/parallel_map.py +++ b/src/depiction/parallel_ops/parallel_map.py @@ -2,11 +2,10 @@ import functools import operator - -import joblib - from typing import TypeVar, TYPE_CHECKING, Callable, Any +import multiprocess.pool + if TYPE_CHECKING: from depiction.parallel_ops import ParallelConfig @@ -41,9 +40,8 @@ def __call__( reduce_fn: Callable[[list[T]], U] | None = None, ) -> list[T] | U: reduce_fn = reduce_fn if reduce_fn is not None else list - joblib_parallel = joblib.Parallel(n_jobs=self.config.n_jobs, verbose=self.config.verbose) - operation = self._bind(operation=operation, bind_kwargs=bind_kwargs) - return reduce_fn(joblib_parallel(joblib.delayed(operation)(task) for task in tasks)) + with multiprocess.pool.Pool(self.config.n_jobs) as pool: + return reduce_fn(pool.map(self._bind(operation=operation, bind_kwargs=bind_kwargs), tasks)) def _bind( self,