Skip to content

Commit

Permalink
Refactor for RPC call timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Feb 5, 2022
1 parent 3aea4b8 commit 523947b
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion brokers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env_logger = "0.7"
futures = "0.3"
log = "0.4"
nanoid = "0.3"
pin-project = "1.0"
redis = { version = "0.21", optional = true, default-features = false, features = ["streams"] }
redis-subscribe = { git = "https://github.com/appellation/redis-subscribe", branch = "feat/impl-error", optional = true }
rmp-serde = "0.15"
Expand All @@ -27,13 +28,14 @@ tokio-stream = { version = "0.1", features = ["sync"] }
[dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread"]
optional = true

[dev-dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread", "macros"]

[features]
amqp-broker = ["lapin"]
amqp-broker = ["lapin", "tokio"]
redis-broker = ["deadpool-redis", "redis", "redis-subscribe"]

[[example]]
Expand Down
2 changes: 1 addition & 1 deletion brokers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
use deadpool_redis::{redis::RedisError, PoolError};
#[cfg(feature = "amqp-broker")]
use lapin::Error as LapinError;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use std::{io::Error as IoError, result::Result as StdResult};
use thiserror::Error;
#[cfg(feature = "amqp-broker")]
use tokio::sync::oneshot::error::RecvError;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

pub type Result<T, E = Error> = StdResult<T, E>;

Expand Down
252 changes: 139 additions & 113 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,91 +1,41 @@
use std::{borrow::Cow, sync::Arc};
use std::{
borrow::Cow,
fmt::{self, Debug},
time::{SystemTime, UNIX_EPOCH},
};

pub use deadpool_redis;
use deadpool_redis::{
redis::{
streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply},
streams::{StreamRangeReply, StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue, RedisError, Value,
},
Connection, Pool,
};
use futures::{
stream::{iter, select_all},
stream_select, StreamExt, TryStream, TryStreamExt,
stream_select, TryStream, TryStreamExt,
};
use nanoid::nanoid;
use redis_subscribe::RedisSub;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{spawn, sync::broadcast};
use tokio_stream::wrappers::BroadcastStream;

use crate::{
error::{Error, Result},
util::stream::repeat_fn,
};

use self::{message::Message, pubsub::BroadcastSub, rpc::Rpc};

pub mod message;
pub mod pubsub;
pub mod rpc;

const DEFAULT_MAX_CHUNK: usize = 10;
const DEFAULT_BLOCK_INTERVAL: usize = 5000;
const STREAM_DATA_KEY: &'static str = "data";
const STREAM_TIMEOUT_KEY: &'static str = "timeout_at";

/// A message received from the broker.
#[derive(Clone)]
pub struct Message<'a, V> {
/// The group this message belongs to.
pub group: &'a str,
/// The event this message signals.
pub event: Cow<'a, str>,
/// The ID of this message (generated by Redis).
pub id: String,
/// The data of this message. Always present unless there is a bug with a client implementation.
pub data: Option<V>,
pool: &'a Pool,
}

impl<'a, V> Message<'a, V>
where
V: DeserializeOwned,
{
fn new(id: StreamId, group: &'a str, event: Cow<'a, str>, pool: &'a Pool) -> Self {
let data = id
.get(STREAM_DATA_KEY)
.and_then(|data: Vec<u8>| rmp_serde::from_read_ref(&data).ok());

Message {
group,
event,
id: id.id,
pool,
data,
}
}
}

impl<'a, V> Message<'a, V> {
/// Acknowledge receipt of the message. This should always be called, since un-acked messages
/// will be reclaimed by other clients.
pub async fn ack(&self) -> Result<()> {
self.pool
.get()
.await?
.xack(&*self.event, self.group, &[&self.id])
.await?;

Ok(())
}

/// Reply to this message.
pub async fn reply(&self, data: &impl Serialize) -> Result<()> {
let key = format!("{}:{}", self.event, self.id);
let serialized = rmp_serde::to_vec(data)?;
self.pool.get().await?.publish(key, serialized).await?;

Ok(())
}
}

pub struct Rpc;

// #[derive(Debug)]
/// RedisBroker is internally reference counted and can be safely cloned.
pub struct RedisBroker<'a> {
/// The consumer name of this broker. Should be unique to the container/machine consuming
/// messages.
Expand All @@ -99,32 +49,52 @@ pub struct RedisBroker<'a> {
/// time period will be reclaimed by other clients.
pub max_operation_time: usize,
pool: Pool,
pubsub: Arc<RedisSub>,
pubsub_msgs: broadcast::Sender<Arc<redis_subscribe::Message>>,
pubsub: BroadcastSub,
read_opts: StreamReadOptions,
}

impl<'a> Clone for RedisBroker<'a> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
group: self.group.clone(),
max_chunk: self.max_chunk,
max_operation_time: self.max_operation_time,
pool: self.pool.clone(),
pubsub: self.pubsub.clone(),
read_opts: Self::make_read_opts(&*self.group, &*self.name),
}
}
}

impl<'a> Debug for RedisBroker<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RedisBroker")
.field("name", &self.name)
.field("group", &self.group)
.field("max_chunk", &self.max_chunk)
.field("max_operation_time", &self.max_operation_time)
.field("pubsub", &self.pubsub)
.field("read_opts", &self.read_opts)
.finish_non_exhaustive()
}
}

impl<'a> RedisBroker<'a> {
fn make_read_opts(group: &str, name: &str) -> StreamReadOptions {
StreamReadOptions::default()
.group(group, name)
.count(DEFAULT_MAX_CHUNK)
.block(DEFAULT_BLOCK_INTERVAL)
}

/// Creates a new broker with sensible defaults.
pub fn new(group: impl Into<Cow<'a, str>>, pool: Pool, address: &str) -> RedisBroker<'a> {
let group = group.into();
let name = nanoid!();
let read_opts = StreamReadOptions::default()
.group(&*group, &name)
.count(DEFAULT_MAX_CHUNK)
.block(DEFAULT_BLOCK_INTERVAL);
let read_opts = RedisBroker::make_read_opts(&*group, &name);

let pubsub = Arc::new(RedisSub::new(&address));

let (tx, _) = broadcast::channel(1);
let task_pubsub = Arc::clone(&pubsub);
let task_tx = tx.clone();
spawn(async move {
let mut stream = task_pubsub.listen().await.unwrap();
while let Some(msg) = stream.next().await {
task_tx.send(Arc::new(msg)).unwrap();
}
});
let pubsub = BroadcastSub::new(address);

Self {
name: Cow::Owned(name),
Expand All @@ -133,47 +103,56 @@ impl<'a> RedisBroker<'a> {
max_operation_time: DEFAULT_BLOCK_INTERVAL,
pool,
pubsub,
pubsub_msgs: tx,
read_opts,
}
}

/// Publishes an event to the broker. Returned value is the ID of the message.
pub async fn publish(&self, event: &str, data: &impl Serialize) -> Result<String> {
let serialized = rmp_serde::to_vec(data)?;
Ok(self
.get_conn()
.await?
.xadd(event, "*", &[(STREAM_DATA_KEY, serialized)])
.await?)
self.publish_timeout(event, data, None).await
}

pub async fn call<V>(&self, event: &str, data: &impl Serialize) -> Result<Option<V>>
where
V: DeserializeOwned,
{
let id = self.publish(event, data).await?;
pub async fn call(
&self,
event: &str,
data: &impl Serialize,
timeout: Option<SystemTime>,
) -> Result<Rpc<'_>> {
let id = self.publish_timeout(event, data, timeout).await?;
let name = format!("{}:{}", event, id);

self.pubsub.subscribe(name.clone()).await?;
Ok(Rpc {
name,
broker: &self,
})
}

let data = BroadcastStream::new(self.pubsub_msgs.subscribe())
.err_into::<Error>()
.try_filter_map(|msg| async move {
match &*msg {
redis_subscribe::Message::Message { message, .. } => {
Ok(rmp_serde::from_read(message.as_bytes())?)
}
_ => Ok(None),
}
})
.boxed()
.next()
.await
.transpose()?;
async fn publish_timeout(
&self,
event: &str,
data: &impl Serialize,
maybe_timeout: Option<SystemTime>,
) -> Result<String> {
let serialized_data = rmp_serde::to_vec(data)?;
let mut conn = self.get_conn().await?;

let args = match maybe_timeout {
Some(timeout) => vec![
(STREAM_DATA_KEY, serialized_data),
(
STREAM_TIMEOUT_KEY,
timeout
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_string()
.into_bytes(),
),
],
None => vec![(STREAM_DATA_KEY, serialized_data)],
};

self.pubsub.unsubscribe(name).await?;
Ok(data)
Ok(conn.xadd(event, "*", &args).await?)
}

pub async fn subscribe(&self, events: &[&str]) -> Result<()> {
Expand Down Expand Up @@ -230,7 +209,7 @@ impl<'a> RedisBroker<'a> {
id,
&group,
Cow::Borrowed(event),
pool,
self,
))
});

Expand All @@ -257,7 +236,7 @@ impl<'a> RedisBroker<'a> {
move |event| {
let key = Cow::from(event.key);
event.ids.into_iter().map(move |id| {
Ok(Message::<V>::new(id, group, key.clone(), pool))
Ok(Message::<V>::new(id, group, key.clone(), self))
})
},
);
Expand All @@ -278,8 +257,12 @@ impl<'a> RedisBroker<'a> {

#[cfg(test)]
mod test {
use std::time::{Duration, SystemTime};

use deadpool_redis::{Manager, Pool};
use futures::TryStreamExt;
use redis::cmd;
use tokio::{spawn, try_join};

use super::RedisBroker;

Expand Down Expand Up @@ -308,4 +291,47 @@ mod test {

assert_eq!(msg.data.expect("data"), vec![1, 2, 3]);
}

#[tokio::test]
async fn rpc_timeout() {
let group = "foo";
let manager = Manager::new("redis://localhost:6379").expect("create manager");
let pool = Pool::new(manager, 32);

let _: () = cmd("FLUSHDB")
.query_async(&mut pool.get().await.expect("redis connection"))
.await
.expect("flush db");

let broker1 = RedisBroker::new(group, pool, "localhost:6379");
let broker2 = broker1.clone();

let events = ["def"];
broker1.subscribe(&events).await.expect("subscribed");

let timeout = Some(SystemTime::now() + Duration::from_millis(500));

let call_fut = spawn(async move {
broker2
.call("def", &[1u8, 2, 3], timeout)
.await
.expect("published");
});

let consume_fut = spawn(async move {
let mut consumer = broker1.consume::<Vec<u8>>(&events);
let msg = consumer
.try_next()
.await
.expect("message")
.expect("message");

msg.ack().await.expect("ack");

assert_eq!(msg.data.as_ref().expect("data"), &[1, 2, 3]);
assert_eq!(msg.timeout_at, timeout);
});

try_join!(consume_fut, call_fut).expect("cancelation futures");
}
}
Loading

0 comments on commit 523947b

Please sign in to comment.