diff --git a/brokers/src/redis.rs b/brokers/src/redis.rs index d1c3468..9edb23f 100644 --- a/brokers/src/redis.rs +++ b/brokers/src/redis.rs @@ -23,7 +23,7 @@ use redust::{ }; use serde::{de::DeserializeOwned, Serialize}; use tokio::{net::ToSocketAddrs, time::sleep}; -use tracing::{instrument, debug}; +use tracing::{debug, instrument}; use crate::{ error::{Error, Result}, @@ -76,7 +76,11 @@ where /// Publishes an event to the broker. Returned value is the ID of the message. #[instrument(ret, err)] - pub async fn publish(&self, event: impl AsRef<[u8]> + Debug, data: &(impl Serialize + Debug)) -> Result { + pub async fn publish( + &self, + event: impl AsRef<[u8]> + Debug, + data: &(impl Serialize + Debug), + ) -> Result { let serialized_data = rmp_serde::to_vec(data)?; let mut conn = self.pool.get().await?; @@ -248,11 +252,7 @@ where #[instrument(ret, err)] async fn xautoclaim(&self, event: &[u8]) -> Result, Error> { - let id = self - .last_autoclaim - .read() - .unwrap() - .to_string(); + let id = self.last_autoclaim.read().unwrap().to_string(); let cmd = [ b"XAUTOCLAIM", diff --git a/brokers/src/redis/message.rs b/brokers/src/redis/message.rs index 3f7ebb7..a5df180 100644 --- a/brokers/src/redis/message.rs +++ b/brokers/src/redis/message.rs @@ -95,8 +95,7 @@ where /// Reply to this message. pub async fn reply(&self, data: &impl Serialize) -> Result<()> { - let mut key = Vec::new(); - key.copy_from_slice(&self.event); + let mut key = self.event.to_vec(); write!(key, ":{}", self.id)?; let serialized = rmp_serde::to_vec(data)?; diff --git a/brokers/src/redis/rpc.rs b/brokers/src/redis/rpc.rs index a546feb..6389abc 100644 --- a/brokers/src/redis/rpc.rs +++ b/brokers/src/redis/rpc.rs @@ -1,8 +1,7 @@ use std::fmt::Debug; -use redust::resp::from_data; +use redust::{model::pubsub, resp::from_data}; use serde::de::DeserializeOwned; -use serde_bytes::Bytes; use tokio::net::ToSocketAddrs; use crate::error::Result; @@ -39,11 +38,15 @@ where V: DeserializeOwned, { let mut conn = self.broker.pool.get().await?; + conn.cmd(["SUBSCRIBE", &self.name]).await?; - conn.cmd(["subscribe", &self.name]).await?; - let data = conn.read_cmd().await?; + loop { + let response = from_data::(conn.read_cmd().await?)?; - let bytes = from_data::<&Bytes>(data)?; - Ok(rmp_serde::from_read_ref(bytes)?) + if let pubsub::Response::Message(msg) = response { + conn.cmd(["UNSUBSCRIBE", &self.name]).await?; + break Ok(rmp_serde::from_read_ref(&msg.data)?); + } + } } }