Skip to content

Commit

Permalink
added lock method in LockManager
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed Nov 21, 2024
1 parent e36b8a1 commit 74c63c6
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 1 deletion.
103 changes: 103 additions & 0 deletions src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,21 @@ pub fn spawn_lock_manager(
)
}

#[derive(Debug, thiserror::Error)]
pub enum LockingError {
InvalidLockName,
}

impl fmt::Display for LockingError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LockingError::InvalidLockName => {
f.write_str("invalid lock name -- too many prefix overlapping")
}
}
}
}

impl LockManager {
pub async fn try_lock<S>(
&self,
Expand Down Expand Up @@ -303,6 +318,88 @@ impl LockManager {
ManagedLockRevokeNotify { watch_lock_delete },
))
}

///
/// Locks a key with automatic lease refresh and lock revocation when dropped.
/// If the key is already held by another lock, it will wait until the lock is released.
///
/// Be aware this method can await indefinitely if the key is never released.
pub async fn lock<S>(
&self,
name: S,
lease_duration: Duration,
) -> Result<Option<ManagedLock>, LockingError>
where
S: AsRef<str>,
{
let name = name.as_ref();
if self.delete_queue_tx.is_closed() {
panic!("LockManager lifecycle thread is stopped.");
}
let gopts = GetOptions::new().with_prefix();
trace!("Trying to lock {name}...");
let result = retry_etcd(
self.etcd.clone(),
(name.to_string(), gopts),
move |etcd, (name, gopts)| async move { etcd.kv_client().get(name, Some(gopts)).await },
)
.await;

let get_response = match result {
Ok(get_response) => get_response,
Err(e) => {
tracing::error!("failed to communicate with etcd: {e}");
return Ok(None);
}
};

if get_response.count() > 1 {
return Err(LockingError::InvalidLockName);
}

let managed_lease = self
.manager_lease_factory
.new_lease(lease_duration, None)
.await;
let lease_id = managed_lease.lease_id;

let result = retry_etcd(
self.etcd.clone(),
(name.to_string(), LockOptions::new().with_lease(lease_id)),
|mut etcd, (name, opts)| async move { etcd.lock(name, Some(opts)).await },
)
.await;

let lock_response = if let Ok(lock_response) = result {
lock_response
} else {
return Ok(None);
};

let (revision, lock_key) = (
lock_response
.header()
.expect("empty header for etcd lock")
.revision(),
lock_response.key().to_vec(),
);

let watch_lock_delete = self
.etcd
.watch_client()
.watch_key_delete(lock_key.clone(), revision);

let managed_lock = ManagedLock {
lock_key,
managed_lease,
etcd: self.etcd.clone(),
created_at_revision: revision,
delete_signal_tx: self.delete_queue_tx.clone(),
revoke_callback_tx: watch_lock_delete.clone(),
};

Ok(Some(managed_lock))
}
}

///
Expand Down Expand Up @@ -357,6 +454,12 @@ impl ManagedLock {
.expect("failed txn")
}

pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify {
ManagedLockRevokeNotify {
watch_lock_delete: self.revoke_callback_tx.clone(),
}
}

pub async fn is_alive(&self) -> bool {
let get_response = self
.etcd
Expand Down
31 changes: 30 additions & 1 deletion tests/test_lock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::time::Duration;

use common::random_str;
use rust_etcd_utils::{lease::ManagedLeaseFactory, lock::spawn_lock_manager, lock::TryLockError};
use rust_etcd_utils::{
lease::ManagedLeaseFactory,
lock::{spawn_lock_manager, TryLockError},
};
mod common;

#[tokio::test]
Expand Down Expand Up @@ -115,3 +118,29 @@ async fn test_managed_lock_scope() {
// If the callback rx received a msg it means the scope didn't cancel the future as it should.
assert!(result.is_err());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_lock() {
let etcd = common::get_etcd_client().await;
let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone());
let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory);
let lock_name = random_str(10);

let lock = lock_man
.lock(&lock_name, Duration::from_secs(2))
.await
.expect("failed to lock 1")
.expect("etcd conn failed");

let _ = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(6)).await;
println!("will revoke lock");
drop(lock)
});

lock_man
.lock(lock_name, Duration::from_secs(2))
.await
.expect("failed to lock 2")
.expect("etcd conn failed");
}

0 comments on commit 74c63c6

Please sign in to comment.