diff --git a/numpy_ext.py b/numpy_ext.py index fd618ce..5da0a57 100644 --- a/numpy_ext.py +++ b/numpy_ext.py @@ -48,6 +48,7 @@ --------- """ +from functools import partial from typing import Callable, Any, Union, Generator, Tuple, List import numpy as np @@ -342,11 +343,14 @@ def rows_gen(): if not skip_na: prepend_func = prepend_na if np.issubdtype(array.dtype, np.datetime64): - prepend_func = lambda arr, n: np.hstack((np.repeat(np.datetime64('NaT'), n), arr)) + + def prepend_func(arr, n): + return np.hstack((np.repeat(np.datetime64('NaT'), n), arr)) yield from (prepend_func(array[:i + 1], (window - 1) - i) for i in np.arange(window - 1)) - yield from (array[i:i + window] for i in np.arange(array.size - (window - 1))) + starts = np.arange(array.size - (window - 1)) + yield from (array[start:end] for start, end in zip(starts, starts + window)) return np.array([row for row in rows_gen()]) if as_array else rows_gen() @@ -395,14 +399,23 @@ def rolling_apply(func: Callable, window: int, *arrays: np.ndarray, n_jobs: int raise ValueError('Arrays must be the same length') def _apply_func_to_arrays(idxs): - return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs) + return func(*[array[idxs[0]:idxs[-1] + 1] for array in arrays], **kwargs) - rolls = rolling(np.arange(len(arrays[0])), window, skip_na=True) + array = arrays[0] + rolls = rolling( + array if len(arrays) == n_jobs == 1 else np.arange(len(array)), + window=window, + skip_na=True + ) if n_jobs == 1: - arr = [_apply_func_to_arrays(idxs) for idxs in rolls] + if len(arrays) == 1: + arr = list(map(partial(func, **kwargs), rolls)) + else: + arr = list(map(_apply_func_to_arrays, rolls)) else: - arr = Parallel(n_jobs=n_jobs)(delayed(_apply_func_to_arrays)(idxs) for idxs in rolls) + f = delayed(_apply_func_to_arrays) + arr = Parallel(n_jobs=n_jobs)(f(idxs[[0, -1]]) for idxs in rolls) return prepend_na(arr, n=window - 1) @@ -502,10 +515,20 @@ def expanding_apply(func: Callable, min_periods: int, *arrays: np.ndarray, n_job def _apply_func_to_arrays(idxs): return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs) - rolls = expanding(np.arange(len(arrays[0])), min_periods, skip_na=True) + array = arrays[0] + rolls = expanding( + array if len(arrays) == n_jobs == 1 else np.arange(len(array)), + min_periods=min_periods, + skip_na=True + ) + if n_jobs == 1: - arr = [_apply_func_to_arrays(idxs) for idxs in rolls] + if len(arrays) == 1: + arr = list(map(partial(func, **kwargs), rolls)) + else: + arr = list(map(_apply_func_to_arrays, rolls)) else: - arr = Parallel(n_jobs=n_jobs)(delayed(_apply_func_to_arrays)(idxs) for idxs in rolls) + f = delayed(_apply_func_to_arrays) + arr = Parallel(n_jobs=n_jobs)(map(f, rolls)) return prepend_na(arr, n=min_periods - 1) diff --git a/requirements.txt b/requirements.txt index f5cee57..c9ebb99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -numpy~=1.16.4 -joblib~=0.14.1 +numpy==1.18.3 +joblib==0.14.1 diff --git a/setup.py b/setup.py index a4018d7..1bf5a64 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def parse_reqs(path): - return [str(r.req) for r in parse_requirements(path, session=pip_session)] + return [r.requirement for r in parse_requirements(path, session=pip_session)] with open('README.md') as f: