From 9589518a4070b59b1c12a709710d797f1a4d1f91 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Wed, 14 Aug 2024 21:21:12 +0300 Subject: [PATCH] Redis pubsub --- rust/Cargo.toml | 2 +- rust/bitbazaar/misc/retry.rs | 6 + rust/bitbazaar/redis/batch.rs | 50 +- rust/bitbazaar/redis/conn.rs | 94 ++- rust/bitbazaar/redis/mod.rs | 3 + .../redis/pubsub/channel_listener.rs | 46 ++ rust/bitbazaar/redis/pubsub/mod.rs | 4 + rust/bitbazaar/redis/pubsub/pubsub_global.rs | 640 ++++++++++++++++++ rust/bitbazaar/redis/redis_retry.rs | 25 + rust/bitbazaar/redis/wrapper.rs | 23 +- 10 files changed, 801 insertions(+), 92 deletions(-) create mode 100644 rust/bitbazaar/redis/pubsub/channel_listener.rs create mode 100644 rust/bitbazaar/redis/pubsub/mod.rs create mode 100644 rust/bitbazaar/redis/pubsub/pubsub_global.rs create mode 100644 rust/bitbazaar/redis/redis_retry.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5711f382..9959a973 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -127,7 +127,7 @@ features = ["sync"] # These are included on top of above features when not wasm: [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "1" -features = ["time", "fs", "process", "rt", "io-util"] +features = ["time", "fs", "process", "rt", "io-util", "macros"] [target.'cfg(target_arch = "wasm32")'.dependencies] tracing-subscriber-wasm = "0.1.0" diff --git a/rust/bitbazaar/misc/retry.rs b/rust/bitbazaar/misc/retry.rs index 510ae4e8..eb7ac8be 100644 --- a/rust/bitbazaar/misc/retry.rs +++ b/rust/bitbazaar/misc/retry.rs @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> { self } + /// Never stop retrying. + pub fn until_forever(mut self) -> Self { + self.until = RetryUntil::TotalAttempts(usize::MAX); + self + } + /// Stop retrying after the total delay reaches the given duration. pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self { self.until = RetryUntil::TotalDelay(max_total_delay); diff --git a/rust/bitbazaar/redis/batch.rs b/rust/bitbazaar/redis/batch.rs index 851ea369..69975817 100644 --- a/rust/bitbazaar/redis/batch.rs +++ b/rust/bitbazaar/redis/batch.rs @@ -4,11 +4,12 @@ use std::{collections::HashSet, marker::PhantomData, sync::LazyLock}; use deadpool_redis::redis::{FromRedisValue, Pipeline, ToRedisArgs}; -use crate::{log::record_exception, misc::Retry, retry_flexi}; +use crate::{log::record_exception, retry_flexi}; use super::{ conn::RedisConnLike, fuzzy::{RedisFuzzy, RedisFuzzyUnwrap}, + redis_retry::redis_retry_config, RedisScript, RedisScriptInvoker, }; @@ -46,29 +47,9 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R async fn inner_fire(&self) -> Option { if let Some(mut conn) = self.redis_conn.get_inner_conn().await { // Handling retryable errors internally: - let retry = Retry::::fibonacci(chrono::Duration::milliseconds(10)) - // Will cumulatively delay for up to 6 seconds. - // SHOULDN'T BE LONGER, considering outer code may then handle the redis failure, - // in e.g. web request, any longer would then be harmful. - .until_total_delay(chrono::Duration::seconds(5)) - .on_retry(move |info| match info.last_error.kind() { - // These should all be automatically retried: - redis::ErrorKind::BusyLoadingError - | redis::ErrorKind::TryAgain - | redis::ErrorKind::MasterDown => { - tracing::warn!( - "Redis command failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", - info.delay_till_next_attempt, - info.last_attempt_no, - info.last_error - ); - None - }, - // Everything else should just exit straight away, no point retrying internally. - _ => Some(info.last_error), - }); - - match retry_flexi!(retry.clone(), { self.pipe.query_async(&mut conn).await }) { + match retry_flexi!(redis_retry_config(), { + self.pipe.query_async(&mut conn).await + }) { Ok(v) => Some(v), Err(e) => { // Load the scripts into Redis if the any of the scripts weren't there before. @@ -89,7 +70,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R for script in &self.used_scripts { load_pipe.add_command(script.load_cmd()); } - match retry_flexi!(retry, { + match retry_flexi!(redis_retry_config(), { load_pipe.query_async::(&mut conn).await }) { // Now loaded the scripts, rerun the batch: @@ -138,12 +119,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R /// E.g. `batch.custom_no_return("SET").custom_arg("key").custom_arg("value").fire().await;` pub fn custom_no_return(mut self, cmd: &str) -> Self { self.pipe.cmd(cmd).ignore(); - RedisBatch { - _returns: PhantomData, - redis_conn: self.redis_conn, - pipe: self.pipe, - used_scripts: self.used_scripts, - } + self } /// Low-level backdoor. Add a custom argument to the last custom command added with either `custom_no_return()` or `custom()`. @@ -152,6 +128,18 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R self } + /// Publish a message to a pubsub channel. + /// Use [`RedisConnLike::pubsub_listen`] to listen for messages. + pub fn publish(mut self, namespace: &str, channel: &str, message: impl ToRedisArgs) -> Self { + self.pipe + .publish( + self.redis_conn.final_key(namespace, channel.into()), + message, + ) + .ignore(); + self + } + /// Expire an existing key with a new/updated ttl. /// /// diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index 8b02867a..81a863d3 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -11,13 +11,17 @@ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs}; use super::{ batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}, fuzzy::RedisFuzzy, + pubsub::{pubsub_global::RedisPubSubGlobal, RedisChannelListener}, }; use crate::{errors::prelude::*, log::record_exception, redis::RedisScript}; /// A lazy redis connection. +#[derive(Debug, Clone)] pub struct RedisConn<'a> { pub(crate) prefix: &'a str, pool: &'a deadpool_redis::Pool, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: &'a Arc, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: @@ -28,27 +32,28 @@ pub struct RedisConn<'a> { } impl<'a> RedisConn<'a> { - pub(crate) fn new(pool: &'a deadpool_redis::Pool, prefix: &'a str) -> Self { - Self { pool, prefix } - } -} - -// Cloning is still technically heavy for the un-owned, as the active connection can't be reused. -impl<'a> Clone for RedisConn<'a> { - fn clone(&self) -> Self { + pub(crate) fn new( + pool: &'a deadpool_redis::Pool, + prefix: &'a str, + pubsub_global: &'a Arc, + ) -> Self { Self { - prefix: self.prefix, - pool: self.pool, + pool, + prefix, + pubsub_global, } } } /// An owned variant of [`RedisConn`]. /// Just requires a couple of Arc clones, so still quite lightweight. +#[derive(Debug, Clone)] pub struct RedisConnOwned { // Prefix and pool both Arced now at top level for easy cloning. pub(crate) prefix: Arc, pool: deadpool_redis::Pool, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: Arc, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: @@ -58,31 +63,6 @@ pub struct RedisConnOwned { // conn: Option, } -impl Clone for RedisConnOwned { - fn clone(&self) -> Self { - Self { - prefix: self.prefix.clone(), - pool: self.pool.clone(), - } - } -} - -macro_rules! impl_debug_for_conn { - ($conn_type:ty, $name:literal) => { - impl std::fmt::Debug for $conn_type { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct($name) - .field("prefix", &self.prefix) - .field("pool", &self.pool) - .finish() - } - } - }; -} - -impl_debug_for_conn!(RedisConn<'_>, "RedisConn"); -impl_debug_for_conn!(RedisConnOwned, "RedisConnOwned"); - /// Generic methods over the RedisConn and RedisConnOwned types. pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Get an internal connection from the pool. @@ -94,6 +74,9 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Get the redis configured prefix. fn prefix(&self) -> &str; + /// Get the redis pubsub global manager. + fn _pubsub_global(&self) -> &Arc; + /// Convert to the owned variant. fn to_owned(&self) -> RedisConnOwned; @@ -107,23 +90,23 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { .is_some() } - // async fn pubsub(self) { - // if let Some(conn) = self.get_inner_conn().await { - // let mut pubsub = deadpool_redis::Connection::take(conn).into_pubsub(); - // let mut pubsub = conn.publish().await; - // conn.subscribe(channel_name); - // conn.into_pubsub(); - // loop { - // let msg = pubsub.get_message().await.unwrap(); - // let payload: String = msg.get_payload().unwrap(); - // println!("channel '{}': {}", msg.get_channel_name(), payload); - // if payload == "exit" { - // break; - // } - // } - // } - // } + /// Subscribe to a channel via pubsub, receiving messages through the returned receiver. + /// The subscription will be dropped when the receiver is dropped. + /// + /// Sending can be done via normal batches using [`RedisBatch::publish`]. + /// + /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. + async fn subscribe( + &self, + namespace: &str, + channel: &str, + ) -> Option> { + self._pubsub_global() + .subscribe(self.final_key(namespace, channel.into())) + .await + } + /// TODONOW update and test. // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// @@ -283,10 +266,15 @@ impl<'a> RedisConnLike for RedisConn<'a> { self.prefix } + fn _pubsub_global(&self) -> &Arc { + self.pubsub_global + } + fn to_owned(&self) -> RedisConnOwned { RedisConnOwned { prefix: Arc::new(self.prefix.to_string()), pool: self.pool.clone(), + pubsub_global: self.pubsub_global.clone(), } } } @@ -306,6 +294,10 @@ impl RedisConnLike for RedisConnOwned { &self.prefix } + fn _pubsub_global(&self) -> &Arc { + &self.pubsub_global + } + fn to_owned(&self) -> RedisConnOwned { self.clone() } diff --git a/rust/bitbazaar/redis/mod.rs b/rust/bitbazaar/redis/mod.rs index 5094fc5f..167249c9 100644 --- a/rust/bitbazaar/redis/mod.rs +++ b/rust/bitbazaar/redis/mod.rs @@ -3,6 +3,8 @@ mod conn; mod dlock; mod fuzzy; mod json; +mod pubsub; +mod redis_retry; mod script; mod temp_list; mod wrapper; @@ -15,6 +17,7 @@ pub use batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; pub use conn::{RedisConn, RedisConnLike}; pub use dlock::{RedisLock, RedisLockErr}; pub use json::{RedisJson, RedisJsonBorrowed}; +pub use pubsub::RedisChannelListener; // Re-exporting redis and deadpool_redis to be used outside if needed: pub use deadpool_redis; pub use redis; diff --git a/rust/bitbazaar/redis/pubsub/channel_listener.rs b/rust/bitbazaar/redis/pubsub/channel_listener.rs new file mode 100644 index 00000000..987c9301 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/channel_listener.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::log::record_exception; + +/// A listener to receive messages from a redis channel via pubsub. +pub struct RedisChannelListener { + pub(crate) on_drop_tx: Arc>, + pub(crate) key: u64, + pub(crate) channel: String, + pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver, + pub(crate) _t: std::marker::PhantomData, +} + +impl RedisChannelListener { + /// Get a new message from the channel. + /// The outer None indicates the channel has been closed erroneously, or the internal data could not be coerced to the target type. + /// In either case, something's gone wrong, an exception will probably have been recorded too. + pub async fn recv(&mut self) -> Option { + if let Some(v) = self.rx.recv().await { + match from_owned_redis_value(v) { + Ok(v) => Some(v), + Err(e) => { + record_exception( + format!( + "Failed to convert redis value to target type '{}'", + std::any::type_name::() + ), + format!("{:?}", e), + ); + None + } + } + } else { + None + } + } +} + +/// Tell the global pubsub manager this listener is being dropped. +impl Drop for RedisChannelListener { + fn drop(&mut self) { + let _ = self.on_drop_tx.send((self.channel.clone(), self.key)); + } +} diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs new file mode 100644 index 00000000..80da002e --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/mod.rs @@ -0,0 +1,4 @@ +mod channel_listener; +pub(crate) mod pubsub_global; + +pub use channel_listener::*; diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs new file mode 100644 index 00000000..10c6d339 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs @@ -0,0 +1,640 @@ +use std::{ + collections::HashMap, + fmt::Debug, + sync::{atomic::AtomicBool, Arc}, +}; + +use chrono::TimeDelta; +use dashmap::DashMap; +use redis::{aio::MultiplexedConnection, from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::{ + log::record_exception, + misc::{random_u64_rolling, Retry}, + prelude::*, + redis::redis_retry::redis_retry_config, +}; + +use super::RedisChannelListener; + +/// The lazy pubsub manager. +pub struct RedisPubSubGlobal { + client: redis::Client, + config: redis::AsyncConnectionConfig, + /// Unlike deadpool these aren't pooled, so definitely need to store and reuse until it becomes invalid, only then get a new one. + active_conn: tokio::sync::RwLock>, + /// The downstream configured listeners for different channels, messages will be pushed to all active listeners. + /// Putting in a nested hashmap for easy cleanup when listeners are dropped. + pub(crate) listeners: + DashMap>>, + + /// The global receiver of messages hooked directly into the redis connection. + /// This will be taken when the main listener is spawned. + rx: tokio::sync::Mutex>>, + /// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped. + /// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.) + listener_drop_tx: Arc>, + listener_drop_rx: + tokio::sync::Mutex>>, + spawned: AtomicBool, +} + +impl Debug for RedisPubSubGlobal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisPubSubGlobal") + .field("client", &self.client) + // .field("config", &self.config) + .field("active_conn", &self.active_conn) + .field("listeners", &self.listeners) + .field("rx", &self.rx) + .field("spawned", &self.spawned) + .finish() + } +} + +impl RedisPubSubGlobal { + pub(crate) fn new(redis_conn_str: impl Into) -> RResult { + let client = redis::Client::open(format!("{}?protocol=resp3", redis_conn_str.into())) + .change_context(AnyErr)?; + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (listener_drop_tx, listener_drop_rx) = tokio::sync::mpsc::unbounded_channel(); + let config = redis::AsyncConnectionConfig::new().set_push_sender(tx); + Ok(Self { + client, + config, + active_conn: tokio::sync::RwLock::new(None), + listeners: DashMap::new(), + rx: tokio::sync::Mutex::new(Some(rx)), + listener_drop_tx: Arc::new(listener_drop_tx), + listener_drop_rx: tokio::sync::Mutex::new(Some(listener_drop_rx)), + spawned: AtomicBool::new(false), + }) + } + + pub(crate) async fn unsubscribe(&self, channel: impl Into) { + let channel = channel.into(); + self.listeners.remove(&channel); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.unsubscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Ok(()) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to unsubscribe from channel.", + format!("{:?}", e), + ); + } + } + } + + /// Returns None when redis down/acting up and couldn't get over a few seconds. + pub(crate) async fn subscribe( + self: &Arc, + channel: impl Into, + ) -> Option> { + let channel = channel.into(); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.subscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Err(redis::RedisError::from(std::io::Error::new( + std::io::ErrorKind::Other, + "Couldn't get a connection to redis.", + ))) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to subscribe to channel.", + format!("{:?}", e), + ); + return None; + } + } + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let listener_key = random_u64_rolling(); + self.listeners + .entry(channel.clone()) + .or_default() + .insert(listener_key, tx); + + if !self + .spawned + .swap(true, std::sync::atomic::Ordering::Relaxed) + { + let arc_self = self.clone(); + let mut rx = self + .rx + .lock() + .await + .take() + .expect("rx should only be taken once"); + let mut listener_drop_rx = self + .listener_drop_rx + .lock() + .await + .take() + .expect("listener_drop_rx should only be taken once"); + + tokio::spawn(async move { + loop { + tokio::select! { + // Adding this means the listener fut will always be polled first, i.e. has higher priority. + // This is what we want as it cleans up dead listeners, so avoids the second fut ideally hitting any dead listeners. + biased; + + result = listener_drop_rx.recv() => { + arc_self.spawned_handle_listener_dropped(result).await; + } + result = rx.recv() => { + arc_self.spawned_handle_message(result).await; + } + } + } + }); + } + + Some(RedisChannelListener { + key: listener_key, + on_drop_tx: self.listener_drop_tx.clone(), + channel, + rx, + _t: std::marker::PhantomData, + }) + } + + /// None returned when redis seemingly down/erroring and can't get a connection. + async fn get_conn(&self, force_new_connection: bool) -> Option { + // Inside rwlock so read only if already in there and not forcing new, to avoid getting a write lock when not needed: + if !force_new_connection { + if let Some(conn) = self.active_conn.read().await.as_ref() { + return Some(conn.clone()); + } + } + + // If couldn't return above, we need a new conn: + let mut maybe_conn = self.active_conn.write().await; + match redis_retry_config() + .call(move || { + // WARNING: unlike deadpool for the rest of redis, this is very heavy as it's not pooled. + self.client + .get_multiplexed_async_connection_with_config(&self.config) + }) + .await + { + Ok(mut conn) => { + // Need to re-subscribe to all actively listened to channels for the new connection: + for entry in self.listeners.iter() { + let channel = entry.key(); + match conn.subscribe(channel).await { + Ok(()) => {} + Err(e) => { + record_exception( + format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel), + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + } + *maybe_conn = Some(conn); + } + Err(e) => { + record_exception( + "Pubsub: creation of a new Redis connection failed.", + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + + let conn = maybe_conn + .as_ref() + .expect("conn should be Some given just created if needed."); + + Some(conn.clone()) + } + + /// Handle cleaning up the listeners dashmap, and calling redis's unsubscribe method when no more listeners for a given channel. + /// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`]. + async fn spawned_handle_listener_dropped( + self: &Arc, + channel_and_key: Option<(String, u64)>, + ) { + match channel_and_key { + Some((channel, key)) => { + let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) { + listeners.remove(&key); + listeners.is_empty() + } else { + true + }; + // Need to come after otherwise dashmap could deadlock. + if unsub { + self.unsubscribe(&channel).await; + } + } + None => { + record_exception( + "Pubsub: redis cleanup channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } + + /// Handle redis pubsub messages coming into subscriptions. + async fn spawned_handle_message(self: &Arc, message: Option) { + match message { + Some(push_info) => { + match push_info.kind.clone() { + redis::PushKind::Subscribe => { + // Example received: + // PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((channel, sub_count)) => { + // tracing::info!( + // "Subscribed to channel: '{}', sub_count: {}", + // channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Subscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Unsubscribe => { + // Example received: + // PushInfo { kind: Unsubscribe, data: [bulk-string('"49878c28-c7ef-4f4c-b196-9956942bbe95:n1:foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((client_and_channel, sub_count)) => { + // tracing::info!( + // "Client unsubscribed from channel: '{}', sub_count: {}", + // client_and_channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Unsubscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Disconnection => { + tracing::warn!( + "Pubsub: redis disconnected, attempting to get new connection, retrying every 100ms until success..." + ); + let result = Retry::fixed(TimeDelta::milliseconds(100)) + .until_forever() + .call(|| async { + match self.get_conn(true).await { + Some(_) => { + tracing::info!("Pubsub: redis reconnected."); + Ok(()) + } + None => Err(()), + } + }) + .await; + if result.is_err() { + panic!("Should be impossible, above retry loop should go infinitely until success"); + } + } + redis::PushKind::Message => { + // Example received: + // PushInfo { kind: Message, data: [bulk-string('"foo"'), bulk-string('"bar"')] } + + match from_owned_redis_value::<(String, redis::Value)>(redis::Value::Array( + push_info.data, + )) { + Ok((channel, msg)) => { + if let Some(listeners) = self.listeners.get(&channel) { + for tx in listeners.values() { + // Given we have a separate future for cleaning up, + // this shouldn't be a big issue if this ever errors with dead listeners, + // as they should immediately be cleaned up by the cleanup future. + let _ = tx.send(msg.clone()); + } + } + } + Err(e) => { + record_exception( + "Pubsub: failed to decode redis::PushKind::Message.", + format!("{:?}", e), + ); + } + } + } + _ => { + record_exception( + "Pubsub: unsupported/unexpected message received by global listener.", + format!("{:?}", push_info), + ); + } + } + } + None => { + record_exception( + "Pubsub: redis listener channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } +} + +// TESTS: +// - redis prefix still used. +// - DONE: sub to same channel twice with same name but 2 different fns, each should be called once, not first twice, second twice, first dropped etc. +// - Redis down then backup: +// - If just during listening, msgs, shld come in after back up. +// - if happened before subscribe, subscribe still recorded and applied after redis back up +// - After lots of random channels, lots of random listeners, once all dropped the hashmap should be empty. + +// Redis server can't be run on windows: +#[cfg(not(target_os = "windows"))] +#[cfg(test)] +mod tests { + + use chrono::TimeDelta; + + use crate::misc::with_timeout; + use crate::redis::{Redis, RedisBatchFire, RedisConnLike, RedisStandalone}; + use crate::testing::prelude::*; + + use super::*; + + async fn setup_conns() -> RResult<(RedisStandalone, Redis, Redis), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + // Also create a fake version on a random port, this will be used to check failure cases. + let fail_r = Redis::new( + "redis://FAKKEEEE:6372", + format!("test_{}", uuid::Uuid::new_v4()), + )?; + Ok((server, work_r, fail_r)) + } + + // The basics: + // - Listeners receive messages. + // - Listeners receive only their own messages. + // - Listeners clean themselves up. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_simple( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + for (mut rx, namespace) in [ + ( + work_conn.subscribe::("n1", "foo").await.unwrap(), + "n1", + ), + ( + work_conn.subscribe::("n2", "foo").await.unwrap(), + "n2", + ), + ] { + assert!(work_conn + .batch() + .publish(namespace, "foo", format!("{}_first_msg", namespace)) + .publish(namespace, "foo", format!("{}_second_msg", namespace)) + .fire() + .await + .is_some()); + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some(format!("{}_first_msg", namespace)), rx.recv().await); + assert_eq!(Some(format!("{}_second_msg", namespace)), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + // Multiple listeners on the same channel: + // - Each gets data + // - Each gets data only once + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_single_channel_multiple_listeners( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + let rx1 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx2 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx3 = work_conn.subscribe::("n1", "foo").await.unwrap(); + + // All 3 receivers should receive these messages: + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + for mut rx in [rx1, rx2, rx3] { + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + /// - pubsub should be able to continue after redis goes down and back up. + /// - subscribe() and publish() should work even if redis literally just coming back alive. + /// - subscriptions should automatically resubscribe when the connection has to be restarted on new redis. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_redis_sketchiness( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + // Start a server to get a static port then instantly shutdown but keep the redis instance (client): + let (client, port) = { + let server = RedisStandalone::new_no_persistence().await?; + let client = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4().to_string())?; + (client, server.port) + }; + + let restart_server = move || { + // Same as new_no_persistence, but have to use underlying for port: + RedisStandalone::new_with_opts(port, Some(&["--appendonly", "no", "--save", "\"\""])) + }; + + // subscribe() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let mut rx = { + let _server = restart_server().await?; + client + .conn() + .subscribe::("n1", "foo") + .await + .unwrap() + }; + + // publish() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let _server = restart_server().await?; + // This is separate, just confirming publish works straight away, + // slight delay needed for actual publish as redis needs time to resubscribe to the channel on the new connection, + // otherwise won't see the published event. + assert!(client + .conn() + .batch() + .publish("lah", "loo", "baz") + .fire() + .await + .is_some()); + + // Short delay, see above comment for redis to resubscribe before proper publish to check: + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + assert!(client + .conn() + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + // Despite all the madness messages should come through: + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + + Ok(()) + } + + // Nothing should break when no ones subscribed to a channel when a message is published. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_no_listener( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + Ok(()) + } +} diff --git a/rust/bitbazaar/redis/redis_retry.rs b/rust/bitbazaar/redis/redis_retry.rs new file mode 100644 index 00000000..3e0c8306 --- /dev/null +++ b/rust/bitbazaar/redis/redis_retry.rs @@ -0,0 +1,25 @@ +use crate::misc::Retry; + +pub(crate) fn redis_retry_config() -> Retry<'static, redis::RedisError> { + Retry::::fibonacci(chrono::Duration::milliseconds(10)) + // Will cumulatively delay for up to about 5ish seconds. + // SHOULDN'T BE LONGER, considering downstream user code may then handle the redis failure, + // in e.g. web request, any longer would then be harmful. + .until_total_delay(chrono::Duration::seconds(5)) + .on_retry(move |info| match info.last_error.kind() { + // These should all be automatically retried: + redis::ErrorKind::BusyLoadingError + | redis::ErrorKind::TryAgain + | redis::ErrorKind::MasterDown => { + tracing::warn!( + "Redis action failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", + info.delay_till_next_attempt, + info.last_attempt_no, + info.last_error + ); + None + }, + // Everything else should just exit straight away, no point retrying internally. + _ => Some(info.last_error), + }) +} diff --git a/rust/bitbazaar/redis/wrapper.rs b/rust/bitbazaar/redis/wrapper.rs index bef6f812..a5123cee 100644 --- a/rust/bitbazaar/redis/wrapper.rs +++ b/rust/bitbazaar/redis/wrapper.rs @@ -1,9 +1,9 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use deadpool_redis::{Config, Runtime}; use futures::Future; -use super::{RedisConn, RedisLock, RedisLockErr}; +use super::{pubsub::pubsub_global::RedisPubSubGlobal, RedisConn, RedisLock, RedisLockErr}; use crate::errors::prelude::*; /// A wrapper around redis to make it more concise to use and not need redis in the downstream Cargo.toml. @@ -12,7 +12,9 @@ use crate::errors::prelude::*; /// All redis errors (availability, unexpected content) will be logged as errors and results returned as `None` (or similar) where possible. #[derive(Debug, Clone)] pub struct Redis { + // deadpool arced internally. pool: deadpool_redis::Pool, + pub(crate) pubsub_listener: Arc, prefix: String, } @@ -20,24 +22,27 @@ impl Redis { /// Create a new global redis wrapper from the given Redis URL (like `redis://127.0.0.1`). /// /// Note this should only be done once at startup. - pub fn new, B: Into>( - redis_conn_str: A, - prefix: B, + pub fn new( + redis_conn_str: impl Into, + prefix: impl Into, ) -> RResult { - let cfg = Config::from_url(redis_conn_str); + let redis_conn_str = redis_conn_str.into(); + let prefix = prefix.into(); + let cfg = Config::from_url(&redis_conn_str); let pool = cfg .create_pool(Some(Runtime::Tokio1)) .change_context(AnyErr)?; - + let pubsub_listener = Arc::new(RedisPubSubGlobal::new(&redis_conn_str)?); Ok(Self { pool, - prefix: prefix.into(), + prefix, + pubsub_listener, }) } /// Get a [`RedisConn`] redis can be called with. pub fn conn(&self) -> RedisConn<'_> { - RedisConn::new(&self.pool, &self.prefix) + RedisConn::new(&self.pool, &self.prefix, &self.pubsub_listener) } /// Get a distributed redis lock.