From b030ef9ceae9cef9ad60d8d41a6c29f7be7e7437 Mon Sep 17 00:00:00 2001 From: cirias Date: Fri, 11 Aug 2023 19:51:33 +0800 Subject: [PATCH] chore: Implement Sync for Consumer (#291) Co-authored-by: Sirius --- src/connection.rs | 9 +++++---- src/consumer/multi.rs | 7 ++++--- src/error.rs | 2 +- src/executor.rs | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 2faae5c..2e82439 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -871,10 +871,11 @@ impl Connection { match auth { Some(m_auth) => { let mut auth_guard = m_auth.lock().await; - Ok(Some(Authentication { - name: auth_guard.auth_method_name(), - data: auth_guard.auth_data().await?, - })) + let name = auth_guard.auth_method_name(); + // wrap the future of auth_data() with Shared so that it implements Sync + let data_fut = auth_guard.auth_data().shared(); + let data = data_fut.await?; + Ok(Some(Authentication { name, data })) } None => Ok(None), } diff --git a/src/consumer/multi.rs b/src/consumer/multi.rs index fad4c75..cd8e0bd 100644 --- a/src/consumer/multi.rs +++ b/src/consumer/multi.rs @@ -30,9 +30,10 @@ pub struct MultiTopicConsumer { pub(super) topics: VecDeque, pub(super) existing_topics: VecDeque, #[allow(clippy::type_complexity)] - pub(super) new_consumers: - Option>, Error>> + Send>>>, - pub(super) refresh: Pin + Send>>, + pub(super) new_consumers: Option< + Pin>, Error>> + Send + Sync>>, + >, + pub(super) refresh: Pin + Send + Sync>>, pub(super) config: ConsumerConfig, // Stats on disconnected consumers to keep metrics correct pub(super) disc_messages_received: u64, diff --git a/src/error.rs b/src/error.rs index 31ed7fb..182dbf5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -406,7 +406,7 @@ impl std::error::Error for ServiceDiscoveryError { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum AuthenticationError { Custom(String), } diff --git a/src/executor.rs b/src/executor.rs index b980986..c51797d 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -252,7 +252,7 @@ pub enum Delay { Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] - AsyncStd(Pin + Send>>), + AsyncStd(Pin + Send + Sync>>), } impl Future for Delay {