diff --git a/rtrec/models/__init__.py b/rtrec/models/__init__.py index cd4604f..91bef0c 100644 --- a/rtrec/models/__init__.py +++ b/rtrec/models/__init__.py @@ -1,5 +1,6 @@ from rtrec._lowlevel import SlimMSE as Fast_SLIM_MSE from .slim import SLIM_MSE from .bprslim import BPR_SLIM +from .fm import FactorizationMachines -__all__ = ["Fast_SLIM_MSE", "SLIM_MSE", "BPR_SLIM"] +__all__ = ["Fast_SLIM_MSE", "SLIM_MSE", "BPR_SLIM", "FactorizationMachines"] diff --git a/rtrec/models/base.py b/rtrec/models/base.py index d50f740..78faf39 100644 --- a/rtrec/models/base.py +++ b/rtrec/models/base.py @@ -250,3 +250,6 @@ def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=Fa :param bypass_prediction: Flag to bypass prediction if user has only interacted with the item (default: False) """ raise NotImplementedError("The _predict_rating method must be implemented by the subclass.") + +def inv_scaling(alpha: float, step: int, power_t: float) -> float: + return alpha / pow(step, power_t) diff --git a/rtrec/models/fm.py b/rtrec/models/fm.py index 0dae133..36963f8 100644 --- a/rtrec/models/fm.py +++ b/rtrec/models/fm.py @@ -1,24 +1,33 @@ +from collections import defaultdict import numpy as np from typing import Dict, Any, Iterable, Tuple, List -from .base import ExplicitFeedbackRecommender +from .base import ExplicitFeedbackRecommender, inv_scaling class FactorizationMachines(ExplicitFeedbackRecommender): def __init__(self, n_factors: int, **kwargs: Any) -> None: super().__init__(**kwargs) - self.ftrl = FTRL(**kwargs) + self.ftrl = AdaGrad(**kwargs) # Initialize parameters + self.alpha = kwargs.get('alpha', 0.01) # Learning rate + self.power_t = kwargs.get('power_t', 0.1) # Power for inv-scaling learning rate + self.n_factors: int = n_factors # Number of latent factors - self.feature_map: Dict[Any, int] = {} # Maps feature keys to indices + self.feature_map: Dict[str, int] = {} # Maps feature keys to indices self.w: List[float] = [0.0] # Linear weights with w[0] as global bias self.V: List[np.ndarray] = [] # Factor matrix (list of arrays) self.cumulative_loss = 0.0 self.steps = 0 - def _get_or_create_index(self, key: Any) -> int: + def get_empirical_error(self) -> float: + if self.steps == 0: + return 0.0 + return self.cumulative_loss / self.steps + + def _get_or_create_index(self, key: str) -> int: """Get the index for a feature key, creating a new one if it doesn't exist.""" index = self.feature_map.get(key, None) @@ -27,19 +36,16 @@ def _get_or_create_index(self, key: Any) -> int: self.feature_map[key] = index # Ensure the list is long enough to accommodate this new index - while len(self.w) <= index: - self.w.append(0.0) # Initialize linear weights as 0.0 + self.w.append(0.0) # Initialize linear weights as 0.0 # Initialize the factor vector for the new feature - while len(self.V) <= index: # Factor matrix index matches directly - self.V.append(np.random.normal(0, 0.1, self.n_factors)) # Random factor initialization + self.V.append(np.random.normal(0, 0.1, self.n_factors)) # Random factor initialization return index - def predict_rating(self, user: int, item: int) -> float: - """Predict the rating for a user-item pair.""" - user_idx = self._get_or_create_index(f'u{user}') - item_idx = self._get_or_create_index(f'i{item}') + def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=False) -> float: + user_idx = self._get_or_create_index(f'u{user_id}') + item_idx = self._get_or_create_index(f'i{item_id}') # Linear term (includes global bias and feature-specific biases) linear_term: float = self.w[0] # Start with the global bias @@ -55,18 +61,19 @@ def predict_rating(self, user: int, item: int) -> float: Vi = self.V[idx] if Vi is None: continue - Vif = self.Vi[f] + Vif = Vi[f] sum_vx += Vif sum_vx_sq += Vif * Vif interaction_term += 0.5 * (sum_vx ** 2 - sum_vx_sq) return linear_term + interaction_term - def _update(self, user: int, item: int) -> None: + def _update(self, user_id: int, item_id: int) -> None: """Perform a single update for the given user-item pair.""" - y = self._get_rating(user, item) # True rating - y_pred = self.predict_rating(user, item) # Predicted rating - dloss = y - y_pred # Prediction error + # Update linear terms for non-zero features + y = self._get_rating(user_id, item_id) # True rating + y_pred = self._predict_rating(user_id, item_id) # Predicted rating + dloss = y_pred - y # Prediction error self.steps += 1 self.cumulative_loss += abs(dloss) @@ -74,43 +81,74 @@ def _update(self, user: int, item: int) -> None: if abs(dloss) <= 1e-6: return - # Update linear terms for non-zero features - user_idx = self._get_or_create_index(f'u{user}') - item_idx = self._get_or_create_index(f'i{item}') - grad = dloss # Gradient is the error for this simple regression task as feature value is 1.0 - self.w[0] += self.ftrl.update(0, grad) # Update global bias - self.w[user_idx + 1] += self.ftrl.update(user_idx + 1, grad) # Update user bias - self.w[item_idx + 1] += self.ftrl.update(item_idx + 1, grad) # Update item bias + + user_idx = self._get_or_create_index(f'u{user_id}') + item_idx = self._get_or_create_index(f'i{item_id}') + + adjusted_learning_rate = inv_scaling(self.alpha, self.steps, self.power_t) + self.w[0] -= adjusted_learning_rate * grad # Update global bias + self.w[user_idx + 1] -= adjusted_learning_rate * grad # Update user bias + self.w[item_idx + 1] -= adjusted_learning_rate * grad # Update item bias # Update interaction factors (latent factors for user-item pair) for f in range(self.n_factors): sum_vx = self.V[user_idx][f] + self.V[item_idx][f] for idx in [user_idx, item_idx]: - v_i_f = self.V[idx][f] - gradient = dloss * (sum_vx - v_i_f) + v_if = self.V[idx][f] + gradient = dloss * (sum_vx - v_if) if abs(gradient) <= 1e-6: continue - self.V[idx][f] += self.ftrl.update((idx, f), gradient) - -class FTRL: - def __init__(self, alpha: float = 0.1, beta: float = 1.0, L1: float = 0.1, L2: float = 1.0, decay_rate: float = 0.9) -> None: - self.alpha = alpha # learning rate - self.beta = beta # scaling factor for regularization - self.L1 = L1 # L1 regularization - self.L2 = L2 # L2 regularization - self.decay_rate = decay_rate - - self.z: Dict[Any, float] = {} # accumulation of gradients - self.n: Dict[Any, float] = {} # accumulation of squared gradients - - def update(self, feature: Any, gradient: float) -> float: - """Update weights with FTRL update rule.""" - old_z = self.z.get(feature, 0.0) - self.z[feature] = self.z.get(feature, 0.0) + gradient - (self.L1 * np.sign(self.z.get(feature, 0.0)) + self.L2 * self.z.get(feature, 0.0)) - self.n[feature] = self.n.get(feature, 0.0) + gradient ** 2 - - if np.abs(self.z[feature]) < self.L1: - self.z[feature] = 0.0 - - return (self.z[feature] - old_z) / (self.beta + np.sqrt(self.n[feature])) + self.ftrl.update(idx, f, gradient, self.V) + + def _get_similarity(self, target_item_id: int, base_item_id: int) -> float: + """Compute the cosine similarity between two items.""" + target_item_idx = self._get_or_create_index(f'i{target_item_id}') + base_item_idx = self._get_or_create_index(f'i{base_item_id}') + + target_item_factors = self.V[target_item_idx] + base_item_factors = self.V[base_item_idx] + + dot_product = np.dot(target_item_factors, base_item_factors) + target_norm = np.linalg.norm(target_item_factors) + base_norm = np.linalg.norm(base_item_factors) + + # Avoid division by zero + return dot_product / (target_norm * base_norm + 1e-6) # cosine similarity + +class AdaGrad: + + def __init__(self, alpha: float = 0.01, lambda1 = 0.0002, lambda2 = 0.0001, epsilon: float = 1e-6, **kwargs: Any) -> None: + """ + AdaGrad Constructor + :param alpha: Learning rate + :param epsilon: Small constant to avoid division by zero + """ + self.alpha = alpha + self.epsilon = epsilon + self.lambda1 = lambda1 + self.lambda2 = lambda2 + self.G = defaultdict(lambda: defaultdict(float)) + + def update(self, feature_idx: int, factor_idx: int, grad: float, V: List[np.ndarray]) -> None: + """ + Update gradient accumulation for AdaGrad. + :param key: Tuple of two integers representing the indices (e.g., (i, j)). + :param grad: Gradient to accumulate. + :return: Updated weight. + """ + # Update the sum of squared gradients + G_val = self.G[feature_idx][factor_idx] + G_new = G_val + np.clip(grad ** 2, 1e-8, 1e8) + self.G[feature_idx][factor_idx] = G_new + + # Update the weight + current_v = V[feature_idx][factor_idx] + adaptive_lr = self.alpha / (np.sqrt(G_new) + self.epsilon) + l1_penalty = np.sign(current_v) * self.lambda1 + l2_penalty = current_v * self.lambda2 + V[feature_idx][factor_idx] -= adaptive_lr * (grad + l1_penalty + l2_penalty) + + # Ensure weight_update is finite + if not np.isfinite(V[feature_idx][factor_idx]): + raise ValueError(f"Weight update is not finite: {V[feature_idx][factor_idx]}")