Skip to content

Commit

Permalink
Added more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed Nov 22, 2024
1 parent c62c84f commit f13105d
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 62 deletions.
22 changes: 22 additions & 0 deletions src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ use {
// Jiffy is interval between system timer interrupts, typically 10ms for linux systems.
const AT_LEAST_10_JIFFIES: Duration = Duration::from_millis(100);

///
/// Managed lease instance that will keep the lease alive until it is dropped.
///
/// See [`ManagedLeaseFactory::new_lease`] for more information.
///
pub struct ManagedLease {
pub lease_id: i64,
// Let this field dead, because when drop it will trigger a task to wake up and gracefully revoke lease.
#[allow(dead_code)]
_tx_terminate: oneshot::Sender<()>,
}

///
/// Managed lease factory that will create a new lease and keep it alive until it is dropped.
///
#[derive(Clone)]
pub struct ManagedLeaseFactory {
etcd: etcd_client::Client,
Expand All @@ -34,11 +42,25 @@ impl ManagedLeaseFactory {
}
}

///
/// Shutdown the lease factory and revoke all leases.
///
/// Becareful calling this method as it will wait for all lease to be revoked.
///
pub async fn shutdown(self, timeout: Duration) {
let mut lock = self.js.lock().await;
let _ = tokio::time::timeout(timeout, lock.shutdown()).await;
}

///
/// Create a new managed lease with the given time-to-live (TTL) and keepalive interval.
///
/// Managed lease have automatic keep alive mechanism that will keep the lease alive until it is dropped.
///
/// The ttl must be at least two (2) seconds.
///
/// Keepalive interval is optional, if not provided it will be half of the ttl.
///
pub async fn new_lease(
&self,
ttl: Duration,
Expand Down
15 changes: 15 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
///
/// Provides an API over "managed" lease
///
pub mod lease;

///
/// Provides an API over "managed" lock
///
pub mod lock;

///
/// Utility function to manage various transient errors.
pub mod retry;

///
/// Robust API for watching etcd changes
pub mod watcher;

///
/// Alias for etcd revision
pub type Revision = i64;
188 changes: 129 additions & 59 deletions src/lock.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
///
/// This module provides a lock manager to create "managed" locks.
///
/// Managed lock's lifecycle is managed by the lock manager background thread, that includes:
/// - Automatic lease refresh
/// - Lock revocation when the lock is dropped
///
/// You can clone [`LockManager`] to share it across threads, it is really cheap to do so.
///
/// See [`spawn_lock_manager`] to create a new lock manager.
///
/// # Examples
///
/// ```
/// use etcd_client::Client;
/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, ManagedLock};
///
/// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd");
///
/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone());
///
/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone());
///
/// // Do something with the lock manager
///
/// let managed_lock: ManagedLock = lock_man.try_lock("test").await.expect("failed to lock");
///
///
/// drop(lock_man);
///
/// // Wait for the lock manager background thread to stop
/// lock_man_handle.await.expect("failed to release lock manager handle");
/// ```
///
use {
super::{
lease::{ManagedLease, ManagedLeaseFactory},
Expand Down Expand Up @@ -31,6 +65,17 @@ enum DeleteQueueCommand {
Delete(Vec<u8>),
}

///
/// A lock manager to create "managed" locks.
///
/// Managed lock's lifecycle is managed by the lock manager background thread, that includes:
/// - Automatic lease refresh
/// - Lock revocation when the lock is dropped
///
/// You can clone [`LockManager`] to share it across threads, it is really cheap to do so.
///
/// See [`spawn_lock_manager`] to create a new lock manager.
///
#[derive(Clone)]
pub struct LockManager {
etcd: etcd_client::Client,
Expand All @@ -42,6 +87,10 @@ pub struct LockManager {
lock_manager_handle_entangled_tx: mpsc::UnboundedSender<()>,
}

///
/// Handle to the lock manager background thread.
///
/// See [`spawn_lock_manager`] to create a new lock manager.
pub struct LockManagerHandle {
inner: JoinHandle<()>,
}
Expand All @@ -52,6 +101,45 @@ impl Future for LockManagerHandle {
self.inner.poll_unpin(cx)
}
}

///
/// Used to notify when a lock is revoked.
///
/// Examples
///
/// ```
///
/// use etcd_client::Client;
/// use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, ManagedLock};
///
/// let etcd = Client::connect(["http://localhost:2379"], None).await.expect("failed to connect to etcd");
///
/// let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone());
///
/// let (lock_man_handle, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory.clone());
///
/// // Do something with the lock manager
///
/// let managed_lock: ManagedLock = lock_man.try_lock("test").await.expect("failed to lock");
///
/// let revoke_notify = managed_lock.get_revoke_notify();
///
/// // Can be cloned
/// let revoke_notify2 = revoke_notify.clone();
///
/// // Can create multiple instances..
///
/// let revoke_notify3 = managed_lock.get_revoke_notify();
///
/// etcd.delete("test", None).await.expect("failed to delete");
///
/// revoke_notify.wait_for_revoke().await;
/// revoke_notify2.wait_for_revoke().await;
/// revoke_notify3.wait_for_revoke().await;
///
/// println!("All revoke notify received");
/// ```
///
pub struct ManagedLockRevokeNotify {
watch_lock_delete: broadcast::Receiver<Revision>,
}
Expand All @@ -64,20 +152,10 @@ impl Clone for ManagedLockRevokeNotify {
}
}

#[derive(Debug, thiserror::Error)]
pub enum LockDeleteCallbackError {
NotifierDropped(String),
}

impl fmt::Display for LockDeleteCallbackError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LockDeleteCallbackError::NotifierDropped(e) => write!(f, "notifier dropped: {e}"),
}
}
}

impl ManagedLockRevokeNotify {
///
/// Wait for the lock to be revoked.
///
pub async fn wait_for_revoke(mut self) {
let _ = self.watch_lock_delete.recv().await;
}
Expand Down Expand Up @@ -223,40 +301,26 @@ pub fn spawn_lock_manager(
)
}

///
/// Error that can occur when trying to lock a key.
///
#[derive(Debug, thiserror::Error)]
pub enum LockingError {
#[error("Invalid lock name, too many prefix overlapping")]
InvalidLockName,
}

impl fmt::Display for LockingError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LockingError::InvalidLockName => {
f.write_str("invalid lock name -- too many prefix overlapping")
}
}
}
}

impl LockManager {
///
/// Tries to lock a key with automatic lease refresh and lock revocation when dropped.
///
/// If the key is already held by another lock, it will return an error immediately.
///
pub async fn try_lock<S>(
&self,
name: S,
lease_duration: Duration,
) -> Result<ManagedLock, TryLockError>
where
S: AsRef<str>,
{
self.try_lock_return_revoke_notify(name, lease_duration)
.await
.map(|(lock, _)| lock)
}

pub async fn try_lock_return_revoke_notify<S>(
&self,
name: S,
lease_duration: Duration,
) -> Result<(ManagedLock, ManagedLockRevokeNotify), TryLockError>
where
S: AsRef<str>,
{
Expand Down Expand Up @@ -308,19 +372,14 @@ impl LockManager {
.etcd
.watch_client()
.watch_key_delete(lock_key.clone(), revision);
Ok((
ManagedLock {
lock_key,
managed_lease,
etcd: self.etcd.clone(),
created_at_revision: revision,
delete_signal_tx: self.delete_queue_tx.clone(),
revoke_callback_rx: watch_lock_delete.subscribe(),
},
ManagedLockRevokeNotify {
watch_lock_delete: watch_lock_delete.subscribe(),
},
))
Ok(ManagedLock {
lock_key,
managed_lease,
etcd: self.etcd.clone(),
created_at_revision: revision,
delete_signal_tx: self.delete_queue_tx.clone(),
revoke_callback_rx: watch_lock_delete.subscribe(),
})
}

///
Expand Down Expand Up @@ -426,6 +485,9 @@ impl Drop for ManagedLock {
}
}

///
/// Error that can occur when using a managed lock.
///
#[derive(Debug)]
pub enum LockError {
LockRevoked,
Expand All @@ -444,6 +506,9 @@ impl ManagedLock {
self.managed_lease.lease_id
}

///
/// Execute an etcd transaction if and only if the lock is still alive.
///
pub async fn txn(&self, operations: impl Into<Vec<TxnOp>>) -> TxnResponse {
let txn = Txn::new()
.when(vec![Compare::version(
Expand All @@ -458,12 +523,18 @@ impl ManagedLock {
.expect("failed txn")
}

///
/// Get a revoke notify handle to be notified when the lock is revoked.
///
pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify {
ManagedLockRevokeNotify {
watch_lock_delete: self.revoke_callback_rx.resubscribe(),
}
}

///
/// Check if the lock is still alive.
///
pub async fn is_alive(&self) -> bool {
let get_response = self
.etcd
Expand All @@ -474,6 +545,9 @@ impl ManagedLock {
get_response.count() == 1
}

///
/// Get the underlying unique lock key.
///
pub fn get_key(&self) -> Vec<u8> {
self.lock_key.clone()
}
Expand Down Expand Up @@ -552,19 +626,15 @@ impl ManagedLock {
}
}

///
/// Error that can occur when trying to lock a key.
///
#[derive(Debug, PartialEq, Eq, Error)]
pub enum TryLockError {
#[error("Invalid lock name, too many prefix overlapping")]
InvalidLockName,
#[error("Already taken")]
AlreadyTaken,
#[error("Locking deadline exceeded")]
LockingDeadlineExceeded,
}

impl fmt::Display for TryLockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidLockName => f.write_str("InvalidLockName"),
Self::AlreadyTaken => f.write_str("AlreadyTaken"),
Self::LockingDeadlineExceeded => f.write_str("LockingDeadlineExceeded"),
}
}
}
Loading

0 comments on commit f13105d

Please sign in to comment.