From 3d0e5c892a63e2aa3698fd0323ba10a6388bdaf7 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Fri, 15 Sep 2023 11:40:49 +0200 Subject: [PATCH] Fix CAS for pause and resume Signed-off-by: Heinz N. Gies --- src/connectors/utils/quiescence.rs | 48 ++++++++++++++++++------------ 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/connectors/utils/quiescence.rs b/src/connectors/utils/quiescence.rs index 88d1159ee0..648c43a5d7 100644 --- a/src/connectors/utils/quiescence.rs +++ b/src/connectors/utils/quiescence.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::errors::Result; +use crate::errors::{Error, Result}; use event_listener::Event; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -90,15 +90,20 @@ impl QuiescenceBeacon { /// pause both reading and writing pub fn pause(&mut self) -> Result<()> { - self.0 - .state - .compare_exchange( - Inner::RUNNING, - Inner::PAUSED, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .map_err(|_| "failed to pause")?; + match self.0.state.compare_exchange( + Inner::RUNNING, + Inner::PAUSED, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) | Err(Inner::PAUSED | Inner::RUNNING | Inner::STOP_ALL | Inner::STOP_READING) => { + Ok(()) + } + // TODO: should we error on those? + // Err(Inner::STOP_ALL) => Err(Error::from("failed to pause from state STOP_ALL")), + // Err(Inner::STOP_READING) => Err(Error::from("failed to pause from state STOP_READING")), + Err(e) => Err(Error::from(format!("Invalid state {e}"))), + }?; Ok(()) } @@ -106,15 +111,20 @@ impl QuiescenceBeacon { /// /// Has no effect if not currently paused. pub fn resume(&mut self) -> Result<()> { - self.0 - .state - .compare_exchange( - Inner::PAUSED, - Inner::RUNNING, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .map_err(|_| "failed to resume")?; + match self.0.state.compare_exchange( + Inner::PAUSED, + Inner::RUNNING, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) | Err(Inner::PAUSED | Inner::RUNNING | Inner::STOP_ALL | Inner::STOP_READING) => { + Ok(()) + } + // TODO: should we error on those? + // Err(Inner::STOP_READING) => Err(Error::from("Can't resume from STOP_READING")), + // Err(Inner::STOP_ALL) => Err(Error::from("Can't resume from STOP_ALL")), + Err(e) => Err(Error::from(format!("Invalid state {e}"))), + }?; self.0.resume_event.notify(Self::MAX_LISTENERS); // we might have been paused, so notify here Ok(())