Skip to content

Commit

Permalink
Useless Option<HeaderMap>
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Sep 20, 2023
1 parent 98fd93a commit 17dfb59
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 56 deletions.
47 changes: 14 additions & 33 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,8 @@ impl Client {
/// # }
/// ```
pub async fn publish(&self, subject: String, payload: Bytes) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: None,
})
.await?;
Ok(())
self.publish_with_headers(subject, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] with headers to a given subject.
Expand Down Expand Up @@ -191,7 +184,7 @@ impl Client {
subject,
payload,
respond: None,
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down Expand Up @@ -223,15 +216,8 @@ impl Client {
reply: String,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: None,
})
.await?;
Ok(())
self.publish_with_reply_and_headers(subject, reply, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] to a given subject with headers and specified response subject
Expand Down Expand Up @@ -269,7 +255,7 @@ impl Client {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down Expand Up @@ -338,14 +324,9 @@ impl Client {
if let Some(inbox) = request.inbox {
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.publish_with_reply_and_headers(subject, inbox, request.headers, request.payload)
.await?;

let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
Expand All @@ -372,9 +353,9 @@ impl Client {
} else {
let (sender, receiver) = oneshot::channel();

let payload = request.payload.unwrap_or_else(Bytes::new);
let respond = self.new_inbox();
let headers = request.headers;
let payload = request.payload;

self.sender
.send(Command::Request {
Expand Down Expand Up @@ -542,8 +523,8 @@ impl Client {
/// Used for building customized requests.
#[derive(Default)]
pub struct Request {
payload: Option<Bytes>,
headers: Option<HeaderMap>,
payload: Bytes,
headers: HeaderMap,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}
Expand All @@ -566,7 +547,7 @@ impl Request {
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> Request {
self.payload = Some(payload);
self.payload = payload;
self
}

Expand All @@ -591,7 +572,7 @@ impl Request {
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> Request {
self.headers = Some(headers);
self.headers = headers;
self
}

Expand Down
34 changes: 14 additions & 20 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,30 +431,24 @@ impl Connection {
respond,
headers,
} => {
let verb = match headers.as_ref() {
Some(headers) if !headers.is_empty() => "HPUB",
_ => "PUB",
};
let verb = if !headers.is_empty() { "HPUB" } else { "PUB" };

small_write!("{verb} {subject} ");

if let Some(respond) = respond {
small_write!("{respond} ");
}

match headers {
Some(headers) if !headers.is_empty() => {
let headers = headers.to_bytes();
if !headers.is_empty() {
let headers = headers.to_bytes();

let headers_len = headers.len();
let total_len = headers_len + payload.len();
small_write!("{headers_len} {total_len}\r\n");
self.write(headers);
}
_ => {
let payload_len = payload.len();
small_write!("{payload_len}\r\n");
}
let headers_len = headers.len();
let total_len = headers_len + payload.len();
small_write!("{headers_len} {total_len}\r\n");
self.write(headers);
} else {
let payload_len = payload.len();
small_write!("{payload_len}\r\n");
}

self.write(Bytes::clone(payload));
Expand Down Expand Up @@ -958,7 +952,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: None,
headers: None,
headers: HeaderMap::new(),
}]
.iter(),
)
Expand All @@ -977,7 +971,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: None,
headers: HeaderMap::new(),
}]
.iter(),
)
Expand All @@ -995,10 +989,10 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: Some(HeaderMap::from_iter([(
headers: HeaderMap::from_iter([(
"Header".parse().unwrap(),
"X".parse().unwrap(),
)])),
)]),
}]
.iter(),
)
Expand Down
6 changes: 3 additions & 3 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ pub(crate) enum Command {
subject: String,
payload: Bytes,
respond: Option<String>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Request {
subject: String,
payload: Bytes,
respond: String,
headers: Option<HeaderMap>,
headers: HeaderMap,
sender: oneshot::Sender<Message>,
},
Subscribe {
Expand All @@ -299,7 +299,7 @@ pub(crate) enum ClientOp {
subject: String,
payload: Bytes,
respond: Option<String>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Subscribe {
sid: u64,
Expand Down

0 comments on commit 17dfb59

Please sign in to comment.