diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index 7e8409e7..042540aa 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -106,6 +106,28 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { .await } + /// Subscribe to a channel pattern via pubsub, receiving messages through the returned receiver. + /// The subscription will be dropped when the receiver is dropped. + /// + /// Sending can be done via normal batches using [`RedisBatch::publish`]. + /// + /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. + /// + /// According to redis (): + /// Supported glob-style patterns: + /// - h?llo subscribes to hello, hallo and hxllo + /// - h*llo subscribes to hllo and heeeello + /// - h[ae]llo subscribes to hello and hallo, but not hillo + async fn psubscribe( + &self, + namespace: &str, + channel_pattern: &str, + ) -> Option> { + self._pubsub_global() + .psubscribe(self.final_key(namespace, channel_pattern.into())) + .await + } + // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// diff --git a/rust/bitbazaar/redis/pubsub/channel_listener.rs b/rust/bitbazaar/redis/pubsub/channel_listener.rs index 987c9301..bdf58f9d 100644 --- a/rust/bitbazaar/redis/pubsub/channel_listener.rs +++ b/rust/bitbazaar/redis/pubsub/channel_listener.rs @@ -4,11 +4,13 @@ use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs}; use crate::log::record_exception; +use super::pubsub_global::ChannelSubscription; + /// A listener to receive messages from a redis channel via pubsub. pub struct RedisChannelListener { - pub(crate) on_drop_tx: Arc>, + pub(crate) on_drop_tx: Arc>, pub(crate) key: u64, - pub(crate) channel: String, + pub(crate) channel_sub: ChannelSubscription, pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver, pub(crate) _t: std::marker::PhantomData, } @@ -41,6 +43,6 @@ impl RedisChannelListener { /// Tell the global pubsub manager this listener is being dropped. impl Drop for RedisChannelListener { fn drop(&mut self) { - let _ = self.on_drop_tx.send((self.channel.clone(), self.key)); + let _ = self.on_drop_tx.send((self.channel_sub.clone(), self.key)); } } diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs index 47d1db89..9bf6bd3f 100644 --- a/rust/bitbazaar/redis/pubsub/mod.rs +++ b/rust/bitbazaar/redis/pubsub/mod.rs @@ -281,6 +281,60 @@ mod tests { Ok(()) } + // Patterns should work with conn.psubscribe(), confirm patterns match correctly, but don't if pattern passed through normal subscribe. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_pattern( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + let mut rx_normal = work_conn.subscribe::("n1", "f*o").await.unwrap(); + let mut rx_pattern = work_conn.psubscribe::("n1", "f*o").await.unwrap(); + + assert!(work_conn + .batch() + .publish("n1", "foo", "only_pattern") + .publish("n1", "f*o", "both") + .fire() + .await + .is_some()); + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("both".to_string()), rx_normal.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx_normal.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + assert_eq!(Some("only_pattern".to_string()), rx_pattern.recv().await); + assert_eq!(Some("both".to_string()), rx_pattern.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx_pattern.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + + Ok(()) + } + // Nothing should break when no ones subscribed to a channel when a message is published. #[rstest] #[tokio::test] diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs index 6350b242..f8225c4c 100644 --- a/rust/bitbazaar/redis/pubsub/pubsub_global.rs +++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs @@ -17,6 +17,28 @@ use crate::{ use super::RedisChannelListener; +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub(crate) enum ChannelSubscription { + Concrete(String), + Pattern(String), +} + +impl ChannelSubscription { + fn is_pattern(&self) -> bool { + match self { + ChannelSubscription::Concrete(_) => false, + ChannelSubscription::Pattern(_) => true, + } + } + + fn as_str(&self) -> &str { + match self { + ChannelSubscription::Concrete(s) => s, + ChannelSubscription::Pattern(s) => s, + } + } +} + /// The lazy pubsub manager. pub struct RedisPubSubGlobal { client: redis::Client, @@ -25,12 +47,14 @@ pub struct RedisPubSubGlobal { active_conn: tokio::sync::RwLock>, /// The downstream configured listeners for different channels, messages will be pushed to all active listeners. /// Putting in a nested hashmap for easy cleanup when listeners are dropped. - pub(crate) listeners: - DashMap>>, + pub(crate) listeners: DashMap< + ChannelSubscription, + HashMap>, + >, /// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped. /// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.) - listener_drop_tx: Arc>, + listener_drop_tx: Arc>, /// Will be taken when the listener is lazily spawned. spawn_init: tokio::sync::Mutex>, @@ -56,7 +80,7 @@ struct SpawnInit { rx: tokio::sync::mpsc::UnboundedReceiver, // Will receive whenever a listener is dropped: - listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(String, u64)>, + listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(ChannelSubscription, u64)>, // Received when the redis instance dropped, meaning the spawned listener should shutdown. on_drop_rx: tokio::sync::oneshot::Receiver<()>, @@ -100,9 +124,8 @@ impl RedisPubSubGlobal { }) } - pub(crate) async fn unsubscribe(&self, channel: impl Into) { - let channel = channel.into(); - self.listeners.remove(&channel); + pub(crate) async fn unsubscribe(&self, channel_sub: &ChannelSubscription) { + self.listeners.remove(channel_sub); let force_new_connection = AtomicBool::new(false); match redis_retry_config() @@ -114,7 +137,12 @@ impl RedisPubSubGlobal { ) .await { - conn.unsubscribe(&channel).await + match &channel_sub { + ChannelSubscription::Concrete(channel) => conn.unsubscribe(&channel).await, + ChannelSubscription::Pattern(channel_pattern) => { + conn.punsubscribe(&channel_pattern).await + } + } } else { // Doing nothing when None as that'll have been logged lower down. Ok(()) @@ -137,8 +165,24 @@ impl RedisPubSubGlobal { self: &Arc, channel: impl Into, ) -> Option> { - let channel = channel.into(); + self._subscribe_inner(ChannelSubscription::Concrete(channel.into())) + .await + } + + /// Returns None when redis down/acting up and couldn't get over a few seconds. + pub(crate) async fn psubscribe( + self: &Arc, + channel_pattern: impl Into, + ) -> Option> { + self._subscribe_inner(ChannelSubscription::Pattern(channel_pattern.into())) + .await + } + /// Returns None when redis down/acting up and couldn't get over a few seconds. + pub(crate) async fn _subscribe_inner( + self: &Arc, + channel_sub: ChannelSubscription, + ) -> Option> { let force_new_connection = AtomicBool::new(false); match redis_retry_config() .call(|| async { @@ -149,7 +193,12 @@ impl RedisPubSubGlobal { ) .await { - conn.subscribe(&channel).await + match &channel_sub { + ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await, + ChannelSubscription::Pattern(channel_pattern) => { + conn.psubscribe(channel_pattern).await + } + } } else { // Doing nothing when None as that'll have been logged lower down. Err(redis::RedisError::from(std::io::Error::new( @@ -163,7 +212,14 @@ impl RedisPubSubGlobal { Ok(()) => {} Err(e) => { record_exception( - "Pubsub: failed to subscribe to channel.", + format!( + "Pubsub: failed to subscribe to channel {}.", + if channel_sub.is_pattern() { + format!("pattern '{}'", channel_sub.as_str()) + } else { + format!("'{}'", channel_sub.as_str()) + } + ), format!("{:?}", e), ); return None; @@ -174,7 +230,7 @@ impl RedisPubSubGlobal { let listener_key = random_u64_rolling(); self.listeners - .entry(channel.clone()) + .entry(channel_sub.clone()) .or_default() .insert(listener_key, tx); @@ -216,7 +272,7 @@ impl RedisPubSubGlobal { Some(RedisChannelListener { key: listener_key, on_drop_tx: self.listener_drop_tx.clone(), - channel, + channel_sub, rx, _t: std::marker::PhantomData, }) @@ -244,12 +300,22 @@ impl RedisPubSubGlobal { Ok(mut conn) => { // Need to re-subscribe to all actively listened to channels for the new connection: for entry in self.listeners.iter() { - let channel = entry.key(); - match conn.subscribe(channel).await { + let channel_sub = entry.key(); + let sub_result = match channel_sub { + ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await, + ChannelSubscription::Pattern(channel_pattern) => { + conn.psubscribe(channel_pattern).await + } + }; + match sub_result { Ok(()) => {} Err(e) => { record_exception( - format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel), + format!("Pubsub: failed to re-subscribe to channel {} with newly acquired connection, discarding.", if channel_sub.is_pattern() { + format!("pattern '{}'", channel_sub.as_str()) + } else { + format!("'{}'", channel_sub.as_str()) + }), format!("{:?}", e), ); *maybe_conn = None; @@ -280,11 +346,11 @@ impl RedisPubSubGlobal { /// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`]. async fn spawned_handle_listener_dropped( self: &Arc, - channel_and_key: Option<(String, u64)>, + channel_sub_and_key: Option<(ChannelSubscription, u64)>, ) { - match channel_and_key { - Some((channel, key)) => { - let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) { + match channel_sub_and_key { + Some((channel_sub, key)) => { + let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel_sub) { listeners.remove(&key); listeners.is_empty() } else { @@ -292,7 +358,7 @@ impl RedisPubSubGlobal { }; // Need to come after otherwise dashmap could deadlock. if unsub { - self.unsubscribe(&channel).await; + self.unsubscribe(&channel_sub).await; } } None => { @@ -309,6 +375,7 @@ impl RedisPubSubGlobal { match message { Some(push_info) => { match push_info.kind.clone() { + redis::PushKind::PSubscribe | redis::PushKind::PUnsubscribe => {} redis::PushKind::Subscribe => { // Example received: // PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] } @@ -385,14 +452,8 @@ impl RedisPubSubGlobal { push_info.data, )) { Ok((channel, msg)) => { - if let Some(listeners) = self.listeners.get(&channel) { - for (msg, tx) in listeners.values().with_clone_lazy(msg) { - // Given we have a separate future for cleaning up, - // this shouldn't be a big issue if this ever errors with dead listeners, - // as they should immediately be cleaned up by the cleanup future. - let _ = tx.send(msg); - } - } + self.handle_msg(ChannelSubscription::Concrete(channel), msg) + .await; } Err(e) => { record_exception( @@ -402,6 +463,26 @@ impl RedisPubSubGlobal { } } } + // Patterns come in separately. + redis::PushKind::PMessage => { + // Example received: + // PushInfo { kind: PMessage, data: [bulk-string('"f*o"'), bulk-string('"foo"'), bulk-string('"only_pattern"')] } + + match from_owned_redis_value::<(String, redis::Value, redis::Value)>( + redis::Value::Array(push_info.data), + ) { + Ok((channel_pattern, _concrete_channel, msg)) => { + self.handle_msg(ChannelSubscription::Pattern(channel_pattern), msg) + .await; + } + Err(e) => { + record_exception( + "Pubsub: failed to decode redis::PushKind::PMessage.", + format!("{:?}", e), + ); + } + } + } _ => { record_exception( "Pubsub: unsupported/unexpected message received by global listener.", @@ -418,4 +499,15 @@ impl RedisPubSubGlobal { } } } + + async fn handle_msg(&self, channel_sub: ChannelSubscription, msg: redis::Value) { + if let Some(listeners) = self.listeners.get(&channel_sub) { + for (msg, tx) in listeners.values().with_clone_lazy(msg) { + // Given we have a separate future for cleaning up, + // this shouldn't be a big issue if this ever errors with dead listeners, + // as they should immediately be cleaned up by the cleanup future. + let _ = tx.send(msg); + } + } + } }