Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor get_datasets functionality #60

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ dependencies = [
"pandas>=2.2.0",
"scikit-learn",
"transformers[torch]>=4.36.1",
"datasets"
"datasets",
"deprecated"
]

[tool.setuptools]
Expand Down
17 changes: 11 additions & 6 deletions tests/toolkit/test_time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TimeSeriesPreprocessor,
create_timestamps,
extend_time_series,
get_datasets,
)
from tsfm_public.toolkit.util import FractionLocation

Expand Down Expand Up @@ -249,7 +250,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={"train": [0, 1 / 3], "valid": [1 / 3, 2 / 3], "test": [2 / 3, 1]},
)
Expand All @@ -267,7 +269,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": [0, 100],
Expand All @@ -294,7 +297,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": [0, 100],
Expand All @@ -319,7 +323,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": 0.7,
Expand Down Expand Up @@ -376,7 +381,7 @@ def test_get_datasets_without_targets(ts_data):
context_length=5,
)

train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2})
train, _, _ = get_datasets(tsp, ts_data, split_config={"train": 0.7, "test": 0.2})

train.datasets[0].target_columns == ["value1", "value2"]

Expand All @@ -394,7 +399,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs):
scaling=True,
)

ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2})
ds_train, ds_valid, ds_test = get_datasets(tsp, df, split_config={"train": 0.7, "test": 0.2})

assert len(tsp.target_scaler_dict) == 2
assert len(ds_train.datasets) == 4
237 changes: 174 additions & 63 deletions tsfm_public/toolkit/time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import numpy as np
import pandas as pd
from datasets import Dataset
from deprecated import deprecated
from sklearn.preprocessing import MinMaxScaler as MinMaxScaler_
from sklearn.preprocessing import OrdinalEncoder as OrdinalEncoder_
from sklearn.preprocessing import StandardScaler as StandardScaler_
Expand Down Expand Up @@ -123,7 +124,45 @@ def __init__(
freq: Optional[Union[int, str]] = None,
**kwargs,
):
# note base class __init__ methods sets all arguments as attributes
"""Multi-time series aware data preprocessor. Provides functions for scaling data and facitilitates downstream
operations on time series data, including model training and inference.

Args:
id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to [].
timestamp_column (Optional[str], optional): The name of the column containing the timestamp of the time series. Defaults to None.
target_columns (List[str], optional): List of column names which identify the target channels in the input, these are the
columns that will be forecasted. Defaults to [].
observable_columns (List[str], optional): List of column names which identify the observable channels in the input.
Observable channels are channels which we have knowledge about in the past and future. For example, weather
conditions such as temperature or precipitation may be known or estimated in the future, but cannot be
changed. Defaults to [].
control_columns (List[str], optional): List of column names which identify the control channels in the input. Control
channels are similar to observable channels, except that future values may be controlled. For example, discount
percentage of a particular product is known and controllable in the future. Defaults to [].
conditional_columns (List[str], optional): List of column names which identify the conditional channels in the input.
Conditional channels are channels which we know in the past, but do not know in the future. Defaults to [].
static_categorical_columns (List[str], optional): List of column names which identify categorical-valued channels in the input
which are fixed over time. Defaults to [].
context_length (int, optional): The length of the input context window. Defaults to 64.
prediction_length (Optional[int], optional): The length of the prediction window. Defaults to None.
scaling (bool, optional): If True, data is scaled. Defaults to False.
scaler_type (ScalerType, optional): The type of scaling to perform. See ScalerType for available scalers. Defaults to ScalerType.STANDARD.value.
scaling_id_columns (Optional[List[str]], optional): In some cases we need to separate data by a different set of id_columns
when determining scaling factors. For the purposes of determining scaling, data will be grouped by the provided columns.
If None, the `id_columns` will be used. Defaults to None.
encode_categorical (bool, optional): If True any categorical columns will be encoded using ordinal encoding. Defaults to True.
time_series_task (str, optional): Reserved for future use. Defaults to TimeSeriesTask.FORECASTING.value.
frequency_mapping (Dict[str, int], optional): _description_. Defaults to DEFAULT_FREQUENCY_MAPPING.
freq (Optional[Union[int, str]], optional): A freqency indicator for the given `timestamp_column`. See
https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#period-aliases for a description of the
allowed values. If not provided, we will attempt to infer it from the data. If not provided, frequency will be
inferred from `timestamp_column`. Defaults to None.

Raises:
ValueError: Raised if `id_columns` is not a list.
ValueError: Raised if `timestamp_column` is not a scalar.
"""
# note base class __init__ method sets all arguments as attributes

if not isinstance(id_columns, list):
raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}")
Expand Down Expand Up @@ -664,13 +703,13 @@ def scale_func(grp, id_columns):
self._clean_up_dataframe(df)
return df

@deprecated(version="0.1.1", reason="Please use the standalone function `get_datasets()`.")
def get_datasets(
self,
dataset: Union[Dataset, pd.DataFrame],
split_config: Dict[str, Union[List[Union[int, float]], float]],
fewshot_fraction: Optional[float] = None,
fewshot_location: str = FractionLocation.LAST.value,
return_dataframe: bool = False,
) -> Tuple[Any]:
"""Creates the preprocessed pytorch datasets needed for training and evaluation
using the HuggingFace trainer
Expand Down Expand Up @@ -698,80 +737,152 @@ def get_datasets(
fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last"
as described in the enum FewshotLocation. Default is to choose the fewshot data at the end
of the training dataset (i.e., "last").
return_dataframe: Instead for returning a pytorch dataset, return tuples of pandas dataframes, after any
preprocessing.

Returns:
Tuple of pytorch datasets, including: train, validation, test.
"""

data = self._standardize_dataframe(dataset)
return get_datasets(
self,
dataset,
split_config=split_config,
fewshot_fraction=fewshot_fraction,
fewshot_location=fewshot_location,
)

if not self.context_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length")
if not self.prediction_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length")

# get split_params
split_params, split_function = get_split_params(split_config, context_length=self.context_length)
def prepare_data_splits(
data: pd.DataFrame,
id_columns: List[str] = [],
context_length: int = 64,
split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2},
) -> Tuple[pd.DataFrame]:
"""Splits the input dataframe according to the split_config.

Args:
data (pd.DataFrame): Input dataframe.
id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to [].
context_length (int, optional): Specifies the length of the context windows extracted from the historical data for feeding into
the model. Defaults to 64.
split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing
split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible:
1. Specify train/valid/test indices or relative fractions
{
train: [0, 50],
valid: [50, 70],
test: [70, 100]
}
end value is not inclusive
2. Specify train/test fractions:
{
train: 0.7
test: 0.2
}
A valid split should not be specified directly; the above implies valid = 0.1
Returns:
Tuple of pandas dataframes, including: train, validation, test.
"""
# get split_params
split_params, split_function = get_split_params(split_config, context_length=context_length)

# split data
if isinstance(split_function, dict):
train_data = split_function["train"](data, id_columns=id_columns, **split_params["train"])
valid_data = split_function["valid"](data, id_columns=id_columns, **split_params["valid"])
test_data = split_function["test"](data, id_columns=id_columns, **split_params["test"])
else:
train_data, valid_data, test_data = split_function(data, id_columns=id_columns, **split_params)

return train_data, valid_data, test_data

# split data
if isinstance(split_function, dict):
train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"])
valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"])
test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"])
else:
train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params)

# data preprocessing
self.train(train_data)

# specify columns
column_specifiers = {
"id_columns": self.id_columns,
"timestamp_column": self.timestamp_column,
"target_columns": self.target_columns,
"observable_columns": self.observable_columns,
"control_columns": self.control_columns,
"conditional_columns": self.conditional_columns,
"static_categorical_columns": self.static_categorical_columns,
}

# handle fewshot operation
if fewshot_fraction is not None:
if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)):
raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}")

train_data = select_by_fixed_fraction(
train_data,
id_columns=self.id_columns,
fraction=fewshot_fraction,
location=fewshot_location,
minimum_size=self.context_length,
)

params = column_specifiers
params["context_length"] = self.context_length
params["prediction_length"] = self.prediction_length
def get_datasets(
ts_preprocessor: TimeSeriesPreprocessor,
dataset: Union[Dataset, pd.DataFrame],
split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2},
fewshot_fraction: Optional[float] = None,
fewshot_location: str = FractionLocation.LAST.value,
) -> Tuple[Any]:
"""Creates the preprocessed pytorch datasets needed for training and evaluation
using the HuggingFace trainer

# get torch datasets
train_valid_test = [train_data, valid_data, test_data]
Args:
dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe
split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing
split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible:
1. Specify train/valid/test indices or relative fractions
{
train: [0, 50],
valid: [50, 70],
test: [70, 100]
}
end value is not inclusive
2. Specify train/test fractions:
{
train: 0.7
test: 0.2
}
A valid split should not be specified directly; the above implies valid = 0.1

fewshot_fraction (float, optional): When non-null, return this percent of the original training
dataset. This is done to support fewshot fine-tuning.
fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last"
as described in the enum FewshotLocation. Default is to choose the fewshot data at the end
of the training dataset (i.e., "last").

Returns:
Tuple of pytorch datasets, including: train, validation, test.
"""

if not ts_preprocessor.context_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length")
if not ts_preprocessor.prediction_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length")

data = ts_preprocessor._standardize_dataframe(dataset)

train_data, valid_data, test_data = prepare_data_splits(
data,
id_columns=ts_preprocessor.id_columns,
context_length=ts_preprocessor.context_length,
split_config=split_config,
)

# data preprocessing
ts_preprocessor.train(train_data)

# specify columns
column_specifiers = {
"id_columns": ts_preprocessor.id_columns,
"timestamp_column": ts_preprocessor.timestamp_column,
"target_columns": ts_preprocessor.target_columns,
"observable_columns": ts_preprocessor.observable_columns,
"control_columns": ts_preprocessor.control_columns,
"conditional_columns": ts_preprocessor.conditional_columns,
"static_categorical_columns": ts_preprocessor.static_categorical_columns,
}

# handle fewshot operation
if fewshot_fraction is not None:
if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)):
raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}")

train_data = select_by_fixed_fraction(
train_data,
id_columns=ts_preprocessor.id_columns,
fraction=fewshot_fraction,
location=fewshot_location,
minimum_size=ts_preprocessor.context_length,
)

if return_dataframe:
return tuple(train_valid_test)
params = column_specifiers
params["context_length"] = ts_preprocessor.context_length
params["prediction_length"] = ts_preprocessor.prediction_length

return tuple([ForecastDFDataset(self.preprocess(d), **params) for d in train_valid_test])
# get torch datasets
train_valid_test = [train_data, valid_data, test_data]

# test_dataset = ForecastDFDataset(
# self.preprocess(test_data),
# **params,
# )
# train_dataset = ForecastDFDataset(self.preprocess(train_data), **params)
# valid_dataset = ForecastDFDataset(
# self.preprocess(valid_data),
# **params,
# )
# return train_dataset, valid_dataset, test_dataset
return tuple([ForecastDFDataset(ts_preprocessor.preprocess(d), **params) for d in train_valid_test])


def create_timestamps(
Expand Down
Loading