From 6de9a8481a6810db8859fdcefbf65b755c3f4edf Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 1 Feb 2024 22:20:43 -0500 Subject: [PATCH 1/5] add filling missing and observed mask for forecasting --- tsfm_public/toolkit/dataset.py | 52 ++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/tsfm_public/toolkit/dataset.py b/tsfm_public/toolkit/dataset.py index e4222328..143b0377 100644 --- a/tsfm_public/toolkit/dataset.py +++ b/tsfm_public/toolkit/dataset.py @@ -42,7 +42,11 @@ def __init__( context_length: int = 1, prediction_length: int = 0, zero_padding: bool = True, +<<<<<<< HEAD stride: int = 1, +======= + fill_value: Union[float, int] = 0.0, +>>>>>>> f75400d (add filling missing and observed mask for forecasting) ): super().__init__() if not isinstance(x_cols, list): @@ -71,6 +75,7 @@ def __init__( self.context_length = context_length self.prediction_length = prediction_length self.zero_padding = zero_padding + self.fill_value = fill_value self.timestamps = None self.group_id = group_id self.stride = stride @@ -154,6 +159,11 @@ def __init__( context_length: int = 1, prediction_length: int = 1, num_workers: int = 1, +<<<<<<< HEAD +======= + pred_len: int = 0, + fill_value: Union[float, int] = 0.0, +>>>>>>> f75400d (add filling missing and observed mask for forecasting) cls=BaseDFDataset, stride: int = 1, **kwargs, @@ -167,10 +177,16 @@ def __init__( # self.y_cols = y_cols self.context_length = context_length self.num_workers = num_workers +<<<<<<< HEAD self.cls = cls self.prediction_length = prediction_length self.stride = stride self.extra_kwargs = kwargs +======= + self.pred_len = pred_len + self.fill_value = fill_value + self.cls = cls +>>>>>>> f75400d (add filling missing and observed mask for forecasting) # create groupby object if len(id_columns) == 1: @@ -212,8 +228,14 @@ def concat_dataset(self): self.context_length, self.prediction_length, self.drop_cols, +<<<<<<< HEAD self.stride, self.extra_kwargs, +======= + self.seq_len, + self.pred_len, + self.fill_value, +>>>>>>> f75400d (add filling missing and observed mask for forecasting) ) for group_id, group in group_df ], @@ -228,6 +250,7 @@ def get_group_data( cls, group, group_id, +<<<<<<< HEAD id_columns: List[str] = [], timestamp_column: Optional[str] = None, context_length: int = 1, @@ -235,6 +258,16 @@ def get_group_data( drop_cols: Optional[List[str]] = None, stride: int = 1, extra_kwargs: Dict[str, Any] = {}, +======= + datetime_col: str, + id_columns: List[str], + x_cols: list, + y_cols: list, + drop_cols: list, + seq_len: int, + pred_len: int, + fill_value: Union[float, int], +>>>>>>> f75400d (add filling missing and observed mask for forecasting) ): return cls( data_df=group, @@ -244,8 +277,14 @@ def get_group_data( context_length=context_length, prediction_length=prediction_length, drop_cols=drop_cols, +<<<<<<< HEAD stride=stride, **extra_kwargs, +======= + seq_len=seq_len, + pred_len=pred_len, + fill_value=fill_value, +>>>>>>> f75400d (add filling missing and observed mask for forecasting) ) @@ -359,6 +398,7 @@ def __init__( frequency_token: Optional[int] = None, autoregressive_modeling: bool = True, stride: int = 1, + fill_value: Union[float, int] = 0.0, ): # output_columns_tmp = input_columns if output_columns == [] else output_columns @@ -369,6 +409,7 @@ def __init__( num_workers=num_workers, context_length=context_length, prediction_length=prediction_length, + fill_value=fill_value, cls=self.BaseForecastDFDataset, stride=stride, # extra_args @@ -406,6 +447,7 @@ def __init__( frequency_token: Optional[int] = None, autoregressive_modeling: bool = True, stride: int = 1, + fill_value: Union[float, int] = 0.0, ): self.frequency_token = frequency_token self.target_columns = target_columns @@ -446,6 +488,7 @@ def __init__( group_id=group_id, drop_cols=drop_cols, stride=stride, + fill_value=fill_value, ) def __getitem__(self, index): @@ -465,9 +508,12 @@ def __getitem__(self, index): seq_y[:, self.y_mask_conditional] = 0 ret = { - "past_values": np_to_torch(seq_x), - "future_values": np_to_torch(seq_y), + "past_values": np_to_torch(np.nan_to_num(seq_x, nan=self.fill_value)), + "future_values": np_to_torch(np.nan_to_num(seq_y, nan=self.fill_value)), + "past_observed_mask": np_to_torch(~np.isnan(seq_x)), + "future_observed_mask": np_to_torch(~np.isnan(seq_y)), } + if self.datetime_col: ret["timestamp"] = self.timestamps[time_id + self.context_length - 1] @@ -599,6 +645,8 @@ def np_to_torch(data: np.array, float_type=np.float32): return torch.from_numpy(data.astype(float_type)) elif data.dtype == "int": return torch.from_numpy(data) + elif data.dtype == "bool": + return torch.from_numpy(data) return torch.from_numpy(data) From 31b41409a70b6f788d2beb0e811a47fb8dafd866 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 12 Jun 2024 13:32:22 -0400 Subject: [PATCH 2/5] complete merge --- tsfm_public/toolkit/dataset.py | 37 +++------------------------------- 1 file changed, 3 insertions(+), 34 deletions(-) diff --git a/tsfm_public/toolkit/dataset.py b/tsfm_public/toolkit/dataset.py index 143b0377..a3debd8c 100644 --- a/tsfm_public/toolkit/dataset.py +++ b/tsfm_public/toolkit/dataset.py @@ -42,11 +42,8 @@ def __init__( context_length: int = 1, prediction_length: int = 0, zero_padding: bool = True, -<<<<<<< HEAD stride: int = 1, -======= fill_value: Union[float, int] = 0.0, ->>>>>>> f75400d (add filling missing and observed mask for forecasting) ): super().__init__() if not isinstance(x_cols, list): @@ -159,11 +156,7 @@ def __init__( context_length: int = 1, prediction_length: int = 1, num_workers: int = 1, -<<<<<<< HEAD -======= - pred_len: int = 0, fill_value: Union[float, int] = 0.0, ->>>>>>> f75400d (add filling missing and observed mask for forecasting) cls=BaseDFDataset, stride: int = 1, **kwargs, @@ -177,16 +170,12 @@ def __init__( # self.y_cols = y_cols self.context_length = context_length self.num_workers = num_workers -<<<<<<< HEAD self.cls = cls self.prediction_length = prediction_length self.stride = stride self.extra_kwargs = kwargs -======= - self.pred_len = pred_len self.fill_value = fill_value self.cls = cls ->>>>>>> f75400d (add filling missing and observed mask for forecasting) # create groupby object if len(id_columns) == 1: @@ -228,14 +217,9 @@ def concat_dataset(self): self.context_length, self.prediction_length, self.drop_cols, -<<<<<<< HEAD self.stride, - self.extra_kwargs, -======= - self.seq_len, - self.pred_len, self.fill_value, ->>>>>>> f75400d (add filling missing and observed mask for forecasting) + self.extra_kwargs, ) for group_id, group in group_df ], @@ -250,24 +234,14 @@ def get_group_data( cls, group, group_id, -<<<<<<< HEAD id_columns: List[str] = [], timestamp_column: Optional[str] = None, context_length: int = 1, prediction_length: int = 1, drop_cols: Optional[List[str]] = None, stride: int = 1, + fill_value: Union[float, int] = 0.0, extra_kwargs: Dict[str, Any] = {}, -======= - datetime_col: str, - id_columns: List[str], - x_cols: list, - y_cols: list, - drop_cols: list, - seq_len: int, - pred_len: int, - fill_value: Union[float, int], ->>>>>>> f75400d (add filling missing and observed mask for forecasting) ): return cls( data_df=group, @@ -277,14 +251,9 @@ def get_group_data( context_length=context_length, prediction_length=prediction_length, drop_cols=drop_cols, -<<<<<<< HEAD stride=stride, - **extra_kwargs, -======= - seq_len=seq_len, - pred_len=pred_len, fill_value=fill_value, ->>>>>>> f75400d (add filling missing and observed mask for forecasting) + **extra_kwargs, ) From cd7de3b2d21b525a614c80d78b83fe12c3b85261 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 12 Jun 2024 14:00:45 -0400 Subject: [PATCH 3/5] fix issue with fill_value --- tsfm_public/toolkit/dataset.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tsfm_public/toolkit/dataset.py b/tsfm_public/toolkit/dataset.py index a3debd8c..438d8571 100644 --- a/tsfm_public/toolkit/dataset.py +++ b/tsfm_public/toolkit/dataset.py @@ -280,6 +280,7 @@ def __init__( context_length: int = 1, num_workers: int = 1, stride: int = 1, + fill_value: Union[float, int] = 0.0, ): super().__init__( data_df=data, @@ -291,6 +292,7 @@ def __init__( cls=self.BasePretrainDFDataset, target_columns=target_columns, stride=stride, + fill_value=fill_value, ) self.n_inp = 1 @@ -306,6 +308,7 @@ def __init__( timestamp_column: Optional[str] = None, target_columns: List[str] = [], stride: int = 1, + fill_value: Union[float, int] = 0.0, ): self.target_columns = target_columns @@ -323,12 +326,16 @@ def __init__( group_id=group_id, drop_cols=drop_cols, stride=stride, + fill_value=fill_value, ) def __getitem__(self, index): time_id = index * self.stride seq_x = self.X[time_id : time_id + self.context_length].values - ret = {"past_values": np_to_torch(seq_x)} + ret = { + "past_values": np_to_torch(seq_x), + "past_observed_mask": np_to_torch(~np.isnan(seq_x)), + } if self.datetime_col: ret["timestamp"] = self.timestamps[time_id + self.context_length - 1] if self.group_id: From 23dcd60e15bef136afcf5863540929349fd65b8e Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 12 Jun 2024 14:01:06 -0400 Subject: [PATCH 4/5] add test --- tests/toolkit/test_dataset.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/toolkit/test_dataset.py b/tests/toolkit/test_dataset.py index 1fc452f0..8486396d 100644 --- a/tests/toolkit/test_dataset.py +++ b/tests/toolkit/test_dataset.py @@ -193,6 +193,32 @@ def test_forecasting_df_dataset_stride(ts_data_with_categorical): np.testing.assert_allclose(ds_past_np, ds_past_np_expected) +def test_forecasting_observed_mask(ts_data_with_categorical): + prediction_length = 2 + context_length = 5 + target_columns = ["value2", "value3"] + + df = ts_data_with_categorical.copy() + df.loc[10, "value3"] = np.nan + + ds = ForecastDFDataset( + df, + timestamp_column="timestamp", + id_columns=["id"], + target_columns=target_columns, + context_length=context_length, + prediction_length=prediction_length, + ) + + # check matching size + assert ds[0]["past_observed_mask"].shape == ds[0]["past_values"].shape + assert ds[0]["future_observed_mask"].shape == ds[0]["future_values"].shape + + # Check mask is correct + np.testing.assert_allclose(ds[4]["future_observed_mask"], np.array([[True, True], [True, False]])) + np.testing.assert_allclose(ds[6]["past_observed_mask"][-1, :], np.array([True, False])) + + def test_forecasting_df_dataset_non_autoregressive(ts_data_with_categorical): prediction_length = 2 target_columns = ["value1"] From ec29125d04c688340fb93b110ff76c9371af87ac Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 12 Jun 2024 15:03:08 -0400 Subject: [PATCH 5/5] check fill value --- tests/toolkit/test_dataset.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/toolkit/test_dataset.py b/tests/toolkit/test_dataset.py index 8486396d..29e2a337 100644 --- a/tests/toolkit/test_dataset.py +++ b/tests/toolkit/test_dataset.py @@ -196,6 +196,7 @@ def test_forecasting_df_dataset_stride(ts_data_with_categorical): def test_forecasting_observed_mask(ts_data_with_categorical): prediction_length = 2 context_length = 5 + fill_value = 0.0 target_columns = ["value2", "value3"] df = ts_data_with_categorical.copy() @@ -208,6 +209,7 @@ def test_forecasting_observed_mask(ts_data_with_categorical): target_columns=target_columns, context_length=context_length, prediction_length=prediction_length, + fill_value=fill_value, ) # check matching size @@ -218,6 +220,23 @@ def test_forecasting_observed_mask(ts_data_with_categorical): np.testing.assert_allclose(ds[4]["future_observed_mask"], np.array([[True, True], [True, False]])) np.testing.assert_allclose(ds[6]["past_observed_mask"][-1, :], np.array([True, False])) + # Check mask value is correct + ds[4]["future_values"][1, 1] == fill_value + + # Check mask value is correct again + fill_value = -100.0 + ds = ForecastDFDataset( + df, + timestamp_column="timestamp", + id_columns=["id"], + target_columns=target_columns, + context_length=context_length, + prediction_length=prediction_length, + fill_value=fill_value, + ) + + ds[4]["future_values"][1, 1] == fill_value + def test_forecasting_df_dataset_non_autoregressive(ts_data_with_categorical): prediction_length = 2