import copy
+
+from nbdev import show_doc
+from fastcore.test import test_eq, test_fail, test_warns
+from window_ops.expanding import expanding_mean
+from window_ops.rolling import rolling_mean
+from window_ops.shift import shift_array
+
+from mlforecast.target_transforms import LocalStandardScaler
+from mlforecast.utils import generate_daily_series, generate_prices_for_series
Core
+Data format
+The required input format is a dataframe with at least the following columns: * unique_id
with a unique identifier for each time serie * ds
with the datestamp and a column * y
with the values of the serie.
Every other column is considered a static feature unless stated otherwise in TimeSeries.fit
= generate_daily_series(20, n_static_features=2)
+ series series
+ | unique_id | +ds | +y | +static_0 | +static_1 | +
---|---|---|---|---|---|
0 | +id_00 | +2000-01-01 | +0.740453 | +27 | +53 | +
1 | +id_00 | +2000-01-02 | +3.595262 | +27 | +53 | +
2 | +id_00 | +2000-01-03 | +6.895835 | +27 | +53 | +
3 | +id_00 | +2000-01-04 | +8.499450 | +27 | +53 | +
4 | +id_00 | +2000-01-05 | +11.321981 | +27 | +53 | +
... | +... | +... | +... | +... | +... | +
4869 | +id_19 | +2000-03-25 | +40.060681 | +97 | +45 | +
4870 | +id_19 | +2000-03-26 | +53.879482 | +97 | +45 | +
4871 | +id_19 | +2000-03-27 | +62.020210 | +97 | +45 | +
4872 | +id_19 | +2000-03-28 | +2.062543 | +97 | +45 | +
4873 | +id_19 | +2000-03-29 | +14.151317 | +97 | +45 | +
4874 rows × 5 columns
+For simplicity we’ll just take one time serie here.
+= series['unique_id'].unique()
+ uids = series[series['unique_id'].eq(uids[0])]
+ serie serie
+ | unique_id | +ds | +y | +static_0 | +static_1 | +
---|---|---|---|---|---|
0 | +id_00 | +2000-01-01 | +0.740453 | +27 | +53 | +
1 | +id_00 | +2000-01-02 | +3.595262 | +27 | +53 | +
2 | +id_00 | +2000-01-03 | +6.895835 | +27 | +53 | +
3 | +id_00 | +2000-01-04 | +8.499450 | +27 | +53 | +
4 | +id_00 | +2000-01-05 | +11.321981 | +27 | +53 | +
... | +... | +... | +... | +... | +... | +
217 | +id_00 | +2000-08-05 | +1.326319 | +27 | +53 | +
218 | +id_00 | +2000-08-06 | +3.823198 | +27 | +53 | +
219 | +id_00 | +2000-08-07 | +5.955518 | +27 | +53 | +
220 | +id_00 | +2000-08-08 | +8.698637 | +27 | +53 | +
221 | +id_00 | +2000-08-09 | +11.925481 | +27 | +53 | +
222 rows × 5 columns
++
TimeSeries
++++TimeSeries (freq:Union[int,str,pandas._libs.tslibs.offsets.BaseOffset,Non + eType]=None, lags:Optional[Iterable[int]]=None, lag_transform + s:Optional[Dict[int,List[Union[Callable,Tuple[Callable,Any]]] + ]]=None, + date_features:Optional[Iterable[Union[str,Callable]]]=None, + differences:Optional[Iterable[int]]=None, num_threads:int=1, + target_transforms:Optional[List[mlforecast.target_transforms. + BaseTargetTransform]]=None)
Utility class for storing and transforming time series data.
+The TimeSeries
class takes care of defining the transformations to be performed (lags
, lag_transforms
and date_features
). The transformations can be computed using multithreading if num_threads > 1
.
def month_start_or_end(dates):
+return dates.is_month_start | dates.is_month_end
+
+= dict(
+ flow_config ='W-THU',
+ freq=[7],
+ lags={
+ lag_transforms1: [expanding_mean, (rolling_mean, 7)]
+
+ },=['dayofweek', 'week', month_start_or_end]
+ date_features
+ )
+= TimeSeries(**flow_config)
+ ts ts
TimeSeries(freq=<Week: weekday=3>, transforms=['lag7', 'expanding_mean_lag1', 'rolling_mean_lag1_window_size7'], date_features=['dayofweek', 'week', 'month_start_or_end'], num_threads=1)
+The frequency is converted to an offset.
+'freq'])) test_eq(ts.freq, pd.tseries.frequencies.to_offset(flow_config[
The date features are stored as they were passed to the constructor.
+'date_features']) test_eq(ts.date_features, flow_config[
The transformations are stored as a dictionary where the key is the name of the transformation (name of the column in the dataframe with the computed features), which is built using build_transform_name
and the value is a tuple where the first element is the lag it is applied to, then the function and then the function arguments.
+ test_eq(
+ ts.transforms,
+ {'lag7': (7, _identity),
+ 'expanding_mean_lag1': (1, expanding_mean),
+ 'rolling_mean_lag1_window_size7': (1, rolling_mean, 7)
+
+
+ } )
Note that for lags
we define the transformation as the identity function applied to its corresponding lag. This is because _transform_series
takes the lag as an argument and shifts the array before computing the transformation.
+
TimeSeries.fit_transform
++++TimeSeries.fit_transform (data:pandas.core.frame.DataFrame, id_col:str, + time_col:str, target_col:str, + static_features:Optional[List[str]]=None, + dropna:bool=True, + keep_last_n:Optional[int]=None, + max_horizon:Optional[int]=None, + return_X_y:bool=False)
Add the features to data
and save the required information for the predictions step.
If not all features are static, specify which ones are in static_features
. If you don’t want to drop rows with null values after the transformations set dropna=False
If keep_last_n
is not None then that number of observations is kept across all series for updates.
= dict(
+ flow_config ='D',
+ freq=[7, 14],
+ lags={
+ lag_transforms2: [
+ 7),
+ (rolling_mean, 14),
+ (rolling_mean,
+ ]
+ },=['dayofweek', 'month', 'year'],
+ date_features=2
+ num_threads
+ )
+= TimeSeries(**flow_config)
+ ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y') _
The series values are stored as a GroupedArray in an attribute ga
. If the data type of the series values is an int then it is converted to np.float32
, this is because lags generate np.nan
s so we need a float data type for them.
np.testing.assert_equal(ts.ga.data, series.y.values)
The series ids are stored in an uids
attribute.
'unique_id'].unique()) test_eq(ts.uids, series[
For each time serie, the last observed date is stored so that predictions start from the last date + the frequency.
+'unique_id')['ds'].max().values) test_eq(ts.last_dates, series.groupby(
The last row of every serie without the y
and ds
columns are taken as static features.
+ pd.testing.assert_frame_equal(
+ ts.static_features_,'unique_id').tail(1).drop(columns=['ds', 'y']).reset_index(drop=True),
+ series.groupby( )
If you pass static_features
to TimeSeries.fit_transform
then only these are kept.
='unique_id', time_col='ds', target_col='y', static_features=['static_0'])
+ ts.fit_transform(series, id_col
+
+ pd.testing.assert_frame_equal(
+ ts.static_features_,'unique_id').tail(1)[['unique_id', 'static_0']].reset_index(drop=True),
+ series.groupby( )
You can also specify keep_last_n in TimeSeries.fit_transform, which means that after computing the features for training we want to keep only the last n samples of each time serie for computing the updates. This saves both memory and time, since the updates are performed by running the transformation functions on all time series again and keeping only the last value (the update).
+If you have very long time series and your updates only require a small sample it’s recommended that you set keep_last_n to the minimum number of samples required to compute the updates, which in this case is 15 since we have a rolling mean of size 14 over the lag 2 and in the first update the lag 2 becomes the lag 1. This is because in the first update the lag 1 is the last value of the series (or the lag 0), the lag 2 is the lag 1 and so on.
+= 15
+ keep_last_n
+= TimeSeries(**flow_config)
+ ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=keep_last_n)
+ df = ts.uids.tolist()
+ ts._uids = np.arange(len(ts.ga))
+ ts._idxs
+ ts._predict_setup()
+= ['lag7', 'lag14']
+ expected_lags = ['rolling_mean_lag2_window_size7',
+ expected_transforms 'rolling_mean_lag2_window_size14']
+ = ['dayofweek', 'month', 'year']
+ expected_date_features
++ expected_transforms + expected_date_features)
+ test_eq(ts.features, expected_lags + ts.features, df.columns.drop(['ds', 'y']).tolist())
+ test_eq(ts.static_features_.columns.tolist() # we dropped 2 rows because of the lag 2 and 13 more to have the window of size 14
+0], series.shape[0] - (2 + 13) * ts.ga.ngroups)
+ test_eq(df.shape[* keep_last_n) test_eq(ts.ga.data.size, ts.ga.ngroups
TimeSeries.fit_transform
requires that the y column doesn’t have any null values. This is because the transformations could propagate them forward, so if you have null values in the y column you’ll get an error.
= series.copy()
+ series_with_nulls 1, 'y'] = np.nan
+ series_with_nulls.loc[
+ test_fail(lambda: ts.fit_transform(series_with_nulls, id_col='unique_id', time_col='ds', target_col='y'),
+ ='y column contains null values'
+ contains )
+
TimeSeries.predict
++++TimeSeries.predict (models:Dict[str,Union[sklearn.base.BaseEstimator,List + [sklearn.base.BaseEstimator]]], horizon:int, dynamic_ + dfs:Optional[List[pandas.core.frame.DataFrame]]=None, + before_predict_callback:Optional[Callable]=None, + after_predict_callback:Optional[Callable]=None, + X_df:Optional[pandas.core.frame.DataFrame]=None, + ids:Optional[List[str]]=None)
Once we have a trained model we can use TimeSeries.predict
passing the model and the horizon to get the predictions back.
class DummyModel:
+def predict(self, X: pd.DataFrame) -> np.ndarray:
+ return X['lag7'].values
+
+= 7
+ horizon = DummyModel()
+ model = TimeSeries(**flow_config)
+ ts ='unique_id', time_col='ds', target_col='y')
+ ts.fit_transform(series, id_col= ts.predict({'DummyModel': model}, horizon)
+ predictions
+= series.groupby('unique_id')
+ grouped_series = grouped_series['y'].tail(7) # the model predicts the lag-7
+ expected_preds = grouped_series['ds'].max()
+ last_dates = last_dates + ts.freq
+ expected_dsmin = last_dates + horizon * ts.freq
+ expected_dsmax = predictions.groupby('unique_id')
+ grouped_preds
+'DummyModel'], expected_preds)
+ np.testing.assert_allclose(predictions['ds'].min(), expected_dsmin)
+ pd.testing.assert_series_equal(grouped_preds['ds'].max(), expected_dsmax) pd.testing.assert_series_equal(grouped_preds[
If we have dynamic features we can pass them to X_df
.
class PredictPrice:
+def predict(self, X):
+ return X['price']
+
+= generate_daily_series(20, n_static_features=2, equal_ends=True)
+ series = series.rename(columns={'static_1': 'product_id'})
+ dynamic_series = generate_prices_for_series(dynamic_series)
+ prices_catalog = dynamic_series.merge(prices_catalog, how='left')
+ series_with_prices
+= PredictPrice()
+ model = TimeSeries(**flow_config)
+ ts
+ ts.fit_transform(
+ series_with_prices,='unique_id',
+ id_col='ds',
+ time_col='y',
+ target_col=['static_0', 'product_id'],
+ static_features
+ )= ts.predict({'PredictPrice': model}, horizon=1, X_df=prices_catalog)
+ predictions
+ pd.testing.assert_frame_equal(={'PredictPrice': 'price'}),
+ predictions.rename(columns'unique_id', 'ds']])[['unique_id', 'ds', 'price']]
+ prices_catalog.merge(predictions[[ )
+
TimeSeries.update
++++TimeSeries.update (df:pandas.core.frame.DataFrame)
Update the values of the stored series.
+ + +Give us a ⭐ on Github