Skip to content

Commit

Permalink
Avoid spamming xautoclaim
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Jun 18, 2022
1 parent 56bc44f commit edeae41
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 47 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 71 additions & 42 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::{
borrow::Cow,
time::{SystemTime, UNIX_EPOCH}, fmt::Debug,
fmt::Debug,
sync::{Arc, RwLock},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use bytes::Bytes;
use futures::{
stream::{iter, select, select_all},
TryStream, TryStreamExt,
Future, TryStream, TryStreamExt,
};
use nanoid::nanoid;
pub use redust;
use redust::{
model::stream::{
claim::AutoclaimResponse,
read::{Field, ReadResponse},
read::{Entries, Field, ReadResponse},
Id,
},
pool::Pool,
resp::from_data,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::net::ToSocketAddrs;
use tokio::{net::ToSocketAddrs, time::sleep};

use crate::{
error::{Error, Result},
Expand All @@ -34,6 +36,8 @@ pub mod rpc;

const DEFAULT_MAX_CHUNK: &[u8] = b"10";
const DEFAULT_BLOCK_INTERVAL: &[u8] = b"5000";
const DEFAULT_BLOCK_DURATION: Duration = Duration::from_secs(5);
const DEFAULT_MIN_IDLE_TIME: &[u8] = b"10000";
const STREAM_DATA_KEY: Field<'static> = Field(Cow::Borrowed(b"data"));
const STREAM_TIMEOUT_KEY: Field<'static> = Field(Cow::Borrowed(b"timeout_at"));

Expand All @@ -49,6 +53,7 @@ where
/// The consumer group name.
pub group: Bytes,
pool: Pool<A>,
last_autoclaim: Arc<RwLock<Id>>,
}

impl<A> RedisBroker<A>
Expand All @@ -64,6 +69,7 @@ where
name: name.into(),
group,
pool,
last_autoclaim: Arc::default(),
}
}

Expand All @@ -74,7 +80,7 @@ where

let data = conn
.cmd([
b"xadd",
b"XADD",
event.as_ref(),
b"*",
&STREAM_DATA_KEY.0,
Expand Down Expand Up @@ -123,7 +129,7 @@ where

let data = conn
.cmd([
b"xadd",
b"XADD",
event.as_ref(),
b"*",
&STREAM_DATA_KEY.0,
Expand All @@ -141,12 +147,12 @@ where

for event in events {
let cmd: &[&[u8]] = &[
b"xgroup",
b"create",
b"XGROUP",
b"CREATE",
&*event,
&*self.group,
b"$",
b"mkstream",
b"MKSTREAM",
];

match conn.cmd(cmd).await {
Expand Down Expand Up @@ -216,15 +222,15 @@ where
async fn xreadgroup(&self, events: &[Bytes]) -> Result<ReadResponse<'static>, Error> {
let ids = vec![&b">"[..]; events.len()];
let mut cmd: Vec<&[u8]> = vec![
b"xreadgroup",
b"group",
b"XREADGROUP",
b"GROUP",
&*self.group,
&*self.name,
b"count",
b"COUNT",
DEFAULT_MAX_CHUNK,
b"block",
b"BLOCK",
DEFAULT_BLOCK_INTERVAL,
b"streams",
b"STREAMS",
];
cmd.extend(events.iter().map(|b| &b[..]));
cmd.extend_from_slice(&ids);
Expand All @@ -233,20 +239,29 @@ where
Ok(from_data(data)?)
}

async fn xautoclaim(&self, event: &[u8]) -> Result<AutoclaimResponse<'static>, Error> {
async fn xautoclaim(&self, event: &[u8]) -> Result<Entries<'static>, Error> {
let id = self
.last_autoclaim
.read()
.unwrap()
.to_string();

let cmd = [
b"xautoclaim",
b"XAUTOCLAIM",
event,
&*self.group,
&*self.name,
DEFAULT_BLOCK_INTERVAL,
b"0-0",
DEFAULT_MIN_IDLE_TIME,
id.as_bytes(),
b"COUNT",
DEFAULT_MAX_CHUNK,
];

let mut conn = self.pool.get().await?;

let res = conn.cmd(cmd).await?;
Ok(from_data(res)?)
let res = from_data::<AutoclaimResponse>(conn.cmd(cmd).await?)?;
*self.last_autoclaim.write().unwrap() = res.0;
Ok(res.1)
}

fn autoclaim_all<V>(
Expand All @@ -260,33 +275,47 @@ where
.into_iter()
.map(|event| {
let this = self.clone();
move || {
let this = this.clone();
let event = event.clone();

let messages = async move {
let read = this.xautoclaim(&event).await?;

let messages = read.1 .0.into_iter().map(move |(id, data)| {
Ok::<_, Error>(Message::<A, V>::new(
id,
data,
event.clone(),
this.clone(),
))
});

Ok::<_, Error>(iter(messages))
};

async move { Some(messages.await) }
}
move || this.autoclaim_event(event.clone())
})
.map(repeat_fn)
.map(|iter| iter.try_flatten());
.map(TryStreamExt::try_flatten);

select_all(futs)
}

/// Autoclaim an event and return a stream of messages found during the autoclaim. The returned
/// future output is always [`Some`], intended to improve ergonomics when used with
/// [`repeat_fn`].
///
/// Delays every invocation of `xautoclaim` by [`DEFAULT_BLOCK_DURATION`], since `xautoclaim`
/// does not support blocking.
fn autoclaim_event<V>(
&self,
event: Bytes,
) -> impl Future<Output = Option<Result<impl TryStream<Ok = Message<A, V>, Error = Error>>>>
where
V: DeserializeOwned,
{
let this = self.clone();
let event = event.clone();

let messages = async move {
sleep(DEFAULT_BLOCK_DURATION).await;

let messages = this
.xautoclaim(&event)
.await?
.0
.into_iter()
.map(move |(id, data)| {
Ok::<_, Error>(Message::<A, V>::new(id, data, event.clone(), this.clone()))
});

Ok::<_, Error>(iter(messages))
};

async move { Some(messages.await) }
}
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion brokers/src/redis/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{
fmt::Debug,
io::Write,
str::from_utf8,
time::{Duration, SystemTime, UNIX_EPOCH}, fmt::Debug,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use bytes::Bytes;
Expand Down

0 comments on commit edeae41

Please sign in to comment.