From 4ae29b980850ea915d1b48439384b42eefb2f02d Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Thu, 19 Oct 2023 13:10:23 +0100 Subject: [PATCH] Add redis read timeout --- capture/src/billing_limits.rs | 22 ++++++++++++---------- capture/src/redis.rs | 11 +++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/capture/src/billing_limits.rs b/capture/src/billing_limits.rs index 5d873e0..c8057a0 100644 --- a/capture/src/billing_limits.rs +++ b/capture/src/billing_limits.rs @@ -7,9 +7,6 @@ use crate::redis::RedisClient; /// We have an async celery worker that regularly checks on accounts + assesses if they are beyond /// a billing limit. If this is the case, a key is set in redis. /// -/// On the capture side, we should check if a request should be limited. If so, we can respond with -/// a 429. -/// /// Requirements /// /// 1. Updates from the celery worker should be reflected in capture within a short period of time @@ -90,7 +87,6 @@ impl BillingLimiter { ) -> anyhow::Result> { let now = time::OffsetDateTime::now_utc().unix_timestamp(); - // todo: timeout on external calls client .zrangebyscore( format!("{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()), @@ -118,14 +114,17 @@ impl BillingLimiter { // // This update will block readers! Keep it fast. if since_update > self.interval { + let span = tracing::debug_span!("updating billing cache from redis"); + let _span = span.enter(); + + // a few requests might end up in here concurrently, but I don't think a few extra will + // be a big problem. If it is, we can rework the concurrency a bit. + // On prod atm we call this around 15 times per second at peak times, and it usually + // completes in <1ms. + let set = Self::fetch_limited(&self.redis, resource).await; - // Update regardless of success here. It does mean we will keep trying to hit redis on - // our interval, but that's probably OK for now. - { - let mut updated = self.updated.write().await; - *updated = now; - } + tracing::debug!("fetched set from redis, caching"); if let Ok(set) = set { let set = HashSet::from_iter(set.iter().cloned()); @@ -133,8 +132,11 @@ impl BillingLimiter { let mut limited = self.limited.write().await; *limited = set; + tracing::debug!("updated cache from redis"); + limited.contains(key) } else { + tracing::error!("failed to fetch from redis in time, failing open"); // If we fail to fetch the set, something really wrong is happening. To avoid // dropping events that we don't mean to drop, fail open and accept data. Better // than angry customers :) diff --git a/capture/src/redis.rs b/capture/src/redis.rs index 1feb01a..209887b 100644 --- a/capture/src/redis.rs +++ b/capture/src/redis.rs @@ -1,6 +1,12 @@ +use std::time::Duration; + use anyhow::Result; use async_trait::async_trait; use redis::{cluster::ClusterClient, AsyncCommands}; +use tokio::time::timeout; + +// average for all commands is <10ms, check grafana +const REDIS_TIMEOUT_MILLISECS: u64 = 10; /// A simple redis wrapper /// I'm currently just exposing the commands we use, for ease of implementation @@ -32,9 +38,10 @@ impl RedisClient for RedisClusterClient { async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result> { let mut conn = self.client.get_async_connection().await?; - let results = conn.zrangebyscore(k, min, max).await?; + let results = conn.zrangebyscore(k, min, max); + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; - Ok(results) + Ok(fut?) } }