Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub channel pattern support #58

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions rust/bitbazaar/redis/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized {
.await
}

/// Subscribe to a channel pattern via pubsub, receiving messages through the returned receiver.
/// The subscription will be dropped when the receiver is dropped.
///
/// Sending can be done via normal batches using [`RedisBatch::publish`].
///
/// Returns None when redis unavailable for some reason, after a few seconds of trying to connect.
///
/// According to redis (<https://redis.io/docs/latest/commands/psubscribe/>):
/// Supported glob-style patterns:
/// - h?llo subscribes to hello, hallo and hxllo
/// - h*llo subscribes to hllo and heeeello
/// - h[ae]llo subscribes to hello and hallo, but not hillo
async fn psubscribe<T: ToRedisArgs + FromRedisValue>(
&self,
namespace: &str,
channel_pattern: &str,
) -> Option<RedisChannelListener<T>> {
self._pubsub_global()
.psubscribe(self.final_key(namespace, channel_pattern.into()))
.await
}

// Commented out as untested, not sure if works.
// /// Get all data from redis, only really useful during testing.
// ///
Expand Down
8 changes: 5 additions & 3 deletions rust/bitbazaar/redis/pubsub/channel_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs};

use crate::log::record_exception;

use super::pubsub_global::ChannelSubscription;

/// A listener to receive messages from a redis channel via pubsub.
pub struct RedisChannelListener<T> {
pub(crate) on_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(String, u64)>>,
pub(crate) on_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(ChannelSubscription, u64)>>,
pub(crate) key: u64,
pub(crate) channel: String,
pub(crate) channel_sub: ChannelSubscription,
pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver<redis::Value>,
pub(crate) _t: std::marker::PhantomData<T>,
}
Expand Down Expand Up @@ -41,6 +43,6 @@ impl<T: ToRedisArgs + FromRedisValue> RedisChannelListener<T> {
/// Tell the global pubsub manager this listener is being dropped.
impl<T> Drop for RedisChannelListener<T> {
fn drop(&mut self) {
let _ = self.on_drop_tx.send((self.channel.clone(), self.key));
let _ = self.on_drop_tx.send((self.channel_sub.clone(), self.key));
}
}
54 changes: 54 additions & 0 deletions rust/bitbazaar/redis/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,60 @@ mod tests {
Ok(())
}

// Patterns should work with conn.psubscribe(), confirm patterns match correctly, but don't if pattern passed through normal subscribe.
#[rstest]
#[tokio::test]
async fn test_redis_pubsub_pattern(
#[allow(unused_variables)] logging: (),
) -> RResult<(), AnyErr> {
let (_server, work_r, _fail_r) = setup_conns().await?;
let work_conn = work_r.conn();

let mut rx_normal = work_conn.subscribe::<String>("n1", "f*o").await.unwrap();
let mut rx_pattern = work_conn.psubscribe::<String>("n1", "f*o").await.unwrap();

assert!(work_conn
.batch()
.publish("n1", "foo", "only_pattern")
.publish("n1", "f*o", "both")
.fire()
.await
.is_some());
with_timeout(
TimeDelta::seconds(3),
|| {
panic!("Timeout waiting for pubsub message");
},
async move {
assert_eq!(Some("both".to_string()), rx_normal.recv().await);
with_timeout(
TimeDelta::milliseconds(100),
|| Ok::<_, Report<AnyErr>>(()),
async {
let msg = rx_normal.recv().await;
panic!("Shouldn't have received any more messages, got: {:?}", msg);
},
)
.await?;
assert_eq!(Some("only_pattern".to_string()), rx_pattern.recv().await);
assert_eq!(Some("both".to_string()), rx_pattern.recv().await);
with_timeout(
TimeDelta::milliseconds(100),
|| Ok::<_, Report<AnyErr>>(()),
async {
let msg = rx_pattern.recv().await;
panic!("Shouldn't have received any more messages, got: {:?}", msg);
},
)
.await?;
Ok::<_, Report<AnyErr>>(())
},
)
.await?;

Ok(())
}

// Nothing should break when no ones subscribed to a channel when a message is published.
#[rstest]
#[tokio::test]
Expand Down
150 changes: 121 additions & 29 deletions rust/bitbazaar/redis/pubsub/pubsub_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ use crate::{

use super::RedisChannelListener;

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) enum ChannelSubscription {
Concrete(String),
Pattern(String),
}

impl ChannelSubscription {
fn is_pattern(&self) -> bool {
match self {
ChannelSubscription::Concrete(_) => false,
ChannelSubscription::Pattern(_) => true,
}
}

fn as_str(&self) -> &str {
match self {
ChannelSubscription::Concrete(s) => s,
ChannelSubscription::Pattern(s) => s,
}
}
}

/// The lazy pubsub manager.
pub struct RedisPubSubGlobal {
client: redis::Client,
Expand All @@ -25,12 +47,14 @@ pub struct RedisPubSubGlobal {
active_conn: tokio::sync::RwLock<Option<MultiplexedConnection>>,
/// The downstream configured listeners for different channels, messages will be pushed to all active listeners.
/// Putting in a nested hashmap for easy cleanup when listeners are dropped.
pub(crate) listeners:
DashMap<String, HashMap<u64, tokio::sync::mpsc::UnboundedSender<redis::Value>>>,
pub(crate) listeners: DashMap<
ChannelSubscription,
HashMap<u64, tokio::sync::mpsc::UnboundedSender<redis::Value>>,
>,

/// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped.
/// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.)
listener_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(String, u64)>>,
listener_drop_tx: Arc<tokio::sync::mpsc::UnboundedSender<(ChannelSubscription, u64)>>,

/// Will be taken when the listener is lazily spawned.
spawn_init: tokio::sync::Mutex<Option<SpawnInit>>,
Expand All @@ -56,7 +80,7 @@ struct SpawnInit {
rx: tokio::sync::mpsc::UnboundedReceiver<redis::PushInfo>,

// Will receive whenever a listener is dropped:
listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(String, u64)>,
listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(ChannelSubscription, u64)>,

// Received when the redis instance dropped, meaning the spawned listener should shutdown.
on_drop_rx: tokio::sync::oneshot::Receiver<()>,
Expand Down Expand Up @@ -100,9 +124,8 @@ impl RedisPubSubGlobal {
})
}

pub(crate) async fn unsubscribe(&self, channel: impl Into<String>) {
let channel = channel.into();
self.listeners.remove(&channel);
pub(crate) async fn unsubscribe(&self, channel_sub: &ChannelSubscription) {
self.listeners.remove(channel_sub);

let force_new_connection = AtomicBool::new(false);
match redis_retry_config()
Expand All @@ -114,7 +137,12 @@ impl RedisPubSubGlobal {
)
.await
{
conn.unsubscribe(&channel).await
match &channel_sub {
ChannelSubscription::Concrete(channel) => conn.unsubscribe(&channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.punsubscribe(&channel_pattern).await
}
}
} else {
// Doing nothing when None as that'll have been logged lower down.
Ok(())
Expand All @@ -137,8 +165,24 @@ impl RedisPubSubGlobal {
self: &Arc<Self>,
channel: impl Into<String>,
) -> Option<RedisChannelListener<T>> {
let channel = channel.into();
self._subscribe_inner(ChannelSubscription::Concrete(channel.into()))
.await
}

/// Returns None when redis down/acting up and couldn't get over a few seconds.
pub(crate) async fn psubscribe<T: ToRedisArgs + FromRedisValue>(
self: &Arc<Self>,
channel_pattern: impl Into<String>,
) -> Option<RedisChannelListener<T>> {
self._subscribe_inner(ChannelSubscription::Pattern(channel_pattern.into()))
.await
}

/// Returns None when redis down/acting up and couldn't get over a few seconds.
pub(crate) async fn _subscribe_inner<T: ToRedisArgs + FromRedisValue>(
self: &Arc<Self>,
channel_sub: ChannelSubscription,
) -> Option<RedisChannelListener<T>> {
let force_new_connection = AtomicBool::new(false);
match redis_retry_config()
.call(|| async {
Expand All @@ -149,7 +193,12 @@ impl RedisPubSubGlobal {
)
.await
{
conn.subscribe(&channel).await
match &channel_sub {
ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.psubscribe(channel_pattern).await
}
}
} else {
// Doing nothing when None as that'll have been logged lower down.
Err(redis::RedisError::from(std::io::Error::new(
Expand All @@ -163,7 +212,14 @@ impl RedisPubSubGlobal {
Ok(()) => {}
Err(e) => {
record_exception(
"Pubsub: failed to subscribe to channel.",
format!(
"Pubsub: failed to subscribe to channel {}.",
if channel_sub.is_pattern() {
format!("pattern '{}'", channel_sub.as_str())
} else {
format!("'{}'", channel_sub.as_str())
}
),
format!("{:?}", e),
);
return None;
Expand All @@ -174,7 +230,7 @@ impl RedisPubSubGlobal {

let listener_key = random_u64_rolling();
self.listeners
.entry(channel.clone())
.entry(channel_sub.clone())
.or_default()
.insert(listener_key, tx);

Expand Down Expand Up @@ -216,7 +272,7 @@ impl RedisPubSubGlobal {
Some(RedisChannelListener {
key: listener_key,
on_drop_tx: self.listener_drop_tx.clone(),
channel,
channel_sub,
rx,
_t: std::marker::PhantomData,
})
Expand Down Expand Up @@ -244,12 +300,22 @@ impl RedisPubSubGlobal {
Ok(mut conn) => {
// Need to re-subscribe to all actively listened to channels for the new connection:
for entry in self.listeners.iter() {
let channel = entry.key();
match conn.subscribe(channel).await {
let channel_sub = entry.key();
let sub_result = match channel_sub {
ChannelSubscription::Concrete(channel) => conn.subscribe(channel).await,
ChannelSubscription::Pattern(channel_pattern) => {
conn.psubscribe(channel_pattern).await
}
};
match sub_result {
Ok(()) => {}
Err(e) => {
record_exception(
format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel),
format!("Pubsub: failed to re-subscribe to channel {} with newly acquired connection, discarding.", if channel_sub.is_pattern() {
format!("pattern '{}'", channel_sub.as_str())
} else {
format!("'{}'", channel_sub.as_str())
}),
format!("{:?}", e),
);
*maybe_conn = None;
Expand Down Expand Up @@ -280,19 +346,19 @@ impl RedisPubSubGlobal {
/// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`].
async fn spawned_handle_listener_dropped(
self: &Arc<Self>,
channel_and_key: Option<(String, u64)>,
channel_sub_and_key: Option<(ChannelSubscription, u64)>,
) {
match channel_and_key {
Some((channel, key)) => {
let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) {
match channel_sub_and_key {
Some((channel_sub, key)) => {
let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel_sub) {
listeners.remove(&key);
listeners.is_empty()
} else {
true
};
// Need to come after otherwise dashmap could deadlock.
if unsub {
self.unsubscribe(&channel).await;
self.unsubscribe(&channel_sub).await;
}
}
None => {
Expand All @@ -309,6 +375,7 @@ impl RedisPubSubGlobal {
match message {
Some(push_info) => {
match push_info.kind.clone() {
redis::PushKind::PSubscribe | redis::PushKind::PUnsubscribe => {}
redis::PushKind::Subscribe => {
// Example received:
// PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] }
Expand Down Expand Up @@ -385,14 +452,8 @@ impl RedisPubSubGlobal {
push_info.data,
)) {
Ok((channel, msg)) => {
if let Some(listeners) = self.listeners.get(&channel) {
for (msg, tx) in listeners.values().with_clone_lazy(msg) {
// Given we have a separate future for cleaning up,
// this shouldn't be a big issue if this ever errors with dead listeners,
// as they should immediately be cleaned up by the cleanup future.
let _ = tx.send(msg);
}
}
self.handle_msg(ChannelSubscription::Concrete(channel), msg)
.await;
}
Err(e) => {
record_exception(
Expand All @@ -402,6 +463,26 @@ impl RedisPubSubGlobal {
}
}
}
// Patterns come in separately.
redis::PushKind::PMessage => {
// Example received:
// PushInfo { kind: PMessage, data: [bulk-string('"f*o"'), bulk-string('"foo"'), bulk-string('"only_pattern"')] }

match from_owned_redis_value::<(String, redis::Value, redis::Value)>(
redis::Value::Array(push_info.data),
) {
Ok((channel_pattern, _concrete_channel, msg)) => {
self.handle_msg(ChannelSubscription::Pattern(channel_pattern), msg)
.await;
}
Err(e) => {
record_exception(
"Pubsub: failed to decode redis::PushKind::PMessage.",
format!("{:?}", e),
);
}
}
}
_ => {
record_exception(
"Pubsub: unsupported/unexpected message received by global listener.",
Expand All @@ -418,4 +499,15 @@ impl RedisPubSubGlobal {
}
}
}

async fn handle_msg(&self, channel_sub: ChannelSubscription, msg: redis::Value) {
if let Some(listeners) = self.listeners.get(&channel_sub) {
for (msg, tx) in listeners.values().with_clone_lazy(msg) {
// Given we have a separate future for cleaning up,
// this shouldn't be a big issue if this ever errors with dead listeners,
// as they should immediately be cleaned up by the cleanup future.
let _ = tx.send(msg);
}
}
}
}
Loading