From b04ca821deb06c5ba730f8a9f26110b7038397d0 Mon Sep 17 00:00:00 2001 From: zakstucke <44890343+zakstucke@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:51:04 +0300 Subject: [PATCH] Redis PubSub, RedisConn methods no longer require mutable, custom backdoor to underlying redis commands in a batch (#57) * No more mutable requirement for redis conns, much more ergonomic * Custom fallbacks to underlying redis interface without having to leave higher level interface * Redis pubsub * Error resistant, more tests, lazy clone. --- rust/Cargo.toml | 10 +- rust/benches/bench_default.rs | 56 +++ rust/benches/bench_tester.rs | 25 -- rust/bitbazaar/log/mod.rs | 2 +- rust/bitbazaar/misc/lazy_clone.rs | 98 ++++ rust/bitbazaar/misc/mod.rs | 2 + rust/bitbazaar/misc/retry.rs | 9 + rust/bitbazaar/redis/batch.rs | 87 ++-- rust/bitbazaar/redis/conn.rs | 206 ++++----- rust/bitbazaar/redis/dlock.rs | 10 +- rust/bitbazaar/redis/mod.rs | 20 +- .../redis/pubsub/channel_listener.rs | 46 ++ rust/bitbazaar/redis/pubsub/mod.rs | 303 +++++++++++++ rust/bitbazaar/redis/pubsub/pubsub_global.rs | 421 ++++++++++++++++++ rust/bitbazaar/redis/redis_retry.rs | 25 ++ rust/bitbazaar/redis/temp_list.rs | 26 +- rust/bitbazaar/redis/wrapper.rs | 23 +- 17 files changed, 1164 insertions(+), 205 deletions(-) create mode 100644 rust/benches/bench_default.rs delete mode 100644 rust/benches/bench_tester.rs create mode 100644 rust/bitbazaar/misc/lazy_clone.rs create mode 100644 rust/bitbazaar/redis/pubsub/channel_listener.rs create mode 100644 rust/bitbazaar/redis/pubsub/mod.rs create mode 100644 rust/bitbazaar/redis/pubsub/pubsub_global.rs create mode 100644 rust/bitbazaar/redis/redis_retry.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 39835505..9959a973 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -127,7 +127,7 @@ features = ["sync"] # These are included on top of above features when not wasm: [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "1" -features = ["time", "fs", "process", "rt", "io-util"] +features = ["time", "fs", "process", "rt", "io-util", "macros"] [target.'cfg(target_arch = "wasm32")'.dependencies] tracing-subscriber-wasm = "0.1.0" @@ -144,10 +144,10 @@ rstest = "0.18" criterion = { version = "0.3", features = ["html_reports", "async_tokio"] } tokio = { version = '1', features = ["full"] } -# When adding new benches, they should be added like this with the name of the file in benches/: (obviously uncommented) -# [[bench]] -# name = "bench_tester" -# harness = false +# When adding new benches, they should be added like this with the name of the file in benches/: +[[bench]] +name = "bench_default" +harness = false [features] collector = ["dep:reqwest", "dep:tempfile", "tarball"] diff --git a/rust/benches/bench_default.rs b/rust/benches/bench_default.rs new file mode 100644 index 00000000..17b267f6 --- /dev/null +++ b/rust/benches/bench_default.rs @@ -0,0 +1,56 @@ +#![allow(unused_imports)] +use criterion::{black_box, criterion_group, criterion_main, Criterion}; + +// <--- EXAMPLE: + +fn fibonacci(n: u64) -> u64 { + let mut a = 0; + let mut b = 1; + + match n { + 0 => b, + _ => { + for _ in 0..n { + let c = a + b; + a = b; + b = c; + } + b + } + } +} + +async fn async_fibonacci(n: u64) -> u64 { + fibonacci(n) +} + +// SYNC EXAMPLE +pub fn bench_sync(c: &mut Criterion) { + c.bench_function("sync: fib 20", |b| b.iter(|| fibonacci(black_box(20)))); +} + +// ASYNC EXAMPLE +pub fn bench_async(c: &mut Criterion) { + c.bench_function("async: fib 20", |b| { + b.to_async(&get_tokio_rt()) + .iter(|| async_fibonacci(black_box(20))) + }); +} + +// CUSTOM CONFIG EXAMPLE +pub fn bench_config(c: &mut Criterion) { + let mut group = c.benchmark_group("small-sample-size"); + group.sample_size(10).significance_level(0.01); + group.bench_function("config: fib 20", |b| b.iter(|| fibonacci(black_box(20)))); + group.finish(); +} + +criterion_group!(benches, bench_sync, bench_async, bench_config); +criterion_main!(benches); + +fn get_tokio_rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} diff --git a/rust/benches/bench_tester.rs b/rust/benches/bench_tester.rs deleted file mode 100644 index 44289562..00000000 --- a/rust/benches/bench_tester.rs +++ /dev/null @@ -1,25 +0,0 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion}; - -fn fibonacci(n: u64) -> u64 { - let mut a = 0; - let mut b = 1; - - match n { - 0 => b, - _ => { - for _ in 0..n { - let c = a + b; - a = b; - b = c; - } - b - } - } -} - -pub fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("fib 20", |b| b.iter(|| fibonacci(black_box(20)))); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/rust/bitbazaar/log/mod.rs b/rust/bitbazaar/log/mod.rs index a3b9eac4..d7b79350 100644 --- a/rust/bitbazaar/log/mod.rs +++ b/rust/bitbazaar/log/mod.rs @@ -407,7 +407,7 @@ mod tests { // Sleeping after each, to try and ensure the correct debug output: log.with_tmp_global(|| { // On windows this needs to be really long to get static record ordering for testing: - let delay = if cfg!(windows) { 100 } else { 10 }; + let delay = if cfg!(windows) { 100 } else { 30 }; debug!("BEFORE"); std::thread::sleep(std::time::Duration::from_millis(delay)); diff --git a/rust/bitbazaar/misc/lazy_clone.rs b/rust/bitbazaar/misc/lazy_clone.rs new file mode 100644 index 00000000..769b5a08 --- /dev/null +++ b/rust/bitbazaar/misc/lazy_clone.rs @@ -0,0 +1,98 @@ +/// Efficient way to clone an item for each element in an iterator. +/// The final iteration will consume the original item, so no unnecessary clones are made. +pub trait IterWithCloneLazy { + /// The return type of the iterator. + type IterT; + + /// Efficient way to pass an owned clone of an item to each element in an iterator. + /// Will pass the final item by value without cloning, so no unnecessary clones are made. + fn with_clone_lazy( + self, + item: ItemT, + ) -> impl Iterator + where + Self: Sized; +} + +impl> IterWithCloneLazy for I { + type IterT = IterT; + + fn with_clone_lazy( + self, + item: ItemT, + ) -> impl Iterator + where + Self: Sized, + { + let mut iter = self.into_iter(); + LazyCloneIter { + item: Some(item), + next_in_iter: iter.next(), + iter, + } + } +} + +struct LazyCloneIter { + // Will consume when next_in_iter is None, as on last iteration. + item: Option, + iter: I, + next_in_iter: Option, +} + +impl Iterator for LazyCloneIter { + type Item = (ItemT, I::Item); + + fn next(&mut self) -> Option { + self.next_in_iter.take().map(|next| { + self.next_in_iter = self.iter.next(); + if self.next_in_iter.is_none() { + (self.item.take().unwrap(), next) + } else { + (self.item.clone().unwrap(), next) + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{atomic::AtomicUsize, Arc}; + + use super::*; + + #[test] + fn test_lazy_clone_with_clone_lazy() { + struct Test { + tot_clones: Arc, + } + impl Clone for Test { + fn clone(&self) -> Self { + self.tot_clones + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Test { + tot_clones: self.tot_clones.clone(), + } + } + } + + // Try for 0..10 iterator length, main things to check are 0, 1 and >1. + // For all but final iteration, should clone, then pass by value. + for count in 0..10 { + let tot_clones = Arc::new(AtomicUsize::new(0)); + let test = Test { + tot_clones: tot_clones.clone(), + }; + for (t, index) in (0..count).with_clone_lazy(test) { + assert_eq!( + t.tot_clones.load(std::sync::atomic::Ordering::Relaxed), + if index < count - 1 { index + 1 } else { index } + ); + } + assert_eq!( + tot_clones.load(std::sync::atomic::Ordering::Relaxed), + count.max(1) - 1 + ); + } + } +} diff --git a/rust/bitbazaar/misc/mod.rs b/rust/bitbazaar/misc/mod.rs index 70dddcda..ae7266ea 100644 --- a/rust/bitbazaar/misc/mod.rs +++ b/rust/bitbazaar/misc/mod.rs @@ -8,6 +8,7 @@ mod binary_search; mod flexi_logger; mod global_lock; mod is_tcp_port_listening; +mod lazy_clone; mod looper; mod main_wrapper; mod periodic_updater; @@ -26,6 +27,7 @@ pub use binary_search::*; pub use flexi_logger::*; pub use global_lock::*; pub use is_tcp_port_listening::is_tcp_port_listening; +pub use lazy_clone::*; pub use looper::*; pub use main_wrapper::*; pub use periodic_updater::*; diff --git a/rust/bitbazaar/misc/retry.rs b/rust/bitbazaar/misc/retry.rs index 510ae4e8..01cb601f 100644 --- a/rust/bitbazaar/misc/retry.rs +++ b/rust/bitbazaar/misc/retry.rs @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> { self } + /// Never stop retrying. + pub fn until_forever(mut self) -> Self { + self.until = RetryUntil::Forever; + self + } + /// Stop retrying after the total delay reaches the given duration. pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self { self.until = RetryUntil::TotalDelay(max_total_delay); @@ -193,6 +199,8 @@ pub enum RetryUntil { TotalDelay(Duration), /// UNSTABLE: ONLY PUBLIC FOR MACRO USE. Delay(Duration), + /// UNSTABLE: ONLY PUBLIC FOR MACRO USE. + Forever, } impl RetryUntil { @@ -223,6 +231,7 @@ impl RetryUntil { return true; } } + RetryUntil::Forever => return false, } false } diff --git a/rust/bitbazaar/redis/batch.rs b/rust/bitbazaar/redis/batch.rs index a9be6b71..69975817 100644 --- a/rust/bitbazaar/redis/batch.rs +++ b/rust/bitbazaar/redis/batch.rs @@ -4,11 +4,12 @@ use std::{collections::HashSet, marker::PhantomData, sync::LazyLock}; use deadpool_redis::redis::{FromRedisValue, Pipeline, ToRedisArgs}; -use crate::{log::record_exception, misc::Retry, retry_flexi}; +use crate::{log::record_exception, retry_flexi}; use super::{ conn::RedisConnLike, fuzzy::{RedisFuzzy, RedisFuzzyUnwrap}, + redis_retry::redis_retry_config, RedisScript, RedisScriptInvoker, }; @@ -27,14 +28,14 @@ static MSET_WITH_EXPIRY_SCRIPT: LazyLock = /// Note each command may be run twice, if scripts needed caching to redis. pub struct RedisBatch<'a, 'c, ConnType: RedisConnLike, ReturnType> { _returns: PhantomData, - redis_conn: &'a mut ConnType, + redis_conn: &'a ConnType, pipe: Pipeline, /// Need to keep a reference to used scripts, these will all be reloaded to redis errors because one wasn't cached on the server. used_scripts: HashSet<&'c RedisScript>, } impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, ReturnType> { - pub(crate) fn new(redis_conn: &'a mut ConnType) -> Self { + pub(crate) fn new(redis_conn: &'a ConnType) -> Self { Self { _returns: PhantomData, redis_conn, @@ -43,32 +44,12 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R } } - async fn inner_fire(&mut self) -> Option { - if let Some(conn) = self.redis_conn.get_inner_conn().await { + async fn inner_fire(&self) -> Option { + if let Some(mut conn) = self.redis_conn.get_inner_conn().await { // Handling retryable errors internally: - let retry = Retry::::fibonacci(chrono::Duration::milliseconds(10)) - // Will cumulatively delay for up to 6 seconds. - // SHOULDN'T BE LONGER, considering outer code may then handle the redis failure, - // in e.g. web request, any longer would then be harmful. - .until_total_delay(chrono::Duration::seconds(5)) - .on_retry(move |info| match info.last_error.kind() { - // These should all be automatically retried: - redis::ErrorKind::BusyLoadingError - | redis::ErrorKind::TryAgain - | redis::ErrorKind::MasterDown => { - tracing::warn!( - "Redis command failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", - info.delay_till_next_attempt, - info.last_attempt_no, - info.last_error - ); - None - }, - // Everything else should just exit straight away, no point retrying internally. - _ => Some(info.last_error), - }); - - match retry_flexi!(retry.clone(), { self.pipe.query_async(conn).await }) { + match retry_flexi!(redis_retry_config(), { + self.pipe.query_async(&mut conn).await + }) { Ok(v) => Some(v), Err(e) => { // Load the scripts into Redis if the any of the scripts weren't there before. @@ -89,11 +70,11 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R for script in &self.used_scripts { load_pipe.add_command(script.load_cmd()); } - match retry_flexi!(retry, { - load_pipe.query_async::(conn).await + match retry_flexi!(redis_retry_config(), { + load_pipe.query_async::(&mut conn).await }) { // Now loaded the scripts, rerun the batch: - Ok(_) => match self.pipe.query_async(conn).await { + Ok(_) => match self.pipe.query_async(&mut conn).await { Ok(result) => Some(result), Err(err) => { record_exception("Redis batch failed. Pipe returned NoScriptError, but we've just loaded all scripts.", format!("{:?}", err)); @@ -132,6 +113,33 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R } } + /// Low-level backdoor. Pass in a custom redis command to run, but don't expect a return value. + /// After calling this, custom_arg() can be used to add arguments. + /// + /// E.g. `batch.custom_no_return("SET").custom_arg("key").custom_arg("value").fire().await;` + pub fn custom_no_return(mut self, cmd: &str) -> Self { + self.pipe.cmd(cmd).ignore(); + self + } + + /// Low-level backdoor. Add a custom argument to the last custom command added with either `custom_no_return()` or `custom()`. + pub fn custom_arg(mut self, arg: impl ToRedisArgs) -> Self { + self.pipe.arg(arg); + self + } + + /// Publish a message to a pubsub channel. + /// Use [`RedisConnLike::pubsub_listen`] to listen for messages. + pub fn publish(mut self, namespace: &str, channel: &str, message: impl ToRedisArgs) -> Self { + self.pipe + .publish( + self.redis_conn.final_key(namespace, channel.into()), + message, + ) + .ignore(); + self + } + /// Expire an existing key with a new/updated ttl. /// /// @@ -411,7 +419,7 @@ impl<'a, 'c, ConnType: RedisConnLike, R: FromRedisValue + RedisFuzzyUnwrap> Redi { type ReturnType = R; - async fn fire(mut self) -> as RedisFuzzyUnwrap>::Output { + async fn fire(self) -> as RedisFuzzyUnwrap>::Output { self.inner_fire::<(R,)>().await.map(|(r,)| r).fuzzy_unwrap() } } @@ -421,7 +429,7 @@ macro_rules! impl_batch_fire { impl<'a, 'c, ConnType: RedisConnLike, $($tup_item: FromRedisValue + RedisFuzzyUnwrap),*> RedisBatchFire for RedisBatch<'a, 'c, ConnType, ($($tup_item,)*)> { type ReturnType = ($($tup_item,)*); - async fn fire(mut self) -> as RedisFuzzyUnwrap>::Output { + async fn fire(self) -> as RedisFuzzyUnwrap>::Output { self.inner_fire::<($($tup_item,)*)>().await.fuzzy_unwrap() } } @@ -447,6 +455,9 @@ pub trait RedisBatchReturningOps<'c> { script_invokation: RedisScriptInvoker<'c>, ) -> Self::NextType>; + /// Low-level backdoor. Pass in a custom redis command to run, specifying the return value to coerce to. + fn custom(self, cmd: &str) -> Self::NextType; + /// Check if a key exists. fn exists(self, namespace: &str, key: &str) -> Self::NextType; @@ -543,6 +554,16 @@ macro_rules! impl_batch_ops { self.script_no_decode_protection(script_invokation) } + fn custom(mut self, cmd: &str) -> Self::NextType { + self.pipe.cmd(cmd); + RedisBatch { + _returns: PhantomData, + redis_conn: self.redis_conn, + pipe: self.pipe, + used_scripts: self.used_scripts + } + } + fn exists(mut self, namespace: &str, key: &str) -> Self::NextType { self.pipe.exists(self.redis_conn.final_key(namespace, key.into())); RedisBatch { diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index bdd08657..7e8409e7 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -8,110 +8,117 @@ use std::{ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs}; -use super::batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; +use super::{ + batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}, + fuzzy::RedisFuzzy, + pubsub::{pubsub_global::RedisPubSubGlobal, RedisChannelListener}, +}; use crate::{errors::prelude::*, log::record_exception, redis::RedisScript}; -/// Wrapper around a lazy redis connection. +/// A lazy redis connection. +#[derive(Debug, Clone)] pub struct RedisConn<'a> { pub(crate) prefix: &'a str, pool: &'a deadpool_redis::Pool, - conn: Option, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: &'a Arc, + // We used to cache the [`deadpool_redis::Connection`] conn in here, + // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. + // so getting for each usage is fine, given: + // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). + // - prevents needing mutable references to the conn anymore, much nicer ergonomics. + // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. + // conn: Option, } impl<'a> RedisConn<'a> { - pub(crate) fn new(pool: &'a deadpool_redis::Pool, prefix: &'a str) -> Self { + pub(crate) fn new( + pool: &'a deadpool_redis::Pool, + prefix: &'a str, + pubsub_global: &'a Arc, + ) -> Self { Self { pool, prefix, - conn: None, - } - } -} - -// Cloning is still technically heavy for the un-owned, as the active connection can't be reused. -impl<'a> Clone for RedisConn<'a> { - fn clone(&self) -> Self { - Self { - prefix: self.prefix, - pool: self.pool, - conn: None, + pubsub_global, } } } -/// An owned variant of [`RedisConn`]. Useful when parent struct lifetimes get out of hand. -/// [`RedisConn`] is better, so keeping local in crate until a real need for it outside. -/// (requires pool arc cloning, and prefix string cloning, so slightly less efficient). +/// An owned variant of [`RedisConn`]. +/// Just requires a couple of Arc clones, so still quite lightweight. +#[derive(Debug, Clone)] pub struct RedisConnOwned { // Prefix and pool both Arced now at top level for easy cloning. - // The conn will be reset to None on each clone, so it's a very heavy object I think. pub(crate) prefix: Arc, pool: deadpool_redis::Pool, - conn: Option, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: Arc, + // We used to cache the [`deadpool_redis::Connection`] conn in here, + // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. + // so getting for each usage is fine, given: + // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). + // - prevents needing mutable references to the conn anymore, much nicer ergonomics. + // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. + // conn: Option, } -impl Clone for RedisConnOwned { - fn clone(&self) -> Self { - Self { - prefix: self.prefix.clone(), - pool: self.pool.clone(), - conn: None, - } - } -} - -macro_rules! impl_debug_for_conn { - ($conn_type:ty, $name:literal) => { - impl std::fmt::Debug for $conn_type { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct($name) - .field("prefix", &self.prefix) - .field("pool", &self.pool) - .field("conn", &self.conn.is_some()) - .finish() - } - } - }; -} - -impl_debug_for_conn!(RedisConn<'_>, "RedisConn"); -impl_debug_for_conn!(RedisConnOwned, "RedisConnOwned"); - /// Generic methods over the RedisConn and RedisConnOwned types. pub trait RedisConnLike: std::fmt::Debug + Send + Sized { - /// Get an internal connection from the pool, connections are kept in the pool for reuse. + /// Get an internal connection from the pool. + /// Despite returning an owned object, the underlying real redis connection will be reused after this user drops it. /// If redis is acting up and unavailable, this will return None. /// NOTE: this mainly is used internally, but provides a fallback to the underlying connection, if the exposed interface does not provide options that fit an external user need (which could definitely happen). - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection>; + async fn get_inner_conn(&self) -> Option; /// Get the redis configured prefix. fn prefix(&self) -> &str; + /// Get the redis pubsub global manager. + fn _pubsub_global(&self) -> &Arc; + /// Convert to the owned variant. - fn into_owned(self) -> RedisConnOwned; + fn to_owned(&self) -> RedisConnOwned; - /// Ping redis, returning true if it's up. - async fn ping(&mut self) -> bool { - if let Some(conn) = self.get_inner_conn().await { - redis::cmd("PING").query_async::(conn).await.is_ok() - } else { - false - } + /// Ping redis, returning true if it's up and responsive. + async fn ping(&self) -> bool { + self.batch() + .custom::>("PING") + .fire() + .await + .flatten() + .is_some() + } + + /// Subscribe to a channel via pubsub, receiving messages through the returned receiver. + /// The subscription will be dropped when the receiver is dropped. + /// + /// Sending can be done via normal batches using [`RedisBatch::publish`]. + /// + /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. + async fn subscribe( + &self, + namespace: &str, + channel: &str, + ) -> Option> { + self._pubsub_global() + .subscribe(self.final_key(namespace, channel.into())) + .await } // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// - // async fn dev_all_data(&mut self) -> HashMap { - // if let Some(conn) = self.get_inner_conn().await { + // async fn dev_all_data(&self) -> HashMap { + // if let Some(mut conn) = self.get_inner_conn().await { // let mut cmd = redis::cmd("SCAN"); // cmd.arg(0); // let mut data = HashMap::new(); // loop { - // let (next_cursor, keys): (i64, Vec) = cmd.query_async(conn).await.unwrap(); + // let (next_cursor, keys): (i64, Vec) = cmd.query_async(&mut conn).await.unwrap(); // for key in keys { // let val: redis::Value = - // redis::cmd("GET").arg(&key).query_async(conn).await.unwrap(); + // redis::cmd("GET").arg(&key).query_async(&mut conn).await.unwrap(); // data.insert(key, val); // } // if next_cursor == 0 { @@ -126,24 +133,15 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { // } /// Flush the whole redis cache, will delete all data. - async fn dev_flushall(&mut self, sync: bool) -> Option { - if let Some(conn) = self.get_inner_conn().await { - let mut cmd = redis::cmd("FLUSHALL"); - if sync { - cmd.arg("SYNC"); - } else { - cmd.arg("ASYNC"); - } - match cmd.query_async::(conn).await { - Ok(s) => Some(s), - Err(e) => { - record_exception("Failed to reset redis cache.", format!("{:?}", e)); - None - } - } + /// Returns the resulting string from the command, or None if failed for some reason. + async fn dev_flushall(&self, sync: bool) -> Option { + let mut batch = self.batch().custom::>("FLUSHALL"); + if sync { + batch = batch.custom_arg("SYNC"); } else { - None + batch = batch.custom_arg("ASYNC"); } + batch.fire().await.flatten() } /// A simple rate_limiter/backoff helper. @@ -162,7 +160,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// - `None`: Continue with the operation. /// - `Some`: Retry after the duration. async fn rate_limiter( - &mut self, + &self, namespace: &str, caller_identifier: &str, start_delaying_after_attempt: usize, @@ -199,7 +197,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { } /// Get a new [`RedisBatch`] for this connection that commands can be piped together with. - fn batch(&mut self) -> RedisBatch<'_, '_, Self, ()> { + fn batch(&self) -> RedisBatch<'_, '_, Self, ()> { RedisBatch::new(self) } @@ -224,7 +222,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Expiry accurate to a millisecond. #[inline] async fn cached_fn<'b, T, Fut, K: Into>>( - &mut self, + &self, namespace: &str, key: K, expiry: Option, @@ -253,51 +251,53 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { } impl<'a> RedisConnLike for RedisConn<'a> { - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection> { - if self.conn.is_none() { - match self.pool.get().await { - Ok(conn) => self.conn = Some(conn), - Err(e) => { - record_exception("Failed to get redis connection.", format!("{:?}", e)); - return None; - } + async fn get_inner_conn(&self) -> Option { + match self.pool.get().await { + Ok(conn) => Some(conn), + Err(e) => { + record_exception("Failed to get redis connection.", format!("{:?}", e)); + None } } - self.conn.as_mut() } fn prefix(&self) -> &str { self.prefix } - fn into_owned(self) -> RedisConnOwned { + fn _pubsub_global(&self) -> &Arc { + self.pubsub_global + } + + fn to_owned(&self) -> RedisConnOwned { RedisConnOwned { prefix: Arc::new(self.prefix.to_string()), pool: self.pool.clone(), - conn: self.conn, + pubsub_global: self.pubsub_global.clone(), } } } impl RedisConnLike for RedisConnOwned { - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection> { - if self.conn.is_none() { - match self.pool.get().await { - Ok(conn) => self.conn = Some(conn), - Err(e) => { - record_exception("Failed to get redis connection.", format!("{:?}", e)); - return None; - } + async fn get_inner_conn(&self) -> Option { + match self.pool.get().await { + Ok(conn) => Some(conn), + Err(e) => { + record_exception("Failed to get redis connection.", format!("{:?}", e)); + None } } - self.conn.as_mut() } fn prefix(&self) -> &str { &self.prefix } - fn into_owned(self) -> RedisConnOwned { - self + fn _pubsub_global(&self) -> &Arc { + &self.pubsub_global + } + + fn to_owned(&self) -> RedisConnOwned { + self.clone() } } diff --git a/rust/bitbazaar/redis/dlock.rs b/rust/bitbazaar/redis/dlock.rs index 7aacf154..aca9da23 100644 --- a/rust/bitbazaar/redis/dlock.rs +++ b/rust/bitbazaar/redis/dlock.rs @@ -111,18 +111,18 @@ impl<'a> RedisLock<'a> { // Need to actually lock for the first time: let lock_id = lock.lock_id.clone(); let val = lock.val.clone(); - lock.exec_or_retry(ttl, move |mut conn| { + lock.exec_or_retry(ttl, move |conn| { let lock_id = lock_id.clone(); let val = val.clone(); async move { - if let Some(conn) = conn.get_inner_conn().await { + if let Some(mut conn) = conn.get_inner_conn().await { let result: RedisResult = redis::cmd("SET") .arg(lock_id) .arg(val) .arg("NX") .arg("PX") .arg(ttl.as_millis() as usize) - .query_async(conn) + .query_async(&mut conn) .await; match result { @@ -225,7 +225,7 @@ impl<'a> RedisLock<'a> { let lock_id = self.lock_id.clone(); let val = self.val.clone(); - self.exec_or_retry(new_ttl, move |mut conn| { + self.exec_or_retry(new_ttl, move |conn| { let lock_id = lock_id.clone(); let val = val.clone(); async move { @@ -260,7 +260,7 @@ impl<'a> RedisLock<'a> { pub async fn unlock(&mut self) -> bool { let result = futures::future::join_all(self.redis.get_conn_to_each_server().into_iter().map( - |mut conn| { + |conn| { let lock_id = self.lock_id.clone(); let val = self.val.clone(); async move { diff --git a/rust/bitbazaar/redis/mod.rs b/rust/bitbazaar/redis/mod.rs index 07ad94e0..167249c9 100644 --- a/rust/bitbazaar/redis/mod.rs +++ b/rust/bitbazaar/redis/mod.rs @@ -3,6 +3,8 @@ mod conn; mod dlock; mod fuzzy; mod json; +mod pubsub; +mod redis_retry; mod script; mod temp_list; mod wrapper; @@ -15,6 +17,7 @@ pub use batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; pub use conn::{RedisConn, RedisConnLike}; pub use dlock::{RedisLock, RedisLockErr}; pub use json::{RedisJson, RedisJsonBorrowed}; +pub use pubsub::RedisChannelListener; // Re-exporting redis and deadpool_redis to be used outside if needed: pub use deadpool_redis; pub use redis; @@ -57,6 +60,19 @@ mod tests { Ok((server, work_r, fail_r)) } + #[rstest] + #[tokio::test] + async fn test_redis_ping(#[allow(unused_variables)] logging: ()) -> RResult<(), AnyErr> { + let (_server, work_r, fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + let fail_conn = fail_r.conn(); + + assert!(work_conn.ping().await); + assert!(!(fail_conn.ping().await)); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_redis_scripts(#[allow(unused_variables)] logging: ()) -> RResult<(), AnyErr> { @@ -460,7 +476,7 @@ mod tests { ) -> RResult<(), AnyErr> { let server = RedisStandalone::new_no_persistence().await?; let r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; - let mut rconn = r.conn(); + let rconn = r.conn(); // THIS ONLY ACTUALLY WORKS FOR STRINGS and false, OTHERS ARE NONE, DUE TO REDIS LIMITATION OF RETURNING NIL FOR EMPTY ARRS AND NONE ETC. // None::, empty vec![] and "" should all work fine as real stored values, @@ -585,7 +601,7 @@ mod tests { let server = RedisStandalone::new_no_persistence().await?; let r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; - let mut rconn = r.conn(); + let rconn = r.conn(); macro_rules! call { () => { diff --git a/rust/bitbazaar/redis/pubsub/channel_listener.rs b/rust/bitbazaar/redis/pubsub/channel_listener.rs new file mode 100644 index 00000000..987c9301 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/channel_listener.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::log::record_exception; + +/// A listener to receive messages from a redis channel via pubsub. +pub struct RedisChannelListener { + pub(crate) on_drop_tx: Arc>, + pub(crate) key: u64, + pub(crate) channel: String, + pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver, + pub(crate) _t: std::marker::PhantomData, +} + +impl RedisChannelListener { + /// Get a new message from the channel. + /// The outer None indicates the channel has been closed erroneously, or the internal data could not be coerced to the target type. + /// In either case, something's gone wrong, an exception will probably have been recorded too. + pub async fn recv(&mut self) -> Option { + if let Some(v) = self.rx.recv().await { + match from_owned_redis_value(v) { + Ok(v) => Some(v), + Err(e) => { + record_exception( + format!( + "Failed to convert redis value to target type '{}'", + std::any::type_name::() + ), + format!("{:?}", e), + ); + None + } + } + } else { + None + } + } +} + +/// Tell the global pubsub manager this listener is being dropped. +impl Drop for RedisChannelListener { + fn drop(&mut self) { + let _ = self.on_drop_tx.send((self.channel.clone(), self.key)); + } +} diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs new file mode 100644 index 00000000..47d1db89 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/mod.rs @@ -0,0 +1,303 @@ +mod channel_listener; +pub(crate) mod pubsub_global; + +pub use channel_listener::*; + +// Redis server can't be run on windows: +#[cfg(not(target_os = "windows"))] +#[cfg(test)] +mod tests { + + use chrono::TimeDelta; + + use crate::misc::with_timeout; + use crate::redis::{Redis, RedisBatchFire, RedisConnLike, RedisStandalone}; + use crate::testing::prelude::*; + + async fn setup_conns() -> RResult<(RedisStandalone, Redis, Redis), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + // Also create a fake version on a random port, this will be used to check failure cases. + let fail_r = Redis::new( + "redis://FAKE:6372", + format!("test_{}", uuid::Uuid::new_v4()), + )?; + Ok((server, work_r, fail_r)) + } + + // The basics: + // - Listeners receive messages. + // - Listeners receive only their own messages. + // - Listeners clean themselves up. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_simple( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + for (mut rx, namespace) in [ + ( + work_conn.subscribe::("n1", "foo").await.unwrap(), + "n1", + ), + ( + work_conn.subscribe::("n2", "foo").await.unwrap(), + "n2", + ), + ] { + assert!(work_conn + .batch() + .publish(namespace, "foo", format!("{}_first_msg", namespace)) + .publish(namespace, "foo", format!("{}_second_msg", namespace)) + .fire() + .await + .is_some()); + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some(format!("{}_first_msg", namespace)), rx.recv().await); + assert_eq!(Some(format!("{}_second_msg", namespace)), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + // The redis prefix should be respected for channels + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_prefix_respected( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r_1 = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + let work_r_2 = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + let work_conn_1 = work_r_1.conn(); + let work_conn_2 = work_r_2.conn(); + + // Given we're using 2 different prefixes, each should not be impacted by the other: + let mut rx1 = work_conn_1.subscribe::("", "foo").await.unwrap(); + let mut rx2 = work_conn_2.subscribe::("", "foo").await.unwrap(); + + assert!(work_conn_1 + .batch() + .publish("", "foo", "conn_1_msg") + .fire() + .await + .is_some()); + assert!(work_conn_2 + .batch() + .publish("", "foo", "conn_2_msg") + .fire() + .await + .is_some()); + + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("conn_1_msg".to_string()), rx1.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx1.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + assert_eq!(Some("conn_2_msg".to_string()), rx2.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx2.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + Ok(()) + } + + // Multiple listeners on the same channel: + // - Each gets data + // - Each gets data only once + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_single_channel_multiple_listeners( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + let rx1 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx2 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx3 = work_conn.subscribe::("n1", "foo").await.unwrap(); + + // All 3 receivers should receive these messages: + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + for mut rx in [rx1, rx2, rx3] { + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + /// - pubsub should be able to continue after redis goes down and back up. + /// - subscribe() and publish() should work even if redis literally just coming back alive. + /// - subscriptions should automatically resubscribe when the connection has to be restarted on new redis. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_redis_sketchiness( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + // Start a server to get a static port then instantly shutdown but keep the redis instance (client): + let (client, port) = { + let server = RedisStandalone::new_no_persistence().await?; + let client = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4().to_string())?; + (client, server.port) + }; + + let restart_server = move || { + // Same as new_no_persistence, but have to use underlying for port: + RedisStandalone::new_with_opts(port, Some(&["--appendonly", "no", "--save", "\"\""])) + }; + + // subscribe() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let mut rx = { + let _server = restart_server().await?; + client + .conn() + .subscribe::("n1", "foo") + .await + .unwrap() + }; + + // publish() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let _server = restart_server().await?; + // This is separate, just confirming publish works straight away, + // slight delay needed for actual publish as redis needs time to resubscribe to the channel on the new connection, + // otherwise won't see the published event. + assert!(client + .conn() + .batch() + .publish("lah", "loo", "baz") + .fire() + .await + .is_some()); + + // Short delay, see above comment for redis to resubscribe before proper publish to check: + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + assert!(client + .conn() + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + // Despite all the madness messages should come through: + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + + Ok(()) + } + + // Nothing should break when no ones subscribed to a channel when a message is published. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_no_listener( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + Ok(()) + } +} diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs new file mode 100644 index 00000000..6350b242 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs @@ -0,0 +1,421 @@ +use std::{ + collections::HashMap, + fmt::Debug, + sync::{atomic::AtomicBool, Arc}, +}; + +use chrono::TimeDelta; +use dashmap::DashMap; +use redis::{aio::MultiplexedConnection, from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::{ + log::record_exception, + misc::{random_u64_rolling, IterWithCloneLazy, Retry}, + prelude::*, + redis::redis_retry::redis_retry_config, +}; + +use super::RedisChannelListener; + +/// The lazy pubsub manager. +pub struct RedisPubSubGlobal { + client: redis::Client, + config: redis::AsyncConnectionConfig, + /// Unlike deadpool these aren't pooled, so definitely need to store and reuse until it becomes invalid, only then get a new one. + active_conn: tokio::sync::RwLock>, + /// The downstream configured listeners for different channels, messages will be pushed to all active listeners. + /// Putting in a nested hashmap for easy cleanup when listeners are dropped. + pub(crate) listeners: + DashMap>>, + + /// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped. + /// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.) + listener_drop_tx: Arc>, + + /// Will be taken when the listener is lazily spawned. + spawn_init: tokio::sync::Mutex>, + spawned: AtomicBool, + + /// Will be sent on Redis drop to kill the spawned listener. + on_drop_tx: Option>, +} + +impl Drop for RedisPubSubGlobal { + fn drop(&mut self) { + // This will kill the spawned listener, which will in turn kill the spawned process. + if let Some(on_drop_tx) = self.on_drop_tx.take() { + let _ = on_drop_tx.send(()); + }; + } +} + +#[derive(Debug)] +struct SpawnInit { + /// The global receiver of messages hooked directly into the redis connection. + /// This will be taken when the main listener is spawned. + rx: tokio::sync::mpsc::UnboundedReceiver, + + // Will receive whenever a listener is dropped: + listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(String, u64)>, + + // Received when the redis instance dropped, meaning the spawned listener should shutdown. + on_drop_rx: tokio::sync::oneshot::Receiver<()>, +} + +impl Debug for RedisPubSubGlobal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisPubSubGlobal") + .field("client", &self.client) + // .field("config", &self.config) + .field("active_conn", &self.active_conn) + .field("listeners", &self.listeners) + .field("listener_drop_tx", &self.listener_drop_tx) + .field("spawn_init", &self.spawn_init) + .field("spawned", &self.spawned) + .finish() + } +} + +impl RedisPubSubGlobal { + pub(crate) fn new(redis_conn_str: impl Into) -> RResult { + let client = redis::Client::open(format!("{}?protocol=resp3", redis_conn_str.into())) + .change_context(AnyErr)?; + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (listener_drop_tx, listener_drop_rx) = tokio::sync::mpsc::unbounded_channel(); + let config = redis::AsyncConnectionConfig::new().set_push_sender(tx); + let (on_drop_tx, on_drop_rx) = tokio::sync::oneshot::channel(); + Ok(Self { + client, + config, + active_conn: tokio::sync::RwLock::new(None), + listeners: DashMap::new(), + listener_drop_tx: Arc::new(listener_drop_tx), + spawn_init: tokio::sync::Mutex::new(Some(SpawnInit { + rx, + listener_drop_rx, + on_drop_rx, + })), + spawned: AtomicBool::new(false), + on_drop_tx: Some(on_drop_tx), + }) + } + + pub(crate) async fn unsubscribe(&self, channel: impl Into) { + let channel = channel.into(); + self.listeners.remove(&channel); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.unsubscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Ok(()) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to unsubscribe from channel.", + format!("{:?}", e), + ); + } + } + } + + /// Returns None when redis down/acting up and couldn't get over a few seconds. + pub(crate) async fn subscribe( + self: &Arc, + channel: impl Into, + ) -> Option> { + let channel = channel.into(); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.subscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Err(redis::RedisError::from(std::io::Error::new( + std::io::ErrorKind::Other, + "Couldn't get a connection to redis.", + ))) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to subscribe to channel.", + format!("{:?}", e), + ); + return None; + } + } + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let listener_key = random_u64_rolling(); + self.listeners + .entry(channel.clone()) + .or_default() + .insert(listener_key, tx); + + if !self + .spawned + .swap(true, std::sync::atomic::Ordering::Relaxed) + { + let arc_self = self.clone(); + let mut init = self + .spawn_init + .lock() + .await + .take() + .expect("init should only be taken once"); + + tokio::spawn(async move { + // Spawned task will exit only when the on_drop_rx is sent, i.e. when the redis instance is dropped. + tokio::select! { + _ = init.on_drop_rx => {} + _ = async { + loop { + tokio::select! { + // Adding this means the listener fut will always be polled first, i.e. has higher priority. + // This is what we want as it cleans up dead listeners, so avoids the second fut ideally hitting any dead listeners. + biased; + result = init.listener_drop_rx.recv() => { + arc_self.spawned_handle_listener_dropped(result).await; + } + result = init.rx.recv() => { + arc_self.spawned_handle_message(result).await; + } + } + } + } => {} + } + }); + } + + Some(RedisChannelListener { + key: listener_key, + on_drop_tx: self.listener_drop_tx.clone(), + channel, + rx, + _t: std::marker::PhantomData, + }) + } + + /// None returned when redis seemingly down/erroring and can't get a connection. + async fn get_conn(&self, force_new_connection: bool) -> Option { + // Inside rwlock so read only if already in there and not forcing new, to avoid getting a write lock when not needed: + if !force_new_connection { + if let Some(conn) = self.active_conn.read().await.as_ref() { + return Some(conn.clone()); + } + } + + // If couldn't return above, we need a new conn: + let mut maybe_conn = self.active_conn.write().await; + match redis_retry_config() + .call(move || { + // WARNING: unlike deadpool for the rest of redis, this is very heavy as it's not pooled. + self.client + .get_multiplexed_async_connection_with_config(&self.config) + }) + .await + { + Ok(mut conn) => { + // Need to re-subscribe to all actively listened to channels for the new connection: + for entry in self.listeners.iter() { + let channel = entry.key(); + match conn.subscribe(channel).await { + Ok(()) => {} + Err(e) => { + record_exception( + format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel), + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + } + *maybe_conn = Some(conn); + } + Err(e) => { + record_exception( + "Pubsub: creation of a new Redis connection failed.", + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + + let conn = maybe_conn + .as_ref() + .expect("conn should be Some given just created if needed."); + + Some(conn.clone()) + } + + /// Handle cleaning up the listeners dashmap, and calling redis's unsubscribe method when no more listeners for a given channel. + /// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`]. + async fn spawned_handle_listener_dropped( + self: &Arc, + channel_and_key: Option<(String, u64)>, + ) { + match channel_and_key { + Some((channel, key)) => { + let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) { + listeners.remove(&key); + listeners.is_empty() + } else { + true + }; + // Need to come after otherwise dashmap could deadlock. + if unsub { + self.unsubscribe(&channel).await; + } + } + None => { + record_exception( + "Pubsub: redis cleanup channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } + + /// Handle redis pubsub messages coming into subscriptions. + async fn spawned_handle_message(self: &Arc, message: Option) { + match message { + Some(push_info) => { + match push_info.kind.clone() { + redis::PushKind::Subscribe => { + // Example received: + // PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((channel, sub_count)) => { + // tracing::info!( + // "Subscribed to channel: '{}', sub_count: {}", + // channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Subscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Unsubscribe => { + // Example received: + // PushInfo { kind: Unsubscribe, data: [bulk-string('"49878c28-c7ef-4f4c-b196-9956942bbe95:n1:foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((client_and_channel, sub_count)) => { + // tracing::info!( + // "Client unsubscribed from channel: '{}', sub_count: {}", + // client_and_channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Unsubscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Disconnection => { + tracing::warn!( + "Pubsub: redis disconnected, attempting to get new connection, retrying every 100ms until success..." + ); + let result = Retry::fixed(TimeDelta::milliseconds(100)) + .until_forever() + .call(|| async { + match self.get_conn(true).await { + Some(_) => { + tracing::info!("Pubsub: redis reconnected."); + Ok(()) + } + None => Err(()), + } + }) + .await; + if result.is_err() { + panic!("Should be impossible, above retry loop should go infinitely until success"); + } + } + redis::PushKind::Message => { + // Example received: + // PushInfo { kind: Message, data: [bulk-string('"foo"'), bulk-string('"bar"')] } + + match from_owned_redis_value::<(String, redis::Value)>(redis::Value::Array( + push_info.data, + )) { + Ok((channel, msg)) => { + if let Some(listeners) = self.listeners.get(&channel) { + for (msg, tx) in listeners.values().with_clone_lazy(msg) { + // Given we have a separate future for cleaning up, + // this shouldn't be a big issue if this ever errors with dead listeners, + // as they should immediately be cleaned up by the cleanup future. + let _ = tx.send(msg); + } + } + } + Err(e) => { + record_exception( + "Pubsub: failed to decode redis::PushKind::Message.", + format!("{:?}", e), + ); + } + } + } + _ => { + record_exception( + "Pubsub: unsupported/unexpected message received by global listener.", + format!("{:?}", push_info), + ); + } + } + } + None => { + record_exception( + "Pubsub: redis listener channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } +} diff --git a/rust/bitbazaar/redis/redis_retry.rs b/rust/bitbazaar/redis/redis_retry.rs new file mode 100644 index 00000000..3e0c8306 --- /dev/null +++ b/rust/bitbazaar/redis/redis_retry.rs @@ -0,0 +1,25 @@ +use crate::misc::Retry; + +pub(crate) fn redis_retry_config() -> Retry<'static, redis::RedisError> { + Retry::::fibonacci(chrono::Duration::milliseconds(10)) + // Will cumulatively delay for up to about 5ish seconds. + // SHOULDN'T BE LONGER, considering downstream user code may then handle the redis failure, + // in e.g. web request, any longer would then be harmful. + .until_total_delay(chrono::Duration::seconds(5)) + .on_retry(move |info| match info.last_error.kind() { + // These should all be automatically retried: + redis::ErrorKind::BusyLoadingError + | redis::ErrorKind::TryAgain + | redis::ErrorKind::MasterDown => { + tracing::warn!( + "Redis action failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", + info.delay_till_next_attempt, + info.last_attempt_no, + info.last_error + ); + None + }, + // Everything else should just exit straight away, no point retrying internally. + _ => Some(info.last_error), + }) +} diff --git a/rust/bitbazaar/redis/temp_list.rs b/rust/bitbazaar/redis/temp_list.rs index 7c60c8f0..06c50dae 100644 --- a/rust/bitbazaar/redis/temp_list.rs +++ b/rust/bitbazaar/redis/temp_list.rs @@ -130,7 +130,7 @@ impl serde::Deserialize<'de>> RedisTempListItem RedisTempListItemWithConn { RedisTempListItemWithConn { item: Mutex::new(self), - conn: Mutex::new(conn.into_owned()), + conn: Mutex::new(conn.to_owned()), } } @@ -382,9 +382,6 @@ impl RedisTempList { Some(self.item_inactive_ttl_chrono()), ) // Cleanup old members that have now expired: - // (set member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 .zremrangebyscore( &self.namespace, &self.key, @@ -480,11 +477,7 @@ impl RedisTempList { let item_info = conn .batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -582,11 +575,7 @@ impl RedisTempList { ) -> RedisTempListItem { let item = conn .batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -627,11 +616,7 @@ impl RedisTempList { uids: impl IntoIterator, ) { conn.batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -684,9 +669,6 @@ impl RedisTempList { Some(self.item_inactive_ttl_chrono()), ) // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 .zremrangebyscore( &self.namespace, &self.key, diff --git a/rust/bitbazaar/redis/wrapper.rs b/rust/bitbazaar/redis/wrapper.rs index bef6f812..a5123cee 100644 --- a/rust/bitbazaar/redis/wrapper.rs +++ b/rust/bitbazaar/redis/wrapper.rs @@ -1,9 +1,9 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use deadpool_redis::{Config, Runtime}; use futures::Future; -use super::{RedisConn, RedisLock, RedisLockErr}; +use super::{pubsub::pubsub_global::RedisPubSubGlobal, RedisConn, RedisLock, RedisLockErr}; use crate::errors::prelude::*; /// A wrapper around redis to make it more concise to use and not need redis in the downstream Cargo.toml. @@ -12,7 +12,9 @@ use crate::errors::prelude::*; /// All redis errors (availability, unexpected content) will be logged as errors and results returned as `None` (or similar) where possible. #[derive(Debug, Clone)] pub struct Redis { + // deadpool arced internally. pool: deadpool_redis::Pool, + pub(crate) pubsub_listener: Arc, prefix: String, } @@ -20,24 +22,27 @@ impl Redis { /// Create a new global redis wrapper from the given Redis URL (like `redis://127.0.0.1`). /// /// Note this should only be done once at startup. - pub fn new, B: Into>( - redis_conn_str: A, - prefix: B, + pub fn new( + redis_conn_str: impl Into, + prefix: impl Into, ) -> RResult { - let cfg = Config::from_url(redis_conn_str); + let redis_conn_str = redis_conn_str.into(); + let prefix = prefix.into(); + let cfg = Config::from_url(&redis_conn_str); let pool = cfg .create_pool(Some(Runtime::Tokio1)) .change_context(AnyErr)?; - + let pubsub_listener = Arc::new(RedisPubSubGlobal::new(&redis_conn_str)?); Ok(Self { pool, - prefix: prefix.into(), + prefix, + pubsub_listener, }) } /// Get a [`RedisConn`] redis can be called with. pub fn conn(&self) -> RedisConn<'_> { - RedisConn::new(&self.pool, &self.prefix) + RedisConn::new(&self.pool, &self.prefix, &self.pubsub_listener) } /// Get a distributed redis lock.