Skip to content

Commit

Permalink
Speed up apply funcs (#6)
Browse files Browse the repository at this point in the history
* optimize performance

* upgraded numpy version & freezed reqs

* pip._internal usage fixes
  • Loading branch information
saninstein authored May 7, 2020
1 parent f58f59e commit b932d80
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
41 changes: 32 additions & 9 deletions numpy_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
---------
"""
from functools import partial
from typing import Callable, Any, Union, Generator, Tuple, List

import numpy as np
Expand Down Expand Up @@ -342,11 +343,14 @@ def rows_gen():
if not skip_na:
prepend_func = prepend_na
if np.issubdtype(array.dtype, np.datetime64):
prepend_func = lambda arr, n: np.hstack((np.repeat(np.datetime64('NaT'), n), arr))

def prepend_func(arr, n):
return np.hstack((np.repeat(np.datetime64('NaT'), n), arr))

yield from (prepend_func(array[:i + 1], (window - 1) - i) for i in np.arange(window - 1))

yield from (array[i:i + window] for i in np.arange(array.size - (window - 1)))
starts = np.arange(array.size - (window - 1))
yield from (array[start:end] for start, end in zip(starts, starts + window))

return np.array([row for row in rows_gen()]) if as_array else rows_gen()

Expand Down Expand Up @@ -395,14 +399,23 @@ def rolling_apply(func: Callable, window: int, *arrays: np.ndarray, n_jobs: int
raise ValueError('Arrays must be the same length')

def _apply_func_to_arrays(idxs):
return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs)
return func(*[array[idxs[0]:idxs[-1] + 1] for array in arrays], **kwargs)

rolls = rolling(np.arange(len(arrays[0])), window, skip_na=True)
array = arrays[0]
rolls = rolling(
array if len(arrays) == n_jobs == 1 else np.arange(len(array)),
window=window,
skip_na=True
)

if n_jobs == 1:
arr = [_apply_func_to_arrays(idxs) for idxs in rolls]
if len(arrays) == 1:
arr = list(map(partial(func, **kwargs), rolls))
else:
arr = list(map(_apply_func_to_arrays, rolls))
else:
arr = Parallel(n_jobs=n_jobs)(delayed(_apply_func_to_arrays)(idxs) for idxs in rolls)
f = delayed(_apply_func_to_arrays)
arr = Parallel(n_jobs=n_jobs)(f(idxs[[0, -1]]) for idxs in rolls)

return prepend_na(arr, n=window - 1)

Expand Down Expand Up @@ -502,10 +515,20 @@ def expanding_apply(func: Callable, min_periods: int, *arrays: np.ndarray, n_job
def _apply_func_to_arrays(idxs):
return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs)

rolls = expanding(np.arange(len(arrays[0])), min_periods, skip_na=True)
array = arrays[0]
rolls = expanding(
array if len(arrays) == n_jobs == 1 else np.arange(len(array)),
min_periods=min_periods,
skip_na=True
)

if n_jobs == 1:
arr = [_apply_func_to_arrays(idxs) for idxs in rolls]
if len(arrays) == 1:
arr = list(map(partial(func, **kwargs), rolls))
else:
arr = list(map(_apply_func_to_arrays, rolls))
else:
arr = Parallel(n_jobs=n_jobs)(delayed(_apply_func_to_arrays)(idxs) for idxs in rolls)
f = delayed(_apply_func_to_arrays)
arr = Parallel(n_jobs=n_jobs)(map(f, rolls))

return prepend_na(arr, n=min_periods - 1)
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
numpy~=1.16.4
joblib~=0.14.1
numpy==1.18.3
joblib==0.14.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def parse_reqs(path):
return [str(r.req) for r in parse_requirements(path, session=pip_session)]
return [r.requirement for r in parse_requirements(path, session=pip_session)]


with open('README.md') as f:
Expand Down

0 comments on commit b932d80

Please sign in to comment.