diff --git a/pyproject.toml b/pyproject.toml index 1d956be9..b95eb9b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,8 @@ dependencies = [ "pandas>=2.2.0", "scikit-learn", "transformers[torch]>=4.36.1", - "datasets" + "datasets", + "deprecated" ] [tool.setuptools] diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 5a813c77..5d357964 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -15,6 +15,7 @@ TimeSeriesPreprocessor, create_timestamps, extend_time_series, + get_datasets, ) from tsfm_public.toolkit.util import FractionLocation @@ -249,7 +250,8 @@ def test_get_datasets(ts_data): context_length=10, ) - train, valid, test = tsp.get_datasets( + train, valid, test = get_datasets( + tsp, ts_data, split_config={"train": [0, 1 / 3], "valid": [1 / 3, 2 / 3], "test": [2 / 3, 1]}, ) @@ -267,7 +269,8 @@ def test_get_datasets(ts_data): context_length=10, ) - train, valid, test = tsp.get_datasets( + train, valid, test = get_datasets( + tsp, ts_data, split_config={ "train": [0, 100], @@ -294,7 +297,8 @@ def test_get_datasets(ts_data): context_length=10, ) - train, valid, test = tsp.get_datasets( + train, valid, test = get_datasets( + tsp, ts_data, split_config={ "train": [0, 100], @@ -319,7 +323,8 @@ def test_get_datasets(ts_data): context_length=10, ) - train, valid, test = tsp.get_datasets( + train, valid, test = get_datasets( + tsp, ts_data, split_config={ "train": 0.7, @@ -376,7 +381,7 @@ def test_get_datasets_without_targets(ts_data): context_length=5, ) - train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2}) + train, _, _ = get_datasets(tsp, ts_data, split_config={"train": 0.7, "test": 0.2}) train.datasets[0].target_columns == ["value1", "value2"] @@ -394,7 +399,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + ds_train, ds_valid, ds_test = get_datasets(tsp, df, split_config={"train": 0.7, "test": 0.2}) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 43a4745f..08e0e258 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd from datasets import Dataset +from deprecated import deprecated from sklearn.preprocessing import MinMaxScaler as MinMaxScaler_ from sklearn.preprocessing import OrdinalEncoder as OrdinalEncoder_ from sklearn.preprocessing import StandardScaler as StandardScaler_ @@ -123,7 +124,45 @@ def __init__( freq: Optional[Union[int, str]] = None, **kwargs, ): - # note base class __init__ methods sets all arguments as attributes + """Multi-time series aware data preprocessor. Provides functions for scaling data and facitilitates downstream + operations on time series data, including model training and inference. + + Args: + id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to []. + timestamp_column (Optional[str], optional): The name of the column containing the timestamp of the time series. Defaults to None. + target_columns (List[str], optional): List of column names which identify the target channels in the input, these are the + columns that will be forecasted. Defaults to []. + observable_columns (List[str], optional): List of column names which identify the observable channels in the input. + Observable channels are channels which we have knowledge about in the past and future. For example, weather + conditions such as temperature or precipitation may be known or estimated in the future, but cannot be + changed. Defaults to []. + control_columns (List[str], optional): List of column names which identify the control channels in the input. Control + channels are similar to observable channels, except that future values may be controlled. For example, discount + percentage of a particular product is known and controllable in the future. Defaults to []. + conditional_columns (List[str], optional): List of column names which identify the conditional channels in the input. + Conditional channels are channels which we know in the past, but do not know in the future. Defaults to []. + static_categorical_columns (List[str], optional): List of column names which identify categorical-valued channels in the input + which are fixed over time. Defaults to []. + context_length (int, optional): The length of the input context window. Defaults to 64. + prediction_length (Optional[int], optional): The length of the prediction window. Defaults to None. + scaling (bool, optional): If True, data is scaled. Defaults to False. + scaler_type (ScalerType, optional): The type of scaling to perform. See ScalerType for available scalers. Defaults to ScalerType.STANDARD.value. + scaling_id_columns (Optional[List[str]], optional): In some cases we need to separate data by a different set of id_columns + when determining scaling factors. For the purposes of determining scaling, data will be grouped by the provided columns. + If None, the `id_columns` will be used. Defaults to None. + encode_categorical (bool, optional): If True any categorical columns will be encoded using ordinal encoding. Defaults to True. + time_series_task (str, optional): Reserved for future use. Defaults to TimeSeriesTask.FORECASTING.value. + frequency_mapping (Dict[str, int], optional): _description_. Defaults to DEFAULT_FREQUENCY_MAPPING. + freq (Optional[Union[int, str]], optional): A freqency indicator for the given `timestamp_column`. See + https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#period-aliases for a description of the + allowed values. If not provided, we will attempt to infer it from the data. If not provided, frequency will be + inferred from `timestamp_column`. Defaults to None. + + Raises: + ValueError: Raised if `id_columns` is not a list. + ValueError: Raised if `timestamp_column` is not a scalar. + """ + # note base class __init__ method sets all arguments as attributes if not isinstance(id_columns, list): raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") @@ -664,13 +703,13 @@ def scale_func(grp, id_columns): self._clean_up_dataframe(df) return df + @deprecated(version="0.1.1", reason="Please use the standalone function `get_datasets()`.") def get_datasets( self, dataset: Union[Dataset, pd.DataFrame], split_config: Dict[str, Union[List[Union[int, float]], float]], fewshot_fraction: Optional[float] = None, fewshot_location: str = FractionLocation.LAST.value, - return_dataframe: bool = False, ) -> Tuple[Any]: """Creates the preprocessed pytorch datasets needed for training and evaluation using the HuggingFace trainer @@ -698,80 +737,152 @@ def get_datasets( fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last" as described in the enum FewshotLocation. Default is to choose the fewshot data at the end of the training dataset (i.e., "last"). - return_dataframe: Instead for returning a pytorch dataset, return tuples of pandas dataframes, after any - preprocessing. Returns: Tuple of pytorch datasets, including: train, validation, test. """ - data = self._standardize_dataframe(dataset) + return get_datasets( + self, + dataset, + split_config=split_config, + fewshot_fraction=fewshot_fraction, + fewshot_location=fewshot_location, + ) - if not self.context_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") - if not self.prediction_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") - # get split_params - split_params, split_function = get_split_params(split_config, context_length=self.context_length) +def prepare_data_splits( + data: pd.DataFrame, + id_columns: List[str] = [], + context_length: int = 64, + split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2}, +) -> Tuple[pd.DataFrame]: + """Splits the input dataframe according to the split_config. + + Args: + data (pd.DataFrame): Input dataframe. + id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to []. + context_length (int, optional): Specifies the length of the context windows extracted from the historical data for feeding into + the model. Defaults to 64. + split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing + split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible: + 1. Specify train/valid/test indices or relative fractions + { + train: [0, 50], + valid: [50, 70], + test: [70, 100] + } + end value is not inclusive + 2. Specify train/test fractions: + { + train: 0.7 + test: 0.2 + } + A valid split should not be specified directly; the above implies valid = 0.1 + Returns: + Tuple of pandas dataframes, including: train, validation, test. + """ + # get split_params + split_params, split_function = get_split_params(split_config, context_length=context_length) + + # split data + if isinstance(split_function, dict): + train_data = split_function["train"](data, id_columns=id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=id_columns, **split_params["test"]) + else: + train_data, valid_data, test_data = split_function(data, id_columns=id_columns, **split_params) + + return train_data, valid_data, test_data - # split data - if isinstance(split_function, dict): - train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) - valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) - test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) - else: - train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) - - # data preprocessing - self.train(train_data) - - # specify columns - column_specifiers = { - "id_columns": self.id_columns, - "timestamp_column": self.timestamp_column, - "target_columns": self.target_columns, - "observable_columns": self.observable_columns, - "control_columns": self.control_columns, - "conditional_columns": self.conditional_columns, - "static_categorical_columns": self.static_categorical_columns, - } - - # handle fewshot operation - if fewshot_fraction is not None: - if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") - - train_data = select_by_fixed_fraction( - train_data, - id_columns=self.id_columns, - fraction=fewshot_fraction, - location=fewshot_location, - minimum_size=self.context_length, - ) - params = column_specifiers - params["context_length"] = self.context_length - params["prediction_length"] = self.prediction_length +def get_datasets( + ts_preprocessor: TimeSeriesPreprocessor, + dataset: Union[Dataset, pd.DataFrame], + split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2}, + fewshot_fraction: Optional[float] = None, + fewshot_location: str = FractionLocation.LAST.value, +) -> Tuple[Any]: + """Creates the preprocessed pytorch datasets needed for training and evaluation + using the HuggingFace trainer - # get torch datasets - train_valid_test = [train_data, valid_data, test_data] + Args: + dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe + split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing + split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible: + 1. Specify train/valid/test indices or relative fractions + { + train: [0, 50], + valid: [50, 70], + test: [70, 100] + } + end value is not inclusive + 2. Specify train/test fractions: + { + train: 0.7 + test: 0.2 + } + A valid split should not be specified directly; the above implies valid = 0.1 + + fewshot_fraction (float, optional): When non-null, return this percent of the original training + dataset. This is done to support fewshot fine-tuning. + fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last" + as described in the enum FewshotLocation. Default is to choose the fewshot data at the end + of the training dataset (i.e., "last"). + + Returns: + Tuple of pytorch datasets, including: train, validation, test. + """ + + if not ts_preprocessor.context_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") + if not ts_preprocessor.prediction_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") + + data = ts_preprocessor._standardize_dataframe(dataset) + + train_data, valid_data, test_data = prepare_data_splits( + data, + id_columns=ts_preprocessor.id_columns, + context_length=ts_preprocessor.context_length, + split_config=split_config, + ) + + # data preprocessing + ts_preprocessor.train(train_data) + + # specify columns + column_specifiers = { + "id_columns": ts_preprocessor.id_columns, + "timestamp_column": ts_preprocessor.timestamp_column, + "target_columns": ts_preprocessor.target_columns, + "observable_columns": ts_preprocessor.observable_columns, + "control_columns": ts_preprocessor.control_columns, + "conditional_columns": ts_preprocessor.conditional_columns, + "static_categorical_columns": ts_preprocessor.static_categorical_columns, + } + + # handle fewshot operation + if fewshot_fraction is not None: + if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): + raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") + + train_data = select_by_fixed_fraction( + train_data, + id_columns=ts_preprocessor.id_columns, + fraction=fewshot_fraction, + location=fewshot_location, + minimum_size=ts_preprocessor.context_length, + ) - if return_dataframe: - return tuple(train_valid_test) + params = column_specifiers + params["context_length"] = ts_preprocessor.context_length + params["prediction_length"] = ts_preprocessor.prediction_length - return tuple([ForecastDFDataset(self.preprocess(d), **params) for d in train_valid_test]) + # get torch datasets + train_valid_test = [train_data, valid_data, test_data] - # test_dataset = ForecastDFDataset( - # self.preprocess(test_data), - # **params, - # ) - # train_dataset = ForecastDFDataset(self.preprocess(train_data), **params) - # valid_dataset = ForecastDFDataset( - # self.preprocess(valid_data), - # **params, - # ) - # return train_dataset, valid_dataset, test_dataset + return tuple([ForecastDFDataset(ts_preprocessor.preprocess(d), **params) for d in train_valid_test]) def create_timestamps(