Skip to content

Commit

Permalink
fixed notify revoke
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed Nov 21, 2024
1 parent 74c63c6 commit 26462df
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rust-etcd-utils"
version = "0.2.0"
version = "0.3.0"
edition = "2021"

[dependencies]
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ Utility library for common ETCD management in Rust, it covers:

## How to test

Uses `compose.yaml` to launch and instance of `etcd` with port-fowarding over the port 2379, so `localhost:2379` redirects to `etcd` instance inside docker.
Uses `compose.yaml` to launch and instance of `etcd` with port-fowarding over the port 2379, so `localhost:2379` redirects to `etcd` instance inside docker.

Then run the following command

```sh
$ cargo test --test -- --nocapture
```
34 changes: 19 additions & 15 deletions src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ impl Future for LockManagerHandle {
self.inner.poll_unpin(cx)
}
}

#[derive(Clone)]
pub struct ManagedLockRevokeNotify {
watch_lock_delete: broadcast::Sender<Revision>,
watch_lock_delete: broadcast::Receiver<Revision>,
}

impl Clone for ManagedLockRevokeNotify {
fn clone(&self) -> Self {
Self {
watch_lock_delete: self.watch_lock_delete.resubscribe(),
}
}
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -72,12 +78,8 @@ impl fmt::Display for LockDeleteCallbackError {
}

impl ManagedLockRevokeNotify {
pub async fn wait_for_revoke(self) -> Result<Revision, LockDeleteCallbackError> {
self.watch_lock_delete
.subscribe()
.recv()
.await
.map_err(|e| LockDeleteCallbackError::NotifierDropped(e.to_string()))
pub async fn wait_for_revoke(mut self) {
let _ = self.watch_lock_delete.recv().await;
}
}

Expand Down Expand Up @@ -313,9 +315,11 @@ impl LockManager {
etcd: self.etcd.clone(),
created_at_revision: revision,
delete_signal_tx: self.delete_queue_tx.clone(),
revoke_callback_tx: watch_lock_delete.clone(),
revoke_callback_rx: watch_lock_delete.subscribe(),
},
ManagedLockRevokeNotify {
watch_lock_delete: watch_lock_delete.subscribe(),
},
ManagedLockRevokeNotify { watch_lock_delete },
))
}

Expand Down Expand Up @@ -395,7 +399,7 @@ impl LockManager {
etcd: self.etcd.clone(),
created_at_revision: revision,
delete_signal_tx: self.delete_queue_tx.clone(),
revoke_callback_tx: watch_lock_delete.clone(),
revoke_callback_rx: watch_lock_delete.subscribe(),
};

Ok(Some(managed_lock))
Expand All @@ -411,7 +415,7 @@ pub struct ManagedLock {
pub created_at_revision: Revision,
etcd: etcd_client::Client,
delete_signal_tx: tokio::sync::mpsc::UnboundedSender<DeleteQueueCommand>,
revoke_callback_tx: broadcast::Sender<Revision>,
revoke_callback_rx: broadcast::Receiver<Revision>,
}

impl Drop for ManagedLock {
Expand Down Expand Up @@ -456,7 +460,7 @@ impl ManagedLock {

pub fn get_revoke_notify(&self) -> ManagedLockRevokeNotify {
ManagedLockRevokeNotify {
watch_lock_delete: self.revoke_callback_tx.clone(),
watch_lock_delete: self.revoke_callback_rx.resubscribe(),
}
}

Expand Down Expand Up @@ -520,7 +524,7 @@ impl ManagedLock {
F: FnOnce() -> Fut,
Fut: Future<Output = T> + Send + 'static,
{
let mut rx = self.revoke_callback_tx.subscribe();
let mut rx = self.revoke_callback_rx.resubscribe();

match rx.try_recv() {
Ok(_) => {
Expand Down
50 changes: 45 additions & 5 deletions tests/test_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ async fn dropping_managed_lock_should_revoke_etcd_lock() {
.expect("failed to lock");

drop(managed_lock1);
let _ = delete_cb
.wait_for_revoke()
.await
.expect("failed to be notified of revoked lock");
let _ = delete_cb.wait_for_revoke().await;

let _managed_lock2 = lock_man
.try_lock(lock_name, Duration::from_secs(10))
Expand Down Expand Up @@ -132,7 +129,7 @@ async fn test_lock() {
.expect("failed to lock 1")
.expect("etcd conn failed");

let _ = tokio::spawn(async move {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(6)).await;
println!("will revoke lock");
drop(lock)
Expand All @@ -144,3 +141,46 @@ async fn test_lock() {
.expect("failed to lock 2")
.expect("etcd conn failed");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_managed_lock_revoke_notify_clonability() {
let etcd = common::get_etcd_client().await;
let managed_lease_factory = ManagedLeaseFactory::new(etcd.clone());
let (_, lock_man) = spawn_lock_manager(etcd.clone(), managed_lease_factory);

let lock_name = random_str(10);

let managed_lock1 = lock_man
.try_lock(lock_name, Duration::from_secs(10))
.await
.expect("failed to lock");

let (tx, rx) = tokio::sync::oneshot::channel();
let lock_key = managed_lock1.get_key();
let revoke_notify1 = managed_lock1.get_revoke_notify();
let revoke_notify2 = managed_lock1.get_revoke_notify();

let h = tokio::spawn(async move {
managed_lock1
.scope(async move {
tokio::time::sleep(Duration::from_secs(20)).await;
// This should never be reached
tx.send(()).unwrap();
})
.await
});

etcd.kv_client()
.delete(lock_key, None)
.await
.expect("failed to delete key");

let _ = h.await;
let result = rx.await;
// If the callback rx received a msg it means the scope didn't cancel the future as it should.

assert!(result.is_err());

let _ = revoke_notify1.wait_for_revoke().await;
let _ = revoke_notify2.wait_for_revoke().await;
}

0 comments on commit 26462df

Please sign in to comment.