From 74c63c624c1821ddd411d8d49bf6c081ba956696 Mon Sep 17 00:00:00 2001 From: Louis-Vincent Date: Thu, 21 Nov 2024 16:26:56 -0500 Subject: [PATCH] added lock method in LockManager --- src/lock.rs | 103 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_lock.rs | 31 +++++++++++++- 2 files changed, 133 insertions(+), 1 deletion(-) diff --git a/src/lock.rs b/src/lock.rs index bb12748..2d51d19 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -221,6 +221,21 @@ pub fn spawn_lock_manager( ) } +#[derive(Debug, thiserror::Error)] +pub enum LockingError { + InvalidLockName, +} + +impl fmt::Display for LockingError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LockingError::InvalidLockName => { + f.write_str("invalid lock name -- too many prefix overlapping") + } + } + } +} + impl LockManager { pub async fn try_lock( &self, @@ -303,6 +318,88 @@ impl LockManager { ManagedLockRevokeNotify { watch_lock_delete }, )) } + + /// + /// Locks a key with automatic lease refresh and lock revocation when dropped. + /// If the key is already held by another lock, it will wait until the lock is released. + /// + /// Be aware this method can await indefinitely if the key is never released. + pub async fn lock( + &self, + name: S, + lease_duration: Duration, + ) -> Result, LockingError> + where + S: AsRef, + { + let name = name.as_ref(); + if self.delete_queue_tx.is_closed() { + panic!("LockManager lifecycle thread is stopped."); + } + let gopts = GetOptions::new().with_prefix(); + trace!("Trying to lock {name}..."); + let result = retry_etcd( + self.etcd.clone(), + (name.to_string(), gopts), + move |etcd, (name, gopts)| async move { etcd.kv_client().get(name, Some(gopts)).await }, + ) + .await; + + let get_response = match result { + Ok(get_response) => get_response, + Err(e) => { + tracing::error!("failed to communicate with etcd: {e}"); + return Ok(None); + } + }; + + if get_response.count() > 1 { + return Err(LockingError::InvalidLockName); + } + + let managed_lease = self + .manager_lease_factory + .new_lease(lease_duration, None) + .await; + let lease_id = managed_lease.lease_id; + + let result = retry_etcd( + self.etcd.clone(), + (name.to_string(), LockOptions::new().with_lease(lease_id)), + |mut etcd, (name, opts)| async move { etcd.lock(name, Some(opts)).await }, + ) + .await; + + let lock_response = if let Ok(lock_response) = result { + lock_response + } else { + return Ok(None); + }; + + let (revision, lock_key) = ( + lock_response + .header() + .expect("empty header for etcd lock") + .revision(), + lock_response.key().to_vec(), + ); + + let watch_lock_delete = self + .etcd + .watch_client() + .watch_key_delete(lock_key.clone(), revision); + + let managed_lock = ManagedLock { + lock_key, + managed_lease, + etcd: self.etcd.clone(), + created_at_revision: revision, + delete_signal_tx: self.delete_queue_tx.clone(), + revoke_callback_tx: watch_lock_delete.clone(), + }; + + Ok(Some(managed_lock)) + } } /// @@ -357,6 +454,12 @@ impl ManagedLock { .expect("failed txn") } + pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify { + ManagedLockRevokeNotify { + watch_lock_delete: self.revoke_callback_tx.clone(), + } + } + pub async fn is_alive(&self) -> bool { let get_response = self .etcd diff --git a/tests/test_lock.rs b/tests/test_lock.rs index e080d1e..bf3cc77 100644 --- a/tests/test_lock.rs +++ b/tests/test_lock.rs @@ -1,7 +1,10 @@ use std::time::Duration; use common::random_str; -use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, lock::TryLockError}; +use rust_etcd_utils::{ + lease::ManagedLeaseFactory, + lock::{spawn_lock_manager, TryLockError}, +}; mod common; #[tokio::test] @@ -115,3 +118,29 @@ async fn test_managed_lock_scope() { // If the callback rx received a msg it means the scope didn't cancel the future as it should. assert!(result.is_err()); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_lock() { + let etcd = common::get_etcd_client().await; + let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone()); + let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory); + let lock_name = random_str(10); + + let lock = lock_man + .lock(&lock_name, Duration::from_secs(2)) + .await + .expect("failed to lock 1") + .expect("etcd conn failed"); + + let _ = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(6)).await; + println!("will revoke lock"); + drop(lock) + }); + + lock_man + .lock(lock_name, Duration::from_secs(2)) + .await + .expect("failed to lock 2") + .expect("etcd conn failed"); +}