Skip to content

Commit

Permalink
fix(conn): ConsumerEngine::reconnect loop (#271)
Browse files Browse the repository at this point in the history
* Fix ConsumerEngine::reconnect loop

- old message source drop doesn't trigger reconnect
- call send_flow after successful subscribe_topic
- subscribe_topic loop keep track of time

Signed-off-by: Eloi DEMOLIS <[email protected]>

* Remove drop_signal from consumer and producer

- close_consumer and close_producer are synchronously called at the
  beginning of reconnect and asynchronously upon drop
- close_consumer send a Register::RemoveConsumer on registrations to
  remove the old resolver from the consumers BTreeMap which cleans the
  old "message source thread"

note: it is not clear when close_consumer and close_producer should be
called, if all cases are covered and in each cases how to properly
handle errors

Signed-off-by: Eloi DEMOLIS <[email protected]>

* Retryable operations refactor:

- merge connection and reconnection code for TopicConsumer in
  retry_subscribe_consumer
- merge connection and reconnection code for TopicProducer in
  retry_create_producer
- merge consumer and producer (re)connection error handling in
  handle_retry_error

Signed-off-by: Eloi DEMOLIS <[email protected]>

---------

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum authored Apr 11, 2023
1 parent 63e178e commit edf1922
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 764 deletions.
18 changes: 17 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub(crate) enum Register {
consumer_id: u64,
resolver: mpsc::UnboundedSender<Message>,
},
RemoveConsumer {
consumer_id: u64,
},
Ping {
resolver: oneshot::Sender<()>,
},
Expand Down Expand Up @@ -157,6 +160,9 @@ impl<S: Stream<Item = Result<Message, ConnectionError>>> Future for Receiver<S>
})) => {
self.consumers.insert(consumer_id, resolver);
}
Poll::Ready(Some(Register::RemoveConsumer { consumer_id })) => {
self.consumers.remove(&consumer_id);
}
Poll::Ready(Some(Register::Ping { resolver })) => {
self.ping = Some(resolver);
}
Expand Down Expand Up @@ -549,6 +555,16 @@ impl<Exe: Executor> ConnectionSender<Exe> {
) -> Result<proto::CommandSuccess, ConnectionError> {
let request_id = self.request_id.get();
let msg = messages::close_consumer(consumer_id, request_id);
match self
.registrations
.unbounded_send(Register::RemoveConsumer { consumer_id })
{
Ok(_) => {}
Err(_) => {
self.error.set(ConnectionError::Disconnected);
return Err(ConnectionError::Disconnected);
}
}
self.send_message(msg, RequestKey::RequestId(request_id), |resp| {
resp.command.success
})
Expand Down Expand Up @@ -1114,7 +1130,7 @@ impl<Exe: Executor> Connection<Exe> {
impl<Exe: Executor> Drop for Connection<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn drop(&mut self) {
trace!("dropping connection {} for {}", self.id, self.url);
debug!("dropping connection {} for {}", self.id, self.url);
if let Some(shutdown) = self.sender.receiver_shutdown.take() {
let _ = shutdown.send(());
}
Expand Down
2 changes: 1 addition & 1 deletion src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
}
}
Some(ConnectionStatus::Connected(_)) => {
//info!("removing old connection");
info!("removing old connection");
}
None => {
//info!("setting up new connection");
Expand Down
276 changes: 44 additions & 232 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use futures::{
channel::{mpsc, mpsc::UnboundedSender, oneshot},
channel::{mpsc, mpsc::UnboundedSender},
SinkExt, StreamExt,
};

Expand All @@ -25,7 +25,8 @@ use crate::{
},
proto,
proto::{BaseCommand, CommandCloseConsumer, CommandMessage},
BrokerAddress, Error, Executor, Payload, Pulsar,
retry_op::retry_subscribe_consumer,
Error, Executor, Payload, Pulsar,
};

pub struct ConsumerEngine<Exe: Executor> {
Expand All @@ -47,7 +48,6 @@ pub struct ConsumerEngine<Exe: Executor> {
unacked_messages: HashMap<MessageIdData, Instant>,
dead_letter_policy: Option<DeadLetterPolicy>,
options: ConsumerOptions,
drop_signal: Option<oneshot::Sender<()>>,
}

impl<Exe: Executor> ConsumerEngine<Exe> {
Expand All @@ -67,7 +67,6 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
unacked_message_redelivery_delay: Option<Duration>,
dead_letter_policy: Option<DeadLetterPolicy>,
options: ConsumerOptions,
drop_signal: oneshot::Sender<()>,
) -> ConsumerEngine<Exe> {
let (event_tx, event_rx) = mpsc::unbounded();
ConsumerEngine {
Expand All @@ -89,7 +88,6 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
unacked_messages: HashMap::new(),
dead_letter_policy,
options,
drop_signal: Some(drop_signal),
}
}

Expand Down Expand Up @@ -191,13 +189,8 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
) -> Option<Result<(), Error>> {
match message_opt {
None => {
error!("Consumer: messages::next: returning Disconnected");
if let Err(err) = self.reconnect().await {
Some(Err(err))
} else {
None
}
//return Err(Error::Consumer(ConsumerError::Connection(ConnectionError::Disconnected)).into());
debug!("Consumer: old message source terminated");
None
}
Some(message) => {
self.remaining_messages -= message
Expand Down Expand Up @@ -543,237 +536,39 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
Ok(())
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn retry_with_delay(
&mut self,
current_retries: &mut u32,
error: &str,
topic: &String,
text: &Option<String>,
broker_address: &BrokerAddress,
) -> Result<Arc<Connection<Exe>>, Error> {
warn!(
"subscribing({}) answered {}, retrying request after {}ms (max_retries = {:?}): {}",
topic,
error,
self.client.operation_retry_options.retry_delay.as_millis(),
self.client.operation_retry_options.max_retries,
text.as_deref().unwrap_or_default()
);

*current_retries += 1;
self.client
.executor
.delay(self.client.operation_retry_options.retry_delay)
.await;

let addr = self.client.lookup_topic(topic).await?;
let connection = self.client.manager.get_connection(&addr).await?;
self.connection = connection.clone();

warn!(
"Retry #{} -> reconnecting consumer {:#} using connection {:#} to broker {:#} to topic {:#}",
current_retries,
self.id,
self.connection.id(),
broker_address.url,
self.topic
);

Ok(connection)
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn subscribe_topic(
&mut self,
resolver: UnboundedSender<crate::message::Message>,
topic: &String,
current_retries: &mut u32,
broker_address: &BrokerAddress,
) -> Result<(), Result<(), Error>> {
let start = Instant::now();
let operation_retry_options = self.client.operation_retry_options.clone();

match self
.connection
.sender()
.subscribe(
resolver.clone(),
topic.clone(),
self.subscription.clone(),
self.sub_type,
self.id,
self.name.clone(),
self.options.clone(),
)
.await
{
Ok(_success) => {
if *current_retries > 0 {
let dur = (Instant::now() - start).as_secs();
log::info!(
"subscribing({}) success after {} retries over {} seconds",
topic,
*current_retries + 1,
dur
);
}
}
Err(ConnectionError::PulsarError(Some(err), text)) => {
return match err {
proto::ServerError::ServiceNotReady | proto::ServerError::ConsumerBusy => {
match operation_retry_options.max_retries {
Some(max_retries) if *current_retries < max_retries => {
self.retry_with_delay(
current_retries,
err.as_str_name(),
topic,
&text,
broker_address,
)
.await
.map_err(Err)?;

Err(Ok(()))
}
_ => {
error!("subscribe topic({}) reached max retries", topic);

Err(Err(ConnectionError::PulsarError(Some(err), text).into()))
}
}
}
_ => Err(Err(Error::Connection(ConnectionError::PulsarError(
Some(err),
text,
)))),
}
}
Err(ConnectionError::Io(e))
// Retryable IO Error
if matches!(
e.kind(),
ErrorKind::BrokenPipe
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::Interrupted
| ErrorKind::NotConnected
| ErrorKind::TimedOut
| ErrorKind::UnexpectedEof
) =>
{
return match operation_retry_options.max_retries {
Some(max_retries) if *current_retries < max_retries => {
self.retry_with_delay(
current_retries,
e.kind().to_string().as_str(),
topic,
&None,
broker_address,
)
.await
.map_err(Err)?;

Err(Ok(()))
}
_ => {
error!("subscribing({}) reached max retries", topic);
Err(Err(Error::Connection(ConnectionError::Io(e))))
}
}
}
Err(e) => {
error!("reconnect error [{:?}]: {:?}", line!(), e);
return Err(Err(Error::Connection(e)));
}
}

Ok(())
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn reconnect(&mut self) -> Result<(), Error> {
debug!("reconnecting consumer for topic: {}", self.topic);
if let Some(prev_single) = std::mem::replace(&mut self.drop_signal, None) {
// kill the previous errored consumer
drop(prev_single);

// send CloseConsumer to server
// remove our resolver from the Connection consumers BTreeMap,
// stopping the current Message source
if let Err(e) = self.connection.sender().close_consumer(self.id).await {
error!(
"could not close consumer {:?}({}) for topic {}: {:?}",
self.name, self.id, self.topic, e
);
}
// should have logged "rx terminated" by now

let broker_address = self.client.lookup_topic(&self.topic).await?;
let conn = self.client.manager.get_connection(&broker_address).await?;

self.connection = conn;

warn!(
"Retry -> reconnecting consumer {:#} using connection {:#} to broker {:#} to topic {:#}",
let messages = retry_subscribe_consumer(
&self.client,
&mut self.connection,
broker_address,
&self.topic,
&self.subscription,
self.sub_type,
self.id,
self.connection.id(),
broker_address.url,
self.topic
);

let topic = self.topic.clone();

let mut current_retries = 0u32;
let (resolver, messages) = mpsc::unbounded();

// Reconnection loop
// If the error is recoverable
// Try to reconnect in the bounds of retry limits
loop {
match self
.subscribe_topic(
resolver.clone(),
&topic,
&mut current_retries,
&broker_address,
)
.await
{
// Reconnection went well
Ok(()) => break,
// An retryable error occurs
Err(Ok(())) => continue,
// An non-retryable error happens, the connection must die !
Err(Err(e)) => return Err(e),
}
}
&self.name,
&self.options,
self.batch_size,
)
.await?;

self.messages_rx = Some(messages);

// drop_signal will be dropped when Consumer is dropped, then
// drop_receiver will return, and we can close the consumer
let (drop_signal, drop_receiver) = oneshot::channel::<()>();
let conn = Arc::downgrade(&self.connection);
let name = self.name.clone();
let id = self.id;
let topic = self.topic.clone();
let _ = self.client.executor.spawn(Box::pin(async move {
let _res = drop_receiver.await;
// if we receive a message, it indicates we want to stop this task

match conn.upgrade() {
None => {
debug!("Connection already dropped, no weak reference remaining")
}
Some(connection) => {
debug!("Closing producers of connection {}", connection.id());
let res = connection.sender().close_consumer(id).await;

if let Err(e) = res {
error!(
"could not close consumer {:?}({}) for topic {}: {:?}",
name, id, topic, e
);
}
}
}
}));

if let Some(prev_single) = std::mem::replace(&mut self.drop_signal, Some(drop_signal)) {
drop(prev_single);
}

Ok(())
}

Expand Down Expand Up @@ -809,3 +604,20 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
tokio::time::timeout_at(tokio::time::Instant::now() + dur, fut).await
}
}

impl<Exe: Executor> std::ops::Drop for ConsumerEngine<Exe> {
fn drop(&mut self) {
let conn = self.connection.clone();
let id = self.id;
let name = self.name.clone();
let topic = self.topic.clone();
let _ = self.client.executor.spawn(Box::pin(async move {
if let Err(e) = conn.sender().close_consumer(id).await {
error!(
"could not close consumer {:?}({}) for topic {}: {:?}",
name, id, topic, e
);
}
}));
}
}
Loading

0 comments on commit edf1922

Please sign in to comment.