Skip to content

Commit

Permalink
Fix RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Nelson committed Jun 18, 2022
1 parent 81e6b26 commit c4b3552
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
14 changes: 7 additions & 7 deletions brokers/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use redust::{
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{net::ToSocketAddrs, time::sleep};
use tracing::{instrument, debug};
use tracing::{debug, instrument};

use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -76,7 +76,11 @@ where

/// Publishes an event to the broker. Returned value is the ID of the message.
#[instrument(ret, err)]
pub async fn publish(&self, event: impl AsRef<[u8]> + Debug, data: &(impl Serialize + Debug)) -> Result<Id> {
pub async fn publish(
&self,
event: impl AsRef<[u8]> + Debug,
data: &(impl Serialize + Debug),
) -> Result<Id> {
let serialized_data = rmp_serde::to_vec(data)?;
let mut conn = self.pool.get().await?;

Expand Down Expand Up @@ -248,11 +252,7 @@ where

#[instrument(ret, err)]
async fn xautoclaim(&self, event: &[u8]) -> Result<Entries<'static>, Error> {
let id = self
.last_autoclaim
.read()
.unwrap()
.to_string();
let id = self.last_autoclaim.read().unwrap().to_string();

let cmd = [
b"XAUTOCLAIM",
Expand Down
3 changes: 1 addition & 2 deletions brokers/src/redis/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ where

/// Reply to this message.
pub async fn reply(&self, data: &impl Serialize) -> Result<()> {
let mut key = Vec::new();
key.copy_from_slice(&self.event);
let mut key = self.event.to_vec();
write!(key, ":{}", self.id)?;

let serialized = rmp_serde::to_vec(data)?;
Expand Down
15 changes: 9 additions & 6 deletions brokers/src/redis/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::fmt::Debug;

use redust::resp::from_data;
use redust::{model::pubsub, resp::from_data};
use serde::de::DeserializeOwned;
use serde_bytes::Bytes;
use tokio::net::ToSocketAddrs;

use crate::error::Result;
Expand Down Expand Up @@ -39,11 +38,15 @@ where
V: DeserializeOwned,
{
let mut conn = self.broker.pool.get().await?;
conn.cmd(["SUBSCRIBE", &self.name]).await?;

conn.cmd(["subscribe", &self.name]).await?;
let data = conn.read_cmd().await?;
loop {
let response = from_data::<pubsub::Response>(conn.read_cmd().await?)?;

let bytes = from_data::<&Bytes>(data)?;
Ok(rmp_serde::from_read_ref(bytes)?)
if let pubsub::Response::Message(msg) = response {
conn.cmd(["UNSUBSCRIBE", &self.name]).await?;
break Ok(rmp_serde::from_read_ref(&msg.data)?);
}
}
}
}

0 comments on commit c4b3552

Please sign in to comment.