From 5ad95ffa3d00ad42e6cff7a5014a827ab68d9ed5 Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 26 Nov 2024 11:19:53 +0800 Subject: [PATCH] feat: support repeated with start datetime --- examples/repeated_with_start_at.rs | 53 +++++++++++++++++ src/error.rs | 1 + src/job/builder.rs | 95 ++++++++++++++++++++++++++---- src/job/runner.rs | 20 +++++++ src/job_scheduler.rs | 71 +++++++++++++++++----- src/scheduler.rs | 32 ++++++---- 6 files changed, 232 insertions(+), 40 deletions(-) create mode 100644 examples/repeated_with_start_at.rs diff --git a/examples/repeated_with_start_at.rs b/examples/repeated_with_start_at.rs new file mode 100644 index 0000000..8e64fe1 --- /dev/null +++ b/examples/repeated_with_start_at.rs @@ -0,0 +1,53 @@ +use chrono::{DateTime, Local, Utc}; +use std::time::Duration; +use tokio_cron_scheduler::{JobBuilder, JobScheduler}; + +#[tokio::main] +async fn main() { + println!("start at: {}", Local::now().to_rfc3339().to_string()); + + // 创建 scheduler + let mut sched = JobScheduler::new().await.unwrap(); + + let datetime = DateTime::parse_from_rfc3339("2024-11-24T00:00:00+08:00") + .unwrap() + .with_timezone(&Utc); + const INTERVAL: u64 = 5; + + // 创建 job + let job = JobBuilder::new() + // 设置 job 的时区为 Utc + .with_timezone(Utc) + // 设置 job 的任务类型为 repeated + .with_repeated_job_type() + // 设置 job 的类型为 repeated + .every_seconds(INTERVAL) + // 设置 job 的开始时间 + .start_at(datetime) + // TODO:设置 job 的任务重叠策略 + // 设置 job 的异步执行函数 + .with_run_async(Box::new(|uuid, mut l| { + Box::pin(async move { + println!(">>> {}", Local::now().to_rfc3339().to_string()); + tokio::time::sleep(Duration::from_secs(INTERVAL + 2)).await; + println!("<<< {}", Local::now().to_rfc3339().to_string()); + }) + })) + .build() + .unwrap(); + + // 将 job 添加到 scheduler + sched.add(job).await.unwrap(); + + // 启动 scheduler + if let Err(err) = sched.start().await { + eprintln!("failed to start scheduler, cause: {:?}", err); + } + + // 等待 + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + + // 停止 scheduler + sched.shutdown().await.unwrap(); + println!("stop at: {}", Local::now().to_rfc3339().to_string()); +} diff --git a/src/error.rs b/src/error.rs index 7bbf7a1..fe5f971 100644 --- a/src/error.rs +++ b/src/error.rs @@ -36,6 +36,7 @@ pub enum JobSchedulerError { NatsCouldNotConnect(String), #[cfg(feature = "nats_storage")] NatsCouldNotCreateKvStore(String), + DurationNotSet, } impl Display for JobSchedulerError { diff --git a/src/job/builder.rs b/src/job/builder.rs index b64d952..15e667e 100644 --- a/src/job/builder.rs +++ b/src/job/builder.rs @@ -7,17 +7,20 @@ pub use crate::job::job_data::{JobStoredData, JobType, Uuid}; use crate::job::job_data_prost; #[cfg(feature = "has_bytes")] pub use crate::job::job_data_prost::{JobStoredData, JobType, Uuid}; +use crate::job::non_cron_job::NonCronJob; use crate::job::{nop, nop_async, JobLocked}; use crate::{JobSchedulerError, JobToRun, JobToRunAsync}; -use chrono::{Offset, TimeZone, Utc}; +use chrono::{DateTime, Offset, TimeZone, Utc}; use core::time::Duration; use croner::Cron; use std::sync::{Arc, RwLock}; use std::time::Instant; - use uuid::Uuid as UuidUuid; -pub struct JobBuilder { +pub struct JobBuilder +where + T: TimeZone, +{ pub job_id: Option, pub timezone: Option, pub job_type: Option, @@ -27,6 +30,7 @@ pub struct JobBuilder { pub duration: Option, pub repeating: Option, pub instant: Option, + pub start_at: Option>, } impl JobBuilder { @@ -41,6 +45,7 @@ impl JobBuilder { duration: None, repeating: None, instant: None, + start_at: None, } } } @@ -57,6 +62,14 @@ impl JobBuilder { duration: self.duration, repeating: self.repeating, instant: self.instant, + start_at: None, + } + } + + pub fn start_at(self, dt: DateTime) -> Self { + Self { + start_at: Some(dt), + ..self } } @@ -159,6 +172,14 @@ impl JobBuilder { } let async_job = run_async.is_some(); + let time_offset_seconds = if let Some(tz) = self.timezone.as_ref() { + tz.offset_from_utc_datetime(&Utc::now().naive_local()) + .fix() + .local_minus_utc() + } else { + 0 + }; + match job_type { JobType::Cron => { if self.schedule.is_none() { @@ -166,14 +187,6 @@ impl JobBuilder { } let schedule = self.schedule.unwrap(); - let time_offset_seconds = if let Some(tz) = self.timezone.as_ref() { - tz.offset_from_utc_datetime(&Utc::now().naive_local()) - .fix() - .local_minus_utc() - } else { - 0 - }; - Ok(JobLocked(Arc::new(RwLock::new(Box::new(CronJob { data: JobStoredData { id: self.job_id.or(Some(UuidUuid::new_v4().into())), @@ -213,8 +226,66 @@ impl JobBuilder { async_job, }))))) } - JobType::Repeated => Err(JobSchedulerError::NoNextTick), + JobType::Repeated => { + if self.duration.is_none() { + return Err(JobSchedulerError::DurationNotSet); + } + let duration = self.duration.unwrap(); + + let next_tick = match self.start_at { + Some(start_at) => { + // 如果当前时间已经超过了 start_at 时间,则直接执行 + if Utc::now() > start_at { + Utc::now().timestamp() as u64 + } else { + start_at.timestamp() as u64 + } + } + None => now_plus_duration(&duration), + }; + + let job = NonCronJob { + data: JobStoredData { + id: self.job_id.or(Some(UuidUuid::new_v4().into())), + last_updated: None, + last_tick: None, + next_tick, + job_type: JobType::Repeated.into(), + count: 0, + extra: vec![], + ran: false, + stopped: false, + #[cfg(feature = "has_bytes")] + job: Some(job_data_prost::job_stored_data::Job::NonCronJob( + job_data_prost::NonCronJob { + repeating: true, + repeated_every: duration.as_secs(), + }, + )), + #[cfg(not(feature = "has_bytes"))] + job: Some(job_data::job_stored_data::Job::NonCronJob( + job_data::NonCronJob { + repeating: true, + repeated_every: duration.as_secs(), + }, + )), + time_offset_seconds: 0, + }, + run: run.unwrap_or(Box::new(nop)), + run_async: run_async.unwrap_or(Box::new(nop_async)), + async_job, + }; + + Ok(JobLocked(Arc::new(RwLock::new(Box::new(job))))) + } JobType::OneShot => Err(JobSchedulerError::NoNextTick), } } } + +fn now_plus_duration(duration: &Duration) -> u64 { + Utc::now() + .checked_add_signed(chrono::Duration::seconds(duration.as_secs() as i64)) + .map(|t| t.timestamp() as u64) + .unwrap_or(0) +} diff --git a/src/job/runner.rs b/src/job/runner.rs index 2bef567..f60f9ca 100644 --- a/src/job/runner.rs +++ b/src/job/runner.rs @@ -43,6 +43,19 @@ impl JobRunner { } }); } + + match job_scheduler.is_running(uuid).await { + Ok(is_running) => { + if is_running { + continue; + } + } + Err(err) => { + error!("Error checking if job is running {:?}", err); + continue; + } + } + let mut w = job_code.write().await; let code = w.get(uuid).await; match code { @@ -50,8 +63,15 @@ impl JobRunner { let mut job = job.write().await; let v = (job)(uuid, job_scheduler.clone()); let tx = tx_notify.clone(); + let job_cloned = job_scheduler.clone(); tokio::spawn(async move { + if let Err(e) = job_cloned.set_ran(uuid, true).await { + error!("Error setting job ran {:?}", e); + } v.await; + if let Err(e) = job_cloned.set_ran(uuid, false).await { + error!("Error setting job not ran {:?}", e); + } if let Err(e) = tx.send((uuid, JobState::Done)) { error!("Error sending spawned task {:?}", e); } diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index c252872..cfd7e31 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -19,8 +19,7 @@ use tokio::sync::RwLock; use tracing::{error, info}; use uuid::Uuid; -pub type ShutdownNotification = - dyn FnMut() -> Pin + Send>> + Send + Sync; +pub type ShutdownNotification = Pin + Send + Sync + 'static>>; /// The JobScheduler contains and executes the scheduled jobs. pub struct JobsSchedulerLocked { @@ -33,7 +32,7 @@ pub struct JobsSchedulerLocked { pub notification_deleter: Arc>, pub notification_runner: Arc>, pub scheduler: Arc>, - pub shutdown_notifier: Option>>>, + pub shutdown_notifier: Arc>>, } impl Clone for JobsSchedulerLocked { @@ -209,7 +208,7 @@ impl JobsSchedulerLocked { notification_deleter: Arc::new(Default::default()), notification_runner: Arc::new(Default::default()), scheduler: Arc::new(Default::default()), - shutdown_notifier: None, + shutdown_notifier: Default::default(), }; Ok(val) @@ -249,7 +248,7 @@ impl JobsSchedulerLocked { notification_deleter: Arc::new(Default::default()), notification_runner: Arc::new(Default::default()), scheduler: Arc::new(Default::default()), - shutdown_notifier: None, + shutdown_notifier: Default::default(), }; Ok(val) @@ -302,6 +301,49 @@ impl JobsSchedulerLocked { JobDeleter::remove(&context, to_be_removed).await } + pub async fn is_running(&self, job_id: Uuid) -> Result { + if !self.inited().await { + let mut s = self.clone(); + s.init().await?; + } + + let mut metadata = self.context.metadata_storage.write().await; + let jm = metadata.get(job_id).await?; + match jm { + Some(job_metadata) => { + if job_metadata.stopped { + return Ok(false); + } + if job_metadata.ran { + return Ok(true); + } + Ok(false) + } + _ => Ok(true), + } + } + + pub(crate) async fn set_ran(&self, job_id: Uuid, ran: bool) -> Result<(), JobSchedulerError> { + if !self.inited().await { + let mut s = self.clone(); + s.init().await?; + } + + let mut metadata = self.context.metadata_storage.write().await; + let jm = metadata.get(job_id).await?; + match jm { + Some(mut job_metadata) => { + job_metadata + .last_updated + .replace(Utc::now().timestamp() as u64); + job_metadata.ran = ran; + metadata.add_or_update(job_metadata).await?; + } + _ => {} + } + Ok(()) + } + /// The `start` spawns a Tokio task where it loops. Every 500ms it /// runs the tick method to increment any pending jobs. /// @@ -381,15 +423,11 @@ impl JobsSchedulerLocked { /// /// Shut the scheduler down pub async fn shutdown(&mut self) -> Result<(), JobSchedulerError> { - let mut notify = None; - std::mem::swap(&mut self.shutdown_notifier, &mut notify); - let mut scheduler = self.scheduler.write().await; scheduler.shutdown().await; - + let notify = { self.shutdown_notifier.write().await.take() }; if let Some(notify) = notify { - let mut notify = notify.write().await; - notify().await; + notify.await; } Ok(()) } @@ -428,14 +466,17 @@ impl JobsSchedulerLocked { /// /// Code that is run after the shutdown was run - pub fn set_shutdown_handler(&mut self, job: Box) { - self.shutdown_notifier = Some(Arc::new(RwLock::new(job))); + pub async fn set_shutdown_handler( + &mut self, + job: impl Future + Send + Sync + 'static, + ) { + self.shutdown_notifier.write().await.replace(Box::pin(job)); } /// /// Remove the shutdown handler - pub fn remove_shutdown_handler(&mut self) { - self.shutdown_notifier = None; + pub async fn remove_shutdown_handler(&mut self) { + self.shutdown_notifier.write().await.take(); } /// diff --git a/src/scheduler.rs b/src/scheduler.rs index 4d2bc6c..f1664ae 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -155,20 +155,26 @@ impl Scheduler { for uuid in must_runs { { - let tx = notify_tx.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send((uuid, JobState::Scheduled)) { - error!("Error sending notification activation {:?}", e); + if let Ok(Some(data)) = metadata_storage.write().await.get(uuid).await { + if !data.ran { + { + let tx = notify_tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send((uuid, JobState::Scheduled)) { + error!("Error sending notification activation {:?}", e); + } + }); + } + { + let tx = job_activation_tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(uuid) { + error!("Error sending job activation tx {:?}", e); + } + }); + } } - }); - } - { - let tx = job_activation_tx.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(uuid) { - error!("Error sending job activation tx {:?}", e); - } - }); + } } let storage = metadata_storage.clone();