diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 35bb2ec9..ef53a67e 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -52,6 +52,7 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para dataset = self.preprocess(inputs, **preprocess_params) batch_size = forward_params["batch_size"] + num_workers = forward_params["num_workers"] signature = inspect.signature(self.model.forward) signature_columns = list(signature.parameters.keys()) @@ -64,10 +65,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para signature_columns=signature_columns, logger=None, description=None, - model_name=self.model.__class__.__name__, + model_name=None, ) dataloader = DataLoader( - dataset, num_workers=1, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False + dataset, batch_size=batch_size, num_workers=num_workers, collate_fn=remove_columns_collator, shuffle=False ) # iterate over dataloader @@ -103,11 +104,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) ) class TimeSeriesForecastingPipeline(TimeSeriesPipeline): - """Hugging Face Pipeline for Time Series Forecasting + """ + Time Series Forecasting using HF time series forecasting models. This pipeline consumes a `pandas.DataFrame` + containing the time series data and produces a new `pandas.DataFrame` containing the forecasts. - feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically - populated from this instance. """ def __init__( @@ -116,11 +116,13 @@ def __init__( freq: Optional[str] = None, explode_forecasts: bool = False, inverse_scale_outputs: bool = True, + add_known_ground_truth: bool = True, **kwargs, ): kwargs["freq"] = freq kwargs["explode_forecasts"] = explode_forecasts kwargs["inverse_scale_outputs"] = inverse_scale_outputs + kwargs["add_known_ground_truth"] = add_known_ground_truth super().__init__(*args, **kwargs) if self.framework == "tf": @@ -132,7 +134,8 @@ def _sanitize_parameters( self, **kwargs, ): - """Assign parameters to the different parts of the process. + """Assigns parameters to the different steps of the process. If context_length and prediction_length + are not provided they are taken from the model config. For expected parameters see the call method below. """ @@ -140,6 +143,21 @@ def _sanitize_parameters( context_length = kwargs.get("context_length", self.model.config.context_length) prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length) + # autopopulate from feature extractor + if self.feature_extractor: + for p in [ + "id_columns", + "timestamp_column", + "target_columns", + "observable_columns", + "control_columns", + "conditional_columns", + "static_categorical_columns", + "freq", + ]: + if p not in kwargs: + kwargs[p] = getattr(self.feature_extractor, p) + preprocess_kwargs = { "prediction_length": prediction_length, "context_length": context_length, @@ -170,6 +188,7 @@ def _sanitize_parameters( "freq", "explode_forecasts", "inverse_scale_outputs", + "add_known_ground_truth", ] for c in preprocess_params: @@ -188,7 +207,14 @@ def _sanitize_parameters( else: batch_size = self._batch_size - forward_kwargs = {"batch_size": batch_size} + num_workers = kwargs.get("num_workers", self._num_workers) + if num_workers is None: + if self._num_workers is None: + num_workers = 0 + else: + num_workers = self._num_workers + + forward_kwargs = {"batch_size": batch_size, "num_workers": num_workers} # if "id_columns" in kwargs: # preprocess_kwargs["id_columns"] = kwargs["id_columns"] @@ -225,7 +251,7 @@ def __call__( i.e., exogenous or supporting features which are known in advance. feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically + series should be prepared. If this is provided, any of the other options below will be automatically populated from this instance. timestamp_column (str): The name of the column containing the timestamp of the time series. @@ -266,6 +292,10 @@ def __call__( inverse_scale_outputs (bool): If true and a valid feature extractor is provided, the outputs will be inverse scaled. + add_known_ground_truth (bool): If True add columns containing the ground truth data. Prediction columns will have a + suffix of "_prediction". Default True. If false, on columns containing predictions are produced, no suffix is + added. + Return (pandas dataframe): A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original input feature values and the output forecast for each input column. The output forecast is a list containing @@ -285,6 +315,8 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li id_columns = kwargs.get("id_columns") # context_length = kwargs.get("context_length") + # use the feature extractor here + if isinstance(time_series, str): time_series = pd.read_csv( time_series, @@ -326,17 +358,6 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li **kwargs, ) - # # stack all the outputs - # # torch tensors are stacked, but other values are passed through as a list - # first = dataset[0] - # full_output = {} - # for k, v in first.items(): - # if isinstance(v, torch.Tensor): - # full_output[k] = torch.stack(tuple(r[k] for r in dataset)) - # else: - # full_output[k] = [r[k] for r in dataset] - - # return full_output return dataset def _forward(self, model_inputs, **kwargs): @@ -350,29 +371,6 @@ def _forward(self, model_inputs, **kwargs): original input keys. """ - # Eventually we should use inspection somehow - # inspect.signature(model_forward).parameters.keys() - # model_input_keys = { - # "past_values", - # "static_categorical_values", - # "freq_token", - # } # todo: this should not be hardcoded - - # signature = inspect.signature(self.model.forward) - # model_input_keys = list(signature.parameters.keys()) - - # model_inputs_only = {} - # for k in model_input_keys: - # if k in model_inputs: - # model_inputs_only[k] = model_inputs[k] - - # model_outputs = self.model(**model_inputs_only) - - # # copy the other inputs - # copy_inputs = True - # for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]: - # model_outputs[k] = model_inputs[k] - model_outputs = self.model(**model_inputs) return model_outputs @@ -392,14 +390,16 @@ def postprocess(self, input, **kwargs): # name the predictions of target columns # outputs should only have size equal to target columns + prediction_columns = [] for i, c in enumerate(kwargs["target_columns"]): - prediction_columns.append(f"{c}_prediction") + prediction_columns.append(f"{c}_prediction" if kwargs["add_known_ground_truth"] else c) out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist() # provide the ground truth values for the targets # when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future - for i, c in enumerate(kwargs["target_columns"]): - out[c] = input["future_values"][:, :, i].numpy().tolist() + if kwargs["add_known_ground_truth"]: + for i, c in enumerate(kwargs["target_columns"]): + out[c] = input["future_values"][:, :, i].numpy().tolist() if "timestamp_column" in kwargs: out[kwargs["timestamp_column"]] = input["timestamp"]