From 1d70242dca4e24b015f714660dd459261bfdbe80 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Mon, 1 Apr 2024 17:03:01 -0400 Subject: [PATCH 1/5] add scaling_id_columns and test Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 13 +++++++++++++ .../toolkit/test_time_series_preprocessor.py | 19 +++++++++++++++++++ .../toolkit/time_series_preprocessor.py | 18 ++++++++++++------ 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index aeb7c883..fc5a0960 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -35,3 +35,16 @@ def sample_data(): } ) return df + + +@pytest.fixture(scope="module") +def ts_data_runs(): + df = pd.DataFrame( + { + "run_id": nreps(["1", "2", "3", "4"], 50), + "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, + "value1": range(200), + } + ) + return df diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 86c33d55..6a113dfe 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -297,3 +297,22 @@ def test_get_datasets_without_targets(ts_data): train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2}) train.datasets[0].target_columns == ["value1", "value2"] + + +def test_id_columns_and_scaling_id_columns(ts_data_runs): + df = ts_data_runs + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["asset_id", "run_id"], + scaling_id_columns=["asset_id"], + target_columns=["value1"], + scaling=True, + ) + + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + + assert len(tsp.target_scaler_dict) == 2 + assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 6f5451a6..e12bc1cb 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -116,6 +116,7 @@ def __init__( scaling: bool = False, # scale_outputs: bool = False, scaler_type: ScalerType = ScalerType.STANDARD.value, + scaling_id_columns: Optional[List[str]] = None, encode_categorical: bool = True, time_series_task: str = TimeSeriesTask.FORECASTING.value, frequency_mapping: Dict[str, int] = DEFAULT_FREQUENCY_MAPPING, @@ -145,6 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -354,8 +356,10 @@ def _get_groups( Yields: Generator[Any, pd.DataFrame]: Group name and resulting pandas dataframe for the group. """ - if self.id_columns: - group_by_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + group_by_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -552,8 +556,8 @@ def inverse_scale_func(grp, id_columns): grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -597,8 +601,10 @@ def scale_func(grp, id_columns): return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: id_columns = INTERNAL_ID_COLUMN From 4f209380a0b1b17e9dd4764cf76c23fc6d2b536d Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Mon, 1 Apr 2024 17:03:30 -0400 Subject: [PATCH 2/5] fix unzip bug Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 643b16df..57b2e844 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -231,7 +231,7 @@ def train_test_split( ) ) - result_train, result_valid, result_test = zip(**result) + result_train, result_valid, result_test = zip(*result) return pd.concat(result_train), pd.concat(result_valid), pd.concat(result_test) From f94dbcf3518ac65428b52394d2077be5c2b6618d Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 09:13:55 -0400 Subject: [PATCH 3/5] allow sufix during inverse scaling, add tests Signed-off-by: Wesley M. Gifford --- .../toolkit/test_time_series_preprocessor.py | 42 +++++++++++++++++++ .../toolkit/time_series_preprocessor.py | 8 +++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 6a113dfe..3d56e400 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -84,6 +84,48 @@ def test_time_series_preprocessor_encodes(sample_data): assert sample_prep[c].dtype == float +def test_time_series_preprocessor_scales(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) + + # check inverse scale result + out_inv = tsp.inverse_scale_targets(out) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + ) + + # check inverse scale result, with suffix + + suffix = "_foo" + targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] + out_inv = tsp.inverse_scale_targets(out, suffix=suffix) + assert np.all( + out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + + def test_augment_time_series(ts_data): periods = 5 a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index e12bc1cb..e31ca5a8 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -533,12 +533,14 @@ def train( return self - def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[Dataset, pd.DataFrame]: + def inverse_scale_targets( + self, dataset: Union[Dataset, pd.DataFrame], suffix: Optional[str] = None + ) -> Union[Dataset, pd.DataFrame]: self._check_dataset(dataset) df = self._standardize_dataframe(dataset) if not self.scaling: - return df + return dataset if len(self.target_scaler_dict) == 0: # trying to inverse scale but this preprocessor is not set up for scaling @@ -547,6 +549,8 @@ def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[ ) cols_to_scale = self.target_columns + if suffix is not None: + cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale] def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): From 2c147ec0e9cf6ede82bd14e7be1938e80d2648ed Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 18:45:06 -0400 Subject: [PATCH 4/5] improve inverse scaling, wip Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 8 +- .../toolkit/test_time_series_preprocessor.py | 69 +++++++-- .../time_series_forecasting_pipeline.py | 5 + .../toolkit/time_series_preprocessor.py | 131 ++++++++++++++---- 4 files changed, 170 insertions(+), 43 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index fc5a0960..7a143e5b 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -16,9 +16,10 @@ def ts_data(): { "id": nreps(["A", "B", "C"], 50), "id2": nreps(["XX", "YY", "ZZ"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] + * 3, "value1": range(150), - "value2": np.arange(150) / 3 + 10, + "value2": np.arange(150) ** 2 / 3 + 10, } ) return df @@ -43,7 +44,8 @@ def ts_data_runs(): { "run_id": nreps(["1", "2", "3", "4"], 50), "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] + * 4, "value1": range(200), } ) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 3d56e400..c6439e16 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -27,9 +27,9 @@ def test_standard_scaler(sample_data): # check shape preserved result = scaler.fit_transform(sample_data[columns]) assert result.shape == sample_data[columns].shape - expected = (sample_data[columns].values - np.mean(sample_data[columns].values, axis=0)) / np.std( - sample_data[columns].values, axis=0 - ) + expected = ( + sample_data[columns].values - np.mean(sample_data[columns].values, axis=0) + ) / np.std(sample_data[columns].values, axis=0) np.testing.assert_allclose(result, expected) # check serialization @@ -100,8 +100,12 @@ def test_time_series_preprocessor_scales(ts_data): # check scaled result out = tsp.preprocess(df) - assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) - assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) + assert np.allclose( + out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0 + ) + assert np.allclose( + out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0 + ) # check inverse scale result out_inv = tsp.inverse_scale_targets(out) @@ -118,7 +122,9 @@ def test_time_series_preprocessor_scales(ts_data): suffix = "_foo" targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] - out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] + out.columns = [ + f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns + ] out_inv = tsp.inverse_scale_targets(out, suffix=suffix) assert np.all( out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) @@ -126,9 +132,37 @@ def test_time_series_preprocessor_scales(ts_data): ) +def test_time_series_preprocessor_inv_scales_lists(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + + # construct artificial result + out["value1"] = out["value1"].apply(lambda x: np.array([x, x])) + out["value2"] = out["value2"].apply(lambda x: np.array([x, x])) + + out_inv = tsp.inverse_scale_targets(out) + + 1 + + def test_augment_time_series(ts_data): periods = 5 - a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) + a = extend_time_series( + ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods + ) # check that length increases by periods for each id assert a.shape[0] == ts_data.shape[0] + 3 * periods @@ -215,7 +249,9 @@ def test_get_datasets(ts_data): ) # 3 time series of length 50 - assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) + assert len(train) == 3 * ( + int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(valid) == len(test) @@ -240,7 +276,10 @@ def test_get_datasets(ts_data): # new train length should be 20% of 100, minus the usual for context length and prediction length fewshot_train_size = ( - int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 + int((100 - tsp.context_length) * 0.2) + + tsp.context_length + - (tsp.context_length + tsp.prediction_length) + + 1 ) assert len(train) == fewshot_train_size @@ -287,11 +326,15 @@ def test_get_datasets(ts_data): }, ) - assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + assert ( + len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 + assert ( + len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 + ) def test_train_without_targets(ts_data): @@ -354,7 +397,9 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + ds_train, ds_valid, ds_test = tsp.get_datasets( + df, split_config={"train": 0.7, "test": 0.2} + ) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 7c010f88..04a758cc 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -326,4 +326,9 @@ def postprocess(self, input, **kwargs): cols_ordered.extend([c for c in cols if c not in cols_ordered]) out = out[cols_ordered] + + # inverse scale if we have a feature extractor + if self.feature_extractor is not None: + out = self.feature_extractor.inverse_scale_targets(out, column_suffix="_prediction") + return out diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index e31ca5a8..db6c8f82 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -56,7 +56,9 @@ def to_json(self) -> str: return json.dumps(self.to_dict()) @classmethod - def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "SKLearnFeatureExtractionBase": + def from_dict( + cls, feature_extractor_dict: Dict[str, Any], **kwargs + ) -> "SKLearnFeatureExtractionBase": """ """ t = cls() @@ -126,10 +128,14 @@ def __init__( # note base class __init__ methods sets all arguments as attributes if not isinstance(id_columns, list): - raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") + raise ValueError( + f"Invalid argument provided for `id_columns`: {id_columns}" + ) if isinstance(timestamp_column, list): - raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") + raise ValueError( + f"`timestamp_column` should not be a list, received: {timestamp_column}" + ) self.id_columns = id_columns self.timestamp_column = timestamp_column @@ -146,7 +152,11 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type - self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) + self.scaling_id_columns = ( + scaling_id_columns + if scaling_id_columns is not None + else copy.copy(id_columns) + ) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -223,7 +233,10 @@ def recursive_check_ndarray(dictionary): elif isinstance(value, np.int64): dictionary[key] = int(value) elif isinstance(value, list): - dictionary[key] = [vv.tolist() if isinstance(vv, np.ndarray) else vv for vv in value] + dictionary[key] = [ + vv.tolist() if isinstance(vv, np.ndarray) else vv + for vv in value + ] elif isinstance(value, dict): dictionary[key] = recursive_check_ndarray(value) return dictionary @@ -239,7 +252,9 @@ def recursive_check_ndarray(dictionary): return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" @classmethod - def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "PreTrainedFeatureExtractor": + def from_dict( + cls, feature_extractor_dict: Dict[str, Any], **kwargs + ) -> "PreTrainedFeatureExtractor": """ Instantiates a type of [`~feature_extraction_utils.FeatureExtractionMixin`] from a Python dictionary of parameters. @@ -358,7 +373,9 @@ def _get_groups( """ if self.scaling_id_columns: group_by_columns = ( - self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -423,7 +440,9 @@ def get_frequency_token(self, token_name: str): token = self.frequency_mapping.get(token_name, None) if token is None: - warn(f"Frequency token {token_name} was not found in the frequncy token mapping.") + warn( + f"Frequency token {token_name} was not found in the frequncy token mapping." + ) token = self.frequency_mapping["oov"] return token @@ -456,7 +475,11 @@ def exogenous_channel_indices(self) -> List[int]: @property def prediction_channel_indices(self) -> List[int]: - return [i for i, c in enumerate(self._get_real_valued_dynamic_channels()) if c in self.target_columns] + return [ + i + for i, c in enumerate(self._get_real_valued_dynamic_channels()) + if c in self.target_columns + ] def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): """Basic checks for input dataset. @@ -481,7 +504,9 @@ def _set_targets(self, dataset: pd.DataFrame) -> None: skip_columns.extend(self.conditional_columns) skip_columns.extend(self.static_categorical_columns) - self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] + self.target_columns = [ + c for c in dataset.columns.to_list() if c not in skip_columns + ] def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: @@ -493,7 +518,10 @@ def _estimate_frequency(self, df: pd.DataFrame): df_subset = df # to do: make more robust - self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] + self.freq = ( + df_subset[self.timestamp_column].iloc[-1] + - df_subset[self.timestamp_column].iloc[-2] + ) if not isinstance(self.freq, (str, int)): self.freq = str(self.freq) @@ -552,16 +580,33 @@ def inverse_scale_targets( if suffix is not None: cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale] + col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) + + if not np.any(col_has_list): + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform( + grp[cols_to_scale] + ) + else: + # resort to rowwise transformation + # need converstion to array of list to single 2D array, then back to array of list + # at this point, all column elements should have same length + grp[cols_to_scale] = grp[cols_to_scale].apply( + lambda x: self.target_scaler_dict[name].inverse_transform(x) + ) return grp if self.scaling_id_columns: - id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + id_columns = ( + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] + ) else: id_columns = INTERNAL_ID_COLUMN @@ -599,15 +644,21 @@ def scale_func(grp, id_columns): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[self.target_columns] = self.target_scaler_dict[name].transform(grp[self.target_columns]) + grp[self.target_columns] = self.target_scaler_dict[name].transform( + grp[self.target_columns] + ) if other_cols_to_scale: - grp[other_cols_to_scale] = self.scaler_dict[name].transform(grp[other_cols_to_scale]) + grp[other_cols_to_scale] = self.scaler_dict[name].transform( + grp[other_cols_to_scale] + ) return grp if self.scaling_id_columns: id_columns = ( - self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] ) else: id_columns = INTERNAL_ID_COLUMN @@ -621,7 +672,9 @@ def scale_func(grp, id_columns): cols_to_encode = self._get_columns_to_encode() if self.encode_categorical and cols_to_encode: if not self.categorical_encoder: - raise RuntimeError("Attempt to encode categorical columns, but the encoder has not been trained yet.") + raise RuntimeError( + "Attempt to encode categorical columns, but the encoder has not been trained yet." + ) df[cols_to_encode] = self.categorical_encoder.transform(df[cols_to_encode]) return df @@ -667,20 +720,34 @@ def get_datasets( data = self._standardize_dataframe(dataset) if not self.context_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") + raise ValueError( + "TimeSeriesPreprocessor must be instantiated with non-null context_length" + ) if not self.prediction_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") + raise ValueError( + "TimeSeriesPreprocessor must be instantiated with non-null prediction_length" + ) # get split_params - split_params, split_function = get_split_params(split_config, context_length=self.context_length) + split_params, split_function = get_split_params( + split_config, context_length=self.context_length + ) # split data if isinstance(split_function, dict): - train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) - valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) - test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + train_data = split_function["train"]( + data, id_columns=self.id_columns, **split_params["train"] + ) + valid_data = split_function["valid"]( + data, id_columns=self.id_columns, **split_params["valid"] + ) + test_data = split_function["test"]( + data, id_columns=self.id_columns, **split_params["test"] + ) else: - train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) + train_data, valid_data, test_data = split_function( + data, id_columns=self.id_columns, **split_params + ) # data preprocessing self.train(train_data) @@ -699,7 +766,9 @@ def get_datasets( # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") + raise ValueError( + f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}" + ) train_data = select_by_fixed_fraction( train_data, @@ -729,13 +798,17 @@ def get_datasets( def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None, - time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None, + time_sequence: Optional[ + Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]] + ] = None, periods: int = 1, ): """Simple utility to create a list of timestamps based on start, delta and number of periods""" if freq is None and time_sequence is None: - raise ValueError("Neither `freq` nor `time_sequence` provided, cannot determine frequency.") + raise ValueError( + "Neither `freq` nor `time_sequence` provided, cannot determine frequency." + ) if freq is None: # to do: make more robust @@ -801,7 +874,9 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]): if grouping_columns == []: new_time_series = augment_one_series(time_series) else: - new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False) + new_time_series = time_series.groupby(grouping_columns).apply( + augment_one_series, include_groups=False + ) idx_names = list(new_time_series.index.names) idx_names[-1] = "__delete" new_time_series = new_time_series.reset_index(names=idx_names) From f10005cfc1511bf40d0bf8f2639503446357fbb6 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 20:13:41 -0400 Subject: [PATCH 5/5] handle scaling of lists, update tests Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 6 +- .../toolkit/test_time_series_preprocessor.py | 50 +++---- .../toolkit/time_series_preprocessor.py | 133 +++++------------- 3 files changed, 55 insertions(+), 134 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index 7a143e5b..1ae61a6a 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -16,8 +16,7 @@ def ts_data(): { "id": nreps(["A", "B", "C"], 50), "id2": nreps(["XX", "YY", "ZZ"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 3, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, "value1": range(150), "value2": np.arange(150) ** 2 / 3 + 10, } @@ -44,8 +43,7 @@ def ts_data_runs(): { "run_id": nreps(["1", "2", "3", "4"], 50), "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 4, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, "value1": range(200), } ) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index c6439e16..9c052fec 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -27,9 +27,9 @@ def test_standard_scaler(sample_data): # check shape preserved result = scaler.fit_transform(sample_data[columns]) assert result.shape == sample_data[columns].shape - expected = ( - sample_data[columns].values - np.mean(sample_data[columns].values, axis=0) - ) / np.std(sample_data[columns].values, axis=0) + expected = (sample_data[columns].values - np.mean(sample_data[columns].values, axis=0)) / np.std( + sample_data[columns].values, axis=0 + ) np.testing.assert_allclose(result, expected) # check serialization @@ -100,12 +100,8 @@ def test_time_series_preprocessor_scales(ts_data): # check scaled result out = tsp.preprocess(df) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0 - ) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0 - ) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) # check inverse scale result out_inv = tsp.inverse_scale_targets(out) @@ -122,9 +118,7 @@ def test_time_series_preprocessor_scales(ts_data): suffix = "_foo" targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] - out.columns = [ - f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns - ] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] out_inv = tsp.inverse_scale_targets(out, suffix=suffix) assert np.all( out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) @@ -150,19 +144,18 @@ def test_time_series_preprocessor_inv_scales_lists(ts_data): out = tsp.preprocess(df) # construct artificial result - out["value1"] = out["value1"].apply(lambda x: np.array([x, x])) - out["value2"] = out["value2"].apply(lambda x: np.array([x, x])) + out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3)) + out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3)) out_inv = tsp.inverse_scale_targets(out) - 1 + assert out_inv["value1"].mean()[0] == df["value1"].mean() + assert out_inv["value2"].mean()[0] == df["value2"].mean() def test_augment_time_series(ts_data): periods = 5 - a = extend_time_series( - ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods - ) + a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) # check that length increases by periods for each id assert a.shape[0] == ts_data.shape[0] + 3 * periods @@ -249,9 +242,7 @@ def test_get_datasets(ts_data): ) # 3 time series of length 50 - assert len(train) == 3 * ( - int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) assert len(valid) == len(test) @@ -276,10 +267,7 @@ def test_get_datasets(ts_data): # new train length should be 20% of 100, minus the usual for context length and prediction length fewshot_train_size = ( - int((100 - tsp.context_length) * 0.2) - + tsp.context_length - - (tsp.context_length + tsp.prediction_length) - + 1 + int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 ) assert len(train) == fewshot_train_size @@ -326,15 +314,11 @@ def test_get_datasets(ts_data): }, ) - assert ( - len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert ( - len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 - ) + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 def test_train_without_targets(ts_data): @@ -397,9 +381,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets( - df, split_config={"train": 0.7, "test": 0.2} - ) + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index db6c8f82..b88dcad7 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -56,9 +56,7 @@ def to_json(self) -> str: return json.dumps(self.to_dict()) @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "SKLearnFeatureExtractionBase": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "SKLearnFeatureExtractionBase": """ """ t = cls() @@ -128,14 +126,10 @@ def __init__( # note base class __init__ methods sets all arguments as attributes if not isinstance(id_columns, list): - raise ValueError( - f"Invalid argument provided for `id_columns`: {id_columns}" - ) + raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") if isinstance(timestamp_column, list): - raise ValueError( - f"`timestamp_column` should not be a list, received: {timestamp_column}" - ) + raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") self.id_columns = id_columns self.timestamp_column = timestamp_column @@ -152,11 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type - self.scaling_id_columns = ( - scaling_id_columns - if scaling_id_columns is not None - else copy.copy(id_columns) - ) + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -233,10 +223,7 @@ def recursive_check_ndarray(dictionary): elif isinstance(value, np.int64): dictionary[key] = int(value) elif isinstance(value, list): - dictionary[key] = [ - vv.tolist() if isinstance(vv, np.ndarray) else vv - for vv in value - ] + dictionary[key] = [vv.tolist() if isinstance(vv, np.ndarray) else vv for vv in value] elif isinstance(value, dict): dictionary[key] = recursive_check_ndarray(value) return dictionary @@ -252,9 +239,7 @@ def recursive_check_ndarray(dictionary): return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "PreTrainedFeatureExtractor": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "PreTrainedFeatureExtractor": """ Instantiates a type of [`~feature_extraction_utils.FeatureExtractionMixin`] from a Python dictionary of parameters. @@ -373,9 +358,7 @@ def _get_groups( """ if self.scaling_id_columns: group_by_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -440,9 +423,7 @@ def get_frequency_token(self, token_name: str): token = self.frequency_mapping.get(token_name, None) if token is None: - warn( - f"Frequency token {token_name} was not found in the frequncy token mapping." - ) + warn(f"Frequency token {token_name} was not found in the frequncy token mapping.") token = self.frequency_mapping["oov"] return token @@ -475,11 +456,7 @@ def exogenous_channel_indices(self) -> List[int]: @property def prediction_channel_indices(self) -> List[int]: - return [ - i - for i, c in enumerate(self._get_real_valued_dynamic_channels()) - if c in self.target_columns - ] + return [i for i, c in enumerate(self._get_real_valued_dynamic_channels()) if c in self.target_columns] def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): """Basic checks for input dataset. @@ -504,9 +481,7 @@ def _set_targets(self, dataset: pd.DataFrame) -> None: skip_columns.extend(self.conditional_columns) skip_columns.extend(self.static_categorical_columns) - self.target_columns = [ - c for c in dataset.columns.to_list() if c not in skip_columns - ] + self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: @@ -518,10 +493,7 @@ def _estimate_frequency(self, df: pd.DataFrame): df_subset = df # to do: make more robust - self.freq = ( - df_subset[self.timestamp_column].iloc[-1] - - df_subset[self.timestamp_column].iloc[-2] - ) + self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] if not isinstance(self.freq, (str, int)): self.freq = str(self.freq) @@ -582,6 +554,14 @@ def inverse_scale_targets( col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + def explode_row(df_row, name, columns): + df = pd.DataFrame(df_row[columns].to_dict()) + inv_scale = self.target_scaler_dict[name].inverse_transform(df) + df_out = df_row.copy() + for idx, c in enumerate(columns): + df_out[c] = inv_scale[:, idx] + return df_out + def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) @@ -589,24 +569,15 @@ def inverse_scale_func(grp, id_columns): name = grp.iloc[0][id_columns] if not np.any(col_has_list): - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform( - grp[cols_to_scale] - ) + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) else: - # resort to rowwise transformation - # need converstion to array of list to single 2D array, then back to array of list - # at this point, all column elements should have same length grp[cols_to_scale] = grp[cols_to_scale].apply( - lambda x: self.target_scaler_dict[name].inverse_transform(x) + lambda x: explode_row(x, name, cols_to_scale), axis="columns" ) return grp if self.scaling_id_columns: - id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] - ) + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -644,21 +615,15 @@ def scale_func(grp, id_columns): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[self.target_columns] = self.target_scaler_dict[name].transform( - grp[self.target_columns] - ) + grp[self.target_columns] = self.target_scaler_dict[name].transform(grp[self.target_columns]) if other_cols_to_scale: - grp[other_cols_to_scale] = self.scaler_dict[name].transform( - grp[other_cols_to_scale] - ) + grp[other_cols_to_scale] = self.scaler_dict[name].transform(grp[other_cols_to_scale]) return grp if self.scaling_id_columns: id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: id_columns = INTERNAL_ID_COLUMN @@ -672,9 +637,7 @@ def scale_func(grp, id_columns): cols_to_encode = self._get_columns_to_encode() if self.encode_categorical and cols_to_encode: if not self.categorical_encoder: - raise RuntimeError( - "Attempt to encode categorical columns, but the encoder has not been trained yet." - ) + raise RuntimeError("Attempt to encode categorical columns, but the encoder has not been trained yet.") df[cols_to_encode] = self.categorical_encoder.transform(df[cols_to_encode]) return df @@ -720,34 +683,20 @@ def get_datasets( data = self._standardize_dataframe(dataset) if not self.context_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null context_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") if not self.prediction_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null prediction_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") # get split_params - split_params, split_function = get_split_params( - split_config, context_length=self.context_length - ) + split_params, split_function = get_split_params(split_config, context_length=self.context_length) # split data if isinstance(split_function, dict): - train_data = split_function["train"]( - data, id_columns=self.id_columns, **split_params["train"] - ) - valid_data = split_function["valid"]( - data, id_columns=self.id_columns, **split_params["valid"] - ) - test_data = split_function["test"]( - data, id_columns=self.id_columns, **split_params["test"] - ) + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) else: - train_data, valid_data, test_data = split_function( - data, id_columns=self.id_columns, **split_params - ) + train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) # data preprocessing self.train(train_data) @@ -766,9 +715,7 @@ def get_datasets( # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError( - f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}" - ) + raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") train_data = select_by_fixed_fraction( train_data, @@ -798,17 +745,13 @@ def get_datasets( def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None, - time_sequence: Optional[ - Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]] - ] = None, + time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None, periods: int = 1, ): """Simple utility to create a list of timestamps based on start, delta and number of periods""" if freq is None and time_sequence is None: - raise ValueError( - "Neither `freq` nor `time_sequence` provided, cannot determine frequency." - ) + raise ValueError("Neither `freq` nor `time_sequence` provided, cannot determine frequency.") if freq is None: # to do: make more robust @@ -874,9 +817,7 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]): if grouping_columns == []: new_time_series = augment_one_series(time_series) else: - new_time_series = time_series.groupby(grouping_columns).apply( - augment_one_series, include_groups=False - ) + new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False) idx_names = list(new_time_series.index.names) idx_names[-1] = "__delete" new_time_series = new_time_series.reset_index(names=idx_names)