Skip to content

Commit

Permalink
Merge Redis streams into single stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Dec 30, 2021
1 parent 2914704 commit dce83f1
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 37 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions brokers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ redis = { version = "0.21", optional = true, default-features = false, features
rmp-serde = "0.15"
serde = "1.0"
thiserror = "1.0"
tokio-stream = "0.1"

[dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread"]
optional = true

[dev-dependencies.tokio]
version = "1.0"
features = ["rt-multi-thread", "macros"]

[features]
amqp-broker = ["lapin"]
amqp-broker = ["lapin", "tokio"]
redis-broker = ["deadpool-redis", "redis"]

[[example]]
Expand Down
2 changes: 1 addition & 1 deletion brokers/src/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::{
sync::{Arc, Weak},
};

use futures::StreamExt;
use lapin::{
message::Delivery, options::*, types::FieldTable, BasicProperties, Channel, Connection,
ConnectionProperties, ExchangeKind,
};
use log::debug;
use nanoid::nanoid;
pub use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::StreamExt;

use crate::error::*;

Expand Down
6 changes: 3 additions & 3 deletions brokers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use deadpool_redis::{redis::RedisError, PoolError};
#[cfg(feature = "amqp-broker")]
use lapin::Error as LapinError;
#[cfg(feature = "amqp-broker")]
use tokio::sync::oneshot::error::RecvError;
use std::{io::Error as IoError, result::Result as StdResult};
use thiserror::Error;
use tokio::{sync::oneshot::error::RecvError, task::JoinError};

pub type Result<T, E = Error> = StdResult<T, E>;

Expand All @@ -15,6 +16,7 @@ pub enum Error {
Lapin(#[from] LapinError),
#[error("IO error")]
Io(#[from] IoError),
#[cfg(feature = "amqp-broker")]
#[error("Async receive error")]
Recv(#[from] RecvError),
#[error("Reply error")]
Expand All @@ -25,8 +27,6 @@ pub enum Error {
#[cfg(feature = "redis-broker")]
#[error("Pool error")]
Deadpool(#[from] PoolError),
#[error("Join error")]
Join(#[from] JoinError),
#[error("MessagePack encode error")]
MsgpackEncode(#[from] rmp_serde::encode::Error),
#[error("MessagePack decode error")]
Expand Down
23 changes: 4 additions & 19 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use deadpool_redis::{
};
use futures::{
stream::{iter, select_all},
StreamExt, TryStream, TryStreamExt,
stream_select, StreamExt, TryStream, TryStreamExt,
};
use nanoid::nanoid;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -61,8 +61,7 @@ impl<'a, V> Message<'a, V> {
/// Acknowledge receipt of the message. This should always be called, since un-acked messages
/// will be reclaimed by other clients.
pub async fn ack(&self) -> Result<()> {
self
.pool
self.pool
.get()
.await?
.xack(&*self.event, self.group, &[&self.id])
Expand All @@ -81,16 +80,6 @@ impl<'a, V> Message<'a, V> {
}
}

/// The result of consuming events.
#[derive(Debug, Clone)]
pub struct ConsumeResult<T, U> {
/// A stream that, when polled, claims messages from other clients that have failed to ack
/// within the configured time period.
pub autoclaim: T,
/// A stream that, when polled, consumes messages from the group for this client.
pub claim: U,
}

// #[derive(Debug)]
pub struct RedisBroker<'a> {
/// The consumer name of this broker. Should be unique to the container/machine consuming
Expand Down Expand Up @@ -176,10 +165,7 @@ impl<'a> RedisBroker<'a> {
pub fn consume<'consume, V>(
&'consume self,
events: &'consume [&str],
) -> ConsumeResult<
impl TryStream<Ok = Message<'consume, V>, Error = Error>,
impl TryStream<Ok = Message<'consume, V>, Error = Error>,
>
) -> impl TryStream<Ok = Message<'consume, V>, Error = Error>
where
V: DeserializeOwned,
{
Expand Down Expand Up @@ -255,7 +241,7 @@ impl<'a> RedisBroker<'a> {
let autoclaim = select_all(autoclaim_futs);
let claim = repeat_fn(claim_fut).try_flatten();

ConsumeResult { autoclaim, claim }
stream_select!(autoclaim, claim)
}
}

Expand Down Expand Up @@ -283,7 +269,6 @@ mod test {

let mut consumer = broker.consume::<Vec<u8>>(&events);
let msg = consumer
.claim
.try_next()
.await
.expect("message")
Expand Down

0 comments on commit dce83f1

Please sign in to comment.