From edf1922881bfd739a0007e43ae31b2487affc0fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eloi=20D=C3=A9molis?= <43861898+Wonshtrum@users.noreply.github.com> Date: Tue, 11 Apr 2023 11:08:04 +0200 Subject: [PATCH] fix(conn): ConsumerEngine::reconnect loop (#271) * Fix ConsumerEngine::reconnect loop - old message source drop doesn't trigger reconnect - call send_flow after successful subscribe_topic - subscribe_topic loop keep track of time Signed-off-by: Eloi DEMOLIS * Remove drop_signal from consumer and producer - close_consumer and close_producer are synchronously called at the beginning of reconnect and asynchronously upon drop - close_consumer send a Register::RemoveConsumer on registrations to remove the old resolver from the consumers BTreeMap which cleans the old "message source thread" note: it is not clear when close_consumer and close_producer should be called, if all cases are covered and in each cases how to properly handle errors Signed-off-by: Eloi DEMOLIS * Retryable operations refactor: - merge connection and reconnection code for TopicConsumer in retry_subscribe_consumer - merge connection and reconnection code for TopicProducer in retry_create_producer - merge consumer and producer (re)connection error handling in handle_retry_error Signed-off-by: Eloi DEMOLIS --------- Signed-off-by: Eloi DEMOLIS --- src/connection.rs | 18 +- src/connection_manager.rs | 2 +- src/consumer/engine.rs | 276 ++++----------------- src/consumer/initial_position.rs | 10 +- src/consumer/topic.rs | 181 ++------------ src/lib.rs | 1 + src/message.rs | 1 - src/producer.rs | 410 ++++--------------------------- src/retry_op.rs | 219 +++++++++++++++++ 9 files changed, 354 insertions(+), 764 deletions(-) create mode 100644 src/retry_op.rs diff --git a/src/connection.rs b/src/connection.rs index 0e53841..e55c086 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,6 +46,9 @@ pub(crate) enum Register { consumer_id: u64, resolver: mpsc::UnboundedSender, }, + RemoveConsumer { + consumer_id: u64, + }, Ping { resolver: oneshot::Sender<()>, }, @@ -157,6 +160,9 @@ impl>> Future for Receiver })) => { self.consumers.insert(consumer_id, resolver); } + Poll::Ready(Some(Register::RemoveConsumer { consumer_id })) => { + self.consumers.remove(&consumer_id); + } Poll::Ready(Some(Register::Ping { resolver })) => { self.ping = Some(resolver); } @@ -549,6 +555,16 @@ impl ConnectionSender { ) -> Result { let request_id = self.request_id.get(); let msg = messages::close_consumer(consumer_id, request_id); + match self + .registrations + .unbounded_send(Register::RemoveConsumer { consumer_id }) + { + Ok(_) => {} + Err(_) => { + self.error.set(ConnectionError::Disconnected); + return Err(ConnectionError::Disconnected); + } + } self.send_message(msg, RequestKey::RequestId(request_id), |resp| { resp.command.success }) @@ -1114,7 +1130,7 @@ impl Connection { impl Drop for Connection { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn drop(&mut self) { - trace!("dropping connection {} for {}", self.id, self.url); + debug!("dropping connection {} for {}", self.id, self.url); if let Some(shutdown) = self.sender.receiver_shutdown.take() { let _ = shutdown.send(()); } diff --git a/src/connection_manager.rs b/src/connection_manager.rs index b686d88..73c516d 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -448,7 +448,7 @@ impl ConnectionManager { } } Some(ConnectionStatus::Connected(_)) => { - //info!("removing old connection"); + info!("removing old connection"); } None => { //info!("setting up new connection"); diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index c57efee..7f8f93d 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -7,7 +7,7 @@ use std::{ }; use futures::{ - channel::{mpsc, mpsc::UnboundedSender, oneshot}, + channel::{mpsc, mpsc::UnboundedSender}, SinkExt, StreamExt, }; @@ -25,7 +25,8 @@ use crate::{ }, proto, proto::{BaseCommand, CommandCloseConsumer, CommandMessage}, - BrokerAddress, Error, Executor, Payload, Pulsar, + retry_op::retry_subscribe_consumer, + Error, Executor, Payload, Pulsar, }; pub struct ConsumerEngine { @@ -47,7 +48,6 @@ pub struct ConsumerEngine { unacked_messages: HashMap, dead_letter_policy: Option, options: ConsumerOptions, - drop_signal: Option>, } impl ConsumerEngine { @@ -67,7 +67,6 @@ impl ConsumerEngine { unacked_message_redelivery_delay: Option, dead_letter_policy: Option, options: ConsumerOptions, - drop_signal: oneshot::Sender<()>, ) -> ConsumerEngine { let (event_tx, event_rx) = mpsc::unbounded(); ConsumerEngine { @@ -89,7 +88,6 @@ impl ConsumerEngine { unacked_messages: HashMap::new(), dead_letter_policy, options, - drop_signal: Some(drop_signal), } } @@ -191,13 +189,8 @@ impl ConsumerEngine { ) -> Option> { match message_opt { None => { - error!("Consumer: messages::next: returning Disconnected"); - if let Err(err) = self.reconnect().await { - Some(Err(err)) - } else { - None - } - //return Err(Error::Consumer(ConsumerError::Connection(ConnectionError::Disconnected)).into()); + debug!("Consumer: old message source terminated"); + None } Some(message) => { self.remaining_messages -= message @@ -543,237 +536,39 @@ impl ConsumerEngine { Ok(()) } - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - async fn retry_with_delay( - &mut self, - current_retries: &mut u32, - error: &str, - topic: &String, - text: &Option, - broker_address: &BrokerAddress, - ) -> Result>, Error> { - warn!( - "subscribing({}) answered {}, retrying request after {}ms (max_retries = {:?}): {}", - topic, - error, - self.client.operation_retry_options.retry_delay.as_millis(), - self.client.operation_retry_options.max_retries, - text.as_deref().unwrap_or_default() - ); - - *current_retries += 1; - self.client - .executor - .delay(self.client.operation_retry_options.retry_delay) - .await; - - let addr = self.client.lookup_topic(topic).await?; - let connection = self.client.manager.get_connection(&addr).await?; - self.connection = connection.clone(); - - warn!( - "Retry #{} -> reconnecting consumer {:#} using connection {:#} to broker {:#} to topic {:#}", - current_retries, - self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - Ok(connection) - } - - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - async fn subscribe_topic( - &mut self, - resolver: UnboundedSender, - topic: &String, - current_retries: &mut u32, - broker_address: &BrokerAddress, - ) -> Result<(), Result<(), Error>> { - let start = Instant::now(); - let operation_retry_options = self.client.operation_retry_options.clone(); - - match self - .connection - .sender() - .subscribe( - resolver.clone(), - topic.clone(), - self.subscription.clone(), - self.sub_type, - self.id, - self.name.clone(), - self.options.clone(), - ) - .await - { - Ok(_success) => { - if *current_retries > 0 { - let dur = (Instant::now() - start).as_secs(); - log::info!( - "subscribing({}) success after {} retries over {} seconds", - topic, - *current_retries + 1, - dur - ); - } - } - Err(ConnectionError::PulsarError(Some(err), text)) => { - return match err { - proto::ServerError::ServiceNotReady | proto::ServerError::ConsumerBusy => { - match operation_retry_options.max_retries { - Some(max_retries) if *current_retries < max_retries => { - self.retry_with_delay( - current_retries, - err.as_str_name(), - topic, - &text, - broker_address, - ) - .await - .map_err(Err)?; - - Err(Ok(())) - } - _ => { - error!("subscribe topic({}) reached max retries", topic); - - Err(Err(ConnectionError::PulsarError(Some(err), text).into())) - } - } - } - _ => Err(Err(Error::Connection(ConnectionError::PulsarError( - Some(err), - text, - )))), - } - } - Err(ConnectionError::Io(e)) - // Retryable IO Error - if matches!( - e.kind(), - ErrorKind::BrokenPipe - | ErrorKind::ConnectionAborted - | ErrorKind::ConnectionReset - | ErrorKind::Interrupted - | ErrorKind::NotConnected - | ErrorKind::TimedOut - | ErrorKind::UnexpectedEof - ) => - { - return match operation_retry_options.max_retries { - Some(max_retries) if *current_retries < max_retries => { - self.retry_with_delay( - current_retries, - e.kind().to_string().as_str(), - topic, - &None, - broker_address, - ) - .await - .map_err(Err)?; - - Err(Ok(())) - } - _ => { - error!("subscribing({}) reached max retries", topic); - Err(Err(Error::Connection(ConnectionError::Io(e)))) - } - } - } - Err(e) => { - error!("reconnect error [{:?}]: {:?}", line!(), e); - return Err(Err(Error::Connection(e))); - } - } - - Ok(()) - } - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn reconnect(&mut self) -> Result<(), Error> { debug!("reconnecting consumer for topic: {}", self.topic); - if let Some(prev_single) = std::mem::replace(&mut self.drop_signal, None) { - // kill the previous errored consumer - drop(prev_single); + + // send CloseConsumer to server + // remove our resolver from the Connection consumers BTreeMap, + // stopping the current Message source + if let Err(e) = self.connection.sender().close_consumer(self.id).await { + error!( + "could not close consumer {:?}({}) for topic {}: {:?}", + self.name, self.id, self.topic, e + ); } + // should have logged "rx terminated" by now let broker_address = self.client.lookup_topic(&self.topic).await?; - let conn = self.client.manager.get_connection(&broker_address).await?; - - self.connection = conn; - warn!( - "Retry -> reconnecting consumer {:#} using connection {:#} to broker {:#} to topic {:#}", + let messages = retry_subscribe_consumer( + &self.client, + &mut self.connection, + broker_address, + &self.topic, + &self.subscription, + self.sub_type, self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - let topic = self.topic.clone(); - - let mut current_retries = 0u32; - let (resolver, messages) = mpsc::unbounded(); - - // Reconnection loop - // If the error is recoverable - // Try to reconnect in the bounds of retry limits - loop { - match self - .subscribe_topic( - resolver.clone(), - &topic, - &mut current_retries, - &broker_address, - ) - .await - { - // Reconnection went well - Ok(()) => break, - // An retryable error occurs - Err(Ok(())) => continue, - // An non-retryable error happens, the connection must die ! - Err(Err(e)) => return Err(e), - } - } + &self.name, + &self.options, + self.batch_size, + ) + .await?; self.messages_rx = Some(messages); - // drop_signal will be dropped when Consumer is dropped, then - // drop_receiver will return, and we can close the consumer - let (drop_signal, drop_receiver) = oneshot::channel::<()>(); - let conn = Arc::downgrade(&self.connection); - let name = self.name.clone(); - let id = self.id; - let topic = self.topic.clone(); - let _ = self.client.executor.spawn(Box::pin(async move { - let _res = drop_receiver.await; - // if we receive a message, it indicates we want to stop this task - - match conn.upgrade() { - None => { - debug!("Connection already dropped, no weak reference remaining") - } - Some(connection) => { - debug!("Closing producers of connection {}", connection.id()); - let res = connection.sender().close_consumer(id).await; - - if let Err(e) = res { - error!( - "could not close consumer {:?}({}) for topic {}: {:?}", - name, id, topic, e - ); - } - } - } - })); - - if let Some(prev_single) = std::mem::replace(&mut self.drop_signal, Some(drop_signal)) { - drop(prev_single); - } - Ok(()) } @@ -809,3 +604,20 @@ impl ConsumerEngine { tokio::time::timeout_at(tokio::time::Instant::now() + dur, fut).await } } + +impl std::ops::Drop for ConsumerEngine { + fn drop(&mut self) { + let conn = self.connection.clone(); + let id = self.id; + let name = self.name.clone(); + let topic = self.topic.clone(); + let _ = self.client.executor.spawn(Box::pin(async move { + if let Err(e) = conn.sender().close_consumer(id).await { + error!( + "could not close consumer {:?}({}) for topic {}: {:?}", + name, id, topic, e + ); + } + })); + } +} diff --git a/src/consumer/initial_position.rs b/src/consumer/initial_position.rs index fc1b91d..822d994 100644 --- a/src/consumer/initial_position.rs +++ b/src/consumer/initial_position.rs @@ -1,19 +1,13 @@ /// position of the first message that will be consumed -#[derive(Clone, Debug)] +#[derive(Default, Clone, Debug)] pub enum InitialPosition { /// start at the oldest message Earliest, /// start at the most recent message + #[default] Latest, } -impl Default for InitialPosition { - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn default() -> Self { - InitialPosition::Latest - } -} - impl From for i32 { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(i: InitialPosition) -> Self { diff --git a/src/consumer/topic.rs b/src/consumer/topic.rs index 84e2af2..db6ae35 100644 --- a/src/consumer/topic.rs +++ b/src/consumer/topic.rs @@ -1,5 +1,4 @@ use std::{ - io::ErrorKind, marker::PhantomData, pin::Pin, sync::{ @@ -7,7 +6,7 @@ use std::{ Arc, }, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; use chrono::{DateTime, Utc}; @@ -26,8 +25,8 @@ use crate::{ }, error::{ConnectionError, ConsumerError}, message::proto::MessageIdData, - proto, proto::CommandConsumerStatsResponse, + retry_op::retry_subscribe_consumer, BrokerAddress, DeserializeMessage, Error, Executor, Payload, Pulsar, }; @@ -38,7 +37,6 @@ pub struct TopicConsumer { topic: String, messages: Pin>, engine_tx: mpsc::UnboundedSender>, - #[allow(unused)] data_type: PhantomData T::Output>, pub(crate) dead_letter_policy: Option, pub(super) last_message_received: Option>, @@ -50,7 +48,7 @@ impl TopicConsumer { pub(super) async fn new( client: Pulsar, topic: String, - mut addr: BrokerAddress, + addr: BrokerAddress, config: ConsumerConfig, ) -> Result, Error> { static CONSUMER_ID_GENERATOR: AtomicU64 = AtomicU64::new(0); @@ -67,162 +65,24 @@ impl TopicConsumer { } = config.clone(); let consumer_id = consumer_id.unwrap_or_else(|| CONSUMER_ID_GENERATOR.fetch_add(1, Ordering::SeqCst)); - let (resolver, messages) = mpsc::unbounded(); let batch_size = batch_size.unwrap_or(1000); - let mut connection = client.manager.get_connection(&addr).await?; - let mut current_retries = 0u32; - let start = Instant::now(); - let operation_retry_options = client.operation_retry_options.clone(); - - loop { - match connection - .sender() - .subscribe( - resolver.clone(), - topic.clone(), - subscription.clone(), - sub_type, - consumer_id, - consumer_name.clone(), - options.clone(), - ) - .await - { - Ok(_) => { - if current_retries > 0 { - let dur = (Instant::now() - start).as_secs(); - log::info!( - "subscribe({}) success after {} retries over {} seconds", - topic, - current_retries + 1, - dur - ); - } - break; - } - Err(ConnectionError::PulsarError(Some(err), text)) - if matches!( - err, - proto::ServerError::ServiceNotReady | proto::ServerError::ConsumerBusy - ) => - { - // Pulsar retryable error - match operation_retry_options.max_retries { - Some(max_retries) if current_retries < max_retries => { - error!("subscribe({}) answered {}, retrying request after {}ms (max_retries = {:?}): {}", - topic, err.as_str_name(), operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries, text.unwrap_or_default()); - - current_retries += 1; - client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - // we need to look up again the topic's address - let prev = addr; - addr = client.lookup_topic(&topic).await?; - if prev != addr { - info!( - "topic {} moved: previous = {:?}, new = {:?}", - topic, prev, addr - ); - } - - connection = client.manager.get_connection(&addr).await?; - continue; - } - _ => { - error!("subscribe({}) reached max retries", topic); - - return Err(ConnectionError::PulsarError( - Some(proto::ServerError::ServiceNotReady), - text, - ) - .into()); - } - } - } - Err(ConnectionError::Io(e)) - if matches!( - e.kind(), - ErrorKind::ConnectionReset - | ErrorKind::ConnectionAborted - | ErrorKind::NotConnected - | ErrorKind::BrokenPipe - | ErrorKind::TimedOut - | ErrorKind::Interrupted - | ErrorKind::UnexpectedEof - ) => - { - match operation_retry_options.max_retries { - Some(max_retries) if current_retries < max_retries => { - error!( - "create consumer( {} {}, retrying request after {}ms (max_retries = {:?})", - topic, e.kind().to_string(), operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries - ); - - current_retries += 1; - client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let prev = addr.clone(); - let addr = client.lookup_topic(&topic).await?; - - if prev != addr { - info!( - "topic {} moved: previous = {:?}, new = {:?}", - topic, prev, addr - ); - } - - connection = client.manager.get_connection(&addr).await?; - continue; - } - _ => { - // The error was retryable but the number of overall retries - // is exhausted - return Err(ConsumerError::Io(e).into()); - } - } - } - Err(e) => return Err(Error::Connection(e)), - } - } - - connection - .sender() - .send_flow(consumer_id, batch_size) - .map_err(|e| { - error!("TopicConsumer::new error[{}]: {:?}", line!(), e); - e - }) - .map_err(|e| Error::Consumer(ConsumerError::Connection(e)))?; + let messages = retry_subscribe_consumer( + &client, + &mut connection, + addr, + &topic, + &subscription, + sub_type, + consumer_id, + &consumer_name, + &options, + batch_size, + ) + .await?; let (engine_tx, engine_rx) = mpsc::unbounded(); - // drop_signal will be dropped when Consumer is dropped, then - // drop_receiver will return, and we can close the consumer - let (drop_signal, drop_receiver) = oneshot::channel::<()>(); - let conn = connection.clone(); - let name = consumer_name.clone(); - let topic_name = topic.clone(); - let _ = client.executor.spawn(Box::pin(async move { - let _res = drop_receiver.await; - // if we receive a message, it indicates we want to stop this task - if _res.is_err() { - if let Err(e) = conn.sender().close_consumer(consumer_id).await { - error!( - "could not close consumer {:?}({}) for topic {}: {:?}", - consumer_name, consumer_id, topic_name, e - ); - } - } - })); if unacked_message_redelivery_delay.is_some() { let mut redelivery_tx = engine_tx.clone(); @@ -251,7 +111,7 @@ impl TopicConsumer { subscription.clone(), sub_type, consumer_id, - name, + consumer_name, tx, messages, engine_rx, @@ -259,16 +119,15 @@ impl TopicConsumer { unacked_message_redelivery_delay, dead_letter_policy.clone(), options.clone(), - drop_signal, ); - let f = async move { + let engine_task = client.executor.spawn(Box::pin(async move { c.engine() .map(|res| { debug!("consumer engine stopped: {:?}", res); }) .await; - }; - if client.executor.spawn(Box::pin(f)).is_err() { + })); + if engine_task.is_err() { return Err(Error::Executor); } diff --git a/src/lib.rs b/src/lib.rs index 5086e91..5841518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,6 +189,7 @@ pub mod executor; pub mod message; pub mod producer; pub mod reader; +mod retry_op; mod service_discovery; #[cfg(test)] diff --git a/src/message.rs b/src/message.rs index ee5d4ee..75cb8b8 100644 --- a/src/message.rs +++ b/src/message.rs @@ -543,7 +543,6 @@ pub mod proto { //trait implementations used in Consumer::unacked_messages impl Eq for MessageIdData {} - #[allow(clippy::derived_hash_with_manual_eq)] impl std::hash::Hash for MessageIdData { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn hash(&self, state: &mut H) { diff --git a/src/producer.rs b/src/producer.rs index 5c338fc..bf2ffbd 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -28,7 +28,8 @@ use crate::{ proto::{self, CommandSendReceipt, EncryptionKeys, Schema}, BatchedMessage, }, - Error, Pulsar, + retry_op::retry_create_producer, + BrokerAddress, Error, Pulsar, }; type ProducerId = u64; @@ -428,15 +429,14 @@ struct TopicProducer { // while we might be pushing more messages from elsewhere batch: Option>, compression: Option, - drop_signal: oneshot::Sender<()>, options: ProducerOptions, } impl TopicProducer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub(crate) async fn from_connection>( + pub(crate) async fn new>( client: Pulsar, - mut connection: Arc>, + addr: BrokerAddress, topic: S, name: Option, options: ProducerOptions, @@ -450,149 +450,18 @@ impl TopicProducer { let topic = topic.clone(); let batch_size = options.batch_size; let compression = options.compression.clone(); - - let producer_name: ProducerName; - let mut current_retries = 0u32; - let start = std::time::Instant::now(); - let operation_retry_options = client.operation_retry_options.clone(); - - loop { - let connection_sender = connection.sender(); - match connection_sender - .create_producer(topic.clone(), producer_id, name.clone(), options.clone()) - .await - .map_err(|e| { - error!("TopicProducer::from_connection error[{}]: {:?}", line!(), e); - e - }) { - Ok(partial_success) => { - // If producer is not "ready", the client will avoid to timeout the request - // for creating the producer. Instead it will wait indefinitely until it gets - // a subsequent `CommandProducerSuccess` with `producer_ready==true`. - if let Some(producer_ready) = partial_success.producer_ready { - if !producer_ready { - // wait until next commandproducersuccess message has been received - trace!("producer is still waiting for exclusive access"); - let result = connection_sender - .wait_for_exclusive_access(partial_success.request_id) - .await; - trace!("result is received: {:?}", result); - } - } - producer_name = partial_success.producer_name; - - if current_retries > 0 { - let dur = (std::time::Instant::now() - start).as_secs(); - log::info!( - "producer({}) success after {} retries over {} seconds", - topic, - current_retries + 1, - dur - ); - } - break; - } - Err(ConnectionError::PulsarError( - Some(proto::ServerError::ServiceNotReady), - text, - )) => { - if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - error!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}", - topic, operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries, text.unwrap_or_default()); - - current_retries += 1; - client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = client.lookup_topic(&topic).await?; - connection = client.manager.get_connection(&addr).await?; - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - - return Err(ConnectionError::PulsarError( - Some(proto::ServerError::ServiceNotReady), - text, - ) - .into()); - } - } - Err(ConnectionError::PulsarError(Some(proto::ServerError::ProducerBusy), text)) => { - if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - error!("create_producer({}) answered ProducerBusy, retrying request after {}ms (max_retries = {:?}): {}", - topic, operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries, text.unwrap_or_default()); - - current_retries += 1; - client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = client.lookup_topic(&topic).await?; - connection = client.manager.get_connection(&addr).await?; - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - - return Err(ConnectionError::PulsarError( - Some(proto::ServerError::ProducerBusy), - text, - ) - .into()); - } - } - Err(ConnectionError::Io(e)) => { - if e.kind() != std::io::ErrorKind::TimedOut { - warn!("send_inner got io error: {:?}", e); - return Err(ProducerError::Connection(ConnectionError::Io(e)).into()); - } else if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - error!( - "create_producer({}) TimedOut, retrying request after {}ms (max_retries = {:?})", - topic, operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries - ); - - current_retries += 1; - client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = client.lookup_topic(&topic).await?; - connection = client.manager.get_connection(&addr).await?; - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - - return Err(ProducerError::Connection(ConnectionError::Io(e)).into()); - } - } - //this also captures producer fenced error - Err(e) => return Err(Error::Connection(e)), - } - } - - // drop_signal will be dropped when the TopicProducer is dropped, then - // drop_receiver will return, and we can close the producer - let (_drop_signal, drop_receiver) = oneshot::channel::<()>(); - let conn = connection.clone(); - let _ = client.executor.spawn(Box::pin(async move { - let _res = drop_receiver.await; - let _ = conn.sender().close_producer(producer_id).await; - })); + let mut connection = client.manager.get_connection(&addr).await?; + + let producer_name = retry_create_producer( + &client, + &mut connection, + addr, + &topic, + producer_id, + name, + &options, + ) + .await?; Ok(TopicProducer { client, @@ -603,7 +472,6 @@ impl TopicProducer { message_id: sequence_ids, batch: batch_size.map(Batch::new).map(Mutex::new), compression, - drop_signal: _drop_signal, options, }) } @@ -833,229 +701,53 @@ impl TopicProducer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] async fn reconnect(&mut self) -> Result<(), Error> { debug!("reconnecting producer for topic: {}", self.topic); - // Sender::send() method consumes the sender - // as the sender is hold by the TopicProducer, there is no way to call send method - // The lines below take the pointed sender and replace it by a new one bound to nothing - // but as the TopicProducer sender is recreate below, there is no worry - let (drop_signal, _) = oneshot::channel::<()>(); - let old_signal = std::mem::replace(&mut self.drop_signal, drop_signal); - // This line ask for kill the previous errored producer - let _ = old_signal.send(()); + if let Err(e) = self.connection.sender().close_producer(self.id).await { + error!( + "could not close producer {:?}({}) for topic {}: {:?}", + self.name, self.id, self.topic, e + ); + } let broker_address = self.client.lookup_topic(&self.topic).await?; - let conn = self.client.manager.get_connection(&broker_address).await?; - self.connection = conn; - warn!( - "Retry #0 -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}", + // should we ignore, test or use producer_name? + let _producer_name = retry_create_producer( + &self.client, + &mut self.connection, + broker_address, + &self.topic, self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - let topic = self.topic.clone(); - let batch_size = self.options.batch_size; - - let mut current_retries = 0u32; - let start = std::time::Instant::now(); - let operation_retry_options = self.client.operation_retry_options.clone(); - - loop { - match self - .connection - .sender() - .create_producer( - topic.clone(), - self.id, - Some(self.name.clone()), - self.options.clone(), - ) - .await - .map_err(|e| { - error!("TopicProducer::create_producer error[{}]: {:?}", line!(), e); - e - }) { - Ok(_success) => { - if current_retries > 0 { - let dur = (std::time::Instant::now() - start).as_secs(); - log::info!( - "producer({}) success after {} retries over {} seconds", - topic, - current_retries + 1, - dur - ); - } - break; - } - Err(ConnectionError::PulsarError( - Some(proto::ServerError::ServiceNotReady), - text, - )) => { - if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - warn!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}", - topic, operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries, text.unwrap_or_default()); - - current_retries += 1; - self.client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = self.client.lookup_topic(&topic).await?; - self.connection = self.client.manager.get_connection(&addr).await?; - - warn!( - "Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}", - current_retries, - self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - - return Err(ConnectionError::PulsarError( - Some(proto::ServerError::ServiceNotReady), - text, - ) - .into()); - } - } - Err(ConnectionError::PulsarError(Some(proto::ServerError::ProducerBusy), text)) => { - if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - warn!("create_producer({}) answered ProducerBusy, retrying request after {}ms (max_retries = {:?}): {}", - topic, operation_retry_options.retry_delay.as_millis(), - operation_retry_options.max_retries, text.unwrap_or_default()); - - current_retries += 1; - self.client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = self.client.lookup_topic(&topic).await?; - self.connection = self.client.manager.get_connection(&addr).await?; - - warn!( - "Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}", - current_retries, - self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - - return Err(ConnectionError::PulsarError( - Some(proto::ServerError::ProducerBusy), - text, - ) - .into()); - } - } - Err(ConnectionError::Io(e)) => { - if e.kind() != std::io::ErrorKind::TimedOut { - error!("send_inner got io error: {:?}", e); - return Err(ProducerError::Connection(ConnectionError::Io(e)).into()); - } else if operation_retry_options.max_retries.is_none() - || operation_retry_options.max_retries.unwrap() > current_retries - { - warn!("create_producer({}) TimedOut, retrying request after {}ms (max_retries = {:?})", - topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries); - - current_retries += 1; - self.client - .executor - .delay(operation_retry_options.retry_delay) - .await; - - let addr = self.client.lookup_topic(&topic).await?; - self.connection = self.client.manager.get_connection(&addr).await?; - - warn!( - "Retry #{} -> reconnecting producer {:#} using connection {:#} to broker {:#} to topic {:#}", - current_retries, - self.id, - self.connection.id(), - broker_address.url, - self.topic - ); - - continue; - } else { - error!("create_producer({}) reached max retries", topic); - return Err(Error::Connection(ConnectionError::Io(e))); - } - } - Err(e) => { - error!("reconnect error[{:?}]: {:?}", line!(), e); - return Err(Error::Connection(e)); - } - } - } - - // drop_signal will be dropped when the TopicProducer is dropped, then - // drop_receiver will return, and we can close the producer - let (_drop_signal, drop_receiver) = oneshot::channel::<()>(); - let batch = batch_size.map(Batch::new).map(Mutex::new); - let conn = Arc::downgrade(&self.connection); - - let producer_id = self.id; - let _ = self.client.executor.spawn(Box::pin(async move { - let _res = drop_receiver.await; - - match conn.upgrade() { - None => { - debug!("Connection already dropped, no weak reference remaining") - } - Some(connection) => { - debug!("Closing producers of connection {}", connection.id()); - let _ = connection.sender().close_producer(producer_id).await; - } - } - })); + Some(self.name.clone()), + &self.options, + ) + .await?; - self.batch = batch; - self.drop_signal = _drop_signal; + self.batch = self.options.batch_size.map(Batch::new).map(Mutex::new); Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn close(&self) -> Result<(), Error> { - let connection = Arc::downgrade(&self.connection); + self.connection.sender().close_producer(self.id).await?; + Ok(()) + } +} - match connection.upgrade() { - None => { - info!("Connection already gone"); - Ok(()) - } - Some(connection) => { - info!( - "Closing connection #{} of producer[{}]", - self.connection.id(), - self.name +impl std::ops::Drop for TopicProducer { + fn drop(&mut self) { + let conn = self.connection.clone(); + let id = self.id; + let name = self.name.clone(); + let topic = self.topic.clone(); + let _ = self.client.executor.spawn(Box::pin(async move { + if let Err(e) = conn.sender().close_producer(id).await { + error!( + "could not close producer {:?}({}) for topic {}: {:?}", + name, id, topic, e ); - connection - .sender() - .close_producer(self.id) - .await - .map(drop) - .map_err(Error::Connection) } - } + })); } } @@ -1125,10 +817,8 @@ impl ProducerBuilder { let options = options.clone(); let pulsar = pulsar.clone(); async move { - let conn = pulsar.manager.get_connection(&addr).await?; let producer = - TopicProducer::from_connection(pulsar, conn, topic, name, options) - .await?; + TopicProducer::new(pulsar, addr, topic, name, options).await?; Ok::, Error>(producer) } }), diff --git a/src/retry_op.rs b/src/retry_op.rs new file mode 100644 index 0000000..74f3f52 --- /dev/null +++ b/src/retry_op.rs @@ -0,0 +1,219 @@ +use std::{io::ErrorKind, sync::Arc, time::Instant}; + +use futures::channel::mpsc::{self, UnboundedReceiver}; + +use crate::{ + connection::Connection, + error::{ConnectionError, ConsumerError}, + message::Message, + proto, BrokerAddress, ConsumerOptions, Error, Executor, ProducerOptions, Pulsar, SubType, +}; + +pub async fn handle_retry_error( + client: &Pulsar, + connection: &mut Arc>, + addr: &mut BrokerAddress, + topic: &str, + operation_name: &str, + current_retries: u32, + err: ConnectionError, +) -> Result<(), Error> { + let operation_retry_options = &client.operation_retry_options; + let (kind, text) = match err { + ConnectionError::PulsarError(Some(kind), ref text) + if matches!( + kind, + proto::ServerError::ServiceNotReady + | proto::ServerError::ConsumerBusy + | proto::ServerError::ProducerBusy + ) => + { + ( + kind.as_str_name().to_owned(), + text.as_ref() + .map(|text| format!(" (\"{text}\")")) + .unwrap_or_default(), + ) + } + ConnectionError::Io(ref kind) + if matches!( + kind.kind(), + ErrorKind::ConnectionReset + | ErrorKind::ConnectionAborted + | ErrorKind::NotConnected + | ErrorKind::BrokenPipe + | ErrorKind::TimedOut + | ErrorKind::Interrupted + | ErrorKind::UnexpectedEof + ) => + { + (kind.kind().to_string(), "".to_owned()) + } + err => { + error!("{operation_name}({topic}) error: {err:?}"); + return Err(err.into()); + } + }; + match operation_retry_options.max_retries { + Some(max_retries) if current_retries < max_retries => { + error!( + "{operation_name}({topic}) answered {kind}{text}, retrying request after {:?} (max_retries = {max_retries})", + operation_retry_options.retry_delay + ); + client + .executor + .delay(operation_retry_options.retry_delay) + .await; + + *addr = client.lookup_topic(topic).await?; + *connection = client.manager.get_connection(addr).await?; + Ok(()) + } + _ => { + error!("{operation_name}({topic}) answered {kind}{text}, reached max retries"); + Err(err.into()) + } + } +} + +pub async fn retry_subscribe_consumer( + client: &Pulsar, + connection: &mut Arc>, + mut addr: BrokerAddress, + topic: &str, + subscription: &str, + sub_type: SubType, + consumer_id: u64, + consumer_name: &Option, + options: &ConsumerOptions, + batch_size: u32, +) -> Result, Error> { + *connection = client.manager.get_connection(&addr).await?; + let (resolver, messages) = mpsc::unbounded(); + let mut current_retries = 0u32; + let start = Instant::now(); + + loop { + warn!( + "Retry #{current_retries} -> connecting consumer {consumer_id} using connection {:#} to broker {:#} to topic {topic}", + connection.id(), + addr.url, + ); + match connection + .sender() + .subscribe( + resolver.clone(), + topic.to_owned(), + subscription.to_owned(), + sub_type, + consumer_id, + consumer_name.clone(), + options.clone(), + ) + .await + { + Ok(_) => { + if current_retries > 0 { + let dur = (Instant::now() - start).as_secs(); + info!( + "TopicConsumer::subscribe({topic}) success after {} retries over {dur} seconds", + current_retries + 1, + ); + } + break; + } + Err(err) => { + handle_retry_error( + client, + connection, + &mut addr, + topic, + "TopicConsumer::subscribe", + current_retries, + err, + ) + .await? + } + } + current_retries += 1; + } + connection + .sender() + .send_flow(consumer_id, batch_size) + .map_err(|err| { + error!("TopicConsumer::send_flow({topic}) error: {err:?}"); + Error::Consumer(ConsumerError::Connection(err)) + })?; + + Ok(messages) +} + +pub async fn retry_create_producer( + client: &Pulsar, + connection: &mut Arc>, + mut addr: BrokerAddress, + topic: &String, + producer_id: u64, + producer_name: Option, + options: &ProducerOptions, +) -> Result { + *connection = client.manager.get_connection(&addr).await?; + let mut current_retries = 0u32; + let start = Instant::now(); + + loop { + warn!( + "Retry #{current_retries} -> connecting producer {producer_id} using connection {:#} to broker {:#} to topic {topic}", + connection.id(), + addr.url, + ); + match connection + .sender() + .create_producer( + topic.clone(), + producer_id, + producer_name.clone(), + options.clone(), + ) + .await + { + Ok(partial_success) => { + // If producer is not "ready", the client will avoid to timeout the request + // for creating the producer. Instead it will wait indefinitely until it gets + // a subsequent `CommandProducerSuccess` with `producer_ready==true`. + if let Some(producer_ready) = partial_success.producer_ready { + if !producer_ready { + // wait until next commandproducersuccess message has been received + trace!("TopicProducer::create({topic}) waiting for exclusive access"); + let result = connection + .sender() + .wait_for_exclusive_access(partial_success.request_id) + .await; + trace!("TopicProducer::create({topic}) received: {result:?}"); + } + } + if current_retries > 0 { + let dur = (std::time::Instant::now() - start).as_secs(); + log::info!( + "TopicProducer::create({topic}) success after {} retries over {dur} seconds", + current_retries + 1, + ); + } + return Ok(partial_success.producer_name); + } + Err(err) => { + handle_retry_error( + client, + connection, + &mut addr, + topic, + "TopicProducer::create", + current_retries, + err, + ) + .await? + } + } + current_retries += 1; + } +}