Skip to content

Commit

Permalink
Fixes for large datasets (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcsqr authored Sep 7, 2023
1 parent 2ced129 commit af34f0e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 23 deletions.
45 changes: 32 additions & 13 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,34 @@ def cov2corr(cov, return_std=False):
return corr

# %% ../nbs/utils.ipynb 9
def _to_summing_matrix(S_df: pd.DataFrame):
def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):
"""Transforms the DataFrame `df` of hierarchies to a summing matrix S."""
categories = [S_df[col].unique() for col in S_df.columns]
cat_sizes = [len(cats) for cats in categories]
idx_bottom = np.argmax(cat_sizes)
cats_bottom = categories[idx_bottom]
encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)

try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T
S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)

if sparse_s:
df_constructor = pd.DataFrame.sparse.from_spmatrix
else:
df_constructor = pd.DataFrame
S = df_constructor(S, index=chain(*categories), columns=cats_bottom)

tags = dict(zip(S_df.columns, categories))
return S, tags

# %% ../nbs/utils.ipynb 10
def aggregate_before(df: pd.DataFrame,
spec: List[List[str]],
agg_fn: Callable = np.sum):
agg_fn: Callable = np.sum,
sparse_s: bool = False):
"""Utils Aggregation Function.
Aggregates bottom level series contained in the pd.DataFrame `df` according
Expand All @@ -95,6 +107,8 @@ def aggregate_before(df: pd.DataFrame,
`df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
`spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
`agg_fn`: Function used to aggregate `'y'`.<br>
`sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>
**Returns:**<br>
`Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
Expand All @@ -121,7 +135,7 @@ def aggregate_before(df: pd.DataFrame,
Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')

# Aggregations constraints S definition
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)
return Y_df, S, tags

# %% ../nbs/utils.ipynb 11
Expand Down Expand Up @@ -176,9 +190,11 @@ def _to_summing_dataframe(
tags = dict(zip(S_df.columns, categories))
tags[bottom_col] = bottom_ids

encoder = OneHotEncoder(
categories=categories, sparse=sparse_s, dtype=np.float32
)
try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T

if sparse_s:
Expand Down Expand Up @@ -255,9 +271,12 @@ def aggregate(

#------------------------------- Aggregation -------------------------------#
n_agg = S_df.shape[0] - S_df.shape[1]
Agg = S_df.values[:n_agg, :]
y_bottom = balanced_df.y.values
if sparse_s:
Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]
else:
Agg = S_df.values[:n_agg, :]

y_bottom = balanced_df.y.values
y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
Expand All @@ -274,7 +293,7 @@ def aggregate(
Y_df = Y_df.set_index('unique_id').dropna()
return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 19
# %% ../nbs/utils.ipynb 20
class HierarchicalPlot:
""" Hierarchical Plot
Expand Down Expand Up @@ -468,7 +487,7 @@ def plot_hierarchical_predictions_gap(self,
plt.grid()
plt.show()

# %% ../nbs/utils.ipynb 34
# %% ../nbs/utils.ipynb 35
# convert levels to output quantile names
def level_to_outputs(level:Iterable[int]):
""" Converts list of levels into output names matching StatsForecast and NeuralForecast methods.
Expand Down Expand Up @@ -512,7 +531,7 @@ def quantiles_to_outputs(quantiles:Iterable[float]):
output_names.append('-median')
return quantiles, output_names

# %% ../nbs/utils.ipynb 35
# %% ../nbs/utils.ipynb 36
# given input array of sample forecasts and inptut quantiles/levels,
# output a Pandas Dataframe with columns of quantile predictions
def samples_to_quantiles_df(samples:np.ndarray,
Expand Down
78 changes: 68 additions & 10 deletions nbs/utils.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,26 @@
"outputs": [],
"source": [
"#| exporti\n",
"def _to_summing_matrix(S_df: pd.DataFrame):\n",
"def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):\n",
" \"\"\"Transforms the DataFrame `df` of hierarchies to a summing matrix S.\"\"\"\n",
" categories = [S_df[col].unique() for col in S_df.columns]\n",
" cat_sizes = [len(cats) for cats in categories]\n",
" idx_bottom = np.argmax(cat_sizes)\n",
" cats_bottom = categories[idx_bottom]\n",
" encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)\n",
"\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
" S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)\n",
"\n",
" if sparse_s:\n",
" df_constructor = pd.DataFrame.sparse.from_spmatrix\n",
" else:\n",
" df_constructor = pd.DataFrame\n",
" S = df_constructor(S, index=chain(*categories), columns=cats_bottom)\n",
"\n",
" tags = dict(zip(S_df.columns, categories))\n",
" return S, tags"
]
Expand All @@ -182,7 +193,8 @@
"#| exporti\n",
"def aggregate_before(df: pd.DataFrame,\n",
" spec: List[List[str]],\n",
" agg_fn: Callable = np.sum):\n",
" agg_fn: Callable = np.sum,\n",
" sparse_s: bool = False):\n",
" \"\"\"Utils Aggregation Function.\n",
"\n",
" Aggregates bottom level series contained in the pd.DataFrame `df` according \n",
Expand All @@ -192,6 +204,8 @@
" `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>\n",
" `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>\n",
" `agg_fn`: Function used to aggregate `'y'`.<br>\n",
" `sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>\n",
"\n",
"\n",
" **Returns:**<br>\n",
" `Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\\mathbf{y}_{[a,b]}$),\n",
Expand All @@ -218,7 +232,7 @@
" Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')\n",
" \n",
" # Aggregations constraints S definition\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)\n",
" return Y_df, S, tags"
]
},
Expand Down Expand Up @@ -281,9 +295,11 @@
" tags = dict(zip(S_df.columns, categories))\n",
" tags[bottom_col] = bottom_ids\n",
"\n",
" encoder = OneHotEncoder(\n",
" categories=categories, sparse=sparse_s, dtype=np.float32\n",
" )\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
"\n",
" if sparse_s:\n",
Expand Down Expand Up @@ -367,9 +383,12 @@
"\n",
" #------------------------------- Aggregation -------------------------------#\n",
" n_agg = S_df.shape[0] - S_df.shape[1]\n",
" Agg = S_df.values[:n_agg, :]\n",
" y_bottom = balanced_df.y.values\n",
" if sparse_s:\n",
" Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]\n",
" else:\n",
" Agg = S_df.values[:n_agg, :]\n",
"\n",
" y_bottom = balanced_df.y.values\n",
" y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))\n",
" y_bottom_mask = np.isnan(y_bottom)\n",
" y_agg = Agg @ np.nan_to_num(y_bottom)\n",
Expand Down Expand Up @@ -558,6 +577,45 @@
"test_eq(Y_df.index, before_Y_df.index)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# Test equality of sparse and non-sparse aggregation\n",
"with CodeTimer('strict non-sparse aggregate'):\n",
" Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_strictly)\n",
"\n",
"with CodeTimer('strict sparse aggregate'):\n",
" Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_strictly)\n",
"\n",
"test_close(Y_df.y.values, Y_df_sparse.y.values)\n",
"test_eq(S_df.values, S_df_sparse.values)\n",
"\n",
"test_eq(S_df.columns, S_df_sparse.columns)\n",
"test_eq(S_df.index, S_df_sparse.index)\n",
"\n",
"test_eq(Y_df.columns, Y_df_sparse.columns)\n",
"test_eq(Y_df.index, Y_df_sparse.index)\n",
"\n",
"with CodeTimer('grouped non-sparse aggregate'):\n",
" Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_grouped)\n",
"\n",
"with CodeTimer('grouped sparse aggregate'):\n",
" Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_grouped)\n",
"\n",
"test_close(Y_df.y.values, Y_df_sparse.y.values)\n",
"test_eq(S_df.values, S_df_sparse.values)\n",
"\n",
"test_eq(S_df.columns, S_df_sparse.columns)\n",
"test_eq(S_df.index, S_df_sparse.index)\n",
"\n",
"test_eq(Y_df.columns, Y_df_sparse.columns)\n",
"test_eq(Y_df.index, Y_df_sparse.index)"
]
},
{
"cell_type": "markdown",
"id": "22febc26-1901-4bef-a181-09ae2f52453b",
Expand Down

0 comments on commit af34f0e

Please sign in to comment.