Skip to content

Commit

Permalink
Add Redis broker
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Aug 21, 2021
1 parent 556911f commit 93196d8
Show file tree
Hide file tree
Showing 7 changed files with 507 additions and 7 deletions.
149 changes: 146 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion brokers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lapin = "1.1"
deadpool-redis = { version = "0.9", optional = true }
lapin = { version = "1.1", optional = true }
env_logger = "0.7"
futures = "0.3"
log = "0.4"
nanoid = "0.3"
redis = { version = "0.21", optional = true, default-features = false, features = ["streams"] }
rmp-serde = "0.15"
serde = "1.0"
thiserror = "1.0"
tokio-stream = "0.1"

Expand All @@ -25,3 +30,23 @@ features = ["rt-multi-thread"]
[dev-dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread", "macros"]

[features]
amqp-broker = ["lapin"]
redis-broker = ["deadpool-redis", "redis"]

[[example]]
name = "amqp_consumer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_producer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_rpc_consumer"
required-features = ["amqp-broker"]

[[example]]
name = "amqp_rpc_producer"
required-features = ["amqp-broker"]
21 changes: 18 additions & 3 deletions brokers/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#[cfg(feature = "redis-broker")]
use deadpool_redis::{redis::RedisError, PoolError};
#[cfg(feature = "amqp-broker")]
use lapin::Error as LapinError;

use std::{io::Error as IoError, result::Result as StdResult};
use thiserror::Error;
use tokio::sync::oneshot::error::RecvError;
use tokio::{sync::oneshot::error::RecvError, task::JoinError};

pub type Result<T> = StdResult<T, Error>;
pub type Result<T, E = Error> = StdResult<T, E>;

#[derive(Error, Debug)]
pub enum Error {
#[cfg(feature = "amqp-broker")]
#[error("Lapin error")]
Lapin(#[from] LapinError),
#[error("IO error")]
Expand All @@ -16,4 +19,16 @@ pub enum Error {
Recv(#[from] RecvError),
#[error("Reply error")]
Reply(String),
#[cfg(feature = "redis-broker")]
#[error("Redis error")]
Redis(#[from] RedisError),
#[cfg(feature = "redis-broker")]
#[error("Pool error")]
Deadpool(#[from] PoolError),
#[error("Join error")]
Join(#[from] JoinError),
#[error("MessagePack encode error")]
MsgpackEncode(#[from] rmp_serde::encode::Error),
#[error("MessagePack decode error")]
MsgpackDecode(#[from] rmp_serde::decode::Error),
}
4 changes: 4 additions & 0 deletions brokers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
#[cfg(feature = "amqp-broker")]
pub mod amqp;
pub mod error;
#[cfg(feature = "redis-broker")]
pub mod redis;
mod util;
Loading

0 comments on commit 93196d8

Please sign in to comment.