Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: deprecate window_ops #410

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID_NIXTLA_TMP }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY_NIXTLA_TMP }}
Expand All @@ -44,7 +44,7 @@ jobs:
fail-fast: false
matrix:
os: [macos-13, macos-14, windows-latest]
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
steps:
- name: Clone repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ for best practices.**
- [m4](https://www.kaggle.com/code/lemuz90/m4-competition)
- [m4-cv](https://www.kaggle.com/code/lemuz90/m4-competition-cv)
- [favorita](https://www.kaggle.com/code/lemuz90/mlforecast-favorita)
- [VN1](https://colab.research.google.com/drive/1UdhCAk49k6HgMezG-U_1ETnAB5pYvZk9)

## Why?

Expand Down
8 changes: 0 additions & 8 deletions mlforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,10 @@
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.expand_target': ( 'grouped_array.html#groupedarray.expand_target',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.restore_fitted_difference': ( 'grouped_array.html#groupedarray.restore_fitted_difference',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.take': ( 'grouped_array.html#groupedarray.take',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.take_from_groups': ( 'grouped_array.html#groupedarray.take_from_groups',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._append_several': ( 'grouped_array.html#_append_several',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._expand_target': ( 'grouped_array.html#_expand_target',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._restore_fitted_difference': ( 'grouped_array.html#_restore_fitted_difference',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._transform_series': ( 'grouped_array.html#_transform_series',
'mlforecast/grouped_array.py')},
'mlforecast.lag_transforms': { 'mlforecast.lag_transforms.Combine': ( 'lag_transforms.html#combine',
Expand Down
11 changes: 10 additions & 1 deletion mlforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,28 @@ def _parse_transforms(
namer = _build_transform_name
for lag in lags:
transforms[f"lag{lag}"] = Lag(lag)
has_fns = False
for lag in lag_transforms.keys():
for tfm in lag_transforms[lag]:
if isinstance(tfm, _BaseLagTransform):
tfm_name = namer(tfm, lag)
transforms[tfm_name] = clone(tfm)._set_core_tfm(lag)
else:
has_fns = True
tfm, *args = _as_tuple(tfm)
assert callable(tfm)
tfm_name = namer(tfm, lag, *args)
transforms[tfm_name] = (lag, tfm, *args)
if has_fns:
warnings.warn(
"The `window_ops` package (and thus `numba`) will no longer be "
"a dependency in a future version.\n"
"Please make sure to add it to your requirements to ensure compatibility.",
category=FutureWarning,
)
return transforms

# %% ../nbs/core.ipynb 21
# %% ../nbs/core.ipynb 22
class TimeSeries:
"""Utility class for storing and transforming time series data."""

Expand Down
147 changes: 52 additions & 95 deletions mlforecast/grouped_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
# %% auto 0
__all__ = ['GroupedArray']

# %% ../nbs/grouped_array.ipynb 1
# %% ../nbs/grouped_array.ipynb 2
import concurrent.futures
from typing import Any, Dict, Mapping, Tuple, Union

from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
import numpy as np
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from utilsforecast.compat import njit

from .compat import shift_array
from .lag_transforms import _BaseLagTransform

# %% ../nbs/grouped_array.ipynb 2
# %% ../nbs/grouped_array.ipynb 3
@njit(nogil=True)
def _transform_series(data, indptr, updates_only, lag, func, *args) -> np.ndarray:
"""Shifts every group in `data` by `lag` and computes `func(shifted, *args)`.
Expand All @@ -34,66 +34,6 @@ def _transform_series(data, indptr, updates_only, lag, func, *args) -> np.ndarra
out[indptr[i] : indptr[i + 1]] = func(lagged, *args)
return out


@njit
def _restore_fitted_difference(diffs_data, diffs_indptr, data, indptr, d):
n_series = len(indptr) - 1
for i in range(n_series):
serie = data[indptr[i] : indptr[i + 1]]
diffs_size = diffs_indptr[i + 1] - diffs_indptr[i]
dropped_rows = diffs_size - serie.size
start_idx = max(0, d - dropped_rows)
for j in range(start_idx, serie.size):
serie[j] += diffs_data[diffs_indptr[i + 1] - serie.size - d + j]


@njit
def _expand_target(data, indptr, max_horizon):
out = np.empty((data.size, max_horizon), dtype=data.dtype)
n_series = len(indptr) - 1
n = 0
for i in range(n_series):
serie = data[indptr[i] : indptr[i + 1]]
for j in range(serie.size):
upper = min(serie.size - j, max_horizon)
for k in range(upper):
out[n, k] = serie[j + k]
for k in range(upper, max_horizon):
out[n, k] = np.nan
n += 1
return out


@njit
def _append_several(
data: np.ndarray,
indptr: np.ndarray,
new_sizes: np.ndarray,
new_values: np.ndarray,
new_groups: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
new_data = np.empty(data.size + new_values.size, dtype=data.dtype)
new_indptr = np.empty(new_sizes.size + 1, dtype=indptr.dtype)
new_indptr[0] = 0
old_indptr_idx = 0
new_vals_idx = 0
for i, is_new in enumerate(new_groups):
new_size = new_sizes[i]
if is_new:
old_size = 0
else:
prev_slice = slice(indptr[old_indptr_idx], indptr[old_indptr_idx + 1])
old_indptr_idx += 1
old_size = prev_slice.stop - prev_slice.start
new_size += old_size
new_data[new_indptr[i] : new_indptr[i] + old_size] = data[prev_slice]
new_indptr[i + 1] = new_indptr[i] + new_size
new_data[new_indptr[i] + old_size : new_indptr[i + 1]] = new_values[
new_vals_idx : new_vals_idx + new_sizes[i]
]
new_vals_idx += new_sizes[i]
return new_data, new_indptr

# %% ../nbs/grouped_array.ipynb 4
class GroupedArray:
"""Array made up of different groups. Can be thought of (and iterated) as a list of arrays.
Expand Down Expand Up @@ -175,21 +115,22 @@ def apply_multithreaded_transforms(
core_tfms[name] = tfm
else:
numba_tfms[name] = tfm
with concurrent.futures.ThreadPoolExecutor(num_threads) as executor:
for tfm_name, (lag, tfm, *args) in numba_tfms.items():
future = executor.submit(
_transform_series,
self.data,
self.indptr,
updates_only,
lag - offset,
tfm,
*args,
)
future_to_result[future] = tfm_name
for future in concurrent.futures.as_completed(future_to_result):
tfm_name = future_to_result[future]
results[tfm_name] = future.result()
if numba_tfms:
with concurrent.futures.ThreadPoolExecutor(num_threads) as executor:
for tfm_name, (lag, tfm, *args) in numba_tfms.items():
future = executor.submit(
_transform_series,
self.data,
self.indptr,
updates_only,
lag - offset,
tfm,
*args,
)
future_to_result[future] = tfm_name
for future in concurrent.futures.as_completed(future_to_result):
tfm_name = future_to_result[future]
results[tfm_name] = future.result()
if core_tfms:
core_ga = CoreGroupedArray(self.data, self.indptr, num_threads)
for name, tfm in core_tfms.items():
Expand All @@ -199,21 +140,16 @@ def apply_multithreaded_transforms(
results[name] = tfm.transform(core_ga)
return results

def restore_fitted_difference(
self, series_data: np.ndarray, series_indptr: np.ndarray, d: int
) -> None:
if len(self.indptr) != len(series_indptr):
raise ValueError("Found different number of groups in fitted differences.")
_restore_fitted_difference(
self.data,
self.indptr,
series_data,
series_indptr,
d,
)

def expand_target(self, max_horizon: int) -> np.ndarray:
return _expand_target(self.data, self.indptr, max_horizon)
out = np.full_like(
self.data, np.nan, shape=(self.data.size, max_horizon), order="F"
)
for j in range(max_horizon):
for i in range(self.n_groups):
out[self.indptr[i] : self.indptr[i + 1] - j, j] = self.data[
self.indptr[i] + j : self.indptr[i + 1]
]
return out

def take_from_groups(self, idx: Union[int, slice]) -> "GroupedArray":
"""Takes `idx` from each group in the array."""
Expand All @@ -240,9 +176,30 @@ def append(self, new_data: np.ndarray) -> "GroupedArray":
def append_several(
self, new_sizes: np.ndarray, new_values: np.ndarray, new_groups: np.ndarray
) -> "GroupedArray":
new_data, new_indptr = _append_several(
self.data, self.indptr, new_sizes, new_values, new_groups
)
new_data = np.empty(self.data.size + new_values.size, dtype=self.data.dtype)
new_indptr = np.empty(new_sizes.size + 1, dtype=self.indptr.dtype)
new_indptr[0] = 0
old_indptr_idx = 0
new_vals_idx = 0
for i, is_new in enumerate(new_groups):
new_size = new_sizes[i]
if is_new:
old_size = 0
else:
prev_slice = slice(
self.indptr[old_indptr_idx], self.indptr[old_indptr_idx + 1]
)
old_indptr_idx += 1
old_size = prev_slice.stop - prev_slice.start
new_size += old_size
new_data[new_indptr[i] : new_indptr[i] + old_size] = self.data[
prev_slice
]
new_indptr[i + 1] = new_indptr[i] + new_size
new_data[new_indptr[i] + old_size : new_indptr[i + 1]] = new_values[
new_vals_idx : new_vals_idx + new_sizes[i]
]
new_vals_idx += new_sizes[i]
return GroupedArray(new_data, new_indptr)

def __repr__(self) -> str:
Expand Down
50 changes: 37 additions & 13 deletions mlforecast/target_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,22 @@ def __init__(self, differences: Iterable[int]):
self.differences = list(differences)

def fit_transform(self, ga: GroupedArray) -> GroupedArray:
self.fitted_: List[GroupedArray] = []
self.fitted_: List[np.ndarray] = []
self.fitted_indptr_: Optional[np.ndarray] = None
original_sizes = np.diff(ga.indptr)
total_diffs = sum(self.differences)
small_series = original_sizes < total_diffs
if small_series.any():
raise _ShortSeriesException(np.arange(ga.n_groups)[small_series])
raise _ShortSeriesException(np.where(small_series)[0])
self.scalers_ = []
core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
for d in self.differences:
if self.store_fitted:
# these are saved in order to be able to perform a correct
# inverse transform when trying to retrieve the fitted values.
self.fitted_.append(
GroupedArray(core_ga.data.copy(), core_ga.indptr.copy())
)
self.fitted_.append(core_ga.data.copy())
if self.fitted_indptr_ is None:
self.fitted_indptr_ = core_ga.indptr.copy()
scaler = core_scalers.Difference(d)
transformed = scaler.fit_transform(core_ga)
self.scalers_.append(scaler)
Expand All @@ -122,14 +123,34 @@ def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
return GroupedArray(transformed, ga.indptr)

def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
ga = copy.copy(ga)
if self.fitted_[0].size < ga.data.size:
raise ValueError("fitted differences are smaller than provided target.")
transformed = ga.data
for d, fitted in zip(reversed(self.differences), reversed(self.fitted_)):
fitted.restore_fitted_difference(ga.data, ga.indptr, d)
return ga
fitted_ga = CoreGroupedArray(fitted, self.fitted_indptr_)
adds = fitted_ga._lag(d)
if adds.size > ga.data.size:
adds = CoreGroupedArray(adds, self.fitted_indptr_)._tails(ga.indptr)
transformed = transformed + adds
return GroupedArray(transformed, ga.indptr)

def take(self, idxs: np.ndarray) -> "Differences":
out = Differences(self.differences)
out.fitted_ = [ga.take(idxs) for ga in self.fitted_]
if self.fitted_indptr_ is None:
out.fitted_ = []
out.fitted_indptr_ = None
else:
out.fitted_ = [
np.hstack(
[
data[self.fitted_indptr_[i] : self.fitted_indptr_[i + 1]]
for i in idxs
]
)
for data in self.fitted_
]
sizes = np.diff(self.fitted_indptr_)[idxs]
out.fitted_indptr_ = np.append(0, sizes.cumsum())
out.scalers_ = [scaler.take(idxs) for scaler in self.scalers_]
return out

Expand All @@ -140,10 +161,13 @@ def stack(scalers: Sequence["Differences"]) -> "Differences": # type: ignore[ov
diffs = first_scaler.differences
out = Differences(diffs)
out.fitted_ = []
for i in range(len(scalers[0].fitted_)):
data = np.hstack([sc.fitted_[i].data for sc in scalers])
sizes = np.hstack([np.diff(sc.fitted_[i].indptr) for sc in scalers])
out.fitted_.append(GroupedArray(data, np.append(0, sizes.cumsum())))
if first_scaler.fitted_indptr_ is None:
out.fitted_indptr_ = None
else:
for i in range(len(scalers[0].fitted_)):
out.fitted_.append(np.hstack([sc.fitted_[i] for sc in scalers]))
sizes = np.hstack([np.diff(sc.fitted_indptr_) for sc in scalers])
out.fitted_indptr_ = np.append(0, sizes.cumsum())
out.scalers_ = [
core_scaler.stack([sc.scalers_[i] for sc in scalers])
for i in range(len(diffs))
Expand Down
Loading