Skip to content

Commit

Permalink
♻️ get dataset now standalone
Browse files Browse the repository at this point in the history
  • Loading branch information
wgifford committed May 31, 2024
1 parent 02dc9dc commit 9abbe49
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 70 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ dependencies = [
"pandas>=2.2.0",
"scikit-learn",
"transformers[torch]>=4.36.1",
"datasets"
"datasets",
"deprecated"
]

[tool.setuptools]
Expand Down
17 changes: 11 additions & 6 deletions tests/toolkit/test_time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TimeSeriesPreprocessor,
create_timestamps,
extend_time_series,
get_datasets,
)
from tsfm_public.toolkit.util import FractionLocation

Expand Down Expand Up @@ -249,7 +250,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={"train": [0, 1 / 3], "valid": [1 / 3, 2 / 3], "test": [2 / 3, 1]},
)
Expand All @@ -267,7 +269,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": [0, 100],
Expand All @@ -294,7 +297,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": [0, 100],
Expand All @@ -319,7 +323,8 @@ def test_get_datasets(ts_data):
context_length=10,
)

train, valid, test = tsp.get_datasets(
train, valid, test = get_datasets(
tsp,
ts_data,
split_config={
"train": 0.7,
Expand Down Expand Up @@ -376,7 +381,7 @@ def test_get_datasets_without_targets(ts_data):
context_length=5,
)

train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2})
train, _, _ = get_datasets(tsp, ts_data, split_config={"train": 0.7, "test": 0.2})

train.datasets[0].target_columns == ["value1", "value2"]

Expand All @@ -394,7 +399,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs):
scaling=True,
)

ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2})
ds_train, ds_valid, ds_test = get_datasets(tsp, df, split_config={"train": 0.7, "test": 0.2})

assert len(tsp.target_scaler_dict) == 2
assert len(ds_train.datasets) == 4
223 changes: 160 additions & 63 deletions tsfm_public/toolkit/time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import numpy as np
import pandas as pd
from datasets import Dataset
from deprecated import deprecated
from sklearn.preprocessing import MinMaxScaler as MinMaxScaler_
from sklearn.preprocessing import OrdinalEncoder as OrdinalEncoder_
from sklearn.preprocessing import StandardScaler as StandardScaler_
Expand Down Expand Up @@ -123,7 +124,31 @@ def __init__(
freq: Optional[Union[int, str]] = None,
**kwargs,
):
# note base class __init__ methods sets all arguments as attributes
"""_summary_
Args:
id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to [].
timestamp_column (Optional[str], optional): _description_. Defaults to None.
target_columns (List[str], optional): _description_. Defaults to [].
observable_columns (List[str], optional): _description_. Defaults to [].
control_columns (List[str], optional): _description_. Defaults to [].
conditional_columns (List[str], optional): _description_. Defaults to [].
static_categorical_columns (List[str], optional): _description_. Defaults to [].
context_length (int, optional): _description_. Defaults to 64.
prediction_length (Optional[int], optional): _description_. Defaults to None.
scaling (bool, optional): _description_. Defaults to False.
scaler_type (ScalerType, optional): _description_. Defaults to ScalerType.STANDARD.value.
scaling_id_columns (Optional[List[str]], optional): _description_. Defaults to None.
encode_categorical (bool, optional): _description_. Defaults to True.
time_series_task (str, optional): _description_. Defaults to TimeSeriesTask.FORECASTING.value.
frequency_mapping (Dict[str, int], optional): _description_. Defaults to DEFAULT_FREQUENCY_MAPPING.
freq (Optional[Union[int, str]], optional): _description_. Defaults to None.
Raises:
ValueError: _description_
ValueError: _description_
"""
# note base class __init__ method sets all arguments as attributes

if not isinstance(id_columns, list):
raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}")
Expand Down Expand Up @@ -664,13 +689,13 @@ def scale_func(grp, id_columns):
self._clean_up_dataframe(df)
return df

@deprecated(version="0.1.1", reason="Please use the standalone function `get_datasets()`.")
def get_datasets(
self,
dataset: Union[Dataset, pd.DataFrame],
split_config: Dict[str, Union[List[Union[int, float]], float]],
fewshot_fraction: Optional[float] = None,
fewshot_location: str = FractionLocation.LAST.value,
return_dataframe: bool = False,
) -> Tuple[Any]:
"""Creates the preprocessed pytorch datasets needed for training and evaluation
using the HuggingFace trainer
Expand Down Expand Up @@ -698,80 +723,152 @@ def get_datasets(
fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last"
as described in the enum FewshotLocation. Default is to choose the fewshot data at the end
of the training dataset (i.e., "last").
return_dataframe: Instead for returning a pytorch dataset, return tuples of pandas dataframes, after any
preprocessing.
Returns:
Tuple of pytorch datasets, including: train, validation, test.
"""

data = self._standardize_dataframe(dataset)
return get_datasets(
self,
dataset,
split_config=split_config,
fewshot_fraction=fewshot_fraction,
fewshot_location=fewshot_location,
)

if not self.context_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length")
if not self.prediction_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length")

# get split_params
split_params, split_function = get_split_params(split_config, context_length=self.context_length)
def prepare_data_splits(
data: pd.DataFrame,
id_columns: List[str] = [],
context_length: int = 64,
split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2},
) -> Tuple[pd.DataFrame]:
"""Splits the input dataframe according to the split_config.
Args:
data (pd.DataFrame): Input dataframe.
id_columns (List[str]): List of column names which identify different time series in a multi-time series input. Defaults to [].
context_length (int, optional): Specifies the length of the context windows extracted from the historical data for feeding into
the model. Defaults to 64.
split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing
split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible:
1. Specify train/valid/test indices or relative fractions
{
train: [0, 50],
valid: [50, 70],
test: [70, 100]
}
end value is not inclusive
2. Specify train/test fractions:
{
train: 0.7
test: 0.2
}
A valid split should not be specified directly; the above implies valid = 0.1
Returns:
Tuple of pandas dataframes, including: train, validation, test.
"""
# get split_params
split_params, split_function = get_split_params(split_config, context_length=context_length)

# split data
if isinstance(split_function, dict):
train_data = split_function["train"](data, id_columns=id_columns, **split_params["train"])
valid_data = split_function["valid"](data, id_columns=id_columns, **split_params["valid"])
test_data = split_function["test"](data, id_columns=id_columns, **split_params["test"])
else:
train_data, valid_data, test_data = split_function(data, id_columns=id_columns, **split_params)

return train_data, valid_data, test_data

# split data
if isinstance(split_function, dict):
train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"])
valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"])
test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"])
else:
train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params)

# data preprocessing
self.train(train_data)

# specify columns
column_specifiers = {
"id_columns": self.id_columns,
"timestamp_column": self.timestamp_column,
"target_columns": self.target_columns,
"observable_columns": self.observable_columns,
"control_columns": self.control_columns,
"conditional_columns": self.conditional_columns,
"static_categorical_columns": self.static_categorical_columns,
}

# handle fewshot operation
if fewshot_fraction is not None:
if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)):
raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}")

train_data = select_by_fixed_fraction(
train_data,
id_columns=self.id_columns,
fraction=fewshot_fraction,
location=fewshot_location,
minimum_size=self.context_length,
)

params = column_specifiers
params["context_length"] = self.context_length
params["prediction_length"] = self.prediction_length
def get_datasets(
ts_preprocessor: TimeSeriesPreprocessor,
dataset: Union[Dataset, pd.DataFrame],
split_config: Dict[str, Union[List[Union[int, float]], float]] = {"train": 0.7, "test": 0.2},
fewshot_fraction: Optional[float] = None,
fewshot_location: str = FractionLocation.LAST.value,
) -> Tuple[Any]:
"""Creates the preprocessed pytorch datasets needed for training and evaluation
using the HuggingFace trainer
# get torch datasets
train_valid_test = [train_data, valid_data, test_data]
Args:
dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe
split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing
split parameters. Defaults to {"train": 0.7, "test": 0.2}. Two configurations are possible:
1. Specify train/valid/test indices or relative fractions
{
train: [0, 50],
valid: [50, 70],
test: [70, 100]
}
end value is not inclusive
2. Specify train/test fractions:
{
train: 0.7
test: 0.2
}
A valid split should not be specified directly; the above implies valid = 0.1
fewshot_fraction (float, optional): When non-null, return this percent of the original training
dataset. This is done to support fewshot fine-tuning.
fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last"
as described in the enum FewshotLocation. Default is to choose the fewshot data at the end
of the training dataset (i.e., "last").
Returns:
Tuple of pytorch datasets, including: train, validation, test.
"""

if not ts_preprocessor.context_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length")
if not ts_preprocessor.prediction_length:
raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length")

data = ts_preprocessor._standardize_dataframe(dataset)

train_data, valid_data, test_data = prepare_data_splits(
data,
id_columns=ts_preprocessor.id_columns,
context_length=ts_preprocessor.context_length,
split_config=split_config,
)

# data preprocessing
ts_preprocessor.train(train_data)

# specify columns
column_specifiers = {
"id_columns": ts_preprocessor.id_columns,
"timestamp_column": ts_preprocessor.timestamp_column,
"target_columns": ts_preprocessor.target_columns,
"observable_columns": ts_preprocessor.observable_columns,
"control_columns": ts_preprocessor.control_columns,
"conditional_columns": ts_preprocessor.conditional_columns,
"static_categorical_columns": ts_preprocessor.static_categorical_columns,
}

# handle fewshot operation
if fewshot_fraction is not None:
if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)):
raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}")

train_data = select_by_fixed_fraction(
train_data,
id_columns=ts_preprocessor.id_columns,
fraction=fewshot_fraction,
location=fewshot_location,
minimum_size=ts_preprocessor.context_length,
)

if return_dataframe:
return tuple(train_valid_test)
params = column_specifiers
params["context_length"] = ts_preprocessor.context_length
params["prediction_length"] = ts_preprocessor.prediction_length

return tuple([ForecastDFDataset(self.preprocess(d), **params) for d in train_valid_test])
# get torch datasets
train_valid_test = [train_data, valid_data, test_data]

# test_dataset = ForecastDFDataset(
# self.preprocess(test_data),
# **params,
# )
# train_dataset = ForecastDFDataset(self.preprocess(train_data), **params)
# valid_dataset = ForecastDFDataset(
# self.preprocess(valid_data),
# **params,
# )
# return train_dataset, valid_dataset, test_dataset
return tuple([ForecastDFDataset(ts_preprocessor.preprocess(d), **params) for d in train_valid_test])


def create_timestamps(
Expand Down

0 comments on commit 9abbe49

Please sign in to comment.