Skip to content

Commit

Permalink
chore: Implement Sync for Consumer (#291)
Browse files Browse the repository at this point in the history
Co-authored-by: Sirius <sirius@x250>
  • Loading branch information
cirias and Sirius authored Aug 11, 2023
1 parent 7054b8a commit b030ef9
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
9 changes: 5 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,11 @@ impl<Exe: Executor> Connection<Exe> {
match auth {
Some(m_auth) => {
let mut auth_guard = m_auth.lock().await;
Ok(Some(Authentication {
name: auth_guard.auth_method_name(),
data: auth_guard.auth_data().await?,
}))
let name = auth_guard.auth_method_name();
// wrap the future of auth_data() with Shared so that it implements Sync
let data_fut = auth_guard.auth_data().shared();
let data = data_fut.await?;
Ok(Some(Authentication { name, data }))
}
None => Ok(None),
}
Expand Down
7 changes: 4 additions & 3 deletions src/consumer/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ pub struct MultiTopicConsumer<T: DeserializeMessage, Exe: Executor> {
pub(super) topics: VecDeque<String>,
pub(super) existing_topics: VecDeque<String>,
#[allow(clippy::type_complexity)]
pub(super) new_consumers:
Option<Pin<Box<dyn Future<Output = Result<Vec<TopicConsumer<T, Exe>>, Error>> + Send>>>,
pub(super) refresh: Pin<Box<dyn Stream<Item = ()> + Send>>,
pub(super) new_consumers: Option<
Pin<Box<dyn Future<Output = Result<Vec<TopicConsumer<T, Exe>>, Error>> + Send + Sync>>,
>,
pub(super) refresh: Pin<Box<dyn Stream<Item = ()> + Send + Sync>>,
pub(super) config: ConsumerConfig,
// Stats on disconnected consumers to keep metrics correct
pub(super) disc_messages_received: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl std::error::Error for ServiceDiscoveryError {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum AuthenticationError {
Custom(String),
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub enum Delay {
Tokio(tokio::time::Sleep),
/// wrapper around async-std's `Delay`
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
AsyncStd(Pin<Box<dyn Future<Output = ()> + Send>>),
AsyncStd(Pin<Box<dyn Future<Output = ()> + Send + Sync>>),
}

impl Future for Delay {
Expand Down

0 comments on commit b030ef9

Please sign in to comment.