diff --git a/Cargo.lock b/Cargo.lock index c010e90..a042d71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,7 @@ dependencies = [ "async-std", "cdn-proto", "clap", + "derive_more", "jf-signature", "parking_lot", "rand", @@ -845,6 +846,7 @@ dependencies = [ "capnpc", "criterion", "derivative", + "derive_more", "jf-signature", "kanal", "lazy_static", @@ -1235,6 +1237,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.76", +] + [[package]] name = "digest" version = "0.10.7" diff --git a/Cargo.toml b/Cargo.toml index 862eda6..21f79f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ async-std = { version = "1", default-features = false, features = [ rkyv = { version = "0.7", features = ["validation"] } derivative = "2" rcgen = { version = "0.13", features = ["x509-parser", "crypto"] } +derive_more = { version = "1", features = ["deref"] } # Dev dependencies (can't be defined explicitly in the workspace) # TODO: figure out if this actually builds on non-test targets diff --git a/cdn-client/Cargo.toml b/cdn-client/Cargo.toml index 8e63291..c0aeb76 100644 --- a/cdn-client/Cargo.toml +++ b/cdn-client/Cargo.toml @@ -33,3 +33,4 @@ rand.workspace = true tracing.workspace = true clap.workspace = true parking_lot.workspace = true +derive_more.workspace = true \ No newline at end of file diff --git a/cdn-client/src/binaries/bad-connector.rs b/cdn-client/src/binaries/bad-connector.rs index c2f4259..c5622ae 100644 --- a/cdn-client/src/binaries/bad-connector.rs +++ b/cdn-client/src/binaries/bad-connector.rs @@ -67,7 +67,7 @@ async fn main() { // and the `QUIC` protocol. let client = Client::::new(config); - client.ensure_initialized().await; + client.ensure_initialized().await.unwrap(); sleep(Duration::from_millis(200)).await; } } diff --git a/cdn-client/src/lib.rs b/cdn-client/src/lib.rs index 7703bfc..bbaa4f2 100644 --- a/cdn-client/src/lib.rs +++ b/cdn-client/src/lib.rs @@ -5,51 +5,331 @@ // along with the Push-CDN repository. If not, see . //! In here we define an API that is a little more higher-level and ergonomic -//! for end users. It is a light wrapper on top of a `Retry` connection. +//! for end users. #![forbid(unsafe_code)] - -pub mod reexports; -mod retry; +use std::{collections::HashSet, sync::Arc, time::Duration}; use cdn_proto::{ bail, - crypto::signature::Serializable, - def::{ConnectionDef, PublicKey}, + connection::{ + auth::user::UserAuth, + limiter::Limiter, + protocols::{Connection, Protocol as _}, + }, + crypto::signature::{KeyPair, Serializable}, + def::{ConnectionDef, Protocol, PublicKey, Scheme}, error::{Error, Result}, message::{Broadcast, Direct, Message, Topic}, + util::AbortOnDropHandle, +}; +use derive_more::derive::Deref; +use parking_lot::Mutex; +use tokio::{ + spawn, + sync::{RwLock, Semaphore, TryAcquireError}, + time::sleep, }; -use retry::Retry; +use tracing::{error, info}; + +pub mod reexports; + +/// `Client` is a wrapper around a fallible connection. +/// +/// It employs synchronization as well as Client logic. +/// Can be cloned to provide a handle to the same underlying elastic connection. +#[derive(Clone, Deref)] +pub struct Client(Arc>); + +/// `ClientRef` is held exclusively by `Client`, wherein an `Arc` is used +/// to facilitate interior mutability. +pub struct ClientRef { + /// This is the remote endpoint of the marshal that we authenticate with. + endpoint: String, + + /// Whether or not to use the trust the local, pinned CA. It is insecure to use this in + /// a production environment. + use_local_authority: bool, + + /// The underlying connection + connection: Arc>>, + + /// The semaphore to ensure only one reconnection is happening at a time + connecting_guard: Arc, + + /// The (optional) task that is responsible for reconnecting + reconnection_task: Arc>>>, + + /// The keypair to use when authenticating + pub keypair: KeyPair>, + + /// The topics we're currently subscribed to. We need this so we can send our subscriptions + /// when we connect to a new server. + pub subscribed_topics: RwLock>, +} + +impl ClientRef { + /// Attempt a reconnection to the remote marshal endpoint. + /// Returns the connection verbatim without updating any internal + /// structs. + /// + /// # Errors + /// - If the connection failed + /// - If authentication failed + async fn connect(self: &Arc) -> Result { + // If the connecting guard is closed, the client has been manually closed + if self.connecting_guard.is_closed() { + return Err(Error::Connection( + "client has been manually closed".to_string(), + )); + } + + // Create the limiter we will use for all connections + let limiter = Limiter::new(None, Some(1)); + + // Make the connection to the marshal + let connection = bail!( + Protocol::::connect(&self.endpoint, self.use_local_authority, limiter.clone()).await, + Connection, + "failed to connect to endpoint" + ); + + // Authenticate the connection to the marshal (if not provided) + let (broker_endpoint, permit) = bail!( + UserAuth::::authenticate_with_marshal(&connection, &self.keypair).await, + Authentication, + "failed to authenticate to marshal" + ); + + // Make the connection to the broker + let connection = bail!( + Protocol::::connect(&broker_endpoint, self.use_local_authority, limiter).await, + Connection, + "failed to connect to broker" + ); + + // Authenticate the connection to the broker + bail!( + UserAuth::::authenticate_with_broker( + &connection, + permit, + self.subscribed_topics.read().await.clone() + ) + .await, + Authentication, + "failed to authenticate to broker" + ); + + info!(id = broker_endpoint, "connected to broker"); + + Ok(connection) + } +} + +/// The configuration needed to construct a `Client` connection. +pub struct Config { + /// This is the remote endpoint of the marshal that we authenticate with. + pub endpoint: String, -/// `Client` is a light wrapper around a `Retry` connection that provides functions -/// for common operations to and from a server. Mostly just used to make the API -/// more ergonomic. Also keeps track of subscriptions. -#[derive(Clone)] -pub struct Client(Retry); + /// Whether or not to use the trust the local, pinned CA. It is insecure to use this in + /// a production environment. + pub use_local_authority: bool, -pub type Config = retry::Config; + /// The underlying (public) verification key, used to authenticate with the server. Checked + /// against the stake table. + pub keypair: KeyPair>, + + /// The topics we're currently subscribed to. We need this here so we can send our subscriptions + /// when we connect to a new server. + pub subscribed_topics: Vec, +} + +// Disconnects the current connection if an error was passed in and we're +// not already reconnecting. +macro_rules! disconnect_on_error { + ($self:expr, $res: expr) => { + match $res { + Ok(t) => Ok(t), + Err(e) => { + // If we are not currently reconnecting, take the current connection + if $self.connecting_guard.available_permits() > 0 { + // If we ran into an error, take the current connection. + // This will only start reconnecting if we try to receive or send another message. + $self.connection.write().await.take(); + } + + Err(e) + } + } + }; +} impl Client { - /// Creates a new `Retry` from a configuration. + /// Creates a new `Client` connection from a `Config` + /// Attempts to make an initial connection. + /// This allows us to create elastic clients that always try to maintain a connection. + /// + /// # Errors + /// - If we are unable to either parse or bind an endpoint to the local endpoint. + /// - If we are unable to make the initial connection pub fn new(config: Config) -> Self { - Self(Retry::from_config(config)) + // Extrapolate values from the underlying client configuration + let Config { + endpoint, + use_local_authority, + keypair, + subscribed_topics, + } = config; + + // Wrap subscribed topics so we can use it now and later + let subscribed_topics = RwLock::new(HashSet::from_iter(subscribed_topics)); + + // Return the slightly transformed connection. + Self(Arc::from(ClientRef { + endpoint, + use_local_authority, + connection: Arc::default(), + connecting_guard: Arc::from(Semaphore::const_new(1)), + reconnection_task: Arc::default(), + keypair, + subscribed_topics, + })) } - /// Returns only once the connection is fully initialized - pub async fn ensure_initialized(&self) { - self.0.ensure_initialized().await; + /// Get the underlying connection if it exists, otherwise try to reconnect. + /// + /// # Errors + /// - If we are in the middle of reconnecting + /// - If the connection has been manually closed + fn reconnect_if_needed(&self, possible_connection: Option) -> Result { + let Some(connection) = possible_connection else { + // If the connection is not initialized for one reason or another, try to reconnect + // Acquire the semaphore to ensure only one reconnection is happening at a time + match Arc::clone(&self.connecting_guard).try_acquire_owned() { + Ok(permit) => { + // We were the first to try reconnecting, spawn a reconnection task + let self_clone = self.0.clone(); + let reconnection_task = AbortOnDropHandle(spawn(async move { + let mut connection = self_clone.connection.write().await; + + // Forever, + loop { + // Try to reconnect + match self_clone.connect().await { + Ok(new_connection) => { + // We successfully reconnected + *connection = Some(new_connection); + break; + } + Err(err) => { + // We failed to reconnect + // Sleep for 2 seconds and then try again + error!("failed to connect: {err}"); + sleep(Duration::from_secs(2)).await; + } + } + } + drop(permit); + })); + + // Update the reconnection task + *self.reconnection_task.lock() = Some(reconnection_task); + } + Err(TryAcquireError::Closed) => { + // The client has been manually closed + return Err(Error::Connection( + "client has been manually closed".to_string(), + )); + } + Err(TryAcquireError::NoPermits) => {} + } + + // The reconnection task has either been started or there were no permits + // available, so we're already reconnecting + return Err(Error::Connection("connection in progress".to_string())); + }; + + Ok(connection) } - /// Receives the next message from the downstream server. - /// If it fails, we return an error but try to initiate a new - /// connection in the background. + /// Get the connection if it exists, wait for a potential reconnection if it does not. /// /// # Errors - /// If the connection or deserialization has failed + /// - If somebody else is already reconnecting + /// - If the client has been manually closed + async fn get_connection(&self) -> Result { + let possible_connection = self.connection.read().await; + + // TODO: figure out a potential way to remove this clone + self.reconnect_if_needed(possible_connection.clone()) + } + + /// Try to get the connection if it exists, otherwise try to reconnect. Does not block. + /// + /// # Errors + /// - If we are in the middle of reconnecting + /// - If the client has been manually closed + fn try_get_connection(&self) -> Result { + let Ok(possible_connection) = self.connection.try_read() else { + // Someone else is already reconnecting + return Err(Error::Connection( + "connection in progress or manually closed".to_string(), + )); + }; + + self.reconnect_if_needed(possible_connection.clone()) + } + + /// Sends a message to the underlying connection. Reconnection is handled under + /// the hood. Messages will fail if the connection is currently closed or reconnecting. + /// + /// # Errors + /// If the message sending fails. For example: + /// - If we are reconnecting + /// - If we are disconnected + pub async fn send_message(&self, message: Message) -> Result<()> { + // Try to get the underlying connection + let connection = self.try_get_connection()?; + + // Soft close the connection + disconnect_on_error!(self, connection.send_message(message).await) + } + + /// Receives a message from the underlying fallible connection. Reconnection logic is here, + /// but Client logic needs to be handled by the caller (e.g. re-receive messages) + /// + /// # Errors + /// - If we are in the middle of reconnecting + /// - If the message receiving failed pub async fn receive_message(&self) -> Result { - // TODO: conditionally match error on whether deserialization OR the connection failed - // this way we don't reconnect if somebody sends us a bad message - self.0.receive_message().await + // Try to synchronously get the underlying connection + let connection = self.get_connection().await?; + + // Receive a message + disconnect_on_error!(self, connection.recv_message().await) + } + + /// Returns only when the connection is fully initialized + /// + /// # Errors + /// - If the connection has been closed + pub async fn ensure_initialized(&self) -> Result<()> { + // Return early if the connecting guard is closed + if self.connecting_guard.is_closed() { + return Err(Error::Connection( + "client has been manually closed".to_string(), + )); + } + + // If we are already connected, return + if self.try_get_connection().is_ok() { + return Ok(()); + } + + // Otherwise, wait to acquire the connecting guard + let _ = self.connecting_guard.acquire().await; + + Ok(()) } /// Sends a pre-serialized message to the server, denoting recipients in the form @@ -97,7 +377,7 @@ impl Client { /// If the connection or serialization has failed pub async fn subscribe(&self, topics: Vec) -> Result<()> { // Lock subscriptions here so we maintain parity during a reconnection - let mut subscribed_guard = self.0.inner.subscribed_topics.write().await; + let mut subscribed_guard = self.subscribed_topics.write().await; // Calculate the real topics to send based on whatever's already in the set let topics_to_send: Vec = topics @@ -131,7 +411,7 @@ impl Client { /// If the connection or serialization has failed pub async fn unsubscribe(&self, topics: Vec) -> Result<()> { // Lock subscriptions here so we maintain parity during a reconnection - let mut subscribed_guard = self.0.inner.subscribed_topics.write().await; + let mut subscribed_guard = self.subscribed_topics.write().await; // Calculate the real topics to send based on whatever's already in the set let topics_to_send: Vec = topics @@ -158,22 +438,40 @@ impl Client { Ok(()) } - /// Sends a message over the wire. Various functions make use - /// of this one upstream. + /// Soft close the connection, ensuring that all messages are sent. /// /// # Errors - /// - if the downstream message sending fails. - pub async fn send_message(&self, message: Message) -> Result<()> { - self.0.send_message(message).await + /// - If we are in the middle of reconnecting + /// - If the connection is closed + pub async fn soft_close(&self) -> Result<()> { + // Try to get the underlying connection + let connection = self.try_get_connection()?; + + // Soft close the connection + disconnect_on_error!(self, connection.soft_close().await) } - /// Soft close the connection, ensuring that all messages are sent. - /// This is useful for ensuring that messages are sent before a - /// connection is closed. + /// Shut down the client, closing the connection and aborting all tasks. + /// Assures that any future calls will fail and that reconnection does not + /// take place. Does not make any guarantees about pending messages. /// - /// # Errors - /// - if the connection is already closed - pub async fn soft_close(&self) -> Result<()> { - self.0.soft_close().await + /// Will block a maximum of 2 seconds for the connection to close. + pub async fn close(&self) { + // First close the connecting guard so no more reconnection tasks can be spawned + // and the next call to `connect()` will fail + self.connecting_guard.close(); + + // Get the current reconnection task and abort it + if let Some(reconnection_task) = self.reconnection_task.lock().take() { + reconnection_task.abort(); + } + + // Take the connection + self.connection.write().await.take(); + } + + /// Returns whether or not the client has been manually closed + pub fn is_closed(&self) -> bool { + self.connecting_guard.is_closed() } } diff --git a/cdn-client/src/retry.rs b/cdn-client/src/retry.rs deleted file mode 100644 index 5f1779e..0000000 --- a/cdn-client/src/retry.rs +++ /dev/null @@ -1,318 +0,0 @@ -// Copyright (c) 2024 Espresso Systems (espressosys.com) -// This file is part of the Push-CDN repository. - -// You should have received a copy of the MIT License -// along with the Push-CDN repository. If not, see . - -//! This file provides a `Retry` connection, which allows for reconnections -//! on top of a normal connection. -//! -//! TODO FOR ALL CRATES: figure out where we need to bail, -//! and where we can just return. Most of the errors already have -//! enough context from previous bails. - -use std::{collections::HashSet, sync::Arc, time::Duration}; - -use cdn_proto::{ - connection::{ - auth::user::UserAuth, - limiter::Limiter, - protocols::{Connection, Protocol as _}, - }, - crypto::signature::KeyPair, - def::{ConnectionDef, Protocol, Scheme}, - error::{Error, Result}, - message::{Message, Topic}, - util::AbortOnDropHandle, -}; -use parking_lot::Mutex; -use tokio::{ - spawn, - sync::{RwLock, Semaphore}, - time::sleep, -}; -use tracing::{error, info}; - -use crate::bail; - -/// `Retry` is a wrapper around a fallible connection. -/// -/// It employs synchronization as well as retry logic. -/// Can be cloned to provide a handle to the same underlying elastic connection. -#[derive(Clone)] -pub struct Retry { - pub inner: Arc>, -} - -/// `Inner` is held exclusively by `Retry`, wherein an `Arc` is used -/// to facilitate interior mutability. -pub struct Inner { - /// This is the remote endpoint of the marshal that we authenticate with. - endpoint: String, - - /// Whether or not to use the trust the local, pinned CA. It is insecure to use this in - /// a production environment. - use_local_authority: bool, - - /// The underlying connection - connection: Arc>>, - - /// The semaphore to ensure only one reconnection is happening at a time - connecting_guard: Arc, - - /// The (optional) task that is responsible for reconnecting - reconnection_task: Arc>>>, - - /// The keypair to use when authenticating - pub keypair: KeyPair>, - - /// The topics we're currently subscribed to. We need this so we can send our subscriptions - /// when we connect to a new server. - pub subscribed_topics: RwLock>, -} - -impl Inner { - /// Attempt a reconnection to the remote marshal endpoint. - /// Returns the connection verbatim without updating any internal - /// structs. - /// - /// # Errors - /// - If the connection failed - /// - If authentication failed - async fn connect(self: &Arc) -> Result { - // Create the limiter we will use for all connections - let limiter = Limiter::new(None, Some(1)); - - // Make the connection to the marshal - let connection = bail!( - Protocol::::connect(&self.endpoint, self.use_local_authority, limiter.clone()).await, - Connection, - "failed to connect to endpoint" - ); - - // Authenticate the connection to the marshal (if not provided) - let (broker_endpoint, permit) = bail!( - UserAuth::::authenticate_with_marshal(&connection, &self.keypair).await, - Authentication, - "failed to authenticate to marshal" - ); - - // Make the connection to the broker - let connection = bail!( - Protocol::::connect(&broker_endpoint, self.use_local_authority, limiter).await, - Connection, - "failed to connect to broker" - ); - - // Authenticate the connection to the broker - bail!( - UserAuth::::authenticate_with_broker( - &connection, - permit, - self.subscribed_topics.read().await.clone() - ) - .await, - Authentication, - "failed to authenticate to broker" - ); - - info!(id = broker_endpoint, "connected to broker"); - - Ok(connection) - } -} - -/// The configuration needed to construct a `Retry` connection. -pub struct Config { - /// This is the remote endpoint of the marshal that we authenticate with. - pub endpoint: String, - - /// Whether or not to use the trust the local, pinned CA. It is insecure to use this in - /// a production environment. - pub use_local_authority: bool, - - /// The underlying (public) verification key, used to authenticate with the server. Checked - /// against the stake table. - pub keypair: KeyPair>, - - /// The topics we're currently subscribed to. We need this here so we can send our subscriptions - /// when we connect to a new server. - pub subscribed_topics: Vec, -} - -// Disconnects the current connection if an error was passed in and we're -// not already reconnecting. -macro_rules! disconnect_on_error { - ($self:expr, $res: expr) => { - match $res { - Ok(t) => Ok(t), - Err(e) => { - // If we are not currently reconnecting, take the current connection - if $self.inner.connecting_guard.available_permits() > 0 { - // If we ran into an error, take the current connection. - // This will only start reconnecting if we try to receive or send another message. - $self.inner.connection.write().await.take(); - } - - Err(e) - } - } - }; -} - -impl Retry { - /// Get the underlying connection if it exists, otherwise try to reconnect. - /// - /// # Errors - /// - If we are in the middle of reconnecting - fn reconnect_if_needed(&self, possible_connection: Option) -> Result { - let Some(connection) = possible_connection else { - // If the connection is not initialized for one reason or another, try to reconnect - // Acquire the semaphore to ensure only one reconnection is happening at a time - if let Ok(permit) = Arc::clone(&self.inner.connecting_guard).try_acquire_owned() { - // We were the first to try reconnecting, spawn a reconnection task - let inner = self.inner.clone(); - let reconnection_task = AbortOnDropHandle(spawn(async move { - let mut connection = inner.connection.write().await; - - // Forever, - loop { - // Try to reconnect - match inner.connect().await { - Ok(new_connection) => { - // We successfully reconnected - *connection = Some(new_connection); - break; - } - Err(err) => { - // We failed to reconnect - // Sleep for 2 seconds and then try again - error!("failed to connect: {err}"); - sleep(Duration::from_secs(2)).await; - } - } - } - drop(permit); - })); - - // Update the reconnection task - *self.inner.reconnection_task.lock() = Some(reconnection_task); - } - - // If we are in the middle of reconnecting, return an error - return Err(Error::Connection("connection in progress".to_string())); - }; - - Ok(connection) - } - - /// Get the connection if it exists, wait for a potential reconnection if it does not. - /// - /// # Errors - /// - If somebody else is already reconnecting - async fn get_connection(&self) -> Result { - let possible_connection = self.inner.connection.read().await; - - // TODO: figure out a potential way to remove this clone - self.reconnect_if_needed(possible_connection.clone()) - } - - /// Try to get the connection if it exists, otherwise try to reconnect. Does not block. - /// - /// # Errors - /// - If we are in the middle of reconnecting - fn try_get_connection(&self) -> Result { - let Ok(possible_connection) = self.inner.connection.try_read() else { - // Someone else is already reconnecting - return Err(Error::Connection("connection in progress".to_string())); - }; - - self.reconnect_if_needed(possible_connection.clone()) - } - - /// Creates a new `Retry` connection from a `Config` - /// Attempts to make an initial connection. - /// This allows us to create elastic clients that always try to maintain a connection. - /// - /// # Errors - /// - If we are unable to either parse or bind an endpoint to the local endpoint. - /// - If we are unable to make the initial connection - pub fn from_config(config: Config) -> Self { - // Extrapolate values from the underlying client configuration - let Config { - endpoint, - use_local_authority, - keypair, - subscribed_topics, - } = config; - - // Wrap subscribed topics so we can use it now and later - let subscribed_topics = RwLock::new(HashSet::from_iter(subscribed_topics)); - - // Return the slightly transformed connection. - Self { - inner: Arc::from(Inner { - endpoint, - use_local_authority, - connection: Arc::default(), - connecting_guard: Arc::from(Semaphore::const_new(1)), - reconnection_task: Arc::default(), - keypair, - subscribed_topics, - }), - } - } - - /// Returns only when the connection is fully initialized - pub async fn ensure_initialized(&self) { - // If we are already connected, return - if self.try_get_connection().is_ok() { - return; - } - - // Otherwise, wait to acquire the connecting guard - let _ = self.inner.connecting_guard.acquire().await; - } - - /// Sends a message to the underlying connection. Reconnection is handled under - /// the hood. Messages will fail if the connection is currently closed or reconnecting. - /// - /// # Errors - /// If the message sending fails. For example: - /// - If we are reconnecting - /// - If we are disconnected - pub async fn send_message(&self, message: Message) -> Result<()> { - // Try to get the underlying connection - let connection = self.try_get_connection()?; - - // Soft close the connection - disconnect_on_error!(self, connection.send_message(message).await) - } - - /// Receives a message from the underlying fallible connection. Reconnection logic is here, - /// but retry logic needs to be handled by the caller (e.g. re-receive messages) - /// - /// # Errors - /// - If we are in the middle of reconnecting - /// - If the message receiving failed - pub async fn receive_message(&self) -> Result { - // Try to synchronously get the underlying connection - let connection = self.get_connection().await?; - - // Receive a message - disconnect_on_error!(self, connection.recv_message().await) - } - - /// Soft close the connection, ensuring that all messages are sent. - /// - /// # Errors - /// - If we are in the middle of reconnecting - /// - If the connection is closed - pub async fn soft_close(&self) -> Result<()> { - // Try to get the underlying connection - let connection = self.try_get_connection()?; - - // Soft close the connection - disconnect_on_error!(self, connection.soft_close().await) - } -} diff --git a/cdn-proto/Cargo.toml b/cdn-proto/Cargo.toml index d010c3d..9314cf0 100644 --- a/cdn-proto/Cargo.toml +++ b/cdn-proto/Cargo.toml @@ -67,4 +67,5 @@ rkyv.workspace = true mnemonic = "1" rcgen.workspace = true derivative.workspace = true +derive_more.workspace = true num_enum = "0.7" diff --git a/cdn-proto/src/discovery/redis.rs b/cdn-proto/src/discovery/redis.rs index c5e7344..a644905 100644 --- a/cdn-proto/src/discovery/redis.rs +++ b/cdn-proto/src/discovery/redis.rs @@ -142,7 +142,7 @@ impl DiscoveryClient for Redis { // Get the number of connections the broker has let num_connections: u64 = bail!( redis::cmd("GET") - .arg(&format!("{broker}/num_connections")) + .arg(format!("{broker}/num_connections")) .query_async(&mut self.underlying_connection) .await, Connection, @@ -152,7 +152,7 @@ impl DiscoveryClient for Redis { // Get the number of permits the broker has let num_permits: u64 = bail!( redis::cmd("SCARD") - .arg(&format!("{broker}/permits")) + .arg(format!("{broker}/permits")) .query_async(&mut self.underlying_connection) .await, Connection, diff --git a/cdn-proto/src/util.rs b/cdn-proto/src/util.rs index 81aef83..7183134 100644 --- a/cdn-proto/src/util.rs +++ b/cdn-proto/src/util.rs @@ -5,6 +5,7 @@ use std::{ task::{Context, Poll}, }; +use derive_more::derive::Deref; use tokio::task::{JoinError, JoinHandle}; /// A function for generating a cute little user mnemonic from a hash @@ -15,7 +16,8 @@ pub fn mnemonic(bytes: H) -> String { mnemonic::to_string(state.finish().to_le_bytes()) } -/// A wrapper for a `JoinHandle` that will abort the task if dropped +/// A wrapper for a `JoinHandle` that will abort the task if +#[derive(Deref)] pub struct AbortOnDropHandle(pub JoinHandle); impl Drop for AbortOnDropHandle {