From 7c5406f02ac2d66bbfb7de86cc144dab02ff1404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Tue, 15 Aug 2023 10:38:29 -0600 Subject: [PATCH] support predicting a subset of series (#183) --- mlforecast/_modidx.py | 2 + mlforecast/core.py | 40 ++++++-- mlforecast/forecast.py | 20 ++-- mlforecast/grouped_array.py | 8 ++ mlforecast/target_transforms.py | 21 ++-- nbs/core.ipynb | 73 +++++++++++--- nbs/forecast.ipynb | 173 +++++++++++++++++++++++++------- nbs/grouped_array.ipynb | 29 +++++- nbs/target_transforms.ipynb | 31 ++++-- 9 files changed, 306 insertions(+), 91 deletions(-) diff --git a/mlforecast/_modidx.py b/mlforecast/_modidx.py index 37447c4c..6b52a178 100644 --- a/mlforecast/_modidx.py +++ b/mlforecast/_modidx.py @@ -147,6 +147,8 @@ 'mlforecast/grouped_array.py'), 'mlforecast.grouped_array.GroupedArray.restore_difference': ( 'grouped_array.html#groupedarray.restore_difference', 'mlforecast/grouped_array.py'), + 'mlforecast.grouped_array.GroupedArray.take': ( 'grouped_array.html#groupedarray.take', + 'mlforecast/grouped_array.py'), 'mlforecast.grouped_array.GroupedArray.take_from_groups': ( 'grouped_array.html#groupedarray.take_from_groups', 'mlforecast/grouped_array.py'), 'mlforecast.grouped_array.GroupedArray.transform_series': ( 'grouped_array.html#groupedarray.transform_series', diff --git a/mlforecast/core.py b/mlforecast/core.py index f6b0a2ec..e1ed5d66 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -425,7 +425,7 @@ def _update_features(self) -> pd.DataFrame: features[feat_name] = feat_vals features_df = pd.DataFrame(features, columns=self.features) - features_df[self.id_col] = self.uids + features_df[self.id_col] = self._uids features_df[self.time_col] = self.curr_dates return self.static_features_.merge(features_df, on=self.id_col) @@ -436,7 +436,7 @@ def _get_predictions(self) -> pd.DataFrame: """Get all the predicted values with their corresponding ids and datestamps.""" n_preds = len(self.y_pred) uids = pd.Series( - np.repeat(self.uids, n_preds), name=self.id_col, dtype=self.uids.dtype + np.repeat(self._uids, n_preds), name=self.id_col, dtype=self.uids.dtype ) df = pd.DataFrame( { @@ -448,10 +448,13 @@ def _get_predictions(self) -> pd.DataFrame: return df def _predict_setup(self) -> None: + self.ga = GroupedArray(self._ga.data, self._ga.indptr) self.curr_dates = self.last_dates.copy() + if self._idxs is not None: + self.ga = self.ga.take(self._idxs) + self.curr_dates = self.curr_dates[self._idxs] self.test_dates = [] self.y_pred = [] - self.ga = GroupedArray(self._ga.data, self._ga.indptr) if self.keep_last_n is not None: self.ga = self.ga.take_from_groups(slice(-self.keep_last_n, None)) self._h = 0 @@ -463,7 +466,7 @@ def _get_features_for_next_step(self, dynamic_dfs, X_df=None): new_x = new_x.merge(df, how="left") new_x = new_x.sort_values(self.id_col) if X_df is not None: - n_series = self.uids.size + n_series = len(self._uids) X = X_df.iloc[self._h * n_series : (self._h + 1) * n_series] new_x = pd.concat([new_x, X.reset_index(drop=True)], axis=1) nulls = new_x.isnull().any() @@ -492,7 +495,7 @@ def _predict_recursive( new_x = before_predict_callback(new_x) predictions = model.predict(new_x) if after_predict_callback is not None: - predictions_serie = pd.Series(predictions, index=self.uids) + predictions_serie = pd.Series(predictions, index=self._uids) predictions = after_predict_callback(predictions_serie).values self._update_y(predictions) if i == 0: @@ -519,7 +522,7 @@ def _predict_multi( ) if dynamic_dfs is None: dynamic_dfs = [] - uids = np.repeat(self.uids, horizon) + uids = np.repeat(self._uids, horizon) dates = np.hstack( [ date + (i + 1) * self.freq @@ -548,7 +551,21 @@ def predict( before_predict_callback: Optional[Callable] = None, after_predict_callback: Optional[Callable] = None, X_df: Optional[pd.DataFrame] = None, + ids: Optional[List[str]] = None, ) -> pd.DataFrame: + if ids is not None: + unseen = set(ids) - set(self.uids) + if unseen: + raise ValueError( + f"The following ids weren't seen during training and thus can't be forecasted: {unseen}" + ) + self._uids = self.uids[self.uids.isin(ids)] + self._idxs: Optional[np.ndarray] = np.where(self.uids.isin(self._uids))[0] + last_dates = self.last_dates[self._idxs] + else: + self._uids = self.uids + self._idxs = None + last_dates = self.last_dates if X_df is not None: if self.id_col not in X_df or self.time_col not in X_df: raise ValueError( @@ -567,14 +584,14 @@ def predict( ) dates_validation = pd.DataFrame( { - self.id_col: self.uids, - "_start": self.last_dates + self.freq, - "_end": self.last_dates + horizon * self.freq, + self.id_col: self._uids, + "_start": last_dates + self.freq, + "_end": last_dates + horizon * self.freq, } ) X_df = X_df.merge(dates_validation, on=[self.id_col]) X_df = X_df[X_df[self.time_col].between(X_df["_start"], X_df["_end"])] - if X_df.shape[0] != self.uids.size * horizon: + if X_df.shape[0] != len(self._uids) * horizon: raise ValueError( "Found missing inputs in X_df. " "It should have one row per id and date for the complete forecasting horizon" @@ -601,7 +618,10 @@ def predict( ) if self.target_transforms is not None: for tfm in self.target_transforms[::-1]: + tfm.idxs = self._idxs preds = tfm.inverse_transform(preds) + tfm.idxs = None + del self._uids, self._idxs return preds def update(self, df: pd.DataFrame) -> None: diff --git a/mlforecast/forecast.py b/mlforecast/forecast.py index ca042488..faaa9a7e 100644 --- a/mlforecast/forecast.py +++ b/mlforecast/forecast.py @@ -129,7 +129,7 @@ def __init__( num_threads: int = 1, target_transforms: Optional[List[BaseTargetTransform]] = None, ): - """Create forecast object + """Forecasting pipeline Parameters ---------- @@ -409,11 +409,12 @@ def predict( new_df: Optional[pd.DataFrame] = None, level: Optional[List[Union[int, float]]] = None, X_df: Optional[pd.DataFrame] = None, + ids: Optional[List[str]] = None, *, horizon: Optional[int] = None, # noqa: ARG002 new_data: Optional[pd.DataFrame] = None, # noqa: ARG002 ) -> pd.DataFrame: - """Compute the predictions for the next `horizon` steps. + """Compute the predictions for the next `h` steps. Parameters ---------- @@ -437,6 +438,8 @@ def predict( Confidence levels between 0 and 100 for prediction intervals. X_df : pandas DataFrame, optional (default=None) Dataframe with the future exogenous features. Should have the id column and the time column. + ids : list of str, optional (default=None) + List with subset of ids seen during training for which the forecasts should be computed. horizon : int Number of periods to predict. This argument has been replaced by h and will be removed in a later release. new_data : pandas DataFrame, optional (default=None) @@ -488,12 +491,13 @@ def predict( ts = self.ts forecasts = ts.predict( - self.models_, - h, - dynamic_dfs, - before_predict_callback, - after_predict_callback, - X_df, + models=self.models_, + horizon=h, + dynamic_dfs=dynamic_dfs, + before_predict_callback=before_predict_callback, + after_predict_callback=after_predict_callback, + X_df=X_df, + ids=ids, ) if level is not None: if self._cs_df is None: diff --git a/mlforecast/grouped_array.py b/mlforecast/grouped_array.py index 65c49919..b71e6ae8 100644 --- a/mlforecast/grouped_array.py +++ b/mlforecast/grouped_array.py @@ -153,6 +153,14 @@ def __setitem__(self, idx: int, vals: np.ndarray): raise ValueError(f"vals must be of size {self[idx].size}") self[idx][:] = vals + def take(self, idxs: np.ndarray) -> "GroupedArray": + ranges = [range(self.indptr[i], self.indptr[i + 1]) for i in idxs] + items = [self.data[rng] for rng in ranges] + sizes = np.array([item.size for item in items]) + data = np.hstack(items) + indptr = np.append(0, sizes.cumsum()) + return GroupedArray(data, indptr) + @classmethod def from_sorted_df( cls, df: "pd.DataFrame", id_col: str, target_col: str diff --git a/mlforecast/target_transforms.py b/mlforecast/target_transforms.py index 13d9d49a..929862e1 100644 --- a/mlforecast/target_transforms.py +++ b/mlforecast/target_transforms.py @@ -6,7 +6,7 @@ # %% ../nbs/target_transforms.ipynb 2 import abc import reprlib -from typing import TYPE_CHECKING, Iterable +from typing import TYPE_CHECKING, Iterable, Optional if TYPE_CHECKING: import pandas as pd @@ -17,6 +17,8 @@ # %% ../nbs/target_transforms.ipynb 3 class BaseTargetTransform(abc.ABC): + idxs: Optional[np.ndarray] = None + def set_column_names(self, id_col: str, time_col: str, target_col: str): self.id_col = id_col self.time_col = time_col @@ -53,18 +55,20 @@ def fit_transform(self, df: "pd.DataFrame") -> "pd.DataFrame": new_indptr = d * np.arange(n_series + 1, dtype=np.int32) _apply_difference(ga.data, ga.indptr, new_data, new_indptr, d) self.original_values_.append(GroupedArray(new_data, new_indptr)) - df = df.copy() + df = df.copy(deep=False) df[self.target_col] = ga.data return df def inverse_transform(self, df: "pd.DataFrame") -> "pd.DataFrame": model_cols = df.columns.drop([self.id_col, self.time_col]) - df = df.copy() + df = df.copy(deep=False) for model in model_cols: model_preds = df[model].values.copy() for d, ga in zip( reversed(self.differences), reversed(self.original_values_) ): + if self.idxs is not None: + ga = ga.take(self.idxs) ga.restore_difference(model_preds, d) df[model] = model_preds return df @@ -76,8 +80,8 @@ def _standard_scaler_transform(data, indptr, stats, out): for i in range(n_series): sl = slice(indptr[i], indptr[i + 1]) subs = data[sl] - mean_ = subs.mean() - std_ = subs.std() + mean_ = np.nanmean(subs) + std_ = np.nanstd(subs) stats[i] = mean_, std_ out[sl] = (data[sl] - mean_) / std_ @@ -102,15 +106,16 @@ def fit_transform(self, df: "pd.DataFrame") -> "pd.DataFrame": self.stats_ = np.empty((len(ga.indptr) - 1, 2)) out = np.empty_like(ga.data) _standard_scaler_transform(ga.data, ga.indptr, self.stats_, out) - df = df.copy() + df = df.copy(deep=False) df[self.target_col] = out return df def inverse_transform(self, df: "pd.DataFrame") -> "pd.DataFrame": - df = df.copy() + df = df.copy(deep=False) model_cols = df.columns.drop([self.id_col, self.time_col]) + stats = self.stats_ if self.idxs is None else self.stats_[self.idxs] for model in model_cols: model_preds = df[model].values - _standard_scaler_inverse_transform(model_preds, self.stats_) + _standard_scaler_inverse_transform(model_preds, stats) df[model] = model_preds return df diff --git a/nbs/core.ipynb b/nbs/core.ipynb index c9a21e20..fffa83f8 100644 --- a/nbs/core.ipynb +++ b/nbs/core.ipynb @@ -874,7 +874,7 @@ " features[feat_name] = feat_vals\n", "\n", " features_df = pd.DataFrame(features, columns=self.features)\n", - " features_df[self.id_col] = self.uids\n", + " features_df[self.id_col] = self._uids\n", " features_df[self.time_col] = self.curr_dates \n", " return self.static_features_.merge(features_df, on=self.id_col)\n", " \n", @@ -885,7 +885,7 @@ " \"\"\"Get all the predicted values with their corresponding ids and datestamps.\"\"\"\n", " n_preds = len(self.y_pred)\n", " uids = pd.Series(\n", - " np.repeat(self.uids, n_preds), name=self.id_col, dtype=self.uids.dtype\n", + " np.repeat(self._uids, n_preds), name=self.id_col, dtype=self.uids.dtype\n", " )\n", " df = pd.DataFrame(\n", " {\n", @@ -897,10 +897,13 @@ " return df\n", "\n", " def _predict_setup(self) -> None:\n", + " self.ga = GroupedArray(self._ga.data, self._ga.indptr) \n", " self.curr_dates = self.last_dates.copy()\n", + " if self._idxs is not None:\n", + " self.ga = self.ga.take(self._idxs)\n", + " self.curr_dates = self.curr_dates[self._idxs]\n", " self.test_dates = []\n", " self.y_pred = []\n", - " self.ga = GroupedArray(self._ga.data, self._ga.indptr)\n", " if self.keep_last_n is not None:\n", " self.ga = self.ga.take_from_groups(slice(-self.keep_last_n, None))\n", " self._h = 0\n", @@ -912,7 +915,7 @@ " new_x = new_x.merge(df, how='left')\n", " new_x = new_x.sort_values(self.id_col)\n", " if X_df is not None:\n", - " n_series = self.uids.size\n", + " n_series = len(self._uids)\n", " X = X_df.iloc[self._h * n_series : (self._h + 1) * n_series]\n", " new_x = pd.concat([new_x, X.reset_index(drop=True)], axis=1)\n", " nulls = new_x.isnull().any()\n", @@ -943,7 +946,7 @@ " new_x = before_predict_callback(new_x)\n", " predictions = model.predict(new_x)\n", " if after_predict_callback is not None:\n", - " predictions_serie = pd.Series(predictions, index=self.uids)\n", + " predictions_serie = pd.Series(predictions, index=self._uids)\n", " predictions = after_predict_callback(predictions_serie).values\n", " self._update_y(predictions)\n", " if i == 0:\n", @@ -966,7 +969,7 @@ " raise ValueError(f'horizon must be at most max_horizon ({self.max_horizon})')\n", " if dynamic_dfs is None:\n", " dynamic_dfs = []\n", - " uids = np.repeat(self.uids, horizon)\n", + " uids = np.repeat(self._uids, horizon)\n", " dates = np.hstack(\n", " [\n", " date + (i+1) * self.freq\n", @@ -995,7 +998,19 @@ " before_predict_callback: Optional[Callable] = None,\n", " after_predict_callback: Optional[Callable] = None,\n", " X_df: Optional[pd.DataFrame] = None,\n", + " ids: Optional[List[str]] = None,\n", " ) -> pd.DataFrame:\n", + " if ids is not None:\n", + " unseen = set(ids) - set(self.uids)\n", + " if unseen:\n", + " raise ValueError(f\"The following ids weren't seen during training and thus can't be forecasted: {unseen}\")\n", + " self._uids = self.uids[self.uids.isin(ids)]\n", + " self._idxs: Optional[np.ndarray] = np.where(self.uids.isin(self._uids))[0]\n", + " last_dates = self.last_dates[self._idxs]\n", + " else:\n", + " self._uids = self.uids\n", + " self._idxs = None\n", + " last_dates = self.last_dates\n", " if X_df is not None:\n", " if self.id_col not in X_df or self.time_col not in X_df:\n", " raise ValueError(f\"X_df must have '{self.id_col}' and '{self.time_col}' columns.\")\n", @@ -1012,14 +1027,14 @@ " )\n", " dates_validation = pd.DataFrame(\n", " {\n", - " self.id_col: self.uids,\n", - " '_start': self.last_dates + self.freq,\n", - " '_end': self.last_dates + horizon * self.freq\n", + " self.id_col: self._uids,\n", + " '_start': last_dates + self.freq,\n", + " '_end': last_dates + horizon * self.freq\n", " }\n", " )\n", " X_df = X_df.merge(dates_validation, on=[self.id_col])\n", " X_df = X_df[X_df[self.time_col].between(X_df['_start'], X_df['_end'])]\n", - " if X_df.shape[0] != self.uids.size * horizon:\n", + " if X_df.shape[0] != len(self._uids) * horizon:\n", " raise ValueError(\n", " \"Found missing inputs in X_df. \"\n", " \"It should have one row per id and date for the complete forecasting horizon\"\n", @@ -1048,9 +1063,12 @@ " )\n", " if self.target_transforms is not None:\n", " for tfm in self.target_transforms[::-1]:\n", + " tfm.idxs = self._idxs\n", " preds = tfm.inverse_transform(preds)\n", + " tfm.idxs = None\n", + " del self._uids, self._idxs\n", " return preds\n", - " \n", + "\n", " def update(self, df: pd.DataFrame) -> None:\n", " \"\"\"Update the values of the stored series.\"\"\"\n", " df = df.sort_values([self.id_col, self.time_col])\n", @@ -1309,6 +1327,9 @@ "# _update_features\n", "ts = TimeSeries(**flow_config)\n", "ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')\n", + "ts._uids = ts.uids\n", + "ts._idxs = np.arange(len(ts.ga))\n", + "ts._predict_setup()\n", "updates = ts._update_features().drop(columns='ds')\n", "\n", "last_date = serie['ds'].max()\n", @@ -1341,6 +1362,9 @@ "# _get_predictions\n", "ts = TimeSeries(freq='D', lags=[1])\n", "ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')\n", + "ts._uids = ts.uids\n", + "ts._idxs = np.arange(len(ts.ga))\n", + "ts._predict_setup()\n", "ts._update_features()\n", "ts._update_y([1.])\n", "preds = ts._get_predictions()\n", @@ -1535,6 +1559,8 @@ "\n", "ts = TimeSeries(**flow_config)\n", "df = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=keep_last_n)\n", + "ts._uids = ts.uids.tolist()\n", + "ts._idxs = np.arange(len(ts.ga))\n", "ts._predict_setup()\n", "\n", "expected_lags = ['lag7', 'lag14']\n", @@ -1650,7 +1676,8 @@ "> dfs:Optional[List[pandas.core.frame.DataFrame]]=None,\n", "> before_predict_callback:Optional[Callable]=None,\n", "> after_predict_callback:Optional[Callable]=None,\n", - "> X_df:Optional[pandas.core.frame.DataFrame]=None)" + "> X_df:Optional[pandas.core.frame.DataFrame]=None,\n", + "> ids:Optional[List[str]]=None)" ], "text/plain": [ "---\n", @@ -1662,7 +1689,8 @@ "> dfs:Optional[List[pandas.core.frame.DataFrame]]=None,\n", "> before_predict_callback:Optional[Callable]=None,\n", "> after_predict_callback:Optional[Callable]=None,\n", - "> X_df:Optional[pandas.core.frame.DataFrame]=None)" + "> X_df:Optional[pandas.core.frame.DataFrame]=None,\n", + "> ids:Optional[List[str]]=None)" ] }, "execution_count": null, @@ -1730,7 +1758,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "If we have dynamic features we can pass them to `dynamic_dfs`." + "If we have dynamic features we can pass them to `X_df`." ] }, { @@ -1764,6 +1792,23 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "# predicting for a subset\n", + "sample_ids = ['id_00', 'id_16']\n", + "sample_preds = ts.predict({'price': model}, 1, X_df=prices_catalog, ids=sample_ids)\n", + "pd.testing.assert_frame_equal(\n", + " sample_preds, \n", + " prices_catalog.merge(predictions[predictions['unique_id'].isin(sample_ids)][['unique_id', 'ds']])[['unique_id', 'ds', 'price']]\n", + ")\n", + "test_fail(lambda: ts.predict({'y': model}, 1, ids=['bonjour']), contains=\"{'bonjour'}\")" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nbs/forecast.ipynb b/nbs/forecast.ipynb index 09c2b690..1f06dc19 100644 --- a/nbs/forecast.ipynb +++ b/nbs/forecast.ipynb @@ -232,7 +232,7 @@ " num_threads: int = 1,\n", " target_transforms: Optional[List[BaseTargetTransform]] = None,\n", " ):\n", - " \"\"\"Create forecast object\n", + " \"\"\"Forecasting pipeline\n", "\n", " Parameters\n", " ----------\n", @@ -507,11 +507,12 @@ " new_df: Optional[pd.DataFrame] = None,\n", " level: Optional[List[Union[int, float]]] = None,\n", " X_df: Optional[pd.DataFrame] = None,\n", + " ids: Optional[List[str]] = None,\n", " *,\n", " horizon: Optional[int] = None, # noqa: ARG002\n", - " new_data: Optional[pd.DataFrame] = None, # noqa: ARG002 \n", + " new_data: Optional[pd.DataFrame] = None, # noqa: ARG002\n", " ) -> pd.DataFrame:\n", - " \"\"\"Compute the predictions for the next `horizon` steps.\n", + " \"\"\"Compute the predictions for the next `h` steps.\n", " \n", " Parameters\n", " ----------\n", @@ -535,6 +536,8 @@ " Confidence levels between 0 and 100 for prediction intervals.\n", " X_df : pandas DataFrame, optional (default=None)\n", " Dataframe with the future exogenous features. Should have the id column and the time column.\n", + " ids : list of str, optional (default=None)\n", + " List with subset of ids seen during training for which the forecasts should be computed.\n", " horizon : int\n", " Number of periods to predict. This argument has been replaced by h and will be removed in a later release.\n", " new_data : pandas DataFrame, optional (default=None)\n", @@ -578,7 +581,13 @@ " ts = self.ts\n", " \n", " forecasts = ts.predict(\n", - " self.models_, h, dynamic_dfs, before_predict_callback, after_predict_callback, X_df\n", + " models=self.models_,\n", + " horizon=h,\n", + " dynamic_dfs=dynamic_dfs,\n", + " before_predict_callback=before_predict_callback,\n", + " after_predict_callback=after_predict_callback,\n", + " X_df=X_df,\n", + " ids=ids,\n", " )\n", " if level is not None:\n", " if self._cs_df is None:\n", @@ -814,7 +823,7 @@ "> target_transforms:Optional[List[mlforecast.target_transforms.\n", "> BaseTargetTransform]]=None)\n", "\n", - "Create forecast object\n", + "Forecasting pipeline\n", "\n", "| | **Type** | **Default** | **Details** |\n", "| -- | -------- | ----------- | ----------- |\n", @@ -842,7 +851,7 @@ "> target_transforms:Optional[List[mlforecast.target_transforms.\n", "> BaseTargetTransform]]=None)\n", "\n", - "Create forecast object\n", + "Forecasting pipeline\n", "\n", "| | **Type** | **Default** | **Details** |\n", "| -- | -------- | ----------- | ----------- |\n", @@ -878,7 +887,7 @@ "id": "fb5ec811-8876-4daa-84a2-2ebe0559a02b", "metadata": {}, "source": [ - "## Example\n", + "### Data\n", "This shows an example with just 4 series of the M4 dataset. If you want to run it yourself on all of them, you can refer to [this notebook](https://www.kaggle.com/code/lemuz90/m4-competition)." ] }, @@ -1121,7 +1130,7 @@ ], "source": [ "fcst = MLForecast(\n", - " models=lgb.LGBMRegressor(random_state=0),\n", + " models=lgb.LGBMRegressor(random_state=0, verbosity=-1),\n", " lags=[24 * (i+1) for i in range(7)],\n", " lag_transforms={\n", " 48: [(ewm_mean, 0.3)],\n", @@ -1255,10 +1264,11 @@ "> new_df:Optional[pandas.core.frame.DataFrame]=None,\n", "> level:Optional[List[Union[int,float]]]=None,\n", "> X_df:Optional[pandas.core.frame.DataFrame]=None,\n", + "> ids:Optional[List[str]]=None,\n", "> horizon:Optional[int]=None,\n", "> new_data:Optional[pandas.core.frame.DataFrame]=None)\n", "\n", - "Compute the predictions for the next `horizon` steps.\n", + "Compute the predictions for the next `h` steps.\n", "\n", "| | **Type** | **Default** | **Details** |\n", "| -- | -------- | ----------- | ----------- |\n", @@ -1269,9 +1279,10 @@ "| new_df | Optional | None | Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_df` is not None, the method will generate forecasts for the new observations. |\n", "| level | Optional | None | Confidence levels between 0 and 100 for prediction intervals. |\n", "| X_df | Optional | None | Dataframe with the future exogenous features. Should have the id column and the time column. |\n", + "| ids | Optional | None | List with subset of ids seen during training for which the forecasts should be computed. |\n", "| horizon | Optional | None | Number of periods to predict. This argument has been replaced by h and will be removed in a later release. |\n", "| new_data | Optional | None | Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_data` is not None, the method will generate forecasts for the new observations. |\n", - "| **Returns** | **DataFrame** | | **noqa: ARG002
noqa: ARG002 ** |" + "| **Returns** | **DataFrame** | | **noqa: ARG002
noqa: ARG002** |" ], "text/plain": [ "---\n", @@ -1286,10 +1297,11 @@ "> new_df:Optional[pandas.core.frame.DataFrame]=None,\n", "> level:Optional[List[Union[int,float]]]=None,\n", "> X_df:Optional[pandas.core.frame.DataFrame]=None,\n", + "> ids:Optional[List[str]]=None,\n", "> horizon:Optional[int]=None,\n", "> new_data:Optional[pandas.core.frame.DataFrame]=None)\n", "\n", - "Compute the predictions for the next `horizon` steps.\n", + "Compute the predictions for the next `h` steps.\n", "\n", "| | **Type** | **Default** | **Details** |\n", "| -- | -------- | ----------- | ----------- |\n", @@ -1300,9 +1312,10 @@ "| new_df | Optional | None | Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_df` is not None, the method will generate forecasts for the new observations. |\n", "| level | Optional | None | Confidence levels between 0 and 100 for prediction intervals. |\n", "| X_df | Optional | None | Dataframe with the future exogenous features. Should have the id column and the time column. |\n", + "| ids | Optional | None | List with subset of ids seen during training for which the forecasts should be computed. |\n", "| horizon | Optional | None | Number of periods to predict. This argument has been replaced by h and will be removed in a later release. |\n", "| new_data | Optional | None | Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_data` is not None, the method will generate forecasts for the new observations. |\n", - "| **Returns** | **DataFrame** | | **noqa: ARG002
noqa: ARG002 ** |" + "| **Returns** | **DataFrame** | | **noqa: ARG002
noqa: ARG002** |" ] }, "execution_count": null, @@ -1370,6 +1383,86 @@ ")" ] }, + { + "cell_type": "markdown", + "id": "de5e75f6-d509-428b-bb60-60c1efcb08bc", + "metadata": {}, + "source": [ + "#### Predicting a subset of the training series" + ] + }, + { + "cell_type": "markdown", + "id": "97e0e418-662f-4a19-b93b-7c94606f1023", + "metadata": {}, + "source": [ + "By default all series seen during training will be forecasted with the predict method. If you're only interested in predicting a couple of them you can use the `ids` argument." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ac6a73e-9df5-4ea5-9142-5c8acbe151be", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsLGBMRegressor
0H38196153.462100
1H41396125.206026
\n", + "
" + ], + "text/plain": [ + " unique_id ds LGBMRegressor\n", + "0 H381 961 53.462100\n", + "1 H413 961 25.206026" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fcst.predict(1, ids=sample_ids[:2])" + ] + }, { "cell_type": "markdown", "id": "adfb0f7f-ea89-44a6-b557-ac12a278f111", @@ -1768,7 +1861,7 @@ "#| hide\n", "# test indexed data, datetime ds\n", "fcst_test = MLForecast(\n", - " models=lgb.LGBMRegressor(random_state=0),\n", + " models=lgb.LGBMRegressor(random_state=0, verbosity=-1),\n", " lags=[1],\n", " num_threads=1,\n", " freq='D'\n", @@ -2296,7 +2389,7 @@ "outputs": [], "source": [ "fcst = MLForecast(\n", - " models=lgb.LGBMRegressor(random_state=0),\n", + " models=lgb.LGBMRegressor(random_state=0, verbosity=-1),\n", " lags=[24 * (i+1) for i in range(7)],\n", " lag_transforms={\n", " 1: [(rolling_mean, 24)],\n", @@ -2748,7 +2841,7 @@ ], "source": [ "fcst = MLForecast(\n", - " models=lgb.LGBMRegressor(random_state=0),\n", + " models=lgb.LGBMRegressor(random_state=0, verbosity=-1),\n", " lags=[24 * (i+1) for i in range(7)],\n", " lag_transforms={\n", " 1: [(rolling_mean, 24)],\n", @@ -4070,7 +4163,7 @@ " return dates.day % 2 == 0\n", "\n", "fcst = MLForecast(\n", - " models=lgb.LGBMRegressor(n_jobs=1, random_state=0),\n", + " models=lgb.LGBMRegressor(n_jobs=1, random_state=0, verbosity=-1),\n", " freq='D',\n", " lags=[7],\n", " lag_transforms={\n", @@ -4165,31 +4258,31 @@ " 0\n", " id_00\n", " 2001-05-15\n", - " 42.382022\n", + " 42.382751\n", " \n", " \n", " 1\n", " id_00\n", " 2001-05-16\n", - " 50.049521\n", + " 50.069369\n", " \n", " \n", " 2\n", " id_00\n", " 2001-05-17\n", - " 1.990593\n", + " 1.928786\n", " \n", " \n", " 3\n", " id_00\n", " 2001-05-18\n", - " 10.312788\n", + " 10.315261\n", " \n", " \n", " 4\n", " id_00\n", " 2001-05-19\n", - " 18.834164\n", + " 18.833788\n", " \n", " \n", " ...\n", @@ -4201,31 +4294,31 @@ " 695\n", " id_99\n", " 2001-05-17\n", - " 44.440773\n", + " 44.484574\n", " \n", " \n", " 696\n", " id_99\n", " 2001-05-18\n", - " 1.909664\n", + " 1.882713\n", " \n", " \n", " 697\n", " id_99\n", " 2001-05-19\n", - " 9.055328\n", + " 9.057168\n", " \n", " \n", " 698\n", " id_99\n", " 2001-05-20\n", - " 15.153067\n", + " 15.155541\n", " \n", " \n", " 699\n", " id_99\n", " 2001-05-21\n", - " 22.910254\n", + " 22.912041\n", " \n", " \n", "\n", @@ -4234,17 +4327,17 @@ ], "text/plain": [ " unique_id ds LGBMRegressor\n", - "0 id_00 2001-05-15 42.382022\n", - "1 id_00 2001-05-16 50.049521\n", - "2 id_00 2001-05-17 1.990593\n", - "3 id_00 2001-05-18 10.312788\n", - "4 id_00 2001-05-19 18.834164\n", + "0 id_00 2001-05-15 42.382751\n", + "1 id_00 2001-05-16 50.069369\n", + "2 id_00 2001-05-17 1.928786\n", + "3 id_00 2001-05-18 10.315261\n", + "4 id_00 2001-05-19 18.833788\n", ".. ... ... ...\n", - "695 id_99 2001-05-17 44.440773\n", - "696 id_99 2001-05-18 1.909664\n", - "697 id_99 2001-05-19 9.055328\n", - "698 id_99 2001-05-20 15.153067\n", - "699 id_99 2001-05-21 22.910254\n", + "695 id_99 2001-05-17 44.484574\n", + "696 id_99 2001-05-18 1.882713\n", + "697 id_99 2001-05-19 9.057168\n", + "698 id_99 2001-05-20 15.155541\n", + "699 id_99 2001-05-21 22.912041\n", "\n", "[700 rows x 3 columns]" ] @@ -4305,7 +4398,7 @@ "non_std_series['ds'] = non_std_series.groupby('unique_id').cumcount()\n", "non_std_series = non_std_series.rename(columns={'unique_id': 'some_id', 'ds': 'time', 'y': 'value'})\n", "models = [\n", - " lgb.LGBMRegressor(n_jobs=1, random_state=0),\n", + " lgb.LGBMRegressor(n_jobs=1, random_state=0, verbosity=-1),\n", " xgb.XGBRegressor(n_jobs=1, random_state=0),\n", "]\n", "flow_params = dict(\n", @@ -4620,7 +4713,7 @@ } ], "source": [ - "fcst = MLForecast(lgb.LGBMRegressor(), freq='D', lags=[1])\n", + "fcst = MLForecast(lgb.LGBMRegressor(verbosity=-1), freq='D', lags=[1])\n", "fcst.fit(series)\n", "\n", "preds = fcst.predict(2, before_predict_callback=inspect_input, after_predict_callback=increase_predictions)\n", @@ -4635,6 +4728,8 @@ "outputs": [], "source": [ "#|hide\n", + "fcst.ts._uids = fcst.ts.uids\n", + "fcst.ts._idxs = None\n", "fcst.ts._predict_setup()\n", "\n", "for attr in ('head', 'tail'):\n", @@ -4659,7 +4754,7 @@ "def test_cross_validation(data=non_std_series, add_exogenous=False):\n", " n_windows = 2\n", " h = 14\n", - " fcst = MLForecast(lgb.LGBMRegressor(), freq='D', lags=[7, 14])\n", + " fcst = MLForecast(lgb.LGBMRegressor(verbosity=-1), freq='D', lags=[7, 14])\n", " if add_exogenous:\n", " data = data.assign(ex1 = lambda x: np.arange(0, len(x)))\n", " with warnings.catch_warnings():\n", diff --git a/nbs/grouped_array.ipynb b/nbs/grouped_array.ipynb index dbbf3793..3286880c 100644 --- a/nbs/grouped_array.ipynb +++ b/nbs/grouped_array.ipynb @@ -192,13 +192,23 @@ " return self.ngroups\n", " \n", " def __getitem__(self, idx: int) -> np.ndarray:\n", - " return self.data[self.indptr[idx] : self.indptr[idx + 1]]\n", - " \n", + " return self.data[self.indptr[idx] : self.indptr[idx + 1]] \n", + "\n", " def __setitem__(self, idx: int, vals: np.ndarray):\n", " if self[idx].size != vals.size:\n", " raise ValueError(f'vals must be of size {self[idx].size}')\n", " self[idx][:] = vals\n", " \n", + " def take(self, idxs: np.ndarray) -> 'GroupedArray':\n", + " ranges = [\n", + " range(self.indptr[i], self.indptr[i + 1]) for i in idxs\n", + " ]\n", + " items = [self.data[rng] for rng in ranges]\n", + " sizes = np.array([item.size for item in items])\n", + " data = np.hstack(items)\n", + " indptr = np.append(0, sizes.cumsum())\n", + " return GroupedArray(data, indptr)\n", + " \n", " @classmethod\n", " def from_sorted_df(cls, df: 'pd.DataFrame', id_col: str, target_col: str) -> 'GroupedArray':\n", " sizes = df.groupby(id_col, observed=True).size().values\n", @@ -318,6 +328,21 @@ "np.testing.assert_equal(last_4.indptr, np.array([0, 2, 6]))" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "9b952d65-0afb-49b4-b1b1-f79c2b2599e6", + "metadata": {}, + "outputs": [], + "source": [ + "# Select a specific subset of groups\n", + "indptr = np.array([0, 2, 4, 7, 10])\n", + "ga2 = GroupedArray(data, indptr)\n", + "subset = ga2.take([0, 2])\n", + "np.testing.assert_allclose(subset[0].data, ga2[0].data)\n", + "np.testing.assert_allclose(subset[1].data, ga2[2].data)" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nbs/target_transforms.ipynb b/nbs/target_transforms.ipynb index de4f322c..3e0f5968 100644 --- a/nbs/target_transforms.ipynb +++ b/nbs/target_transforms.ipynb @@ -32,7 +32,7 @@ "#| export\n", "import abc\n", "import reprlib\n", - "from typing import TYPE_CHECKING, Iterable\n", + "from typing import TYPE_CHECKING, Iterable, Optional\n", "\n", "if TYPE_CHECKING:\n", " import pandas as pd\n", @@ -51,11 +51,13 @@ "source": [ "#| export\n", "class BaseTargetTransform(abc.ABC):\n", + " idxs: Optional[np.ndarray] = None\n", + " \n", " def set_column_names(self, id_col: str, time_col: str, target_col: str):\n", " self.id_col = id_col\n", " self.time_col = time_col\n", " self.target_col = target_col\n", - " \n", + "\n", " @abc.abstractmethod\n", " def fit_transform(self, df: 'pd.DataFrame') -> 'pd.DataFrame':\n", " raise NotImplementedError\n", @@ -93,16 +95,18 @@ " new_indptr = d * np.arange(n_series + 1, dtype=np.int32)\n", " _apply_difference(ga.data, ga.indptr, new_data, new_indptr, d)\n", " self.original_values_.append(GroupedArray(new_data, new_indptr))\n", - " df = df.copy()\n", + " df = df.copy(deep=False)\n", " df[self.target_col] = ga.data\n", " return df\n", "\n", " def inverse_transform(self, df: 'pd.DataFrame') -> 'pd.DataFrame':\n", " model_cols = df.columns.drop([self.id_col, self.time_col])\n", - " df = df.copy()\n", + " df = df.copy(deep=False)\n", " for model in model_cols:\n", " model_preds = df[model].values.copy()\n", " for d, ga in zip(reversed(self.differences), reversed(self.original_values_)):\n", + " if self.idxs is not None:\n", + " ga = ga.take(self.idxs)\n", " ga.restore_difference(model_preds, d)\n", " df[model] = model_preds\n", " return df" @@ -122,8 +126,8 @@ " for i in range(n_series):\n", " sl = slice(indptr[i], indptr[i + 1])\n", " subs = data[sl]\n", - " mean_ = subs.mean()\n", - " std_ = subs.std()\n", + " mean_ = np.nanmean(subs)\n", + " std_ = np.nanstd(subs)\n", " stats[i] = mean_, std_\n", " out[sl] = (data[sl] - mean_) / std_\n", "\n", @@ -155,16 +159,17 @@ " self.stats_ = np.empty((len(ga.indptr) - 1, 2)) \n", " out = np.empty_like(ga.data)\n", " _standard_scaler_transform(ga.data, ga.indptr, self.stats_, out)\n", - " df = df.copy()\n", + " df = df.copy(deep=False)\n", " df[self.target_col] = out\n", " return df\n", "\n", " def inverse_transform(self, df: 'pd.DataFrame') -> 'pd.DataFrame': \n", - " df = df.copy()\n", - " model_cols = df.columns.drop([self.id_col, self.time_col]) \n", + " df = df.copy(deep=False)\n", + " model_cols = df.columns.drop([self.id_col, self.time_col])\n", + " stats = self.stats_ if self.idxs is None else self.stats_[self.idxs]\n", " for model in model_cols:\n", " model_preds = df[model].values\n", - " _standard_scaler_inverse_transform(model_preds, self.stats_)\n", + " _standard_scaler_inverse_transform(model_preds, stats)\n", " df[model] = model_preds\n", " return df" ] @@ -186,6 +191,12 @@ "pd.testing.assert_frame_equal(\n", " sc.inverse_transform(sc.fit_transform(series)),\n", " series,\n", + ")\n", + "subset = series[series['unique_id'].isin(['id_0', 'id_7'])]\n", + "sc.idxs = [0, 7]\n", + "pd.testing.assert_frame_equal(\n", + " sc.inverse_transform(subset),\n", + " subset\n", ")" ] }