From c3069ab73a4581a43b93673dde740ba1cf0168c3 Mon Sep 17 00:00:00 2001
From: zakstucke <44890343+zakstucke@users.noreply.github.com>
Date: Mon, 19 Aug 2024 16:52:55 +0300
Subject: [PATCH] pubsub channel pattern support (#58)
* No more mutable requirement for redis conns, much more ergonomic
* Custom fallbacks to underlying redis interface without having to leave higher level interface
* Redis pubsub
* Error resistant, more tests, lazy clone.
* Pubsub pattern/psubscribe support
---
rust/bitbazaar/redis/conn.rs | 22 +++
.../redis/pubsub/channel_listener.rs | 8 +-
rust/bitbazaar/redis/pubsub/mod.rs | 54 +++++++
rust/bitbazaar/redis/pubsub/pubsub_global.rs | 150 ++++++++++++++----
4 files changed, 202 insertions(+), 32 deletions(-)
diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs
index 7e8409e7..042540aa 100644
--- a/rust/bitbazaar/redis/conn.rs
+++ b/rust/bitbazaar/redis/conn.rs
@@ -106,6 +106,28 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
.await
}
+ /// Subscribe to a channel pattern via pubsub, receiving messages through the returned receiver.
+ /// The subscription will be dropped when the receiver is dropped.
+ ///
+ /// Sending can be done via normal batches using [`RedisBatch::publish`].
+ ///
+ /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect.
+ ///
+ /// According to redis ():
+ /// Supported glob-style patterns:
+ /// - h?llo subscribes to hello, hallo and hxllo
+ /// - h*llo subscribes to hllo and heeeello
+ /// - h[ae]llo subscribes to hello and hallo, but not hillo
+ async fn psubscribe(
+ &self,
+ namespace: &str,
+ channel_pattern: &str,
+ ) -> Option> {
+ self._pubsub_global()
+ .psubscribe(self.final_key(namespace, channel_pattern.into()))
+ .await
+ }
+
// Commented out as untested, not sure if works.
// /// Get all data from redis, only really useful during testing.
// ///
diff --git a/rust/bitbazaar/redis/pubsub/channel_listener.rs b/rust/bitbazaar/redis/pubsub/channel_listener.rs
index 987c9301..bdf58f9d 100644
--- a/rust/bitbazaar/redis/pubsub/channel_listener.rs
+++ b/rust/bitbazaar/redis/pubsub/channel_listener.rs
@@ -4,11 +4,13 @@ use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs};
use crate::log::record_exception;
+use super::pubsub_global::ChannelSubscription;
+
/// A listener to receive messages from a redis channel via pubsub.
pub struct RedisChannelListener {
- pub(crate) on_drop_tx: Arc>,
+ pub(crate) on_drop_tx: Arc>,
pub(crate) key: u64,
- pub(crate) channel: String,
+ pub(crate) channel_sub: ChannelSubscription,
pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver,
pub(crate) _t: std::marker::PhantomData,
}
@@ -41,6 +43,6 @@ impl RedisChannelListener {
/// Tell the global pubsub manager this listener is being dropped.
impl Drop for RedisChannelListener {
fn drop(&mut self) {
- let _ = self.on_drop_tx.send((self.channel.clone(), self.key));
+ let _ = self.on_drop_tx.send((self.channel_sub.clone(), self.key));
}
}
diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs
index 47d1db89..9bf6bd3f 100644
--- a/rust/bitbazaar/redis/pubsub/mod.rs
+++ b/rust/bitbazaar/redis/pubsub/mod.rs
@@ -281,6 +281,60 @@ mod tests {
Ok(())
}
+ // Patterns should work with conn.psubscribe(), confirm patterns match correctly, but don't if pattern passed through normal subscribe.
+ #[rstest]
+ #[tokio::test]
+ async fn test_redis_pubsub_pattern(
+ #[allow(unused_variables)] logging: (),
+ ) -> RResult<(), AnyErr> {
+ let (_server, work_r, _fail_r) = setup_conns().await?;
+ let work_conn = work_r.conn();
+
+ let mut rx_normal = work_conn.subscribe::("n1", "f*o").await.unwrap();
+ let mut rx_pattern = work_conn.psubscribe::("n1", "f*o").await.unwrap();
+
+ assert!(work_conn
+ .batch()
+ .publish("n1", "foo", "only_pattern")
+ .publish("n1", "f*o", "both")
+ .fire()
+ .await
+ .is_some());
+ with_timeout(
+ TimeDelta::seconds(3),
+ || {
+ panic!("Timeout waiting for pubsub message");
+ },
+ async move {
+ assert_eq!(Some("both".to_string()), rx_normal.recv().await);
+ with_timeout(
+ TimeDelta::milliseconds(100),
+ || Ok::<_, Report>(()),
+ async {
+ let msg = rx_normal.recv().await;
+ panic!("Shouldn't have received any more messages, got: {:?}", msg);
+ },
+ )
+ .await?;
+ assert_eq!(Some("only_pattern".to_string()), rx_pattern.recv().await);
+ assert_eq!(Some("both".to_string()), rx_pattern.recv().await);
+ with_timeout(
+ TimeDelta::milliseconds(100),
+ || Ok::<_, Report>(()),
+ async {
+ let msg = rx_pattern.recv().await;
+ panic!("Shouldn't have received any more messages, got: {:?}", msg);
+ },
+ )
+ .await?;
+ Ok::<_, Report>(())
+ },
+ )
+ .await?;
+
+ Ok(())
+ }
+
// Nothing should break when no ones subscribed to a channel when a message is published.
#[rstest]
#[tokio::test]
diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs
index 6350b242..f8225c4c 100644
--- a/rust/bitbazaar/redis/pubsub/pubsub_global.rs
+++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs
@@ -17,6 +17,28 @@ use crate::{
use super::RedisChannelListener;
+#[derive(Debug, Clone, Hash, PartialEq, Eq)]
+pub(crate) enum ChannelSubscription {
+ Concrete(String),
+ Pattern(String),
+}
+
+impl ChannelSubscription {
+ fn is_pattern(&self) -> bool {
+ match self {
+ ChannelSubscription::Concrete(_) => false,
+ ChannelSubscription::Pattern(_) => true,
+ }
+ }
+
+ fn as_str(&self) -> &str {
+ match self {
+ ChannelSubscription::Concrete(s) => s,
+ ChannelSubscription::Pattern(s) => s,
+ }
+ }
+}
+
/// The lazy pubsub manager.
pub struct RedisPubSubGlobal {
client: redis::Client,
@@ -25,12 +47,14 @@ pub struct RedisPubSubGlobal {
active_conn: tokio::sync::RwLock