Skip to content

Commit

Permalink
Merge pull request #5 from jgardona/bugfix/block-with-zero-counter
Browse files Browse the repository at this point in the history
Fix: wg.wait() blocks indefinitely if there are no jobs #4
  • Loading branch information
jgardona authored Dec 13, 2023
2 parents 3db7040 + 3f65903 commit 8e589b1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rpools"
version = "0.3.0"
version = "0.3.1"
authors = ["jcbritobr <[email protected]>"]
edition = "2018"
repository = "https://github.com/jgardona/rpools"
Expand Down
36 changes: 23 additions & 13 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
//! ## Sync
//!
//!
//! This module has data structures used to synchronize
//! threads. WaitGroup is used to make a thread to wait
//! others.
//!
//!
//! ### Examples
//! ```
//! use rpools::pool::WorkerPool;
//! use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex};
//! use rpools::sync::WaitGroup;
//!
//!
//! let njobs = 20;
//! let nworkers = 3;
//! let pool = WorkerPool::new(nworkers);
//! let atomic = Arc::new(AtomicUsize::new(0));
//! let wg = WaitGroup::default();
//!
//!
//! // send the jobs to the pool
//! for _ in 0..njobs {
//! let wg = wg.clone();
Expand All @@ -25,7 +25,7 @@
//! drop(wg);
//! });
//! }
//!
//!
//! // wait for the pool finnishes
//! wg.wait();
//! assert_eq!(njobs, atomic.load(Ordering::Relaxed));
Expand All @@ -36,7 +36,6 @@ use std::sync::{
Arc, Condvar, Mutex,
};


/// A data struct to store a counter, a mutex and a condvar.
/// It is responsible and serves as semaphore to synchronize threads.
#[derive(Default)]
Expand All @@ -56,16 +55,16 @@ impl WaitGroup {
/// Blocks the current thread and waits until counter becomes 0. If
/// counter is 0, start processing again.
pub fn wait(&self) {
let mut started = self.0.mu.lock().expect("Cant get the lock");
while !*started {
started = self
let mut mutex = self.0.mu.lock().expect("Cant get the lock");
loop {
if self.0.counter.load(Ordering::Relaxed) == 0 {
break;
}
mutex = self
.0
.condvar
.wait(started)
.wait(mutex)
.expect("Cant block the current thread");
if self.0.counter.load(Ordering::Relaxed) == 0 {
*started = true;
}
}
}
}
Expand All @@ -89,3 +88,14 @@ impl Drop for WaitGroup {
self.0.condvar.notify_one();
}
}

#[cfg(test)]
mod mod_wait_group_tests {
use super::WaitGroup;

#[test]
fn test_if_zero_count_must_not_block() {
let wg = WaitGroup::default();
wg.wait();
}
}

0 comments on commit 8e589b1

Please sign in to comment.