Skip to content

Commit

Permalink
Merge pull request #26 from IBM/scaling_updates
Browse files Browse the repository at this point in the history
Scaling updates
  • Loading branch information
wgifford authored Apr 3, 2024
2 parents e1f28a4 + f10005c commit f43506c
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 11 deletions.
15 changes: 14 additions & 1 deletion tests/toolkit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def ts_data():
"id2": nreps(["XX", "YY", "ZZ"], 50),
"timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3,
"value1": range(150),
"value2": np.arange(150) / 3 + 10,
"value2": np.arange(150) ** 2 / 3 + 10,
}
)
return df
Expand All @@ -35,3 +35,16 @@ def sample_data():
}
)
return df


@pytest.fixture(scope="module")
def ts_data_runs():
df = pd.DataFrame(
{
"run_id": nreps(["1", "2", "3", "4"], 50),
"asset_id": nreps(["foo", "bar", "foo", "bar"], 50),
"timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4,
"value1": range(200),
}
)
return df
88 changes: 88 additions & 0 deletions tests/toolkit/test_time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,75 @@ def test_time_series_preprocessor_encodes(sample_data):
assert sample_prep[c].dtype == float


def test_time_series_preprocessor_scales(ts_data):
df = ts_data

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["id", "id2"],
target_columns=["value1", "value2"],
scaling=True,
)

tsp.train(df)

# check scaled result
out = tsp.preprocess(df)
assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0)
assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0)

# check inverse scale result
out_inv = tsp.inverse_scale_targets(out)
assert np.all(
out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
)
assert np.all(
out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x))
)

# check inverse scale result, with suffix

suffix = "_foo"
targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns]
out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns]
out_inv = tsp.inverse_scale_targets(out, suffix=suffix)
assert np.all(
out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
)


def test_time_series_preprocessor_inv_scales_lists(ts_data):
df = ts_data

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["id", "id2"],
target_columns=["value1", "value2"],
scaling=True,
)

tsp.train(df)

# check scaled result
out = tsp.preprocess(df)

# construct artificial result
out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3))
out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3))

out_inv = tsp.inverse_scale_targets(out)

assert out_inv["value1"].mean()[0] == df["value1"].mean()
assert out_inv["value2"].mean()[0] == df["value2"].mean()


def test_augment_time_series(ts_data):
periods = 5
a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods)
Expand Down Expand Up @@ -297,3 +366,22 @@ def test_get_datasets_without_targets(ts_data):
train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2})

train.datasets[0].target_columns == ["value1", "value2"]


def test_id_columns_and_scaling_id_columns(ts_data_runs):
df = ts_data_runs

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["asset_id", "run_id"],
scaling_id_columns=["asset_id"],
target_columns=["value1"],
scaling=True,
)

ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2})

assert len(tsp.target_scaler_dict) == 2
assert len(ds_train.datasets) == 4
5 changes: 5 additions & 0 deletions tsfm_public/toolkit/time_series_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,9 @@ def postprocess(self, input, **kwargs):
cols_ordered.extend([c for c in cols if c not in cols_ordered])

out = out[cols_ordered]

# inverse scale if we have a feature extractor
if self.feature_extractor is not None:
out = self.feature_extractor.inverse_scale_targets(out, column_suffix="_prediction")

return out
44 changes: 35 additions & 9 deletions tsfm_public/toolkit/time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __init__(
scaling: bool = False,
# scale_outputs: bool = False,
scaler_type: ScalerType = ScalerType.STANDARD.value,
scaling_id_columns: Optional[List[str]] = None,
encode_categorical: bool = True,
time_series_task: str = TimeSeriesTask.FORECASTING.value,
frequency_mapping: Dict[str, int] = DEFAULT_FREQUENCY_MAPPING,
Expand Down Expand Up @@ -145,6 +146,7 @@ def __init__(
self.time_series_task = time_series_task
# self.scale_outputs = scale_outputs
self.scaler_type = scaler_type
self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns)

# we maintain two scalers per time series to facilitate inverse scaling of the targets
self.scaler_dict = {}
Expand Down Expand Up @@ -354,8 +356,10 @@ def _get_groups(
Yields:
Generator[Any, pd.DataFrame]: Group name and resulting pandas dataframe for the group.
"""
if self.id_columns:
group_by_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0]
if self.scaling_id_columns:
group_by_columns = (
self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0]
)
else:
group_by_columns = INTERNAL_ID_COLUMN

Expand Down Expand Up @@ -529,12 +533,14 @@ def train(

return self

def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[Dataset, pd.DataFrame]:
def inverse_scale_targets(
self, dataset: Union[Dataset, pd.DataFrame], suffix: Optional[str] = None
) -> Union[Dataset, pd.DataFrame]:
self._check_dataset(dataset)
df = self._standardize_dataframe(dataset)

if not self.scaling:
return df
return dataset

if len(self.target_scaler_dict) == 0:
# trying to inverse scale but this preprocessor is not set up for scaling
Expand All @@ -543,17 +549,35 @@ def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[
)

cols_to_scale = self.target_columns
if suffix is not None:
cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale]

col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale]

def explode_row(df_row, name, columns):
df = pd.DataFrame(df_row[columns].to_dict())
inv_scale = self.target_scaler_dict[name].inverse_transform(df)
df_out = df_row.copy()
for idx, c in enumerate(columns):
df_out[c] = inv_scale[:, idx]
return df_out

def inverse_scale_func(grp, id_columns):
if isinstance(id_columns, list):
name = tuple(grp.iloc[0][id_columns].tolist())
else:
name = grp.iloc[0][id_columns]
grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale])

if not np.any(col_has_list):
grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale])
else:
grp[cols_to_scale] = grp[cols_to_scale].apply(
lambda x: explode_row(x, name, cols_to_scale), axis="columns"
)
return grp

if self.id_columns:
id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0]
if self.scaling_id_columns:
id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0]
else:
id_columns = INTERNAL_ID_COLUMN

Expand Down Expand Up @@ -597,8 +621,10 @@ def scale_func(grp, id_columns):

return grp

if self.id_columns:
id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0]
if self.scaling_id_columns:
id_columns = (
self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0]
)
else:
id_columns = INTERNAL_ID_COLUMN

Expand Down
2 changes: 1 addition & 1 deletion tsfm_public/toolkit/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def train_test_split(
)
)

result_train, result_valid, result_test = zip(**result)
result_train, result_valid, result_test = zip(*result)
return pd.concat(result_train), pd.concat(result_valid), pd.concat(result_test)


Expand Down

0 comments on commit f43506c

Please sign in to comment.