From 93196d8d1eaf5c9fa11618b118f1dc14865c50f5 Mon Sep 17 00:00:00 2001 From: Will Nelson Date: Sat, 21 Aug 2021 15:28:36 -0700 Subject: [PATCH] Add Redis broker --- Cargo.lock | 149 ++++++++++++++++++- brokers/Cargo.toml | 27 +++- brokers/src/error.rs | 21 ++- brokers/src/lib.rs | 4 + brokers/src/redis.rs | 292 +++++++++++++++++++++++++++++++++++++ brokers/src/util/mod.rs | 1 + brokers/src/util/stream.rs | 20 +++ 7 files changed, 507 insertions(+), 7 deletions(-) create mode 100644 brokers/src/redis.rs create mode 100644 brokers/src/util/mod.rs create mode 100644 brokers/src/util/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 746b632..098dfc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,7 +21,7 @@ dependencies = [ "amq-protocol-types", "amq-protocol-uri", "cookie-factory", - "nom", + "nom 6.1.2", ] [[package]] @@ -42,7 +42,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d198cde4bde0eadf1a946e4f0d113ff471bf8abe4ef728181d7320461da8a3e4" dependencies = [ "cookie-factory", - "nom", + "nom 6.1.2", "serde", "serde_json", ] @@ -121,6 +121,17 @@ version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-tungstenite" version = "0.6.0" @@ -280,6 +291,19 @@ dependencies = [ "bitflags", ] +[[package]] +name = "combine" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d47c1b11006b87e492b53b313bb699ce60e16613c4dddaa91f8f7c220ab2fa" +dependencies = [ + "bytes 1.0.1", + "futures-util", + "memchr", + "pin-project-lite 0.2.7", + "tokio 1.9.0", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -289,6 +313,17 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "config" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1b9d958c2b1368a663f05538fc1b5975adce1e19f435acceae987aceeeb369" +dependencies = [ + "lazy_static", + "nom 5.1.2", + "serde", +] + [[package]] name = "cookie-factory" version = "0.3.2" @@ -388,6 +423,33 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "deadpool" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef82259c587bceda08349f28ff00f69ae4c897898f254140af6021eb218e8232" +dependencies = [ + "async-trait", + "config", + "num_cpus", + "serde", + "tokio 1.9.0", +] + +[[package]] +name = "deadpool-redis" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f31ab753d882c470926de2b34f42ed03e6e0661b609586825391f6560113db" +dependencies = [ + "async-trait", + "config", + "deadpool", + "log", + "redis", + "serde", +] + [[package]] name = "digest" version = "0.9.0" @@ -403,6 +465,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dtoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" + [[package]] name = "encoding_rs" version = "0.8.28" @@ -637,7 +705,7 @@ dependencies = [ "indexmap", "slab", "tokio 0.2.25", - "tokio-util", + "tokio-util 0.3.1", "tracing", "tracing-futures", ] @@ -1041,6 +1109,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nom" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +dependencies = [ + "lexical-core", + "memchr", + "version_check", +] + [[package]] name = "nom" version = "6.1.2" @@ -1522,6 +1601,25 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "redis" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bbc1838d8d0b423f325d6fac80c5f19109c7d16c8c37c584893dc17cf71c63d" +dependencies = [ + "async-trait", + "bytes 1.0.1", + "combine", + "dtoa", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite 0.2.7", + "tokio 1.9.0", + "tokio-util 0.6.7", + "url", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -1599,14 +1697,40 @@ dependencies = [ "winreg", ] +[[package]] +name = "rmp" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6" +dependencies = [ + "byteorder", + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rustacles-brokers" version = "0.2.0" dependencies = [ + "deadpool-redis", "env_logger", + "futures 0.3.16", "lapin", "log", "nanoid", + "redis", + "rmp-serde", + "serde", "thiserror", "tokio 1.9.0", "tokio-stream", @@ -1920,9 +2044,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c" dependencies = [ "autocfg 1.0.1", + "bytes 1.0.1", + "libc", + "memchr", + "mio 0.7.13", "num_cpus", "pin-project-lite 0.2.7", "tokio-macros 1.3.0", + "winapi 0.3.9", ] [[package]] @@ -2041,6 +2170,20 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "tokio-util" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +dependencies = [ + "bytes 1.0.1", + "futures-core", + "futures-sink", + "log", + "pin-project-lite 0.2.7", + "tokio 1.9.0", +] + [[package]] name = "tower-service" version = "0.3.1" diff --git a/brokers/Cargo.toml b/brokers/Cargo.toml index 7b3b7e2..7e8ada3 100644 --- a/brokers/Cargo.toml +++ b/brokers/Cargo.toml @@ -11,10 +11,15 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lapin = "1.1" +deadpool-redis = { version = "0.9", optional = true } +lapin = { version = "1.1", optional = true } env_logger = "0.7" +futures = "0.3" log = "0.4" nanoid = "0.3" +redis = { version = "0.21", optional = true, default-features = false, features = ["streams"] } +rmp-serde = "0.15" +serde = "1.0" thiserror = "1.0" tokio-stream = "0.1" @@ -25,3 +30,23 @@ features = ["rt-multi-thread"] [dev-dependencies.tokio] version = "1.0" features = ["rt-multi-thread", "macros"] + +[features] +amqp-broker = ["lapin"] +redis-broker = ["deadpool-redis", "redis"] + +[[example]] +name = "amqp_consumer" +required-features = ["amqp-broker"] + +[[example]] +name = "amqp_producer" +required-features = ["amqp-broker"] + +[[example]] +name = "amqp_rpc_consumer" +required-features = ["amqp-broker"] + +[[example]] +name = "amqp_rpc_producer" +required-features = ["amqp-broker"] diff --git a/brokers/src/error.rs b/brokers/src/error.rs index 3fb7f91..4319d39 100644 --- a/brokers/src/error.rs +++ b/brokers/src/error.rs @@ -1,13 +1,16 @@ +#[cfg(feature = "redis-broker")] +use deadpool_redis::{redis::RedisError, PoolError}; +#[cfg(feature = "amqp-broker")] use lapin::Error as LapinError; - use std::{io::Error as IoError, result::Result as StdResult}; use thiserror::Error; -use tokio::sync::oneshot::error::RecvError; +use tokio::{sync::oneshot::error::RecvError, task::JoinError}; -pub type Result = StdResult; +pub type Result = StdResult; #[derive(Error, Debug)] pub enum Error { + #[cfg(feature = "amqp-broker")] #[error("Lapin error")] Lapin(#[from] LapinError), #[error("IO error")] @@ -16,4 +19,16 @@ pub enum Error { Recv(#[from] RecvError), #[error("Reply error")] Reply(String), + #[cfg(feature = "redis-broker")] + #[error("Redis error")] + Redis(#[from] RedisError), + #[cfg(feature = "redis-broker")] + #[error("Pool error")] + Deadpool(#[from] PoolError), + #[error("Join error")] + Join(#[from] JoinError), + #[error("MessagePack encode error")] + MsgpackEncode(#[from] rmp_serde::encode::Error), + #[error("MessagePack decode error")] + MsgpackDecode(#[from] rmp_serde::decode::Error), } diff --git a/brokers/src/lib.rs b/brokers/src/lib.rs index a65f66d..0049013 100644 --- a/brokers/src/lib.rs +++ b/brokers/src/lib.rs @@ -1,2 +1,6 @@ +#[cfg(feature = "amqp-broker")] pub mod amqp; pub mod error; +#[cfg(feature = "redis-broker")] +pub mod redis; +mod util; diff --git a/brokers/src/redis.rs b/brokers/src/redis.rs new file mode 100644 index 0000000..0515f58 --- /dev/null +++ b/brokers/src/redis.rs @@ -0,0 +1,292 @@ +use std::{borrow::Cow, ops::DerefMut}; + +use deadpool_redis::{ + redis::{ + streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply}, + AsyncCommands, FromRedisValue, RedisError, Value, + }, + Connection, Pool, +}; +use futures::{ + future::ready, + stream::{iter, select_all}, + StreamExt, TryStream, TryStreamExt, +}; +use nanoid::nanoid; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{ + error::{Error, Result}, + util::stream::repeat_fn, +}; + +const DEFAULT_MAX_CHUNK: usize = 10; +const DEFAULT_BLOCK_INTERVAL: usize = 5000; +const STREAM_DATA_KEY: &'static str = "data"; + +/// A message received from the broker. +#[derive(Clone)] +pub struct Message<'a, V> { + /// The group this message belongs to. + pub group: &'a str, + /// The event this message signals. + pub event: Cow<'a, str>, + /// The ID of this message (generated by Redis). + pub id: String, + /// The data of this message. Always present unless there is a bug with a client implementation. + pub data: Option, + pool: &'a Pool, +} + +impl<'a, V> Message<'a, V> +where + V: DeserializeOwned, +{ + fn new(id: StreamId, group: &'a str, event: Cow<'a, str>, pool: &'a Pool) -> Self { + let data = id + .get(STREAM_DATA_KEY) + .and_then(|data: Vec| rmp_serde::from_read_ref(&data).ok()); + + Message { + group, + event, + id: id.id, + pool, + data, + } + } +} + +impl<'a, V> Message<'a, V> { + /// Acknowledge receipt of the message. This should always be called, since un-acked messages + /// will be reclaimed by other clients. + pub async fn ack(&self) -> Result<()> { + let _: Value = self + .pool + .get() + .await? + .xack(&*self.event, self.group, &[&self.id]) + .await?; + + Ok(()) + } + + /// Reply to this message. + pub async fn reply(&self, data: &impl Serialize) -> Result<()> { + let key = format!("{}:{}", self.event, self.id); + let serialized = rmp_serde::to_vec(data)?; + let _: Value = self.pool.get().await?.publish(key, serialized).await?; + + Ok(()) + } +} + +/// The result of consuming events. +#[derive(Debug, Clone)] +pub struct ConsumeResult { + /// A stream that, when polled, claims messages from other clients that have failed to ack + /// within the configured time period. + pub autoclaim: T, + /// A stream that, when polled, consumes messages from the group for this client. + pub claim: U, +} + +// #[derive(Debug)] +pub struct RedisBroker { + /// The consumer name of this broker. Should be unique to the container/machine consuming + /// messages. + pub name: String, + /// The consumer group name. + pub group: String, + /// The largest chunk to consume from Redis. This is only exposed for tuning purposes and + /// doesn't affect the public API at all. + pub max_chunk: usize, + /// The maximum time that a broker is assumed to be alive (ms). Messages pending after this + /// time period will be reclaimed by other clients. + pub max_operation_time: usize, + pool: Pool, + read_opts: StreamReadOptions, +} + +impl RedisBroker { + /// Creates a new broker with sensible defaults. + pub fn new(group: String, pool: Pool) -> RedisBroker { + let name = nanoid!(); + let read_opts = StreamReadOptions::default() + .group(&group, &name) + .count(DEFAULT_MAX_CHUNK) + .block(DEFAULT_BLOCK_INTERVAL); + + Self { + name, + group, + max_chunk: DEFAULT_MAX_CHUNK, + max_operation_time: DEFAULT_BLOCK_INTERVAL, + pool, + read_opts, + } + } + + /// Publishes an event to the broker. Returned value is the ID of the message. + pub async fn publish(&self, event: &str, data: &impl Serialize) -> Result { + let serialized = rmp_serde::to_vec(data)?; + Ok(self + .get() + .await? + .xadd(event, "*", &[(STREAM_DATA_KEY, serialized)]) + .await?) + } + + pub async fn call(&self, event: &str, data: &impl Serialize) -> Result> + where + V: DeserializeOwned, + { + let id = self.publish(event, data).await?; + let name = format!("{}:{}", event, id); + + let mut conn = Connection::take(self.get().await?).into_pubsub(); + conn.subscribe(&name).await?; + + let mut stream = conn.on_message(); + Ok(stream + .next() + .await + .map(|msg| msg.get_payload::>()) + .transpose()? + .map(|payload| rmp_serde::from_read_ref(&payload)) + .transpose()?) + } + + pub async fn subscribe(&self, events: &[&str]) -> Result<()> { + for event in events { + let _: Result = self + .get() + .await? + .xgroup_create_mkstream(*event, &self.group, 0) + .await; + } + + Ok(()) + } + + async fn get(&self) -> Result { + Ok(self.pool.get().await?) + } + + /// Consume events from the broker. + pub fn consume<'a, V>( + &'a self, + events: &'a [&str], + ) -> ConsumeResult< + impl TryStream, Error = Error>, + impl TryStream, Error = Error>, + > + where + V: DeserializeOwned, + { + let ids = vec![">"; events.len()]; + + let pool = &self.pool; + let group = &self.group; + let name = &self.name; + let time = self.max_operation_time; + + let autoclaim_futs = events + .iter() + .map(|event| { + move || async move { + let messages = async move { + let mut conn = pool.get().await?; + let mut cmd = redis::cmd("xautoclaim"); + + cmd.arg(event).arg(group).arg(name).arg(time).arg("0-0"); + + let res: Vec = cmd.query_async(conn.deref_mut()).await?; + let read = StreamRangeReply::from_redis_value(&res[1])?; + + let messages = read.ids.into_iter().map(move |id| { + Ok::<_, Error>(Message::::new( + id, + &group, + Cow::Borrowed(event), + pool, + )) + }); + + Ok::<_, Error>(iter(messages)) + }; + + Some(messages.await) + } + }) + .map(repeat_fn) + .map(|iter| iter.try_flatten()); + + let claim_fut = move || { + let opts = &self.read_opts; + let ids = ids.clone(); + + async move { + let messages = + async move { + let read: Option = + pool.get().await?.xread_options(&events, &ids, opts).await?; + + dbg!(&read); + let messages = read.map(|reply| reply.keys).into_iter().flatten().flat_map( + move |event| { + let key = Cow::from(event.key); + event.ids.into_iter().map(move |id| { + Ok(Message::::new(id, group, key.clone(), pool)) + }) + }, + ); + + Ok::<_, Error>(iter(messages)) + }; + + Some(messages.await) + } + }; + + let autoclaim = select_all(autoclaim_futs); + let claim = repeat_fn(claim_fut).try_flatten(); + + ConsumeResult { autoclaim, claim } + } +} + +#[cfg(test)] +mod test { + use deadpool_redis::{Manager, Pool}; + use futures::TryStreamExt; + + use super::RedisBroker; + + #[tokio::test] + async fn consumes_messages() { + let group = "foo".to_string(); + let manager = Manager::new("redis://localhost:6379").expect("create manager"); + let pool = Pool::new(manager, 32); + let broker = RedisBroker::new(group, pool); + + let events = ["abc"]; + + broker.subscribe(&events).await.expect("subscribed"); + broker + .publish("abc", &[1u8, 2, 3]) + .await + .expect("published"); + + let mut consumer = broker.consume::>(&events); + let msg = consumer + .claim + .try_next() + .await + .expect("message") + .expect("message"); + msg.ack().await.expect("ack"); + + assert_eq!(msg.data.expect("data"), vec![1, 2, 3]); + } +} diff --git a/brokers/src/util/mod.rs b/brokers/src/util/mod.rs new file mode 100644 index 0000000..baf29e0 --- /dev/null +++ b/brokers/src/util/mod.rs @@ -0,0 +1 @@ +pub mod stream; diff --git a/brokers/src/util/stream.rs b/brokers/src/util/stream.rs new file mode 100644 index 0000000..51f83d1 --- /dev/null +++ b/brokers/src/util/stream.rs @@ -0,0 +1,20 @@ +use futures::Stream; +use std::{future::Future, task::Poll}; + +pub fn repeat_fn(mut func: F) -> impl Stream +where + R: Future>, + F: FnMut() -> R, +{ + let mut fut = Box::pin(func()); + + futures::stream::poll_fn(move |ctx| { + let out = fut.as_mut().poll(ctx); + + if let Poll::Ready(_) = out { + fut.set(func()); + } + + out + }) +}