diff --git a/src/connection.rs b/src/connection.rs index 0adcf10..6df11b7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -323,20 +323,22 @@ impl ConnectionSender { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub(crate) async fn send( + pub(crate) fn send( &self, producer_id: u64, producer_name: String, sequence_id: u64, message: producer::ProducerMessage, - ) -> Result { + ) -> Result< + impl Future>, + ConnectionError, + > { let key = RequestKey::ProducerSend { producer_id, sequence_id, }; let msg = messages::send(producer_id, producer_name, sequence_id, message); - self.send_message(msg, key, |resp| resp.command.send_receipt) - .await + self.send_message_non_blocking(msg, key, |resp| resp.command.send_receipt) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] @@ -623,17 +625,31 @@ impl ConnectionSender { extract: F, ) -> Result where - F: FnOnce(Message) -> Option, + F: FnOnce(Message) -> Option + 'static, + { + self.send_message_non_blocking(msg, key, extract)?.await + } + + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn send_message_non_blocking( + &self, + msg: Message, + key: RequestKey, + extract: F, + ) -> Result>, ConnectionError> + where + F: FnOnce(Message) -> Option + 'static, { let (resolver, response) = oneshot::channel(); trace!("sending message(key = {:?}): {:?}", key, msg); let k = key.clone(); - let response = async { + let error = self.error.clone(); + let response = async move { response .await .map_err(|oneshot::Canceled| { - self.error.set(ConnectionError::Disconnected); + error.set(ConnectionError::Disconnected); ConnectionError::Disconnected }) .map(move |message: Message| { @@ -648,36 +664,41 @@ impl ConnectionSender { self.tx.unbounded_send(msg), ) { (Ok(_), Ok(_)) => { + let connection_id = self.connection_id; + let error = self.error.clone(); let delay_f = self.executor.delay(self.operation_timeout); - pin_mut!(response); - pin_mut!(delay_f); - - match select(response, delay_f).await { - Either::Left((res, _)) => { - // println!("recv msg: {:?}", res); - res - } - Either::Right(_) => { - warn!( - "connection {} timedout sending message to the Pulsar server", - self.connection_id - ); - self.error.set(ConnectionError::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, - format!( - " connection {} timedout sending message to the Pulsar server", - self.connection_id - ), - ))); - Err(ConnectionError::Io(std::io::Error::new( - std::io::ErrorKind::TimedOut, - format!( - " connection {} timedout sending message to the Pulsar server", - self.connection_id - ), - ))) + let fut = async move { + pin_mut!(response); + pin_mut!(delay_f); + match select(response, delay_f).await { + Either::Left((res, _)) => { + // println!("recv msg: {:?}", res); + res + } + Either::Right(_) => { + warn!( + "connection {} timedout sending message to the Pulsar server", + connection_id + ); + error.set(ConnectionError::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + " connection {} timedout sending message to the Pulsar server", + connection_id + ), + ))); + Err(ConnectionError::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + " connection {} timedout sending message to the Pulsar server", + connection_id + ), + ))) + } } - } + }; + + Ok(fut) } _ => { warn!( diff --git a/src/producer.rs b/src/producer.rs index bbb5dbc..26637fc 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -538,7 +538,7 @@ impl TopicProducer { }; trace!("sending a batched message of size {}", message_count); - let send_receipt = self.send_compress(message).await.map_err(Arc::new); + let send_receipt = self.send_compress(message).await?.await.map_err(Arc::new); for resolver in receipts { let _ = resolver.send( send_receipt @@ -557,8 +557,13 @@ impl TopicProducer { let (tx, rx) = oneshot::channel(); match self.batch.as_ref() { None => { - let receipt = self.send_compress(message).await?; - let _ = tx.send(Ok(receipt)); + let fut = self.send_compress(message).await?; + self.client + .executor + .spawn(Box::pin(async move { + let _ = tx.send(fut.await); + })) + .map_err(|_| Error::Executor)?; Ok(SendFuture(rx)) } Some(batch) => { @@ -586,16 +591,18 @@ impl TopicProducer { ..Default::default() }; - let send_receipt = self.send_compress(message).await.map_err(Arc::new); - trace!("sending a batched message of size {}", counter); - for tx in receipts.drain(..) { - let _ = tx.send( - send_receipt - .clone() - .map_err(|e| ProducerError::Batch(e).into()), - ); - } + let receipt_fut = self.send_compress(message).await?; + self.client + .executor + .spawn(Box::pin(async move { + let res = receipt_fut.await.map_err(Arc::new); + for tx in receipts.drain(..) { + let _ = tx + .send(res.clone().map_err(|e| ProducerError::Batch(e).into())); + } + })) + .map_err(|_| Error::Executor)?; } Ok(SendFuture(rx)) @@ -607,7 +614,7 @@ impl TopicProducer { async fn send_compress( &mut self, mut message: ProducerMessage, - ) -> Result { + ) -> Result>, Error> { let compressed_message = match self.compression.clone() { None | Some(Compression::None) => message, #[cfg(feature = "lz4")] @@ -674,16 +681,25 @@ impl TopicProducer { async fn send_inner( &mut self, message: ProducerMessage, - ) -> Result { + ) -> Result>, Error> { loop { let msg = message.clone(); - match self - .connection - .sender() - .send(self.id, self.name.clone(), self.message_id.get(), msg) - .await - { - Ok(receipt) => return Ok(receipt), + match self.connection.sender().send( + self.id, + self.name.clone(), + self.message_id.get(), + msg, + ) { + Ok(fut) => { + let fut = async move { + let res = fut.await; + res.map_err(|e| { + error!("wait send receipt got error: {:?}", e); + Error::Producer(ProducerError::Connection(e)) + }) + }; + return Ok(fut); + } Err(ConnectionError::Disconnected) => {} Err(ConnectionError::Io(e)) => { if e.kind() != std::io::ErrorKind::TimedOut {