From 52214ca0aa44403d78d77a07c3df9bdc23e32c9e Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 25 Jul 2024 15:43:43 -0400 Subject: [PATCH 1/4] unset num_workers --- .../time_series_forecasting_pipeline.py | 29 ++----------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 35bb2ec9..9226eaa2 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -64,11 +64,9 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para signature_columns=signature_columns, logger=None, description=None, - model_name=self.model.__class__.__name__, - ) - dataloader = DataLoader( - dataset, num_workers=1, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False + model_name=None, ) + dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False) # iterate over dataloader it = iter(dataloader) @@ -350,29 +348,6 @@ def _forward(self, model_inputs, **kwargs): original input keys. """ - # Eventually we should use inspection somehow - # inspect.signature(model_forward).parameters.keys() - # model_input_keys = { - # "past_values", - # "static_categorical_values", - # "freq_token", - # } # todo: this should not be hardcoded - - # signature = inspect.signature(self.model.forward) - # model_input_keys = list(signature.parameters.keys()) - - # model_inputs_only = {} - # for k in model_input_keys: - # if k in model_inputs: - # model_inputs_only[k] = model_inputs[k] - - # model_outputs = self.model(**model_inputs_only) - - # # copy the other inputs - # copy_inputs = True - # for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]: - # model_outputs[k] = model_inputs[k] - model_outputs = self.model(**model_inputs) return model_outputs From e29b2573297b91056bf102c56e4448e646513bf6 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 25 Jul 2024 15:51:58 -0400 Subject: [PATCH 2/4] option to add ground truth data when available --- .../toolkit/time_series_forecasting_pipeline.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 9226eaa2..39e3e0e5 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -114,11 +114,13 @@ def __init__( freq: Optional[str] = None, explode_forecasts: bool = False, inverse_scale_outputs: bool = True, + add_known_ground_truth: bool = True, **kwargs, ): kwargs["freq"] = freq kwargs["explode_forecasts"] = explode_forecasts kwargs["inverse_scale_outputs"] = inverse_scale_outputs + kwargs["add_known_ground_truth"] = add_known_ground_truth super().__init__(*args, **kwargs) if self.framework == "tf": @@ -168,6 +170,7 @@ def _sanitize_parameters( "freq", "explode_forecasts", "inverse_scale_outputs", + "add_known_ground_truth", ] for c in preprocess_params: @@ -367,14 +370,16 @@ def postprocess(self, input, **kwargs): # name the predictions of target columns # outputs should only have size equal to target columns + prediction_columns = [] for i, c in enumerate(kwargs["target_columns"]): - prediction_columns.append(f"{c}_prediction") + prediction_columns.append(f"{c}_prediction" if kwargs["add_known_ground_truth"] else c) out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist() # provide the ground truth values for the targets # when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future - for i, c in enumerate(kwargs["target_columns"]): - out[c] = input["future_values"][:, :, i].numpy().tolist() + if kwargs["add_known_ground_truth"]: + for i, c in enumerate(kwargs["target_columns"]): + out[c] = input["future_values"][:, :, i].numpy().tolist() if "timestamp_column" in kwargs: out[kwargs["timestamp_column"]] = input["timestamp"] From 493a2f78512379f7f73da3f336180c758444bcd4 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 25 Jul 2024 22:38:28 -0400 Subject: [PATCH 3/4] auto-set some parameters when preprocessor is passed --- .../time_series_forecasting_pipeline.py | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 39e3e0e5..6f9961da 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -101,11 +101,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) ) class TimeSeriesForecastingPipeline(TimeSeriesPipeline): - """Hugging Face Pipeline for Time Series Forecasting + """ + Time Series Forecasting using HF time series forecasting models. This pipeline consumes a `pandas.DataFrame` + containing the time series data and produces a new `pandas.DataFrame` containing the forecasts. - feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically - populated from this instance. """ def __init__( @@ -132,7 +131,8 @@ def _sanitize_parameters( self, **kwargs, ): - """Assign parameters to the different parts of the process. + """Assigns parameters to the different steps of the process. If context_length and prediction_length + are not provided they are taken from the model config. For expected parameters see the call method below. """ @@ -140,6 +140,21 @@ def _sanitize_parameters( context_length = kwargs.get("context_length", self.model.config.context_length) prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length) + # autopopulate from feature extractor + if self.feature_extractor: + for p in [ + "id_columns", + "timestamp_column", + "target_columns", + "observable_columns", + "control_columns", + "conditional_columns", + "static_categorical_columns", + "freq", + ]: + if p not in kwargs: + kwargs[p] = getattr(self.feature_extractor, p) + preprocess_kwargs = { "prediction_length": prediction_length, "context_length": context_length, @@ -226,7 +241,7 @@ def __call__( i.e., exogenous or supporting features which are known in advance. feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically + series should be prepared. If this is provided, any of the other options below will be automatically populated from this instance. timestamp_column (str): The name of the column containing the timestamp of the time series. @@ -267,6 +282,10 @@ def __call__( inverse_scale_outputs (bool): If true and a valid feature extractor is provided, the outputs will be inverse scaled. + add_known_ground_truth (bool): If True add columns containing the ground truth data. Prediction columns will have a + suffix of "_prediction". Default True. If false, on columns containing predictions are produced, no suffix is + added. + Return (pandas dataframe): A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original input feature values and the output forecast for each input column. The output forecast is a list containing @@ -286,6 +305,8 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li id_columns = kwargs.get("id_columns") # context_length = kwargs.get("context_length") + # use the feature extractor here + if isinstance(time_series, str): time_series = pd.read_csv( time_series, @@ -327,17 +348,6 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li **kwargs, ) - # # stack all the outputs - # # torch tensors are stacked, but other values are passed through as a list - # first = dataset[0] - # full_output = {} - # for k, v in first.items(): - # if isinstance(v, torch.Tensor): - # full_output[k] = torch.stack(tuple(r[k] for r in dataset)) - # else: - # full_output[k] = [r[k] for r in dataset] - - # return full_output return dataset def _forward(self, model_inputs, **kwargs): From 8176f8523c1c377e5452eb2fc04a4cf39fd53e8f Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 25 Jul 2024 23:01:38 -0400 Subject: [PATCH 4/4] allow batch_size, num_workers --- .../toolkit/time_series_forecasting_pipeline.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 6f9961da..ef53a67e 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -52,6 +52,7 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para dataset = self.preprocess(inputs, **preprocess_params) batch_size = forward_params["batch_size"] + num_workers = forward_params["num_workers"] signature = inspect.signature(self.model.forward) signature_columns = list(signature.parameters.keys()) @@ -66,7 +67,9 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para description=None, model_name=None, ) - dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False) + dataloader = DataLoader( + dataset, batch_size=batch_size, num_workers=num_workers, collate_fn=remove_columns_collator, shuffle=False + ) # iterate over dataloader it = iter(dataloader) @@ -204,7 +207,14 @@ def _sanitize_parameters( else: batch_size = self._batch_size - forward_kwargs = {"batch_size": batch_size} + num_workers = kwargs.get("num_workers", self._num_workers) + if num_workers is None: + if self._num_workers is None: + num_workers = 0 + else: + num_workers = self._num_workers + + forward_kwargs = {"batch_size": batch_size, "num_workers": num_workers} # if "id_columns" in kwargs: # preprocess_kwargs["id_columns"] = kwargs["id_columns"]