Skip to content

Commit

Permalink
Add BetaGeoModel_extract_predictive_variables. Add TestBetaGeoModelWi…
Browse files Browse the repository at this point in the history
…thCovariates.test_extract_predictive_covariates
  • Loading branch information
PabloRoque committed Jan 20, 2025
1 parent e74745c commit ffced16
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 4 deletions.
51 changes: 48 additions & 3 deletions pymc_marketing/clv/models/beta_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,13 @@ def build_model(self) -> None: # type: ignore[override]
"a",
a_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient)),
dims="customer_id",
)
b = pm.Deterministic(
"b",
b_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient)),
dims="customer_id",
)
else:
a = self.model_config["a_prior"].create_variable("a")
Expand Down Expand Up @@ -295,11 +297,13 @@ def build_model(self) -> None: # type: ignore[override]
"a",
a_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient)),
dims="customer_id",
)
b = pm.Deterministic(
"b",
b_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient)),
dims="customer_id",
)

else:
Expand Down Expand Up @@ -360,9 +364,50 @@ def _extract_predictive_variables(
must_be_unique=["customer_id"],
)

a = self.fit_result["a"]
b = self.fit_result["b"]
alpha = self.fit_result["alpha"]
customer_id = data["customer_id"]
model_coords = self.model.coords
if self.purchase_covariate_cols:
purchase_xarray = xarray.DataArray(
data[self.purchase_covariate_cols],
dims=["customer_id", "purchase_covariate"],
coords=[customer_id, list(model_coords["purchase_covariate"])],
)
alpha_scale = self.fit_result["alpha_scale"]
purchase_coefficient = self.fit_result["purchase_coefficient"]
alpha = alpha_scale * np.exp(
-xarray.dot(
purchase_coefficient, purchase_xarray, dims="purchase_covariate"
)
)
alpha.name = "alpha"
else:
alpha = self.fit_result["alpha"]

if self.dropout_covariate_cols:
dropout_xarray = xarray.DataArray(
data[self.dropout_covariate_cols],
dims=["customer_id", "dropout_covariate"],
coords=[customer_id, list(model_coords["dropout_covariate"])],
)
a_scale = self.fit_result["a_scale"]
dropout_coefficient = self.fit_result["dropout_coefficient"]
a = a_scale * np.exp(
-xarray.dot(
dropout_coefficient, dropout_xarray, dim="dropout_covariate"
)
)
a.name = "a"
b_scale = self.fit_result["b_scale"]
b = b_scale * np.exp(
-xarray.dot(
dropout_coefficient, dropout_xarray, dim="dropout_covariate"
)
)
b.name = "b"
else:
a = self.fit_result["a"]
b = self.fit_result["b"]

r = self.fit_result["r"]

customer_vars = to_xarray(
Expand Down
138 changes: 137 additions & 1 deletion tests/clv/models/test_beta_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pymc_marketing.clv.distributions import BetaGeoNBD
from pymc_marketing.clv.models.beta_geo import BetaGeoModel
from pymc_marketing.prior import Prior
from tests.conftest import create_mock_fit, mock_sample
from tests.conftest import create_mock_fit, mock_sample, set_model_fit


class TestBetaGeoModel:
Expand Down Expand Up @@ -675,3 +675,139 @@ def test_save_load(self):
assert self.model.sampler_config == model2.sampler_config
assert self.model.idata == model2.idata
os.remove("test_model")


class TestBetaGeoModelWithCovariates:
@classmethod
def setup_class(cls):
# Set random seed
cls.rng = rng = np.random.default_rng(34)

# parameters
cls.true_params = dict(
a_scale=0.793,
b_scale=2.426,
alpha_scale=4.414,
r=0.243,
purchase_coefficient=np.array([1.0, -2.0]),
dropout_coefficient=np.array([3.0]),
)

# Use Quickstart dataset (the CDNOW_sample research data) for testing
cls.data = data = pd.read_csv("data/clv_quickstart.csv")
data["customer_id"] = data.index

# Create two purchase covariates and one dropout covariate
# We standardize so that the coefficient * covariates have similar variance
N = data.shape[0]
data["purchase_cov1"] = rng.normal(size=N) / 2
data["purchase_cov2"] = rng.normal(size=N) / 4
data["dropout_cov"] = rng.normal(size=N) / 6

purchase_covariate_cols = ["purchase_cov1", "purchase_cov2"]
dropout_covariate_cols = ["dropout_cov"]
non_nested_priors = dict(
a_prior=Prior("Uniform", lower=0, upper=1),
b_prior=Prior("Uniform", lower=0, upper=1),
)
covariate_config = dict(
purchase_covariate_cols=purchase_covariate_cols,
dropout_covariate_cols=dropout_covariate_cols,
)
cls.model_with_covariates = BetaGeoModel(
cls.data,
model_config={**non_nested_priors, **covariate_config},
)

# Mock an idata object for tests requiring a fitted model
chains = 2
draws = 200
n_purchase_covariates = len(purchase_covariate_cols)
n_dropout_covariates = len(dropout_covariate_cols)
mock_fit_dict = {
"r": rng.normal(cls.true_params["r"], 1e-3, size=(chains, draws)),
"alpha_scale": rng.normal(
cls.true_params["alpha_scale"], 1e-3, size=(chains, draws)
),
"a_scale": rng.normal(
cls.true_params["a_scale"], 1e-3, size=(chains, draws)
),
"b_scale": rng.normal(
cls.true_params["b_scale"], 1e-3, size=(chains, draws)
),
"purchase_coefficient": rng.normal(
cls.true_params["purchase_coefficient"],
1e-3,
size=(chains, draws, n_purchase_covariates),
),
"dropout_coefficient": rng.normal(
cls.true_params["dropout_coefficient"],
1e-3,
size=(chains, draws, n_dropout_covariates),
),
}
mock_fit_with_covariates = az.from_dict(
mock_fit_dict,
dims={
"purchase_coefficient": ["purchase_covariate"],
"dropout_coefficient": ["dropout_covariate"],
},
coords={
"purchase_covariate": purchase_covariate_cols,
"dropout_covariate": dropout_covariate_cols,
},
)
set_model_fit(cls.model_with_covariates, mock_fit_with_covariates)

# Create a reference model without covariates
cls.model_without_covariates = BetaGeoModel(data)
mock_fit_without_covariates = az.from_dict(
{
"r": mock_fit_dict["r"],
"alpha": mock_fit_dict["alpha_scale"],
"a_scale": mock_fit_dict["a_scale"],
"b_scale": mock_fit_dict["b_scale"],
}
)
set_model_fit(cls.model_without_covariates, mock_fit_without_covariates)

def test_extract_predictive_covariates(self):
"""Test that alpha/beta computed from the model and helper match."""
model = self.model_with_covariates
with model.model:
trace = pm.sample_posterior_predictive(
model.idata, var_names=["alpha", "a", "b"]
).posterior_predictive
alpha_model = trace["alpha"]
a_model = trace["a"]
b_model = trace["b"]

variables = model._extract_predictive_variables(data=self.data)
alpha_helper = variables["alpha"]
a_helper = variables["a"]
b_helper = variables["b"]

np.testing.assert_allclose(alpha_model, alpha_helper)
np.testing.assert_allclose(a_model, a_helper)
np.testing.assert_allclose(b_model, b_helper)

new_data = self.data.assign(
purchase_cov1=1.0,
dropout_cov=1.0,
customer_id=self.data["customer_id"] + 1,
)
different_vars = model._extract_predictive_variables(data=new_data)

different_alpha = different_vars["alpha"]
assert np.all(
different_alpha.customer_id.values == alpha_model.customer_id.values + 1
)
assert not np.allclose(alpha_model, different_alpha)

different_a = different_vars["a"]
assert np.all(different_a.customer_id.values == a_model.customer_id.values + 1)
assert not np.allclose(a_model, different_a)

different_b = different_vars["b"]
assert np.all(different_b.customer_id.values == b_model.customer_id.values + 1)
assert not np.allclose(b_model, different_b)

0 comments on commit ffced16

Please sign in to comment.