Skip to content

Commit

Permalink
pubsub channel pattern support (#58)
Browse files Browse the repository at this point in the history
* No more mutable requirement for redis conns, much more ergonomic

* Custom fallbacks to underlying redis interface without having to leave higher level interface

* Redis pubsub

* Error resistant, more tests, lazy clone.

* Pubsub pattern/psubscribe support
  • Loading branch information
zakstucke authored Aug 19, 2024
1 parent b04ca82 commit c3069ab
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 32 deletions.
22 changes: 22 additions & 0 deletions rust/bitbazaar/redis/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
.await
}

/// Subscribe to a channel pattern via pubsub, receiving messages through the returned receiver.
/// The subscription will be dropped when the receiver is dropped.
///
/// Sending can be done via normal batches using [`RedisBatch::publish`].
///
/// Returns None when redis unavailable for some reason, after a few seconds of trying to connect.
///
/// According to redis (<https://redis.io/docs/latest/commands/psubscribe/>):
/// Supported glob-style patterns:
/// - h?llo subscribes to hello, hallo and hxllo
/// - h*llo subscribes to hllo and heeeello
/// - h[ae]llo subscribes to hello and hallo, but not hillo
async fn psubscribe<T: ToRedisArgs + FromRedisValue>(
&self,
namespace: &str,
channel_pattern: &str,
) -> Option<RedisChannelListener<T>> {
self._pubsub_global()
.psubscribe(self.final_key(namespace, channel_pattern.into()))
.await
}

// Commented out as untested, not sure if works.
// /// Get all data from redis, only really useful during testing.
// ///
Expand Down
8 changes: 5 additions & 3 deletions rust/bitbazaar/redis/pubsub/channel_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs};

use crate::log::record_exception;

use super::pubsub_global::ChannelSubscription;

/// A listener to receive messages from a redis channel via pubsub.
pub struct RedisChannelListener<T> {
pub(crate) on_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(String, u64)>>,
pub(crate) on_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(ChannelSubscription, u64)>>,
pub(crate) key: u64,
pub(crate) channel: String,
pub(crate) channel_sub: ChannelSubscription,
pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver<redis::Value>,
pub(crate) _t: std::marker::PhantomData<T>,
}
Expand Down Expand Up @@ -41,6 +43,6 @@ impl<T: ToRedisArgs + FromRedisValue> RedisChannelListener<T> {
/// Tell the global pubsub manager this listener is being dropped.
impl<T> Drop for RedisChannelListener<T> {
fn drop(&mut self) {
let _ = self.on_drop_tx.send((self.channel.clone(), self.key));
let _ = self.on_drop_tx.send((self.channel_sub.clone(), self.key));
}
}
54 changes: 54 additions & 0 deletions rust/bitbazaar/redis/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,60 @@ mod tests {
Ok(())
}

// Patterns should work with conn.psubscribe(), confirm patterns match correctly, but don't if pattern passed through normal subscribe.
#[rstest]
#[tokio::test]
async fn test_redis_pubsub_pattern(
#[allow(unused_variables)] logging: (),
) -> RResult<(), AnyErr> {
let (_server, work_r, _fail_r) = setup_conns().await?;
let work_conn = work_r.conn();

let mut rx_normal = work_conn.subscribe::<String>("n1", "f*o").await.unwrap();
let mut rx_pattern = work_conn.psubscribe::<String>("n1", "f*o").await.unwrap();

assert!(work_conn
.batch()
.publish("n1", "foo", "only_pattern")
.publish("n1", "f*o", "both")
.fire()
.await
.is_some());
with_timeout(
TimeDelta::seconds(3),
|| {
panic!("Timeout waiting for pubsub message");
},
async move {
assert_eq!(Some("both".to_string()), rx_normal.recv().await);
with_timeout(
TimeDelta::milliseconds(100),
|| Ok::<_, Report<AnyErr>>(()),
async {
let msg = rx_normal.recv().await;
panic!("Shouldn't have received any more messages, got: {:?}", msg);
},
)
.await?;
assert_eq!(Some("only_pattern".to_string()), rx_pattern.recv().await);
assert_eq!(Some("both".to_string()), rx_pattern.recv().await);
with_timeout(
TimeDelta::milliseconds(100),
|| Ok::<_, Report<AnyErr>>(()),
async {
let msg = rx_pattern.recv().await;
panic!("Shouldn't have received any more messages, got: {:?}", msg);
},
)
.await?;
Ok::<_, Report<AnyErr>>(())
},
)
.await?;

Ok(())
}

// Nothing should break when no ones subscribed to a channel when a message is published.
#[rstest]
#[tokio::test]
Expand Down
150 changes: 121 additions & 29 deletions rust/bitbazaar/redis/pubsub/pubsub_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ use crate::{

use super::RedisChannelListener;

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) enum ChannelSubscription {
Concrete(String),
Pattern(String),
}

impl ChannelSubscription {
fn is_pattern(&self) -> bool {
match self {
ChannelSubscription::Concrete(_) => false,
ChannelSubscription::Pattern(_) => true,
}
}

fn as_str(&self) -> &str {
match self {
ChannelSubscription::Concrete(s) => s,
ChannelSubscription::Pattern(s) => s,
}
}
}

/// The lazy pubsub manager.
pub struct RedisPubSubGlobal {
client: redis::Client,
Expand All @@ -25,12 +47,14 @@ pub struct RedisPubSubGlobal {
active_conn: tokio::sync::RwLock<Option<MultiplexedConnection>>,
/// The downstream configured listeners for different channels, messages will be pushed to all active listeners.
/// Putting in a nested hashmap for easy cleanup when listeners are dropped.
pub(crate) listeners:
DashMap<String, HashMap<u64, tokio::sync::mpsc::UnboundedSender<redis::Value>>>,
pub(crate) listeners: DashMap<
ChannelSubscription,
HashMap<u64, tokio::sync::mpsc::UnboundedSender<redis::Value>>,
>,

/// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped.
/// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.)
listener_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(String, u64)>>,
listener_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(ChannelSubscription, u64)>>,

/// Will be taken when the listener is lazily spawned.
spawn_init: tokio::sync::Mutex<Option<SpawnInit>>,
Expand All @@ -56,7 +80,7 @@ struct SpawnInit {
rx: tokio::sync::mpsc::UnboundedReceiver<redis::PushInfo>,

// Will receive whenever a listener is dropped:
listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(String, u64)>,
listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(ChannelSubscription, u64)>,

// Received when the redis instance dropped, meaning the spawned listener should shutdown.
on_drop_rx: tokio::sync::oneshot::Receiver<()>,
Expand Down Expand Up @@ -100,9 +124,8 @@ impl RedisPubSubGlobal {
})
}

pub(crate) async fn unsubscribe(&self, channel: impl Into<String>) {
let channel = channel.into();
self.listeners.remove(&channel);
pub(crate) async fn unsubscribe(&self, channel_sub: &ChannelSubscription) {
self.listeners.remove(channel_sub);

let force_new_connection = AtomicBool::new(false);
match redis_retry_config()
Expand All @@ -114,7 +137,12 @@ impl RedisPubSubGlobal {
)
.await
{
conn.unsubscribe(&channel).await
match &channel_sub {
ChannelSubscription::Concrete(channel) => conn.unsubscribe(&channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.punsubscribe(&channel_pattern).await
}
}
} else {
// Doing nothing when None as that'll have been logged lower down.
Ok(())
Expand All @@ -137,8 +165,24 @@ impl RedisPubSubGlobal {
self: &Arc<Self>,
channel: impl Into<String>,
) -> Option<RedisChannelListener<T>> {
let channel = channel.into();
self._subscribe_inner(ChannelSubscription::Concrete(channel.into()))
.await
}

/// Returns None when redis down/acting up and couldn't get over a few seconds.
pub(crate) async fn psubscribe<T: ToRedisArgs + FromRedisValue>(
self: &Arc<Self>,
channel_pattern: impl Into<String>,
) -> Option<RedisChannelListener<T>> {
self._subscribe_inner(ChannelSubscription::Pattern(channel_pattern.into()))
.await
}

/// Returns None when redis down/acting up and couldn't get over a few seconds.
pub(crate) async fn _subscribe_inner<T: ToRedisArgs + FromRedisValue>(
self: &Arc<Self>,
channel_sub: ChannelSubscription,
) -> Option<RedisChannelListener<T>> {
let force_new_connection = AtomicBool::new(false);
match redis_retry_config()
.call(|| async {
Expand All @@ -149,7 +193,12 @@ impl RedisPubSubGlobal {
)
.await
{
conn.subscribe(&channel).await
match &channel_sub {
ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.psubscribe(channel_pattern).await
}
}
} else {
// Doing nothing when None as that'll have been logged lower down.
Err(redis::RedisError::from(std::io::Error::new(
Expand All @@ -163,7 +212,14 @@ impl RedisPubSubGlobal {
Ok(()) => {}
Err(e) => {
record_exception(
"Pubsub: failed to subscribe to channel.",
format!(
"Pubsub: failed to subscribe to channel {}.",
if channel_sub.is_pattern() {
format!("pattern '{}'", channel_sub.as_str())
} else {
format!("'{}'", channel_sub.as_str())
}
),
format!("{:?}", e),
);
return None;
Expand All @@ -174,7 +230,7 @@ impl RedisPubSubGlobal {

let listener_key = random_u64_rolling();
self.listeners
.entry(channel.clone())
.entry(channel_sub.clone())
.or_default()
.insert(listener_key, tx);

Expand Down Expand Up @@ -216,7 +272,7 @@ impl RedisPubSubGlobal {
Some(RedisChannelListener {
key: listener_key,
on_drop_tx: self.listener_drop_tx.clone(),
channel,
channel_sub,
rx,
_t: std::marker::PhantomData,
})
Expand Down Expand Up @@ -244,12 +300,22 @@ impl RedisPubSubGlobal {
Ok(mut conn) => {
// Need to re-subscribe to all actively listened to channels for the new connection:
for entry in self.listeners.iter() {
let channel = entry.key();
match conn.subscribe(channel).await {
let channel_sub = entry.key();
let sub_result = match channel_sub {
ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.psubscribe(channel_pattern).await
}
};
match sub_result {
Ok(()) => {}
Err(e) => {
record_exception(
format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel),
format!("Pubsub: failed to re-subscribe to channel {} with newly acquired connection, discarding.", if channel_sub.is_pattern() {
format!("pattern '{}'", channel_sub.as_str())
} else {
format!("'{}'", channel_sub.as_str())
}),
format!("{:?}", e),
);
*maybe_conn = None;
Expand Down Expand Up @@ -280,19 +346,19 @@ impl RedisPubSubGlobal {
/// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`].
async fn spawned_handle_listener_dropped(
self: &Arc<Self>,
channel_and_key: Option<(String, u64)>,
channel_sub_and_key: Option<(ChannelSubscription, u64)>,
) {
match channel_and_key {
Some((channel, key)) => {
let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) {
match channel_sub_and_key {
Some((channel_sub, key)) => {
let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel_sub) {
listeners.remove(&key);
listeners.is_empty()
} else {
true
};
// Need to come after otherwise dashmap could deadlock.
if unsub {
self.unsubscribe(&channel).await;
self.unsubscribe(&channel_sub).await;
}
}
None => {
Expand All @@ -309,6 +375,7 @@ impl RedisPubSubGlobal {
match message {
Some(push_info) => {
match push_info.kind.clone() {
redis::PushKind::PSubscribe | redis::PushKind::PUnsubscribe => {}
redis::PushKind::Subscribe => {
// Example received:
// PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] }
Expand Down Expand Up @@ -385,14 +452,8 @@ impl RedisPubSubGlobal {
push_info.data,
)) {
Ok((channel, msg)) => {
if let Some(listeners) = self.listeners.get(&channel) {
for (msg, tx) in listeners.values().with_clone_lazy(msg) {
// Given we have a separate future for cleaning up,
// this shouldn't be a big issue if this ever errors with dead listeners,
// as they should immediately be cleaned up by the cleanup future.
let _ = tx.send(msg);
}
}
self.handle_msg(ChannelSubscription::Concrete(channel), msg)
.await;
}
Err(e) => {
record_exception(
Expand All @@ -402,6 +463,26 @@ impl RedisPubSubGlobal {
}
}
}
// Patterns come in separately.
redis::PushKind::PMessage => {
// Example received:
// PushInfo { kind: PMessage, data: [bulk-string('"f*o"'), bulk-string('"foo"'), bulk-string('"only_pattern"')] }

match from_owned_redis_value::<(String, redis::Value, redis::Value)>(
redis::Value::Array(push_info.data),
) {
Ok((channel_pattern, _concrete_channel, msg)) => {
self.handle_msg(ChannelSubscription::Pattern(channel_pattern), msg)
.await;
}
Err(e) => {
record_exception(
"Pubsub: failed to decode redis::PushKind::PMessage.",
format!("{:?}", e),
);
}
}
}
_ => {
record_exception(
"Pubsub: unsupported/unexpected message received by global listener.",
Expand All @@ -418,4 +499,15 @@ impl RedisPubSubGlobal {
}
}
}

async fn handle_msg(&self, channel_sub: ChannelSubscription, msg: redis::Value) {
if let Some(listeners) = self.listeners.get(&channel_sub) {
for (msg, tx) in listeners.values().with_clone_lazy(msg) {
// Given we have a separate future for cleaning up,
// this shouldn't be a big issue if this ever errors with dead listeners,
// as they should immediately be cleaned up by the cleanup future.
let _ = tx.send(msg);
}
}
}
}

0 comments on commit c3069ab

Please sign in to comment.