diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 3849bd6cb..de6dab5f3 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -152,15 +152,8 @@ impl Client { /// # } /// ``` pub async fn publish(&self, subject: Subject, payload: Bytes) -> Result<(), PublishError> { - self.sender - .send(Command::Publish { - subject, - payload, - respond: None, - headers: None, - }) - .await?; - Ok(()) + self.publish_with_headers(subject, HeaderMap::new(), payload) + .await } /// Publish a [Message] with headers to a given subject. @@ -193,7 +186,7 @@ impl Client { subject, payload, respond: None, - headers: Some(headers), + headers, }) .await?; Ok(()) @@ -225,15 +218,8 @@ impl Client { reply: Subject, payload: Bytes, ) -> Result<(), PublishError> { - self.sender - .send(Command::Publish { - subject, - payload, - respond: Some(reply), - headers: None, - }) - .await?; - Ok(()) + self.publish_with_reply_and_headers(subject, reply, HeaderMap::new(), payload) + .await } /// Publish a [Message] to a given subject with headers and specified response subject @@ -271,7 +257,7 @@ impl Client { subject, payload, respond: Some(reply), - headers: Some(headers), + headers, }) .await?; Ok(()) @@ -344,17 +330,14 @@ impl Client { if let Some(inbox) = request.inbox { let timeout = request.timeout.unwrap_or(self.request_timeout); let mut subscriber = self.subscribe(inbox.clone().into()).await?; - let payload: Bytes = request.payload.unwrap_or_default(); - match request.headers { - Some(headers) => { - self.publish_with_reply_and_headers(subject, inbox.into(), headers, payload) - .await? - } - None => { - self.publish_with_reply(subject, inbox.into(), payload) - .await? - } - } + self.publish_with_reply_and_headers( + subject, + inbox.into(), + request.headers, + request.payload, + ) + .await?; + let request = match timeout { Some(timeout) => { tokio::time::timeout(timeout, subscriber.next()) @@ -381,9 +364,9 @@ impl Client { } else { let (sender, receiver) = oneshot::channel(); - let payload = request.payload.unwrap_or_default(); let respond = self.new_inbox().into(); let headers = request.headers; + let payload = request.payload; self.sender .send(Command::Request { @@ -551,8 +534,8 @@ impl Client { /// Used for building customized requests. #[derive(Default)] pub struct Request { - payload: Option, - headers: Option, + payload: Bytes, + headers: HeaderMap, timeout: Option>, inbox: Option, } @@ -575,7 +558,7 @@ impl Request { /// # } /// ``` pub fn payload(mut self, payload: Bytes) -> Request { - self.payload = Some(payload); + self.payload = payload; self } @@ -600,7 +583,7 @@ impl Request { /// # } /// ``` pub fn headers(mut self, headers: HeaderMap) -> Request { - self.headers = Some(headers); + self.headers = headers; self } diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index 756140481..d2778d8cc 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -431,10 +431,7 @@ impl Connection { respond, headers, } => { - let verb = match headers.as_ref() { - Some(headers) if !headers.is_empty() => "HPUB", - _ => "PUB", - }; + let verb = if !headers.is_empty() { "HPUB" } else { "PUB" }; small_write!("{verb} {subject} "); @@ -442,19 +439,16 @@ impl Connection { small_write!("{respond} "); } - match headers { - Some(headers) if !headers.is_empty() => { - let headers = headers.to_bytes(); + if !headers.is_empty() { + let headers = headers.to_bytes(); - let headers_len = headers.len(); - let total_len = headers_len + payload.len(); - small_write!("{headers_len} {total_len}\r\n"); - self.write(headers); - } - _ => { - let payload_len = payload.len(); - small_write!("{payload_len}\r\n"); - } + let headers_len = headers.len(); + let total_len = headers_len + payload.len(); + small_write!("{headers_len} {total_len}\r\n"); + self.write(headers); + } else { + let payload_len = payload.len(); + small_write!("{payload_len}\r\n"); } self.write(Bytes::clone(payload)); @@ -954,7 +948,7 @@ mod write_op { subject: "FOO.BAR".into(), payload: "Hello World".into(), respond: None, - headers: None, + headers: HeaderMap::new(), }] .iter(), ) @@ -973,7 +967,7 @@ mod write_op { subject: "FOO.BAR".into(), payload: "Hello World".into(), respond: Some("INBOX.67".into()), - headers: None, + headers: HeaderMap::new(), }] .iter(), ) @@ -991,10 +985,10 @@ mod write_op { subject: "FOO.BAR".into(), payload: "Hello World".into(), respond: Some("INBOX.67".into()), - headers: Some(HeaderMap::from_iter([( + headers: HeaderMap::from_iter([( "Header".parse().unwrap(), "X".parse().unwrap(), - )])), + )]), }] .iter(), ) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index f2478c19b..a422c545a 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -1179,7 +1179,7 @@ impl futures::Stream for Streams<'_> { #[derive(Default, Clone, Debug)] pub struct Publish { payload: Bytes, - headers: Option, + headers: header::HeaderMap, } impl Publish { /// Creates a new custom Publish struct to be used with. @@ -1194,14 +1194,12 @@ impl Publish { } /// Adds headers to the message. pub fn headers(mut self, headers: HeaderMap) -> Self { - self.headers = Some(headers); + self.headers = headers; self } /// A shorthand to add a single header. pub fn header(mut self, name: N, value: V) -> Self { - self.headers - .get_or_insert(header::HeaderMap::new()) - .insert(name, value); + self.headers.insert(name, value); self } /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window. diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 982955aca..f248babd1 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -270,13 +270,13 @@ pub(crate) enum Command { subject: Subject, payload: Bytes, respond: Option, - headers: Option, + headers: HeaderMap, }, Request { subject: Subject, payload: Bytes, respond: Subject, - headers: Option, + headers: HeaderMap, sender: oneshot::Sender, }, Subscribe { @@ -301,7 +301,7 @@ pub(crate) enum ClientOp { subject: Subject, payload: Bytes, respond: Option, - headers: Option, + headers: HeaderMap, }, Subscribe { sid: u64,