diff --git a/rust/feature-flags/src/cohort/cohort_cache_manager.rs b/rust/feature-flags/src/cohort/cohort_cache_manager.rs index d2e43f529dea6..c545321fe2542 100644 --- a/rust/feature-flags/src/cohort/cohort_cache_manager.rs +++ b/rust/feature-flags/src/cohort/cohort_cache_manager.rs @@ -1,6 +1,10 @@ use crate::api::errors::FlagError; use crate::cohort::cohort_models::Cohort; use crate::flags::flag_matching::{PostgresReader, TeamId}; +use crate::metrics::metrics_consts::{ + COHORT_CACHE_HIT_COUNTER, COHORT_CACHE_MISS_COUNTER, DB_COHORT_ERRORS_COUNTER, + DB_COHORT_READS_COUNTER, +}; use moka::future::Cache; use std::sync::Arc; use std::time::Duration; @@ -63,22 +67,56 @@ impl CohortCacheManager { /// If the cohorts are not present in the cache or have expired, it fetches them from the database, /// caches the result upon successful retrieval, and then returns it. pub async fn get_cohorts(&self, team_id: TeamId) -> Result, FlagError> { + // First check cache before acquiring lock if let Some(cached_cohorts) = self.cache.get(&team_id).await { + common_metrics::inc( + COHORT_CACHE_HIT_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); return Ok(cached_cohorts.clone()); } // Acquire the lock before fetching let _lock = self.fetch_lock.lock().await; - // Double-check the cache after acquiring the lock + // Double-check the cache after acquiring lock if let Some(cached_cohorts) = self.cache.get(&team_id).await { + common_metrics::inc( + COHORT_CACHE_HIT_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); return Ok(cached_cohorts.clone()); } - let fetched_cohorts = Cohort::list_from_pg(self.reader.clone(), team_id).await?; - self.cache.insert(team_id, fetched_cohorts.clone()).await; + // If we get here, we have a cache miss + common_metrics::inc( + COHORT_CACHE_MISS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); - Ok(fetched_cohorts) + // Attempt to fetch from DB + match Cohort::list_from_pg(self.reader.clone(), team_id).await { + Ok(fetched_cohorts) => { + common_metrics::inc( + DB_COHORT_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + self.cache.insert(team_id, fetched_cohorts.clone()).await; + Ok(fetched_cohorts) + } + Err(e) => { + common_metrics::inc( + DB_COHORT_ERRORS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + Err(e) + } + } } } diff --git a/rust/feature-flags/src/config.rs b/rust/feature-flags/src/config.rs index 1a549594dcda6..a57567429945a 100644 --- a/rust/feature-flags/src/config.rs +++ b/rust/feature-flags/src/config.rs @@ -99,8 +99,8 @@ pub struct Config { #[envconfig(from = "TEAM_IDS_TO_TRACK", default = "all")] pub team_ids_to_track: TeamIdsToTrack, - #[envconfig(from = "CACHE_MAX_ENTRIES", default = "100000")] - pub cache_max_entries: u64, + #[envconfig(from = "CACHE_MAX_COHORT_ENTRIES", default = "100000")] + pub cache_max_cohort_entries: u64, #[envconfig(from = "CACHE_TTL_SECONDS", default = "300")] pub cache_ttl_seconds: u64, @@ -120,7 +120,7 @@ impl Config { maxmind_db_path: "".to_string(), enable_metrics: false, team_ids_to_track: TeamIdsToTrack::All, - cache_max_entries: 100_000, + cache_max_cohort_entries: 100_000, cache_ttl_seconds: 300, } } diff --git a/rust/feature-flags/src/flags/flag_matching.rs b/rust/feature-flags/src/flags/flag_matching.rs index aeda22a1efeff..73666f02f74eb 100644 --- a/rust/feature-flags/src/flags/flag_matching.rs +++ b/rust/feature-flags/src/flags/flag_matching.rs @@ -5,7 +5,11 @@ use crate::cohort::cohort_cache_manager::CohortCacheManager; use crate::cohort::cohort_models::{Cohort, CohortId}; use crate::flags::flag_match_reason::FeatureFlagMatchReason; use crate::flags::flag_models::{FeatureFlag, FeatureFlagList, FlagGroupType}; -use crate::metrics::metrics_consts::{FLAG_EVALUATION_ERROR_COUNTER, FLAG_HASH_KEY_WRITES_COUNTER}; +use crate::metrics::metrics_consts::{ + DB_GROUP_PROPERTIES_READS_COUNTER, DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER, + DB_PERSON_PROPERTIES_READS_COUNTER, FLAG_EVALUATION_ERROR_COUNTER, + FLAG_HASH_KEY_WRITES_COUNTER, PROPERTY_CACHE_HITS_COUNTER, PROPERTY_CACHE_MISSES_COUNTER, +}; use crate::metrics::metrics_utils::parse_exception_for_prometheus_label; use crate::properties::property_matching::match_property; use crate::properties::property_models::{OperatorType, PropertyFilter}; @@ -107,11 +111,22 @@ impl GroupTypeMappingCache { Ok(mapping) if !mapping.is_empty() => mapping, Ok(_) => { self.failed_to_fetch_flags = true; - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return Err(FlagError::NoGroupTypeMappings); } Err(e) => { self.failed_to_fetch_flags = true; + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return Err(e); } }; @@ -135,7 +150,12 @@ impl GroupTypeMappingCache { self.group_indexes_to_types.clone_from(&result); Ok(result) } else { - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); Err(FlagError::NoGroupTypeMappings) } } @@ -164,7 +184,12 @@ impl GroupTypeMappingCache { .collect(); if mapping.is_empty() { - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); Err(FlagError::NoGroupTypeMappings) } else { Ok(mapping) @@ -328,7 +353,6 @@ impl FeatureFlagMatcher { .await { error!("Failed to set feature flag hash key overrides: {:?}", e); - // Increment the counter for failed write let reason = parse_exception_for_prometheus_label(&e); inc( FLAG_EVALUATION_ERROR_COUNTER, @@ -340,7 +364,6 @@ impl FeatureFlagMatcher { writing_hash_key_override = true; } - // TODO I'm not sure if this is the right place to increment this counter inc( FLAG_HASH_KEY_WRITES_COUNTER, &[ @@ -443,7 +466,13 @@ impl FeatureFlagMatcher { ) .await { - Ok(_) => {} + Ok(_) => { + inc( + DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + } Err(e) => { error_while_computing_flags = true; // TODO add sentry exception tracking @@ -806,7 +835,7 @@ impl FeatureFlagMatcher { } } - /// Get group properties from cache or database. + /// Get group properties from overrides, cache or database. /// /// This function attempts to retrieve group properties either from a cache or directly from the database. /// It first checks if there are any locally computable property overrides. If so, it returns those. @@ -832,9 +861,26 @@ impl FeatureFlagMatcher { /// and updates the cache accordingly. async fn get_person_id(&mut self) -> Result { match self.properties_cache.person_id { - Some(id) => Ok(id), + Some(id) => { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "person_id".to_string())], + 1, + ); + Ok(id) + } None => { + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "person_id".to_string())], + 1, + ); let id = self.get_person_id_from_db().await?; + inc( + DB_PERSON_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), self.team_id.to_string())], + 1, + ); self.properties_cache.person_id = Some(id); Ok(id) } @@ -852,7 +898,7 @@ impl FeatureFlagMatcher { .map(|(_, person_id)| person_id) } - /// Get person properties from cache or database. + /// Get person properties from overrides, cache or database. /// /// This function attempts to retrieve person properties either from a cache or directly from the database. /// It first checks if there are any locally computable property overrides. If so, it returns those. @@ -997,16 +1043,33 @@ impl FeatureFlagMatcher { .group_properties .get(&group_type_index) { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "group_properties".to_string())], + 1, + ); let mut result = HashMap::new(); result.clone_from(properties); return Ok(result); } + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "group_properties".to_string())], + 1, + ); + let reader = self.reader.clone(); let team_id = self.team_id; let db_properties = fetch_group_properties_from_db(reader, team_id, group_type_index).await?; + inc( + DB_GROUP_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + // once the properties are fetched, cache them so we don't need to fetch again in a given request self.properties_cache .group_properties @@ -1025,17 +1088,34 @@ impl FeatureFlagMatcher { ) -> Result, FlagError> { // check if the properties are already cached, if so return them if let Some(properties) = &self.properties_cache.person_properties { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "person_properties".to_string())], + 1, + ); let mut result = HashMap::new(); result.clone_from(properties); return Ok(result); } + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "person_properties".to_string())], + 1, + ); + let reader = self.reader.clone(); let distinct_id = self.distinct_id.clone(); let team_id = self.team_id; let (db_properties, person_id) = fetch_person_properties_from_db(reader, distinct_id, team_id).await?; + inc( + DB_PERSON_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + // once the properties and person ID are fetched, cache them so we don't need to fetch again in a given request self.properties_cache.person_properties = Some(db_properties.clone()); self.properties_cache.person_id = Some(person_id); @@ -1555,9 +1635,6 @@ fn locally_computable_property_overrides( property_filters: &[PropertyFilter], ) -> Option> { property_overrides.as_ref().and_then(|overrides| { - // TODO handle note from Neil: https://github.com/PostHog/posthog/pull/24589#discussion_r1735828561 - // TL;DR – we'll need to handle cohort properties at the DB level, i.e. we'll need to adjust the cohort query - // to account for if a given person is an element of the cohort X, Y, Z, etc let should_prefer_overrides = property_filters .iter() .all(|prop| overrides.contains_key(&prop.key) && prop.prop_type != "cohort"); diff --git a/rust/feature-flags/src/flags/flag_request.rs b/rust/feature-flags/src/flags/flag_request.rs index 89890505c6c4b..cab455e13bbbc 100644 --- a/rust/feature-flags/src/flags/flag_request.rs +++ b/rust/feature-flags/src/flags/flag_request.rs @@ -10,7 +10,11 @@ use crate::{ api::errors::FlagError, client::{database::Client as DatabaseClient, redis::Client as RedisClient}, flags::flag_models::FeatureFlagList, - metrics::metrics_consts::FLAG_CACHE_HIT_COUNTER, + metrics::metrics_consts::{ + DB_FLAG_READS_COUNTER, DB_TEAM_READS_COUNTER, FLAG_CACHE_ERRORS_COUNTER, + FLAG_CACHE_HIT_COUNTER, TEAM_CACHE_ERRORS_COUNTER, TEAM_CACHE_HIT_COUNTER, + TOKEN_VALIDATION_ERRORS_COUNTER, + }, team::team_models::Team, }; @@ -83,23 +87,50 @@ impl FlagRequest { _ => return Err(FlagError::NoTokenError), }; - match Team::from_redis(redis_client.clone(), token.clone()).await { - Ok(_) => Ok(token), + let (result, cache_hit) = match Team::from_redis(redis_client.clone(), token.clone()).await + { + Ok(_) => (Ok(token.clone()), true), Err(_) => { - // Fallback: Check PostgreSQL if not found in Redis match Team::from_pg(pg_client, token.clone()).await { Ok(team) => { + inc( + DB_TEAM_READS_COUNTER, + &[("token".to_string(), token.clone())], + 1, + ); // Token found in PostgreSQL, update Redis cache so that we can verify it from Redis next time if let Err(e) = Team::update_redis_cache(redis_client, &team).await { tracing::warn!("Failed to update Redis cache: {}", e); + inc( + TEAM_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); } - Ok(token) + (Ok(token.clone()), false) + } + Err(_) => { + inc( + TOKEN_VALIDATION_ERRORS_COUNTER, + &[("reason".to_string(), "token_not_found".to_string())], + 1, + ); + (Err(FlagError::TokenValidationError), false) } - // TODO do we need a custom error here to track the fallback - Err(_) => Err(FlagError::TokenValidationError), } } - } + }; + + inc( + TEAM_CACHE_HIT_COUNTER, + &[ + ("token".to_string(), token.clone()), + ("cache_hit".to_string(), cache_hit.to_string()), + ], + 1, + ); + + result } /// Fetches the team from the cache or the database. @@ -111,22 +142,42 @@ impl FlagRequest { redis_client: Arc, pg_client: Arc, ) -> Result { - match Team::from_redis(redis_client.clone(), token.to_owned()).await { - Ok(team) => Ok(team), - Err(_) => match Team::from_pg(pg_client, token.to_owned()).await { - Ok(team) => { - // If we have the team in postgres, but not redis, update redis so we're faster next time - // TODO: we have some counters in django for tracking these cache misses - // we should probably do the same here - if let Err(e) = Team::update_redis_cache(redis_client, &team).await { - tracing::warn!("Failed to update Redis cache: {}", e); + let (team_result, cache_hit) = + match Team::from_redis(redis_client.clone(), token.to_owned()).await { + Ok(team) => (Ok(team), true), + Err(_) => match Team::from_pg(pg_client, token.to_owned()).await { + Ok(team) => { + inc( + DB_TEAM_READS_COUNTER, + &[("token".to_string(), token.to_string())], + 1, + ); + // If we have the team in postgres, but not redis, update redis so we're faster next time + if let Err(e) = Team::update_redis_cache(redis_client, &team).await { + tracing::warn!("Failed to update Redis cache: {}", e); + inc( + TEAM_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); + } + (Ok(team), false) } - Ok(team) - } - // TODO what kind of error should we return here? - Err(e) => Err(e), - }, - } + // TODO what kind of error should we return here? + Err(e) => (Err(e), false), + }, + }; + + inc( + TEAM_CACHE_HIT_COUNTER, + &[ + ("token".to_string(), token.to_string()), + ("cache_hit".to_string(), cache_hit.to_string()), + ], + 1, + ); + + team_result } /// Extracts the distinct_id from the request. @@ -164,31 +215,37 @@ impl FlagRequest { redis_client: &Arc, pg_client: &Arc, ) -> Result { - let mut cache_hit = false; - let flags = match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { - Ok(flags) => { - cache_hit = true; - Ok(flags) - } - Err(_) => match FeatureFlagList::from_pg(pg_client.clone(), team_id).await { - Ok(flags) => { - if let Err(e) = FeatureFlagList::update_flags_in_redis( - redis_client.clone(), - team_id, - &flags, - ) - .await - { - tracing::warn!("Failed to update Redis cache: {}", e); - // TODO add new metric category for this + let (flags_result, cache_hit) = + match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { + Ok(flags) => (Ok(flags), true), + Err(_) => match FeatureFlagList::from_pg(pg_client.clone(), team_id).await { + Ok(flags) => { + inc( + DB_FLAG_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + if let Err(e) = FeatureFlagList::update_flags_in_redis( + redis_client.clone(), + team_id, + &flags, + ) + .await + { + tracing::warn!("Failed to update Redis cache: {}", e); + inc( + FLAG_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); + } + (Ok(flags), false) } - Ok(flags) - } - // TODO what kind of error should we return here? This should be postgres - // I guess it can be whatever the FlagError is - Err(e) => Err(e), - }, - }; + // TODO what kind of error should we return here? This should be postgres + // I guess it can be whatever the FlagError is + Err(e) => (Err(e), false), + }, + }; inc( FLAG_CACHE_HIT_COUNTER, @@ -199,7 +256,7 @@ impl FlagRequest { 1, ); - flags + flags_result } } diff --git a/rust/feature-flags/src/metrics/metrics_consts.rs b/rust/feature-flags/src/metrics/metrics_consts.rs index 5ece796159739..477b88175eb6b 100644 --- a/rust/feature-flags/src/metrics/metrics_consts.rs +++ b/rust/feature-flags/src/metrics/metrics_consts.rs @@ -1,5 +1,26 @@ pub const FLAG_EVALUATION_ERROR_COUNTER: &str = "flag_evaluation_error_total"; pub const FLAG_CACHE_HIT_COUNTER: &str = "flag_cache_hit_total"; +pub const FLAG_CACHE_ERRORS_COUNTER: &str = "flag_cache_errors_total"; pub const FLAG_HASH_KEY_WRITES_COUNTER: &str = "flag_hash_key_writes_total"; -// TODO add metrics for failing to update redis? Does that really happen? -// maybe worth adding for rollout, since writing to redis is a critical path thing +pub const TEAM_CACHE_HIT_COUNTER: &str = "team_cache_hit_total"; +pub const TEAM_CACHE_ERRORS_COUNTER: &str = "team_cache_errors_total"; +pub const DB_TEAM_READS_COUNTER: &str = "db_team_reads_total"; +pub const TOKEN_VALIDATION_ERRORS_COUNTER: &str = "token_validation_errors_total"; +pub const DB_FLAG_READS_COUNTER: &str = "db_flag_reads_total"; +pub const DB_FLAG_ERRORS_COUNTER: &str = "db_flag_errors_total"; +pub const DB_COHORT_READS_COUNTER: &str = "db_cohort_reads_total"; +pub const DB_COHORT_WRITES_COUNTER: &str = "db_cohort_writes_total"; +pub const DB_COHORT_ERRORS_COUNTER: &str = "db_cohort_errors_total"; +pub const COHORT_CACHE_HIT_COUNTER: &str = "cohort_cache_hit_total"; +pub const COHORT_CACHE_MISS_COUNTER: &str = "cohort_cache_miss_total"; +pub const COHORT_CACHE_ERRORS_COUNTER: &str = "cohort_cache_errors_total"; +pub const GROUP_TYPE_CACHE_HIT_COUNTER: &str = "group_type_cache_hit_total"; +pub const GROUP_TYPE_CACHE_MISS_COUNTER: &str = "group_type_cache_miss_total"; +pub const GROUP_TYPE_CACHE_ERRORS_COUNTER: &str = "group_type_cache_errors_total"; +pub const FAILED_TO_FETCH_GROUP_COUNTER: &str = "failed_to_fetch_group_total"; +pub const PROPERTY_CACHE_HITS_COUNTER: &str = "property_cache_hits_total"; +pub const PROPERTY_CACHE_MISSES_COUNTER: &str = "property_cache_misses_total"; +pub const DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER: &str = + "db_person_and_group_properties_reads_total"; +pub const DB_PERSON_PROPERTIES_READS_COUNTER: &str = "db_person_properties_reads_total"; +pub const DB_GROUP_PROPERTIES_READS_COUNTER: &str = "db_group_properties_reads_total"; diff --git a/rust/feature-flags/src/server.rs b/rust/feature-flags/src/server.rs index e4943572afa6d..10f64960cd4f9 100644 --- a/rust/feature-flags/src/server.rs +++ b/rust/feature-flags/src/server.rs @@ -56,7 +56,7 @@ where let cohort_cache = Arc::new(CohortCacheManager::new( reader.clone(), - Some(config.cache_max_entries), + Some(config.cache_max_cohort_entries), Some(config.cache_ttl_seconds), ));