Skip to content

Commit

Permalink
Merge pull request #90 from ibm-granite/pipeline_enhancement
Browse files Browse the repository at this point in the history
Pipeline enhancement
  • Loading branch information
wgifford authored Jul 26, 2024
2 parents e8e8a22 + 8176f85 commit 5493539
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions tsfm_public/toolkit/time_series_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
dataset = self.preprocess(inputs, **preprocess_params)

batch_size = forward_params["batch_size"]
num_workers = forward_params["num_workers"]
signature = inspect.signature(self.model.forward)
signature_columns = list(signature.parameters.keys())

Expand All @@ -64,10 +65,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
signature_columns=signature_columns,
logger=None,
description=None,
model_name=self.model.__class__.__name__,
model_name=None,
)
dataloader = DataLoader(
dataset, num_workers=1, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False
dataset, batch_size=batch_size, num_workers=num_workers, collate_fn=remove_columns_collator, shuffle=False
)

# iterate over dataloader
Expand Down Expand Up @@ -103,11 +104,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False)
)
class TimeSeriesForecastingPipeline(TimeSeriesPipeline):
"""Hugging Face Pipeline for Time Series Forecasting
"""
Time Series Forecasting using HF time series forecasting models. This pipeline consumes a `pandas.DataFrame`
containing the time series data and produces a new `pandas.DataFrame` containing the forecasts.
feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time
series should be prepared. If this is provided, and of the other options below will be automatically
populated from this instance.
"""

def __init__(
Expand All @@ -116,11 +116,13 @@ def __init__(
freq: Optional[str] = None,
explode_forecasts: bool = False,
inverse_scale_outputs: bool = True,
add_known_ground_truth: bool = True,
**kwargs,
):
kwargs["freq"] = freq
kwargs["explode_forecasts"] = explode_forecasts
kwargs["inverse_scale_outputs"] = inverse_scale_outputs
kwargs["add_known_ground_truth"] = add_known_ground_truth
super().__init__(*args, **kwargs)

if self.framework == "tf":
Expand All @@ -132,14 +134,30 @@ def _sanitize_parameters(
self,
**kwargs,
):
"""Assign parameters to the different parts of the process.
"""Assigns parameters to the different steps of the process. If context_length and prediction_length
are not provided they are taken from the model config.
For expected parameters see the call method below.
"""

context_length = kwargs.get("context_length", self.model.config.context_length)
prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length)

# autopopulate from feature extractor
if self.feature_extractor:
for p in [
"id_columns",
"timestamp_column",
"target_columns",
"observable_columns",
"control_columns",
"conditional_columns",
"static_categorical_columns",
"freq",
]:
if p not in kwargs:
kwargs[p] = getattr(self.feature_extractor, p)

preprocess_kwargs = {
"prediction_length": prediction_length,
"context_length": context_length,
Expand Down Expand Up @@ -170,6 +188,7 @@ def _sanitize_parameters(
"freq",
"explode_forecasts",
"inverse_scale_outputs",
"add_known_ground_truth",
]

for c in preprocess_params:
Expand All @@ -188,7 +207,14 @@ def _sanitize_parameters(
else:
batch_size = self._batch_size

forward_kwargs = {"batch_size": batch_size}
num_workers = kwargs.get("num_workers", self._num_workers)
if num_workers is None:
if self._num_workers is None:
num_workers = 0
else:
num_workers = self._num_workers

forward_kwargs = {"batch_size": batch_size, "num_workers": num_workers}

# if "id_columns" in kwargs:
# preprocess_kwargs["id_columns"] = kwargs["id_columns"]
Expand Down Expand Up @@ -225,7 +251,7 @@ def __call__(
i.e., exogenous or supporting features which are known in advance.
feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time
series should be prepared. If this is provided, and of the other options below will be automatically
series should be prepared. If this is provided, any of the other options below will be automatically
populated from this instance.
timestamp_column (str): The name of the column containing the timestamp of the time series.
Expand Down Expand Up @@ -266,6 +292,10 @@ def __call__(
inverse_scale_outputs (bool): If true and a valid feature extractor is provided, the outputs will be inverse scaled.
add_known_ground_truth (bool): If True add columns containing the ground truth data. Prediction columns will have a
suffix of "_prediction". Default True. If false, on columns containing predictions are produced, no suffix is
added.
Return (pandas dataframe):
A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original
input feature values and the output forecast for each input column. The output forecast is a list containing
Expand All @@ -285,6 +315,8 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li
id_columns = kwargs.get("id_columns")
# context_length = kwargs.get("context_length")

# use the feature extractor here

if isinstance(time_series, str):
time_series = pd.read_csv(
time_series,
Expand Down Expand Up @@ -326,17 +358,6 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li
**kwargs,
)

# # stack all the outputs
# # torch tensors are stacked, but other values are passed through as a list
# first = dataset[0]
# full_output = {}
# for k, v in first.items():
# if isinstance(v, torch.Tensor):
# full_output[k] = torch.stack(tuple(r[k] for r in dataset))
# else:
# full_output[k] = [r[k] for r in dataset]

# return full_output
return dataset

def _forward(self, model_inputs, **kwargs):
Expand All @@ -350,29 +371,6 @@ def _forward(self, model_inputs, **kwargs):
original input keys.
"""

# Eventually we should use inspection somehow
# inspect.signature(model_forward).parameters.keys()
# model_input_keys = {
# "past_values",
# "static_categorical_values",
# "freq_token",
# } # todo: this should not be hardcoded

# signature = inspect.signature(self.model.forward)
# model_input_keys = list(signature.parameters.keys())

# model_inputs_only = {}
# for k in model_input_keys:
# if k in model_inputs:
# model_inputs_only[k] = model_inputs[k]

# model_outputs = self.model(**model_inputs_only)

# # copy the other inputs
# copy_inputs = True
# for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]:
# model_outputs[k] = model_inputs[k]

model_outputs = self.model(**model_inputs)

return model_outputs
Expand All @@ -392,14 +390,16 @@ def postprocess(self, input, **kwargs):

# name the predictions of target columns
# outputs should only have size equal to target columns

prediction_columns = []
for i, c in enumerate(kwargs["target_columns"]):
prediction_columns.append(f"{c}_prediction")
prediction_columns.append(f"{c}_prediction" if kwargs["add_known_ground_truth"] else c)
out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist()
# provide the ground truth values for the targets
# when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future
for i, c in enumerate(kwargs["target_columns"]):
out[c] = input["future_values"][:, :, i].numpy().tolist()
if kwargs["add_known_ground_truth"]:
for i, c in enumerate(kwargs["target_columns"]):
out[c] = input["future_values"][:, :, i].numpy().tolist()

if "timestamp_column" in kwargs:
out[kwargs["timestamp_column"]] = input["timestamp"]
Expand Down

0 comments on commit 5493539

Please sign in to comment.