Skip to content

Commit

Permalink
adjust create_timestamps, extend_time_series from service_abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
wgifford committed Nov 11, 2024
1 parent 96bd0fa commit 1a67220
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 28 deletions.
1 change: 1 addition & 0 deletions tsfm_public/toolkit/time_series_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li
time_series = extend_time_series(
time_series=time_series,
timestamp_column=timestamp_column,
freq=self.feature_extractor.freq if self.feature_extractor else None,
grouping_columns=id_columns,
periods=prediction_length,
)
Expand Down
105 changes: 77 additions & 28 deletions tsfm_public/toolkit/time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,17 +932,19 @@ def get_datasets(


def create_timestamps(
last_timestamp: Union[datetime.datetime, pd.Timestamp],
freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None,
last_timestamp: Union[datetime.datetime, pd.Timestamp, np.datetime64, int, float, np.integer, np.floating],
freq: Optional[
Union[int, float, np.integer, np.floating, datetime.timedelta, pd.Timedelta, np.timedelta64, str]
] = None,
time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None,
periods: int = 1,
) -> List[pd.Timestamp]:
"""Simple utility to create a list of timestamps based on start, delta and number of periods
Args:
last_timestamp (Union[datetime.datetime, pd.Timestamp]): The last observed timestamp, new timestamps will be created
last_timestamp (Union[datetime.datetime, pd.Timestamp, int, float, np.integer, np.floating]): The last observed timestamp, new timestamps will be created
after this timestamp.
freq (Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]], optional): The frequency at which timestamps
freq (Optional[Union[int, float, np.integer, np.floating, datetime.timedelta, pd.Timedelta, np.timedelta, str]], optional): The frequency at which timestamps
should be generated. Defaults to None.
time_sequence (Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]], optional): A time sequence
from which the frequency can be inferred. Defaults to None.
Expand All @@ -966,29 +968,61 @@ def create_timestamps(
"Could not extend time series because frequency was not provided and could not be estimated from the available data."
)

# more complex logic is required to support all edge cases
if isinstance(freq, (pd.Timedelta, datetime.timedelta, str)):
def convert_numeric(val: Any):
"""Helper function to convert strings to numerical values"""
if isinstance(val, str):
try:
return int(val)
except (ValueError, TypeError):
try:
return float(val)
except (ValueError, TypeError):
return val
return val

if isinstance(freq, str):
freq = convert_numeric(freq)

# freq is str, which is not convertible to numeric
if isinstance(freq, str):
# try to convert to pd.timedelta
try:
# try date range directly
freq = pd._libs.tslibs.timedeltas.Timedelta(freq)
except ValueError:
pass

return pd.date_range(
last_timestamp,
freq=freq,
periods=periods + 1,
).tolist()[1:]
# frequency is timedelta like object
elif isinstance(freq, (pd.Timedelta, np.timedelta64, datetime.timedelta)):
if isinstance(last_timestamp, (np.datetime64, pd.Timestamp, datetime.datetime)):
return pd.date_range(
last_timestamp,
freq=freq,
freq=pd.Timedelta(freq) if isinstance(freq, np.timedelta64) else freq,
periods=periods + 1,
).tolist()[1:]
except ValueError as e:
# if it fails, we can try to compute a timedelta from the provided string
if isinstance(freq, str):
freq = pd._libs.tslibs.timedeltas.Timedelta(freq)
return pd.date_range(
last_timestamp,
freq=freq,
periods=periods + 1,
).tolist()[1:]
else:
raise e
# [last_timestamp + i * freq for i in range(1, periods + 1)]
# last_timestamp is not date type, but freq is timedelta type -- ambiguous
else:
raise ValueError(
f"Ambiguous last_timestamp {last_timestamp} (type: {type(last_timestamp)}) with freq {freq}."
)
# frequency is numeric
elif isinstance(freq, (int, float, np.integer, np.floating)):
if isinstance(last_timestamp, (int, float, np.floating, np.integer)):
return [last_timestamp + i * freq for i in range(1, periods + 1)]
# last_timestamp is a date type, but freq is numeric -- ambiguous
else:
raise ValueError(
f"Ambiguous frequency {freq} with last_timestamp {last_timestamp} (type: {type(last_timestamp)})."
)
else:
# numerical timestamp column
return [last_timestamp + i * freq for i in range(1, periods + 1)]
raise ValueError(
f"Could not create timestamps, given the following inputs: last_timestamp={last_timestamp}, freq={freq}, periods={periods}."
)


def estimate_frequency(timestamp_data: Union[pd.Series, np.ndarray]):
Expand All @@ -1004,29 +1038,39 @@ def estimate_frequency(timestamp_data: Union[pd.Series, np.ndarray]):

def extend_time_series(
time_series: pd.DataFrame,
# last_known_timestamp,
timestamp_column: str,
grouping_columns: List[str],
freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta]] = None,
periods: int = 1,
# delta: datetime.timedelta = datetime.timedelta(days=1),
periods: int = None,
total_periods: Optional[int] = None,
):
"""Extends the provided time series with empty data for the number of periods specified. For each time series, based
on groups defined by grouping columns, adds emptry records following the last timestamp. The empty records contain
only timestamps and grouping indicators, remaining fields will be null.
One of periods or total_periods must be specified.
Args:
time_series (pd.DataFrame): _description_
start_timestamp (_type_): _description_
column_name (str): _description_
grouping_columns (List[str]): _description_
freq:
periods (int, optional): _description_. Defaults to 1.
delta (datetime.timedelta, optional): _description_. Defaults to datetime.timedelta(days=1).
total_periods (int, optional): total length of the series after extending. Defaults to None.
"""

def augment_one_series(group: Union[pd.Series, pd.DataFrame]):
def augment_one_series(
group: Union[pd.Series, pd.DataFrame], periods: Optional[int] = None, total_periods: Optional[int] = None
):
last_timestamp = group[timestamp_column].iloc[-1]

if periods is None:
periods = total_periods - len(group)

if periods < 1:
return group

new_data = pd.DataFrame(
{
timestamp_column: create_timestamps(
Expand All @@ -1044,10 +1088,15 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]):
)
return df.reset_index(drop=True)

if (periods is None and total_periods is None) or (periods is not None and total_periods is not None):
raise ValueError("Exactly one of `periods` or `total_periods` must be specified")

if grouping_columns == []:
new_time_series = augment_one_series(time_series)
new_time_series = augment_one_series(time_series, periods=periods, total_periods=total_periods)
else:
new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False)
new_time_series = time_series.groupby(grouping_columns).apply(
augment_one_series, include_groups=False, periods=periods, total_periods=total_periods
)
idx_names = list(new_time_series.index.names)
idx_names[-1] = "__delete"
new_time_series = new_time_series.reset_index(names=idx_names)
Expand Down

0 comments on commit 1a67220

Please sign in to comment.