Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Add redis read timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Ellie Huxtable committed Oct 19, 2023
1 parent 33fb9f0 commit 4ae29b9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
22 changes: 12 additions & 10 deletions capture/src/billing_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use crate::redis::RedisClient;
/// We have an async celery worker that regularly checks on accounts + assesses if they are beyond
/// a billing limit. If this is the case, a key is set in redis.
///
/// On the capture side, we should check if a request should be limited. If so, we can respond with
/// a 429.
///
/// Requirements
///
/// 1. Updates from the celery worker should be reflected in capture within a short period of time
Expand Down Expand Up @@ -90,7 +87,6 @@ impl BillingLimiter {
) -> anyhow::Result<Vec<String>> {
let now = time::OffsetDateTime::now_utc().unix_timestamp();

// todo: timeout on external calls
client
.zrangebyscore(
format!("{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()),
Expand Down Expand Up @@ -118,23 +114,29 @@ impl BillingLimiter {
//
// This update will block readers! Keep it fast.
if since_update > self.interval {
let span = tracing::debug_span!("updating billing cache from redis");
let _span = span.enter();

// a few requests might end up in here concurrently, but I don't think a few extra will
// be a big problem. If it is, we can rework the concurrency a bit.
// On prod atm we call this around 15 times per second at peak times, and it usually
// completes in <1ms.

let set = Self::fetch_limited(&self.redis, resource).await;

// Update regardless of success here. It does mean we will keep trying to hit redis on
// our interval, but that's probably OK for now.
{
let mut updated = self.updated.write().await;
*updated = now;
}
tracing::debug!("fetched set from redis, caching");

if let Ok(set) = set {
let set = HashSet::from_iter(set.iter().cloned());

let mut limited = self.limited.write().await;
*limited = set;

tracing::debug!("updated cache from redis");

limited.contains(key)
} else {
tracing::error!("failed to fetch from redis in time, failing open");
// If we fail to fetch the set, something really wrong is happening. To avoid
// dropping events that we don't mean to drop, fail open and accept data. Better
// than angry customers :)
Expand Down
11 changes: 9 additions & 2 deletions capture/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use redis::{cluster::ClusterClient, AsyncCommands};
use tokio::time::timeout;

// average for all commands is <10ms, check grafana
const REDIS_TIMEOUT_MILLISECS: u64 = 10;

/// A simple redis wrapper
/// I'm currently just exposing the commands we use, for ease of implementation
Expand Down Expand Up @@ -32,9 +38,10 @@ impl RedisClient for RedisClusterClient {
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>> {
let mut conn = self.client.get_async_connection().await?;

let results = conn.zrangebyscore(k, min, max).await?;
let results = conn.zrangebyscore(k, min, max);
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

Ok(results)
Ok(fut?)
}
}

Expand Down

0 comments on commit 4ae29b9

Please sign in to comment.