From f13105d18aa8c6fc17ff3514dbb255c40bec69cd Mon Sep 17 00:00:00 2001 From: Louis-Vincent Date: Fri, 22 Nov 2024 09:01:54 -0500 Subject: [PATCH] Added more docs --- src/lease.rs | 22 ++++++ src/lib.rs | 15 ++++ src/lock.rs | 188 +++++++++++++++++++++++++++++++-------------- src/retry.rs | 18 +++++ src/watcher.rs | 17 ++++ tests/test_lock.rs | 7 +- 6 files changed, 205 insertions(+), 62 deletions(-) diff --git a/src/lease.rs b/src/lease.rs index 8c7654a..a9f8a89 100644 --- a/src/lease.rs +++ b/src/lease.rs @@ -13,6 +13,11 @@ use { // Jiffy is interval between system timer interrupts, typically 10ms for linux systems. const AT_LEAST_10_JIFFIES: Duration = Duration::from_millis(100); +/// +/// Managed lease instance that will keep the lease alive until it is dropped. +/// +/// See [`ManagedLeaseFactory::new_lease`] for more information. +/// pub struct ManagedLease { pub lease_id: i64, // Let this field dead, because when drop it will trigger a task to wake up and gracefully revoke lease. @@ -20,6 +25,9 @@ pub struct ManagedLease { _tx_terminate: oneshot::Sender<()>, } +/// +/// Managed lease factory that will create a new lease and keep it alive until it is dropped. +/// #[derive(Clone)] pub struct ManagedLeaseFactory { etcd: etcd_client::Client, @@ -34,11 +42,25 @@ impl ManagedLeaseFactory { } } + /// + /// Shutdown the lease factory and revoke all leases. + /// + /// Becareful calling this method as it will wait for all lease to be revoked. + /// pub async fn shutdown(self, timeout: Duration) { let mut lock = self.js.lock().await; let _ = tokio::time::timeout(timeout, lock.shutdown()).await; } + /// + /// Create a new managed lease with the given time-to-live (TTL) and keepalive interval. + /// + /// Managed lease have automatic keep alive mechanism that will keep the lease alive until it is dropped. + /// + /// The ttl must be at least two (2) seconds. + /// + /// Keepalive interval is optional, if not provided it will be half of the ttl. + /// pub async fn new_lease( &self, ttl: Duration, diff --git a/src/lib.rs b/src/lib.rs index 8ec685d..bbd7572 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,21 @@ +/// +/// Provides an API over "managed" lease +/// pub mod lease; + +/// +/// Provides an API over "managed" lock +/// pub mod lock; + +/// +/// Utility function to manage various transient errors. pub mod retry; + +/// +/// Robust API for watching etcd changes pub mod watcher; +/// +/// Alias for etcd revision pub type Revision = i64; diff --git a/src/lock.rs b/src/lock.rs index 6587fc3..2241e53 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -1,3 +1,37 @@ +/// +/// This module provides a lock manager to create "managed" locks. +/// +/// Managed lock's lifecycle is managed by the lock manager background thread, that includes: +/// - Automatic lease refresh +/// - Lock revocation when the lock is dropped +/// +/// You can clone [`LockManager`] to share it across threads, it is really cheap to do so. +/// +/// See [`spawn_lock_manager`] to create a new lock manager. +/// +/// # Examples +/// +/// ``` +/// use etcd_client::Client; +/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, ManagedLock}; +/// +/// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd"); +/// +/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); +/// +/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone()); +/// +/// // Do something with the lock manager +/// +/// let managed_lock: ManagedLock = lock_man.try_lock("test").await.expect("failed to lock"); +/// +/// +/// drop(lock_man); +/// +/// // Wait for the lock manager background thread to stop +/// lock_man_handle.await.expect("failed to release lock manager handle"); +/// ``` +/// use { super::{ lease::{ManagedLease, ManagedLeaseFactory}, @@ -31,6 +65,17 @@ enum DeleteQueueCommand { Delete(Vec), } +/// +/// A lock manager to create "managed" locks. +/// +/// Managed lock's lifecycle is managed by the lock manager background thread, that includes: +/// - Automatic lease refresh +/// - Lock revocation when the lock is dropped +/// +/// You can clone [`LockManager`] to share it across threads, it is really cheap to do so. +/// +/// See [`spawn_lock_manager`] to create a new lock manager. +/// #[derive(Clone)] pub struct LockManager { etcd: etcd_client::Client, @@ -42,6 +87,10 @@ pub struct LockManager { lock_manager_handle_entangled_tx: mpsc::UnboundedSender<()>, } +/// +/// Handle to the lock manager background thread. +/// +/// See [`spawn_lock_manager`] to create a new lock manager. pub struct LockManagerHandle { inner: JoinHandle<()>, } @@ -52,6 +101,45 @@ impl Future for LockManagerHandle { self.inner.poll_unpin(cx) } } + +/// +/// Used to notify when a lock is revoked. +/// +/// Examples +/// +/// ``` +/// +/// use etcd_client::Client; +/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, ManagedLock}; +/// +/// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd"); +/// +/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); +/// +/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone()); +/// +/// // Do something with the lock manager +/// +/// let managed_lock: ManagedLock = lock_man.try_lock("test").await.expect("failed to lock"); +/// +/// let revoke_notify = managed_lock.get_revoke_notify(); +/// +/// // Can be cloned +/// let revoke_notify2 = revoke_notify.clone(); +/// +/// // Can create multiple instances.. +/// +/// let revoke_notify3 = managed_lock.get_revoke_notify(); +/// +/// etcd.delete("test", None).await.expect("failed to delete"); +/// +/// revoke_notify.wait_for_revoke().await; +/// revoke_notify2.wait_for_revoke().await; +/// revoke_notify3.wait_for_revoke().await; +/// +/// println!("All revoke notify received"); +/// ``` +/// pub struct ManagedLockRevokeNotify { watch_lock_delete: broadcast::Receiver, } @@ -64,20 +152,10 @@ impl Clone for ManagedLockRevokeNotify { } } -#[derive(Debug, thiserror::Error)] -pub enum LockDeleteCallbackError { - NotifierDropped(String), -} - -impl fmt::Display for LockDeleteCallbackError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - LockDeleteCallbackError::NotifierDropped(e) => write!(f, "notifier dropped: {e}"), - } - } -} - impl ManagedLockRevokeNotify { + /// + /// Wait for the lock to be revoked. + /// pub async fn wait_for_revoke(mut self) { let _ = self.watch_lock_delete.recv().await; } @@ -223,40 +301,26 @@ pub fn spawn_lock_manager( ) } +/// +/// Error that can occur when trying to lock a key. +/// #[derive(Debug, thiserror::Error)] pub enum LockingError { + #[error("Invalid lock name, too many prefix overlapping")] InvalidLockName, } -impl fmt::Display for LockingError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - LockingError::InvalidLockName => { - f.write_str("invalid lock name -- too many prefix overlapping") - } - } - } -} - impl LockManager { + /// + /// Tries to lock a key with automatic lease refresh and lock revocation when dropped. + /// + /// If the key is already held by another lock, it will return an error immediately. + /// pub async fn try_lock( &self, name: S, lease_duration: Duration, ) -> Result - where - S: AsRef, - { - self.try_lock_return_revoke_notify(name, lease_duration) - .await - .map(|(lock, _)| lock) - } - - pub async fn try_lock_return_revoke_notify( - &self, - name: S, - lease_duration: Duration, - ) -> Result<(ManagedLock, ManagedLockRevokeNotify), TryLockError> where S: AsRef, { @@ -308,19 +372,14 @@ impl LockManager { .etcd .watch_client() .watch_key_delete(lock_key.clone(), revision); - Ok(( - ManagedLock { - lock_key, - managed_lease, - etcd: self.etcd.clone(), - created_at_revision: revision, - delete_signal_tx: self.delete_queue_tx.clone(), - revoke_callback_rx: watch_lock_delete.subscribe(), - }, - ManagedLockRevokeNotify { - watch_lock_delete: watch_lock_delete.subscribe(), - }, - )) + Ok(ManagedLock { + lock_key, + managed_lease, + etcd: self.etcd.clone(), + created_at_revision: revision, + delete_signal_tx: self.delete_queue_tx.clone(), + revoke_callback_rx: watch_lock_delete.subscribe(), + }) } /// @@ -426,6 +485,9 @@ impl Drop for ManagedLock { } } +/// +/// Error that can occur when using a managed lock. +/// #[derive(Debug)] pub enum LockError { LockRevoked, @@ -444,6 +506,9 @@ impl ManagedLock { self.managed_lease.lease_id } + /// + /// Execute an etcd transaction if and only if the lock is still alive. + /// pub async fn txn(&self, operations: impl Into>) -> TxnResponse { let txn = Txn::new() .when(vec![Compare::version( @@ -458,12 +523,18 @@ impl ManagedLock { .expect("failed txn") } + /// + /// Get a revoke notify handle to be notified when the lock is revoked. + /// pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify { ManagedLockRevokeNotify { watch_lock_delete: self.revoke_callback_rx.resubscribe(), } } + /// + /// Check if the lock is still alive. + /// pub async fn is_alive(&self) -> bool { let get_response = self .etcd @@ -474,6 +545,9 @@ impl ManagedLock { get_response.count() == 1 } + /// + /// Get the underlying unique lock key. + /// pub fn get_key(&self) -> Vec { self.lock_key.clone() } @@ -552,19 +626,15 @@ impl ManagedLock { } } +/// +/// Error that can occur when trying to lock a key. +/// #[derive(Debug, PartialEq, Eq, Error)] pub enum TryLockError { + #[error("Invalid lock name, too many prefix overlapping")] InvalidLockName, + #[error("Already taken")] AlreadyTaken, + #[error("Locking deadline exceeded")] LockingDeadlineExceeded, } - -impl fmt::Display for TryLockError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::InvalidLockName => f.write_str("InvalidLockName"), - Self::AlreadyTaken => f.write_str("AlreadyTaken"), - Self::LockingDeadlineExceeded => f.write_str("LockingDeadlineExceeded"), - } - } -} diff --git a/src/retry.rs b/src/retry.rs index e948c7c..f1784ff 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -4,6 +4,15 @@ use { tracing::{error, warn}, }; +/// +/// Check if an etcd error is transient. +/// +/// Transient error are error that are caused by "outside" forces that cannot be prevented such as a network partition. +/// +/// If the error is for example gRPC status "Not found", the function will return false. +/// +/// Transient errors are worth retrying -- See [`retry_etcd`] for an example. +/// pub fn is_transient(err: &etcd_client::Error) -> bool { match err { etcd_client::Error::GRpcStatus(status) => match status.code() { @@ -29,6 +38,9 @@ pub fn is_transient(err: &etcd_client::Error) -> bool { } } +/// +/// Retry an etcd transaction if the error is transient. +/// pub async fn retry_etcd_txn( etcd: etcd_client::Client, txn: etcd_client::Txn, @@ -39,6 +51,9 @@ pub async fn retry_etcd_txn( .await } +/// +/// Retry an etcd get if the error is transient. +/// pub async fn retry_etcd_get( etcd: etcd_client::Client, key: String, @@ -95,6 +110,9 @@ where retry_etcd_with_strategy(etcd, reusable_args, retry_strategy, f).await } +/// +/// Similar to [`retry_etcd`] but with a custom retry strategy. +/// pub async fn retry_etcd_with_strategy( etcd: etcd_client::Client, reusable_args: A, diff --git a/src/watcher.rs b/src/watcher.rs index 7c180fd..5810e74 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -9,6 +9,11 @@ use { tracing::{error, warn}, }; +/// +/// Custom types for watch events. +/// +/// Unwrap the etcd watch event to a more user-friendly event. +/// pub enum WatchEvent { Put { key: Vec, @@ -22,6 +27,15 @@ pub enum WatchEvent { }, } +/// +/// Extension trait for [`WatchClient`]. +/// +/// This trait provides utility methods for working with [`WatchClient`]. +/// +/// This extension trait provides rust channel of watch stream and more reliability in case of transient errors. +/// +/// On transient errors, the watch stream will be retried and resume where you left off. +/// #[async_trait::async_trait] pub trait WatchClientExt { fn get_watch_client(&self) -> WatchClient; @@ -186,6 +200,9 @@ pub trait WatchClientExt { rx } + /// + /// Creates a broadcast channel that watches for a key to deleted. + /// fn watch_key_delete( &self, key: impl Into>, diff --git a/tests/test_lock.rs b/tests/test_lock.rs index 199230d..7596393 100644 --- a/tests/test_lock.rs +++ b/tests/test_lock.rs @@ -47,13 +47,14 @@ async fn dropping_managed_lock_should_revoke_etcd_lock() { let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); let lock_name = random_str(10); - let (managed_lock1, delete_cb) = lock_man - .try_lock_return_revoke_notify(lock_name.as_str(), Duration::from_secs(10)) + let managed_lock1 = lock_man + .try_lock(lock_name.as_str(), Duration::from_secs(10)) .await .expect("failed to lock"); + let revoke_notify = managed_lock1.get_revoke_notify(); drop(managed_lock1); - let _ = delete_cb.wait_for_revoke().await; + let _ = revoke_notify.wait_for_revoke().await; let _managed_lock2 = lock_man .try_lock(lock_name, Duration::from_secs(10))