From f10005cfc1511bf40d0bf8f2639503446357fbb6 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 20:13:41 -0400 Subject: [PATCH] handle scaling of lists, update tests Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 6 +- .../toolkit/test_time_series_preprocessor.py | 50 +++---- .../toolkit/time_series_preprocessor.py | 133 +++++------------- 3 files changed, 55 insertions(+), 134 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index 7a143e5b..1ae61a6a 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -16,8 +16,7 @@ def ts_data(): { "id": nreps(["A", "B", "C"], 50), "id2": nreps(["XX", "YY", "ZZ"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 3, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, "value1": range(150), "value2": np.arange(150) ** 2 / 3 + 10, } @@ -44,8 +43,7 @@ def ts_data_runs(): { "run_id": nreps(["1", "2", "3", "4"], 50), "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 4, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, "value1": range(200), } ) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index c6439e16..9c052fec 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -27,9 +27,9 @@ def test_standard_scaler(sample_data): # check shape preserved result = scaler.fit_transform(sample_data[columns]) assert result.shape == sample_data[columns].shape - expected = ( - sample_data[columns].values - np.mean(sample_data[columns].values, axis=0) - ) / np.std(sample_data[columns].values, axis=0) + expected = (sample_data[columns].values - np.mean(sample_data[columns].values, axis=0)) / np.std( + sample_data[columns].values, axis=0 + ) np.testing.assert_allclose(result, expected) # check serialization @@ -100,12 +100,8 @@ def test_time_series_preprocessor_scales(ts_data): # check scaled result out = tsp.preprocess(df) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0 - ) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0 - ) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) # check inverse scale result out_inv = tsp.inverse_scale_targets(out) @@ -122,9 +118,7 @@ def test_time_series_preprocessor_scales(ts_data): suffix = "_foo" targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] - out.columns = [ - f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns - ] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] out_inv = tsp.inverse_scale_targets(out, suffix=suffix) assert np.all( out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) @@ -150,19 +144,18 @@ def test_time_series_preprocessor_inv_scales_lists(ts_data): out = tsp.preprocess(df) # construct artificial result - out["value1"] = out["value1"].apply(lambda x: np.array([x, x])) - out["value2"] = out["value2"].apply(lambda x: np.array([x, x])) + out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3)) + out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3)) out_inv = tsp.inverse_scale_targets(out) - 1 + assert out_inv["value1"].mean()[0] == df["value1"].mean() + assert out_inv["value2"].mean()[0] == df["value2"].mean() def test_augment_time_series(ts_data): periods = 5 - a = extend_time_series( - ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods - ) + a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) # check that length increases by periods for each id assert a.shape[0] == ts_data.shape[0] + 3 * periods @@ -249,9 +242,7 @@ def test_get_datasets(ts_data): ) # 3 time series of length 50 - assert len(train) == 3 * ( - int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) assert len(valid) == len(test) @@ -276,10 +267,7 @@ def test_get_datasets(ts_data): # new train length should be 20% of 100, minus the usual for context length and prediction length fewshot_train_size = ( - int((100 - tsp.context_length) * 0.2) - + tsp.context_length - - (tsp.context_length + tsp.prediction_length) - + 1 + int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 ) assert len(train) == fewshot_train_size @@ -326,15 +314,11 @@ def test_get_datasets(ts_data): }, ) - assert ( - len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert ( - len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 - ) + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 def test_train_without_targets(ts_data): @@ -397,9 +381,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets( - df, split_config={"train": 0.7, "test": 0.2} - ) + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index db6c8f82..b88dcad7 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -56,9 +56,7 @@ def to_json(self) -> str: return json.dumps(self.to_dict()) @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "SKLearnFeatureExtractionBase": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "SKLearnFeatureExtractionBase": """ """ t = cls() @@ -128,14 +126,10 @@ def __init__( # note base class __init__ methods sets all arguments as attributes if not isinstance(id_columns, list): - raise ValueError( - f"Invalid argument provided for `id_columns`: {id_columns}" - ) + raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") if isinstance(timestamp_column, list): - raise ValueError( - f"`timestamp_column` should not be a list, received: {timestamp_column}" - ) + raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") self.id_columns = id_columns self.timestamp_column = timestamp_column @@ -152,11 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type - self.scaling_id_columns = ( - scaling_id_columns - if scaling_id_columns is not None - else copy.copy(id_columns) - ) + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -233,10 +223,7 @@ def recursive_check_ndarray(dictionary): elif isinstance(value, np.int64): dictionary[key] = int(value) elif isinstance(value, list): - dictionary[key] = [ - vv.tolist() if isinstance(vv, np.ndarray) else vv - for vv in value - ] + dictionary[key] = [vv.tolist() if isinstance(vv, np.ndarray) else vv for vv in value] elif isinstance(value, dict): dictionary[key] = recursive_check_ndarray(value) return dictionary @@ -252,9 +239,7 @@ def recursive_check_ndarray(dictionary): return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "PreTrainedFeatureExtractor": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "PreTrainedFeatureExtractor": """ Instantiates a type of [`~feature_extraction_utils.FeatureExtractionMixin`] from a Python dictionary of parameters. @@ -373,9 +358,7 @@ def _get_groups( """ if self.scaling_id_columns: group_by_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -440,9 +423,7 @@ def get_frequency_token(self, token_name: str): token = self.frequency_mapping.get(token_name, None) if token is None: - warn( - f"Frequency token {token_name} was not found in the frequncy token mapping." - ) + warn(f"Frequency token {token_name} was not found in the frequncy token mapping.") token = self.frequency_mapping["oov"] return token @@ -475,11 +456,7 @@ def exogenous_channel_indices(self) -> List[int]: @property def prediction_channel_indices(self) -> List[int]: - return [ - i - for i, c in enumerate(self._get_real_valued_dynamic_channels()) - if c in self.target_columns - ] + return [i for i, c in enumerate(self._get_real_valued_dynamic_channels()) if c in self.target_columns] def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): """Basic checks for input dataset. @@ -504,9 +481,7 @@ def _set_targets(self, dataset: pd.DataFrame) -> None: skip_columns.extend(self.conditional_columns) skip_columns.extend(self.static_categorical_columns) - self.target_columns = [ - c for c in dataset.columns.to_list() if c not in skip_columns - ] + self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: @@ -518,10 +493,7 @@ def _estimate_frequency(self, df: pd.DataFrame): df_subset = df # to do: make more robust - self.freq = ( - df_subset[self.timestamp_column].iloc[-1] - - df_subset[self.timestamp_column].iloc[-2] - ) + self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] if not isinstance(self.freq, (str, int)): self.freq = str(self.freq) @@ -582,6 +554,14 @@ def inverse_scale_targets( col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + def explode_row(df_row, name, columns): + df = pd.DataFrame(df_row[columns].to_dict()) + inv_scale = self.target_scaler_dict[name].inverse_transform(df) + df_out = df_row.copy() + for idx, c in enumerate(columns): + df_out[c] = inv_scale[:, idx] + return df_out + def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) @@ -589,24 +569,15 @@ def inverse_scale_func(grp, id_columns): name = grp.iloc[0][id_columns] if not np.any(col_has_list): - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform( - grp[cols_to_scale] - ) + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) else: - # resort to rowwise transformation - # need converstion to array of list to single 2D array, then back to array of list - # at this point, all column elements should have same length grp[cols_to_scale] = grp[cols_to_scale].apply( - lambda x: self.target_scaler_dict[name].inverse_transform(x) + lambda x: explode_row(x, name, cols_to_scale), axis="columns" ) return grp if self.scaling_id_columns: - id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] - ) + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -644,21 +615,15 @@ def scale_func(grp, id_columns): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[self.target_columns] = self.target_scaler_dict[name].transform( - grp[self.target_columns] - ) + grp[self.target_columns] = self.target_scaler_dict[name].transform(grp[self.target_columns]) if other_cols_to_scale: - grp[other_cols_to_scale] = self.scaler_dict[name].transform( - grp[other_cols_to_scale] - ) + grp[other_cols_to_scale] = self.scaler_dict[name].transform(grp[other_cols_to_scale]) return grp if self.scaling_id_columns: id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: id_columns = INTERNAL_ID_COLUMN @@ -672,9 +637,7 @@ def scale_func(grp, id_columns): cols_to_encode = self._get_columns_to_encode() if self.encode_categorical and cols_to_encode: if not self.categorical_encoder: - raise RuntimeError( - "Attempt to encode categorical columns, but the encoder has not been trained yet." - ) + raise RuntimeError("Attempt to encode categorical columns, but the encoder has not been trained yet.") df[cols_to_encode] = self.categorical_encoder.transform(df[cols_to_encode]) return df @@ -720,34 +683,20 @@ def get_datasets( data = self._standardize_dataframe(dataset) if not self.context_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null context_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") if not self.prediction_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null prediction_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") # get split_params - split_params, split_function = get_split_params( - split_config, context_length=self.context_length - ) + split_params, split_function = get_split_params(split_config, context_length=self.context_length) # split data if isinstance(split_function, dict): - train_data = split_function["train"]( - data, id_columns=self.id_columns, **split_params["train"] - ) - valid_data = split_function["valid"]( - data, id_columns=self.id_columns, **split_params["valid"] - ) - test_data = split_function["test"]( - data, id_columns=self.id_columns, **split_params["test"] - ) + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) else: - train_data, valid_data, test_data = split_function( - data, id_columns=self.id_columns, **split_params - ) + train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) # data preprocessing self.train(train_data) @@ -766,9 +715,7 @@ def get_datasets( # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError( - f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}" - ) + raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") train_data = select_by_fixed_fraction( train_data, @@ -798,17 +745,13 @@ def get_datasets( def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None, - time_sequence: Optional[ - Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]] - ] = None, + time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None, periods: int = 1, ): """Simple utility to create a list of timestamps based on start, delta and number of periods""" if freq is None and time_sequence is None: - raise ValueError( - "Neither `freq` nor `time_sequence` provided, cannot determine frequency." - ) + raise ValueError("Neither `freq` nor `time_sequence` provided, cannot determine frequency.") if freq is None: # to do: make more robust @@ -874,9 +817,7 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]): if grouping_columns == []: new_time_series = augment_one_series(time_series) else: - new_time_series = time_series.groupby(grouping_columns).apply( - augment_one_series, include_groups=False - ) + new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False) idx_names = list(new_time_series.index.names) idx_names[-1] = "__delete" new_time_series = new_time_series.reset_index(names=idx_names)