diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index 69b170b..cfd7e31 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -19,8 +19,7 @@ use tokio::sync::RwLock; use tracing::{error, info}; use uuid::Uuid; -pub type ShutdownNotification = - dyn FnMut() -> Pin + Send>> + Send + Sync; +pub type ShutdownNotification = Pin + Send + Sync + 'static>>; /// The JobScheduler contains and executes the scheduled jobs. pub struct JobsSchedulerLocked { @@ -33,7 +32,7 @@ pub struct JobsSchedulerLocked { pub notification_deleter: Arc>, pub notification_runner: Arc>, pub scheduler: Arc>, - pub shutdown_notifier: Option>>>, + pub shutdown_notifier: Arc>>, } impl Clone for JobsSchedulerLocked { @@ -209,7 +208,7 @@ impl JobsSchedulerLocked { notification_deleter: Arc::new(Default::default()), notification_runner: Arc::new(Default::default()), scheduler: Arc::new(Default::default()), - shutdown_notifier: None, + shutdown_notifier: Default::default(), }; Ok(val) @@ -249,7 +248,7 @@ impl JobsSchedulerLocked { notification_deleter: Arc::new(Default::default()), notification_runner: Arc::new(Default::default()), scheduler: Arc::new(Default::default()), - shutdown_notifier: None, + shutdown_notifier: Default::default(), }; Ok(val) @@ -424,15 +423,11 @@ impl JobsSchedulerLocked { /// /// Shut the scheduler down pub async fn shutdown(&mut self) -> Result<(), JobSchedulerError> { - let mut notify = None; - std::mem::swap(&mut self.shutdown_notifier, &mut notify); - let mut scheduler = self.scheduler.write().await; scheduler.shutdown().await; - + let notify = { self.shutdown_notifier.write().await.take() }; if let Some(notify) = notify { - let mut notify = notify.write().await; - notify().await; + notify.await; } Ok(()) } @@ -471,14 +466,17 @@ impl JobsSchedulerLocked { /// /// Code that is run after the shutdown was run - pub fn set_shutdown_handler(&mut self, job: Box) { - self.shutdown_notifier = Some(Arc::new(RwLock::new(job))); + pub async fn set_shutdown_handler( + &mut self, + job: impl Future + Send + Sync + 'static, + ) { + self.shutdown_notifier.write().await.replace(Box::pin(job)); } /// /// Remove the shutdown handler - pub fn remove_shutdown_handler(&mut self) { - self.shutdown_notifier = None; + pub async fn remove_shutdown_handler(&mut self) { + self.shutdown_notifier.write().await.take(); } /// diff --git a/src/scheduler.rs b/src/scheduler.rs index 4d2bc6c..f1664ae 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -155,20 +155,26 @@ impl Scheduler { for uuid in must_runs { { - let tx = notify_tx.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send((uuid, JobState::Scheduled)) { - error!("Error sending notification activation {:?}", e); + if let Ok(Some(data)) = metadata_storage.write().await.get(uuid).await { + if !data.ran { + { + let tx = notify_tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send((uuid, JobState::Scheduled)) { + error!("Error sending notification activation {:?}", e); + } + }); + } + { + let tx = job_activation_tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(uuid) { + error!("Error sending job activation tx {:?}", e); + } + }); + } } - }); - } - { - let tx = job_activation_tx.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(uuid) { - error!("Error sending job activation tx {:?}", e); - } - }); + } } let storage = metadata_storage.clone();