Skip to content

Commit

Permalink
feat: Change send to not block on waiting receipt (#298)
Browse files Browse the repository at this point in the history
Co-authored-by: Sirius Fang <[email protected]>
  • Loading branch information
cirias and Sirius Fang authored Oct 22, 2023
1 parent cdb1fdf commit 5c245cd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 56 deletions.
91 changes: 56 additions & 35 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,20 +323,22 @@ impl<Exe: Executor> ConnectionSender<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub(crate) async fn send(
pub(crate) fn send(
&self,
producer_id: u64,
producer_name: String,
sequence_id: u64,
message: producer::ProducerMessage,
) -> Result<proto::CommandSendReceipt, ConnectionError> {
) -> Result<
impl Future<Output = Result<proto::CommandSendReceipt, ConnectionError>>,
ConnectionError,
> {
let key = RequestKey::ProducerSend {
producer_id,
sequence_id,
};
let msg = messages::send(producer_id, producer_name, sequence_id, message);
self.send_message(msg, key, |resp| resp.command.send_receipt)
.await
self.send_message_non_blocking(msg, key, |resp| resp.command.send_receipt)
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
Expand Down Expand Up @@ -623,17 +625,31 @@ impl<Exe: Executor> ConnectionSender<Exe> {
extract: F,
) -> Result<R, ConnectionError>
where
F: FnOnce(Message) -> Option<R>,
F: FnOnce(Message) -> Option<R> + 'static,
{
self.send_message_non_blocking(msg, key, extract)?.await
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn send_message_non_blocking<R: Debug, F>(
&self,
msg: Message,
key: RequestKey,
extract: F,
) -> Result<impl Future<Output = Result<R, ConnectionError>>, ConnectionError>
where
F: FnOnce(Message) -> Option<R> + 'static,
{
let (resolver, response) = oneshot::channel();
trace!("sending message(key = {:?}): {:?}", key, msg);

let k = key.clone();
let response = async {
let error = self.error.clone();
let response = async move {
response
.await
.map_err(|oneshot::Canceled| {
self.error.set(ConnectionError::Disconnected);
error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |message: Message| {
Expand All @@ -648,36 +664,41 @@ impl<Exe: Executor> ConnectionSender<Exe> {
self.tx.unbounded_send(msg),
) {
(Ok(_), Ok(_)) => {
let connection_id = self.connection_id;
let error = self.error.clone();
let delay_f = self.executor.delay(self.operation_timeout);
pin_mut!(response);
pin_mut!(delay_f);

match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
res
}
Either::Right(_) => {
warn!(
"connection {} timedout sending message to the Pulsar server",
self.connection_id
);
self.error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
self.connection_id
),
)));
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
self.connection_id
),
)))
let fut = async move {
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
res
}
Either::Right(_) => {
warn!(
"connection {} timedout sending message to the Pulsar server",
connection_id
);
error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
connection_id
),
)));
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
" connection {} timedout sending message to the Pulsar server",
connection_id
),
)))
}
}
}
};

Ok(fut)
}
_ => {
warn!(
Expand Down
58 changes: 37 additions & 21 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
};

trace!("sending a batched message of size {}", message_count);
let send_receipt = self.send_compress(message).await.map_err(Arc::new);
let send_receipt = self.send_compress(message).await?.await.map_err(Arc::new);
for resolver in receipts {
let _ = resolver.send(
send_receipt
Expand All @@ -557,8 +557,13 @@ impl<Exe: Executor> TopicProducer<Exe> {
let (tx, rx) = oneshot::channel();
match self.batch.as_ref() {
None => {
let receipt = self.send_compress(message).await?;
let _ = tx.send(Ok(receipt));
let fut = self.send_compress(message).await?;
self.client
.executor
.spawn(Box::pin(async move {
let _ = tx.send(fut.await);
}))
.map_err(|_| Error::Executor)?;
Ok(SendFuture(rx))
}
Some(batch) => {
Expand Down Expand Up @@ -586,16 +591,18 @@ impl<Exe: Executor> TopicProducer<Exe> {
..Default::default()
};

let send_receipt = self.send_compress(message).await.map_err(Arc::new);

trace!("sending a batched message of size {}", counter);
for tx in receipts.drain(..) {
let _ = tx.send(
send_receipt
.clone()
.map_err(|e| ProducerError::Batch(e).into()),
);
}
let receipt_fut = self.send_compress(message).await?;
self.client
.executor
.spawn(Box::pin(async move {
let res = receipt_fut.await.map_err(Arc::new);
for tx in receipts.drain(..) {
let _ = tx
.send(res.clone().map_err(|e| ProducerError::Batch(e).into()));
}
}))
.map_err(|_| Error::Executor)?;
}

Ok(SendFuture(rx))
Expand All @@ -607,7 +614,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
async fn send_compress(
&mut self,
mut message: ProducerMessage,
) -> Result<proto::CommandSendReceipt, Error> {
) -> Result<impl Future<Output = Result<CommandSendReceipt, Error>>, Error> {
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
#[cfg(feature = "lz4")]
Expand Down Expand Up @@ -674,16 +681,25 @@ impl<Exe: Executor> TopicProducer<Exe> {
async fn send_inner(
&mut self,
message: ProducerMessage,
) -> Result<proto::CommandSendReceipt, Error> {
) -> Result<impl Future<Output = Result<CommandSendReceipt, Error>>, Error> {
loop {
let msg = message.clone();
match self
.connection
.sender()
.send(self.id, self.name.clone(), self.message_id.get(), msg)
.await
{
Ok(receipt) => return Ok(receipt),
match self.connection.sender().send(
self.id,
self.name.clone(),
self.message_id.get(),
msg,
) {
Ok(fut) => {
let fut = async move {
let res = fut.await;
res.map_err(|e| {
error!("wait send receipt got error: {:?}", e);
Error::Producer(ProducerError::Connection(e))
})
};
return Ok(fut);
}
Err(ConnectionError::Disconnected) => {}
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
Expand Down

0 comments on commit 5c245cd

Please sign in to comment.