Skip to content

Commit

Permalink
Spawn tasks for claim queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Jul 26, 2022
1 parent f6cce1f commit 6d652b7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
29 changes: 19 additions & 10 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use redust::{
resp::from_data,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{net::ToSocketAddrs, time::sleep};
use tokio::{net::ToSocketAddrs, spawn, time::sleep};
use tracing::{debug, instrument};

use crate::{
Expand All @@ -46,7 +46,7 @@ const STREAM_TIMEOUT_KEY: Field<'static> = Field(Cow::Borrowed(b"timeout_at"));
#[derive(Clone)]
pub struct RedisBroker<A>
where
A: ToSocketAddrs + Clone + Send + Sync + Debug,
A: ToSocketAddrs + Clone + Send + Sync + Debug + 'static,
{
/// The consumer name of this broker. Should be unique to the container/machine consuming
/// messages.
Expand Down Expand Up @@ -209,22 +209,25 @@ where
let this = this.clone();
let events = events.clone();

async move { Some(this.get_messages(&events).await) }
async move { Some(this.get_messages(events).await) }
};

repeat_fn(fut_fn).try_flatten()
}

async fn get_messages<V>(
&self,
events: &[Bytes],
events: Vec<Bytes>,
) -> Result<impl TryStream<Ok = Message<A, V>, Error = Error>>
where
V: DeserializeOwned,
{
let this = self.clone();
let read = self.xreadgroup(events).await?;
let read = spawn(async move { this.xreadgroup(&events).await })
.await
.unwrap()?;

let this = self.clone();
let messages = read.0.into_iter().flat_map(move |(event, entries)| {
let this = this.clone();
entries.0.into_iter().map(move |(id, entry)| {
Expand Down Expand Up @@ -322,7 +325,7 @@ where
let this = self.clone();
let event = event.clone();

let messages = async move {
let messages = spawn(async move {
sleep(DEFAULT_BLOCK_DURATION).await;

let messages = this
Expand All @@ -335,9 +338,9 @@ where
});

Ok::<_, Error>(iter(messages))
};
});

async move { Some(messages.await) }
async move { Some(messages.await.unwrap()) }
}
}

Expand All @@ -363,7 +366,10 @@ mod test {

let events = [Bytes::from("abc")];

broker.ensure_events(events.iter()).await.expect("subscribed");
broker
.ensure_events(events.iter())
.await
.expect("subscribed");
broker
.publish("abc", &[1u8, 2, 3])
.await
Expand Down Expand Up @@ -392,7 +398,10 @@ mod test {
let broker1 = RedisBroker::new(group, pool);
let broker2 = broker1.clone();

broker1.ensure_events(events.iter()).await.expect("subscribed");
broker1
.ensure_events(events.iter())
.await
.expect("subscribed");

let timeout = Some(SystemTime::now() + Duration::from_millis(500));

Expand Down
2 changes: 1 addition & 1 deletion brokers/src/redis/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::{RedisBroker, STREAM_DATA_KEY, STREAM_TIMEOUT_KEY};
#[derive(Debug, Clone)]
pub struct Message<A, V>
where
A: ToSocketAddrs + Clone + Send + Sync + Debug,
A: ToSocketAddrs + Clone + Send + Sync + Debug + 'static,
{
/// The group this message belongs to.
pub group: Bytes,
Expand Down
4 changes: 2 additions & 2 deletions brokers/src/redis/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use serde::de::DeserializeOwned;
use tokio::net::ToSocketAddrs;
use tracing::instrument;

use crate::{error::Result, common};
use crate::{common, error::Result};

use super::RedisBroker;

/// A Remote Procedure Call. Poll the future returned by `response` to get the response value.
#[derive(Debug, Clone)]
pub struct Rpc<A>
where
A: ToSocketAddrs + Clone + Send + Sync + Debug,
A: ToSocketAddrs + Clone + Send + Sync + Debug + 'static,
{
pub(crate) name: String,
pub(crate) broker: RedisBroker<A>,
Expand Down

0 comments on commit 6d652b7

Please sign in to comment.