Skip to content

Commit

Permalink
Merge pull request #6 from spec-tacles/feat/redis-broker
Browse files Browse the repository at this point in the history
Add Redis broker
  • Loading branch information
appellation authored Dec 30, 2021
2 parents 556911f + dce83f1 commit ff0b9b8
Show file tree
Hide file tree
Showing 8 changed files with 674 additions and 244 deletions.
561 changes: 323 additions & 238 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 27 additions & 2 deletions brokers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,42 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lapin = "1.1"
deadpool-redis = { version = "0.9", optional = true }
lapin = { version = "1.1", optional = true }
env_logger = "0.7"
futures = "0.3"
log = "0.4"
nanoid = "0.3"
redis = { version = "0.21", optional = true, default-features = false, features = ["streams"] }
rmp-serde = "0.15"
serde = "1.0"
thiserror = "1.0"
tokio-stream = "0.1"

[dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread"]
optional = true

[dev-dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread", "macros"]

[features]
amqp-broker = ["lapin", "tokio"]
redis-broker = ["deadpool-redis", "redis"]

[[example]]
name = "amqp_consumer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_producer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_rpc_consumer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_rpc_producer"
required-features = ["amqp-broker"]
2 changes: 1 addition & 1 deletion brokers/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::{
sync::{Arc, Weak},
};

use futures::StreamExt;
use lapin::{
message::Delivery, options::*, types::FieldTable, BasicProperties, Channel, Connection,
ConnectionProperties, ExchangeKind,
};
use log::debug;
use nanoid::nanoid;
pub use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::StreamExt;

use crate::error::*;

Expand Down
21 changes: 18 additions & 3 deletions brokers/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
#[cfg(feature = "redis-broker")]
use deadpool_redis::{redis::RedisError, PoolError};
#[cfg(feature = "amqp-broker")]
use lapin::Error as LapinError;

#[cfg(feature = "amqp-broker")]
use tokio::sync::oneshot::error::RecvError;
use std::{io::Error as IoError, result::Result as StdResult};
use thiserror::Error;
use tokio::sync::oneshot::error::RecvError;

pub type Result<T> = StdResult<T, Error>;
pub type Result<T, E = Error> = StdResult<T, E>;

#[derive(Error, Debug)]
pub enum Error {
#[cfg(feature = "amqp-broker")]
#[error("Lapin error")]
Lapin(#[from] LapinError),
#[error("IO error")]
Io(#[from] IoError),
#[cfg(feature = "amqp-broker")]
#[error("Async receive error")]
Recv(#[from] RecvError),
#[error("Reply error")]
Reply(String),
#[cfg(feature = "redis-broker")]
#[error("Redis error")]
Redis(#[from] RedisError),
#[cfg(feature = "redis-broker")]
#[error("Pool error")]
Deadpool(#[from] PoolError),
#[error("MessagePack encode error")]
MsgpackEncode(#[from] rmp_serde::encode::Error),
#[error("MessagePack decode error")]
MsgpackDecode(#[from] rmp_serde::decode::Error),
}
4 changes: 4 additions & 0 deletions brokers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
#[cfg(feature = "amqp-broker")]
pub mod amqp;
pub mod error;
#[cfg(feature = "redis-broker")]
pub mod redis;
mod util;
280 changes: 280 additions & 0 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
use std::borrow::Cow;

pub use deadpool_redis;
use deadpool_redis::{
redis::{
streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue, RedisError, Value,
},
Connection, Pool,
};
use futures::{
stream::{iter, select_all},
stream_select, StreamExt, TryStream, TryStreamExt,
};
use nanoid::nanoid;
use serde::{de::DeserializeOwned, Serialize};

use crate::{
error::{Error, Result},
util::stream::repeat_fn,
};

const DEFAULT_MAX_CHUNK: usize = 10;
const DEFAULT_BLOCK_INTERVAL: usize = 5000;
const STREAM_DATA_KEY: &'static str = "data";

/// A message received from the broker.
#[derive(Clone)]
pub struct Message<'a, V> {
/// The group this message belongs to.
pub group: &'a str,
/// The event this message signals.
pub event: Cow<'a, str>,
/// The ID of this message (generated by Redis).
pub id: String,
/// The data of this message. Always present unless there is a bug with a client implementation.
pub data: Option<V>,
pool: &'a Pool,
}

impl<'a, V> Message<'a, V>
where
V: DeserializeOwned,
{
fn new(id: StreamId, group: &'a str, event: Cow<'a, str>, pool: &'a Pool) -> Self {
let data = id
.get(STREAM_DATA_KEY)
.and_then(|data: Vec<u8>| rmp_serde::from_read_ref(&data).ok());

Message {
group,
event,
id: id.id,
pool,
data,
}
}
}

impl<'a, V> Message<'a, V> {
/// Acknowledge receipt of the message. This should always be called, since un-acked messages
/// will be reclaimed by other clients.
pub async fn ack(&self) -> Result<()> {
self.pool
.get()
.await?
.xack(&*self.event, self.group, &[&self.id])
.await?;

Ok(())
}

/// Reply to this message.
pub async fn reply(&self, data: &impl Serialize) -> Result<()> {
let key = format!("{}:{}", self.event, self.id);
let serialized = rmp_serde::to_vec(data)?;
self.pool.get().await?.publish(key, serialized).await?;

Ok(())
}
}

// #[derive(Debug)]
pub struct RedisBroker<'a> {
/// The consumer name of this broker. Should be unique to the container/machine consuming
/// messages.
pub name: Cow<'a, str>,
/// The consumer group name.
pub group: Cow<'a, str>,
/// The largest chunk to consume from Redis. This is only exposed for tuning purposes and
/// doesn't affect the public API at all.
pub max_chunk: usize,
/// The maximum time that a broker is assumed to be alive (ms). Messages pending after this
/// time period will be reclaimed by other clients.
pub max_operation_time: usize,
pool: Pool,
read_opts: StreamReadOptions,
}

impl<'a> RedisBroker<'a> {
/// Creates a new broker with sensible defaults.
pub fn new(group: impl Into<Cow<'a, str>>, pool: Pool) -> RedisBroker<'a> {
let group = group.into();
let name = nanoid!();
let read_opts = StreamReadOptions::default()
.group(&*group, &name)
.count(DEFAULT_MAX_CHUNK)
.block(DEFAULT_BLOCK_INTERVAL);

Self {
name: Cow::Owned(name),
group,
max_chunk: DEFAULT_MAX_CHUNK,
max_operation_time: DEFAULT_BLOCK_INTERVAL,
pool,
read_opts,
}
}

/// Publishes an event to the broker. Returned value is the ID of the message.
pub async fn publish(&self, event: &str, data: &impl Serialize) -> Result<String> {
let serialized = rmp_serde::to_vec(data)?;
Ok(self
.get_conn()
.await?
.xadd(event, "*", &[(STREAM_DATA_KEY, serialized)])
.await?)
}

pub async fn call<V>(&self, event: &str, data: &impl Serialize) -> Result<Option<V>>
where
V: DeserializeOwned,
{
let id = self.publish(event, data).await?;
let name = format!("{}:{}", event, id);

let mut conn = Connection::take(self.get_conn().await?).into_pubsub();
conn.subscribe(&name).await?;

let mut stream = conn.on_message();
Ok(stream
.next()
.await
.map(|msg| rmp_serde::from_read_ref(msg.get_payload_bytes()))
.transpose()?)
}

pub async fn subscribe(&self, events: &[&str]) -> Result<()> {
for event in events {
let _: Result<Value, RedisError> = self
.get_conn()
.await?
.xgroup_create_mkstream(*event, &*self.group, 0)
.await;
}

Ok(())
}

async fn get_conn(&self) -> Result<Connection> {
Ok(self.pool.get().await?)
}

/// Consume events from the broker.
pub fn consume<'consume, V>(
&'consume self,
events: &'consume [&str],
) -> impl TryStream<Ok = Message<'consume, V>, Error = Error>
where
V: DeserializeOwned,
{
let ids = vec![">"; events.len()];

let pool = &self.pool;
let group = &self.group;
let name = &self.name;
let time = self.max_operation_time;

let autoclaim_futs = events
.iter()
.copied()
.map(|event| {
move || async move {
let messages = async move {
let mut conn = pool.get().await?;
let mut cmd = redis::cmd("xautoclaim");

cmd.arg(event)
.arg(&**group)
.arg(&**name)
.arg(time)
.arg("0-0");

let res: Vec<Value> = cmd.query_async(&mut conn).await?;
let read = StreamRangeReply::from_redis_value(&res[1])?;

let messages = read.ids.into_iter().map(move |id| {
Ok::<_, Error>(Message::<V>::new(
id,
&group,
Cow::Borrowed(event),
pool,
))
});

Ok::<_, Error>(iter(messages))
};

Some(messages.await)
}
})
.map(repeat_fn)
.map(|iter| iter.try_flatten());

let claim_fut = move || {
let opts = &self.read_opts;
let ids = ids.clone();

async move {
let messages =
async move {
let read: Option<StreamReadReply> =
pool.get().await?.xread_options(&events, &ids, opts).await?;

let messages = read.map(|reply| reply.keys).into_iter().flatten().flat_map(
move |event| {
let key = Cow::from(event.key);
event.ids.into_iter().map(move |id| {
Ok(Message::<V>::new(id, group, key.clone(), pool))
})
},
);

Ok::<_, Error>(iter(messages))
};

Some(messages.await)
}
};

let autoclaim = select_all(autoclaim_futs);
let claim = repeat_fn(claim_fut).try_flatten();

stream_select!(autoclaim, claim)
}
}

#[cfg(test)]
mod test {
use deadpool_redis::{Manager, Pool};
use futures::TryStreamExt;

use super::RedisBroker;

#[tokio::test]
async fn consumes_messages() {
let group = "foo";
let manager = Manager::new("redis://localhost:6379").expect("create manager");
let pool = Pool::new(manager, 32);
let broker = RedisBroker::new(group, pool);

let events = ["abc"];

broker.subscribe(&events).await.expect("subscribed");
broker
.publish("abc", &[1u8, 2, 3])
.await
.expect("published");

let mut consumer = broker.consume::<Vec<u8>>(&events);
let msg = consumer
.try_next()
.await
.expect("message")
.expect("message");
msg.ack().await.expect("ack");

assert_eq!(msg.data.expect("data"), vec![1, 2, 3]);
}
}
1 change: 1 addition & 0 deletions brokers/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod stream;
Loading

0 comments on commit ff0b9b8

Please sign in to comment.