Skip to content

Commit

Permalink
Redis pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
zakstucke committed Aug 14, 2024
1 parent a91dd21 commit 9589518
Show file tree
Hide file tree
Showing 10 changed files with 801 additions and 92 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ features = ["sync"]
# These are included on top of above features when not wasm:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1"
features = ["time", "fs", "process", "rt", "io-util"]
features = ["time", "fs", "process", "rt", "io-util", "macros"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
tracing-subscriber-wasm = "0.1.0"
Expand Down
6 changes: 6 additions & 0 deletions rust/bitbazaar/misc/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> {
self
}

/// Never stop retrying.
pub fn until_forever(mut self) -> Self {
self.until = RetryUntil::TotalAttempts(usize::MAX);
self
}

/// Stop retrying after the total delay reaches the given duration.
pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self {
self.until = RetryUntil::TotalDelay(max_total_delay);
Expand Down
50 changes: 19 additions & 31 deletions rust/bitbazaar/redis/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use std::{collections::HashSet, marker::PhantomData, sync::LazyLock};

use deadpool_redis::redis::{FromRedisValue, Pipeline, ToRedisArgs};

use crate::{log::record_exception, misc::Retry, retry_flexi};
use crate::{log::record_exception, retry_flexi};

use super::{
conn::RedisConnLike,
fuzzy::{RedisFuzzy, RedisFuzzyUnwrap},
redis_retry::redis_retry_config,
RedisScript, RedisScriptInvoker,
};

Expand Down Expand Up @@ -46,29 +47,9 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R
async fn inner_fire<R: FromRedisValue>(&self) -> Option<R> {
if let Some(mut conn) = self.redis_conn.get_inner_conn().await {
// Handling retryable errors internally:
let retry = Retry::<redis::RedisError>::fibonacci(chrono::Duration::milliseconds(10))
// Will cumulatively delay for up to 6 seconds.
// SHOULDN'T BE LONGER, considering outer code may then handle the redis failure,
// in e.g. web request, any longer would then be harmful.
.until_total_delay(chrono::Duration::seconds(5))
.on_retry(move |info| match info.last_error.kind() {
// These should all be automatically retried:
redis::ErrorKind::BusyLoadingError
| redis::ErrorKind::TryAgain
| redis::ErrorKind::MasterDown => {
tracing::warn!(
"Redis command failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.",
info.delay_till_next_attempt,
info.last_attempt_no,
info.last_error
);
None
},
// Everything else should just exit straight away, no point retrying internally.
_ => Some(info.last_error),
});

match retry_flexi!(retry.clone(), { self.pipe.query_async(&mut conn).await }) {
match retry_flexi!(redis_retry_config(), {
self.pipe.query_async(&mut conn).await
}) {
Ok(v) => Some(v),
Err(e) => {
// Load the scripts into Redis if the any of the scripts weren't there before.
Expand All @@ -89,7 +70,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R
for script in &self.used_scripts {
load_pipe.add_command(script.load_cmd());
}
match retry_flexi!(retry, {
match retry_flexi!(redis_retry_config(), {
load_pipe.query_async::<redis::Value>(&mut conn).await
}) {
// Now loaded the scripts, rerun the batch:
Expand Down Expand Up @@ -138,12 +119,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R
/// E.g. `batch.custom_no_return("SET").custom_arg("key").custom_arg("value").fire().await;`
pub fn custom_no_return(mut self, cmd: &str) -> Self {
self.pipe.cmd(cmd).ignore();
RedisBatch {
_returns: PhantomData,
redis_conn: self.redis_conn,
pipe: self.pipe,
used_scripts: self.used_scripts,
}
self
}

/// Low-level backdoor. Add a custom argument to the last custom command added with either `custom_no_return()` or `custom()`.
Expand All @@ -152,6 +128,18 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R
self
}

/// Publish a message to a pubsub channel.
/// Use [`RedisConnLike::pubsub_listen`] to listen for messages.
pub fn publish(mut self, namespace: &str, channel: &str, message: impl ToRedisArgs) -> Self {
self.pipe
.publish(
self.redis_conn.final_key(namespace, channel.into()),
message,
)
.ignore();
self
}

/// Expire an existing key with a new/updated ttl.
///
/// <https://redis.io/commands/pexpire/>
Expand Down
94 changes: 43 additions & 51 deletions rust/bitbazaar/redis/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs};
use super::{
batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps},
fuzzy::RedisFuzzy,
pubsub::{pubsub_global::RedisPubSubGlobal, RedisChannelListener},
};
use crate::{errors::prelude::*, log::record_exception, redis::RedisScript};

/// A lazy redis connection.
#[derive(Debug, Clone)]
pub struct RedisConn<'a> {
pub(crate) prefix: &'a str,
pool: &'a deadpool_redis::Pool,
// It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection:
pubsub_global: &'a Arc<RedisPubSubGlobal>,
// We used to cache the [`deadpool_redis::Connection`] conn in here,
// but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally.
// so getting for each usage is fine, given:
Expand All @@ -28,27 +32,28 @@ pub struct RedisConn<'a> {
}

impl<'a> RedisConn<'a> {
pub(crate) fn new(pool: &'a deadpool_redis::Pool, prefix: &'a str) -> Self {
Self { pool, prefix }
}
}

// Cloning is still technically heavy for the un-owned, as the active connection can't be reused.
impl<'a> Clone for RedisConn<'a> {
fn clone(&self) -> Self {
pub(crate) fn new(
pool: &'a deadpool_redis::Pool,
prefix: &'a str,
pubsub_global: &'a Arc<RedisPubSubGlobal>,
) -> Self {
Self {
prefix: self.prefix,
pool: self.pool,
pool,
prefix,
pubsub_global,
}
}
}

/// An owned variant of [`RedisConn`].
/// Just requires a couple of Arc clones, so still quite lightweight.
#[derive(Debug, Clone)]
pub struct RedisConnOwned {
// Prefix and pool both Arced now at top level for easy cloning.
pub(crate) prefix: Arc<String>,
pool: deadpool_redis::Pool,
// It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection:
pubsub_global: Arc<RedisPubSubGlobal>,
// We used to cache the [`deadpool_redis::Connection`] conn in here,
// but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally.
// so getting for each usage is fine, given:
Expand All @@ -58,31 +63,6 @@ pub struct RedisConnOwned {
// conn: Option<deadpool_redis::Connection>,
}

impl Clone for RedisConnOwned {
fn clone(&self) -> Self {
Self {
prefix: self.prefix.clone(),
pool: self.pool.clone(),
}
}
}

macro_rules! impl_debug_for_conn {
($conn_type:ty, $name:literal) => {
impl std::fmt::Debug for $conn_type {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct($name)
.field("prefix", &self.prefix)
.field("pool", &self.pool)
.finish()
}
}
};
}

impl_debug_for_conn!(RedisConn<'_>, "RedisConn");
impl_debug_for_conn!(RedisConnOwned, "RedisConnOwned");

/// Generic methods over the RedisConn and RedisConnOwned types.
pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
/// Get an internal connection from the pool.
Expand All @@ -94,6 +74,9 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
/// Get the redis configured prefix.
fn prefix(&self) -> &str;

/// Get the redis pubsub global manager.
fn _pubsub_global(&self) -> &Arc<RedisPubSubGlobal>;

/// Convert to the owned variant.
fn to_owned(&self) -> RedisConnOwned;

Expand All @@ -107,23 +90,23 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
.is_some()
}

// async fn pubsub(self) {
// if let Some(conn) = self.get_inner_conn().await {
// let mut pubsub = deadpool_redis::Connection::take(conn).into_pubsub();
// let mut pubsub = conn.publish().await;
// conn.subscribe(channel_name);
// conn.into_pubsub();
// loop {
// let msg = pubsub.get_message().await.unwrap();
// let payload: String = msg.get_payload().unwrap();
// println!("channel '{}': {}", msg.get_channel_name(), payload);
// if payload == "exit" {
// break;
// }
// }
// }
// }
/// Subscribe to a channel via pubsub, receiving messages through the returned receiver.
/// The subscription will be dropped when the receiver is dropped.
///
/// Sending can be done via normal batches using [`RedisBatch::publish`].
///
/// Returns None when redis unavailable for some reason, after a few seconds of trying to connect.
async fn subscribe<T: ToRedisArgs + FromRedisValue>(
&self,
namespace: &str,
channel: &str,
) -> Option<RedisChannelListener<T>> {
self._pubsub_global()
.subscribe(self.final_key(namespace, channel.into()))
.await
}

/// TODONOW update and test.
// Commented out as untested, not sure if works.
// /// Get all data from redis, only really useful during testing.
// ///
Expand Down Expand Up @@ -283,10 +266,15 @@ impl<'a> RedisConnLike for RedisConn<'a> {
self.prefix
}

fn _pubsub_global(&self) -> &Arc<RedisPubSubGlobal> {
self.pubsub_global
}

fn to_owned(&self) -> RedisConnOwned {
RedisConnOwned {
prefix: Arc::new(self.prefix.to_string()),
pool: self.pool.clone(),
pubsub_global: self.pubsub_global.clone(),
}
}
}
Expand All @@ -306,6 +294,10 @@ impl RedisConnLike for RedisConnOwned {
&self.prefix
}

fn _pubsub_global(&self) -> &Arc<RedisPubSubGlobal> {
&self.pubsub_global
}

fn to_owned(&self) -> RedisConnOwned {
self.clone()
}
Expand Down
3 changes: 3 additions & 0 deletions rust/bitbazaar/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod conn;
mod dlock;
mod fuzzy;
mod json;
mod pubsub;
mod redis_retry;
mod script;
mod temp_list;
mod wrapper;
Expand All @@ -15,6 +17,7 @@ pub use batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps};
pub use conn::{RedisConn, RedisConnLike};
pub use dlock::{RedisLock, RedisLockErr};
pub use json::{RedisJson, RedisJsonBorrowed};
pub use pubsub::RedisChannelListener;
// Re-exporting redis and deadpool_redis to be used outside if needed:
pub use deadpool_redis;
pub use redis;
Expand Down
46 changes: 46 additions & 0 deletions rust/bitbazaar/redis/pubsub/channel_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::sync::Arc;

use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs};

use crate::log::record_exception;

/// A listener to receive messages from a redis channel via pubsub.
pub struct RedisChannelListener<T> {
pub(crate) on_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(String, u64)>>,
pub(crate) key: u64,
pub(crate) channel: String,
pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver<redis::Value>,
pub(crate) _t: std::marker::PhantomData<T>,
}

impl<T: ToRedisArgs + FromRedisValue> RedisChannelListener<T> {
/// Get a new message from the channel.
/// The outer None indicates the channel has been closed erroneously, or the internal data could not be coerced to the target type.
/// In either case, something's gone wrong, an exception will probably have been recorded too.
pub async fn recv(&mut self) -> Option<T> {
if let Some(v) = self.rx.recv().await {
match from_owned_redis_value(v) {
Ok(v) => Some(v),
Err(e) => {
record_exception(
format!(
"Failed to convert redis value to target type '{}'",
std::any::type_name::<T>()
),
format!("{:?}", e),
);
None
}
}
} else {
None
}
}
}

/// Tell the global pubsub manager this listener is being dropped.
impl<T> Drop for RedisChannelListener<T> {
fn drop(&mut self) {
let _ = self.on_drop_tx.send((self.channel.clone(), self.key));
}
}
4 changes: 4 additions & 0 deletions rust/bitbazaar/redis/pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod channel_listener;
pub(crate) mod pubsub_global;

pub use channel_listener::*;
Loading

0 comments on commit 9589518

Please sign in to comment.