Skip to content

Commit

Permalink
Fix CAS for pause and resume
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Sep 18, 2023
1 parent 9eb11f5 commit 3d0e5c8
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions src/connectors/utils/quiescence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::errors::Result;
use crate::errors::{Error, Result};
use event_listener::Event;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -90,31 +90,41 @@ impl QuiescenceBeacon {

/// pause both reading and writing
pub fn pause(&mut self) -> Result<()> {
self.0
.state
.compare_exchange(
Inner::RUNNING,
Inner::PAUSED,
Ordering::AcqRel,
Ordering::Relaxed,
)
.map_err(|_| "failed to pause")?;
match self.0.state.compare_exchange(
Inner::RUNNING,
Inner::PAUSED,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) | Err(Inner::PAUSED | Inner::RUNNING | Inner::STOP_ALL | Inner::STOP_READING) => {
Ok(())
}
// TODO: should we error on those?
// Err(Inner::STOP_ALL) => Err(Error::from("failed to pause from state STOP_ALL")),
// Err(Inner::STOP_READING) => Err(Error::from("failed to pause from state STOP_READING")),
Err(e) => Err(Error::from(format!("Invalid state {e}"))),
}?;

Check warning on line 106 in src/connectors/utils/quiescence.rs

View check run for this annotation

Codecov / codecov/patch

src/connectors/utils/quiescence.rs#L105-L106

Added lines #L105 - L106 were not covered by tests
Ok(())
}

/// Resume both reading and writing.
///
/// Has no effect if not currently paused.
pub fn resume(&mut self) -> Result<()> {
self.0
.state
.compare_exchange(
Inner::PAUSED,
Inner::RUNNING,
Ordering::AcqRel,
Ordering::Relaxed,
)
.map_err(|_| "failed to resume")?;
match self.0.state.compare_exchange(
Inner::PAUSED,
Inner::RUNNING,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) | Err(Inner::PAUSED | Inner::RUNNING | Inner::STOP_ALL | Inner::STOP_READING) => {
Ok(())
}
// TODO: should we error on those?
// Err(Inner::STOP_READING) => Err(Error::from("Can't resume from STOP_READING")),
// Err(Inner::STOP_ALL) => Err(Error::from("Can't resume from STOP_ALL")),
Err(e) => Err(Error::from(format!("Invalid state {e}"))),
}?;

Check warning on line 127 in src/connectors/utils/quiescence.rs

View check run for this annotation

Codecov / codecov/patch

src/connectors/utils/quiescence.rs#L126-L127

Added lines #L126 - L127 were not covered by tests

self.0.resume_event.notify(Self::MAX_LISTENERS); // we might have been paused, so notify here
Ok(())
Expand Down

0 comments on commit 3d0e5c8

Please sign in to comment.