Skip to content

Commit

Permalink
feat: support repeated with start datetime
Browse files Browse the repository at this point in the history
  • Loading branch information
zyyang90 committed Nov 26, 2024
1 parent 6c56854 commit 5ad95ff
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 40 deletions.
53 changes: 53 additions & 0 deletions examples/repeated_with_start_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use chrono::{DateTime, Local, Utc};
use std::time::Duration;
use tokio_cron_scheduler::{JobBuilder, JobScheduler};

#[tokio::main]
async fn main() {
println!("start at: {}", Local::now().to_rfc3339().to_string());

// 创建 scheduler
let mut sched = JobScheduler::new().await.unwrap();

let datetime = DateTime::parse_from_rfc3339("2024-11-24T00:00:00+08:00")
.unwrap()
.with_timezone(&Utc);
const INTERVAL: u64 = 5;

// 创建 job
let job = JobBuilder::new()
// 设置 job 的时区为 Utc
.with_timezone(Utc)
// 设置 job 的任务类型为 repeated
.with_repeated_job_type()
// 设置 job 的类型为 repeated
.every_seconds(INTERVAL)
// 设置 job 的开始时间
.start_at(datetime)
// TODO:设置 job 的任务重叠策略
// 设置 job 的异步执行函数
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
println!(">>> {}", Local::now().to_rfc3339().to_string());
tokio::time::sleep(Duration::from_secs(INTERVAL + 2)).await;
println!("<<< {}", Local::now().to_rfc3339().to_string());
})
}))
.build()
.unwrap();

// 将 job 添加到 scheduler
sched.add(job).await.unwrap();

// 启动 scheduler
if let Err(err) = sched.start().await {
eprintln!("failed to start scheduler, cause: {:?}", err);
}

// 等待
tokio::time::sleep(std::time::Duration::from_secs(20)).await;

// 停止 scheduler
sched.shutdown().await.unwrap();
println!("stop at: {}", Local::now().to_rfc3339().to_string());
}
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum JobSchedulerError {
NatsCouldNotConnect(String),
#[cfg(feature = "nats_storage")]
NatsCouldNotCreateKvStore(String),
DurationNotSet,
}

impl Display for JobSchedulerError {
Expand Down
95 changes: 83 additions & 12 deletions src/job/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ pub use crate::job::job_data::{JobStoredData, JobType, Uuid};
use crate::job::job_data_prost;
#[cfg(feature = "has_bytes")]
pub use crate::job::job_data_prost::{JobStoredData, JobType, Uuid};
use crate::job::non_cron_job::NonCronJob;
use crate::job::{nop, nop_async, JobLocked};
use crate::{JobSchedulerError, JobToRun, JobToRunAsync};
use chrono::{Offset, TimeZone, Utc};
use chrono::{DateTime, Offset, TimeZone, Utc};
use core::time::Duration;
use croner::Cron;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use uuid::Uuid as UuidUuid;

pub struct JobBuilder<T> {
pub struct JobBuilder<T>
where
T: TimeZone,
{
pub job_id: Option<Uuid>,
pub timezone: Option<T>,
pub job_type: Option<JobType>,
Expand All @@ -27,6 +30,7 @@ pub struct JobBuilder<T> {
pub duration: Option<Duration>,
pub repeating: Option<bool>,
pub instant: Option<Instant>,
pub start_at: Option<DateTime<T>>,
}

impl JobBuilder<Utc> {
Expand All @@ -41,6 +45,7 @@ impl JobBuilder<Utc> {
duration: None,
repeating: None,
instant: None,
start_at: None,
}
}
}
Expand All @@ -57,6 +62,14 @@ impl<T: TimeZone> JobBuilder<T> {
duration: self.duration,
repeating: self.repeating,
instant: self.instant,
start_at: None,
}
}

pub fn start_at(self, dt: DateTime<T>) -> Self {
Self {
start_at: Some(dt),
..self
}
}

Expand Down Expand Up @@ -159,21 +172,21 @@ impl<T: TimeZone> JobBuilder<T> {
}
let async_job = run_async.is_some();

let time_offset_seconds = if let Some(tz) = self.timezone.as_ref() {
tz.offset_from_utc_datetime(&Utc::now().naive_local())
.fix()
.local_minus_utc()
} else {
0
};

match job_type {
JobType::Cron => {
if self.schedule.is_none() {
return Err(JobSchedulerError::ScheduleNotSet);
}
let schedule = self.schedule.unwrap();

let time_offset_seconds = if let Some(tz) = self.timezone.as_ref() {
tz.offset_from_utc_datetime(&Utc::now().naive_local())
.fix()
.local_minus_utc()
} else {
0
};

Ok(JobLocked(Arc::new(RwLock::new(Box::new(CronJob {
data: JobStoredData {
id: self.job_id.or(Some(UuidUuid::new_v4().into())),
Expand Down Expand Up @@ -213,8 +226,66 @@ impl<T: TimeZone> JobBuilder<T> {
async_job,
})))))
}
JobType::Repeated => Err(JobSchedulerError::NoNextTick),
JobType::Repeated => {
if self.duration.is_none() {
return Err(JobSchedulerError::DurationNotSet);
}
let duration = self.duration.unwrap();

let next_tick = match self.start_at {
Some(start_at) => {
// 如果当前时间已经超过了 start_at 时间,则直接执行
if Utc::now() > start_at {
Utc::now().timestamp() as u64
} else {
start_at.timestamp() as u64
}
}
None => now_plus_duration(&duration),
};

let job = NonCronJob {
data: JobStoredData {
id: self.job_id.or(Some(UuidUuid::new_v4().into())),
last_updated: None,
last_tick: None,
next_tick,
job_type: JobType::Repeated.into(),
count: 0,
extra: vec![],
ran: false,
stopped: false,
#[cfg(feature = "has_bytes")]
job: Some(job_data_prost::job_stored_data::Job::NonCronJob(
job_data_prost::NonCronJob {
repeating: true,
repeated_every: duration.as_secs(),
},
)),
#[cfg(not(feature = "has_bytes"))]
job: Some(job_data::job_stored_data::Job::NonCronJob(
job_data::NonCronJob {
repeating: true,
repeated_every: duration.as_secs(),
},
)),
time_offset_seconds: 0,
},
run: run.unwrap_or(Box::new(nop)),
run_async: run_async.unwrap_or(Box::new(nop_async)),
async_job,
};

Ok(JobLocked(Arc::new(RwLock::new(Box::new(job)))))
}
JobType::OneShot => Err(JobSchedulerError::NoNextTick),
}
}
}

fn now_plus_duration(duration: &Duration) -> u64 {
Utc::now()
.checked_add_signed(chrono::Duration::seconds(duration.as_secs() as i64))
.map(|t| t.timestamp() as u64)
.unwrap_or(0)
}
20 changes: 20 additions & 0 deletions src/job/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,35 @@ impl JobRunner {
}
});
}

match job_scheduler.is_running(uuid).await {
Ok(is_running) => {
if is_running {
continue;
}
}
Err(err) => {
error!("Error checking if job is running {:?}", err);
continue;
}
}

let mut w = job_code.write().await;
let code = w.get(uuid).await;
match code {
Ok(Some(job)) => {
let mut job = job.write().await;
let v = (job)(uuid, job_scheduler.clone());
let tx = tx_notify.clone();
let job_cloned = job_scheduler.clone();
tokio::spawn(async move {
if let Err(e) = job_cloned.set_ran(uuid, true).await {
error!("Error setting job ran {:?}", e);
}
v.await;
if let Err(e) = job_cloned.set_ran(uuid, false).await {
error!("Error setting job not ran {:?}", e);
}
if let Err(e) = tx.send((uuid, JobState::Done)) {
error!("Error sending spawned task {:?}", e);
}
Expand Down
71 changes: 56 additions & 15 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use tokio::sync::RwLock;
use tracing::{error, info};
use uuid::Uuid;

pub type ShutdownNotification =
dyn FnMut() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync;
pub type ShutdownNotification = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;

/// The JobScheduler contains and executes the scheduled jobs.
pub struct JobsSchedulerLocked {
Expand All @@ -33,7 +32,7 @@ pub struct JobsSchedulerLocked {
pub notification_deleter: Arc<RwLock<NotificationDeleter>>,
pub notification_runner: Arc<RwLock<NotificationRunner>>,
pub scheduler: Arc<RwLock<Scheduler>>,
pub shutdown_notifier: Option<Arc<RwLock<Box<ShutdownNotification>>>>,
pub shutdown_notifier: Arc<RwLock<Option<ShutdownNotification>>>,
}

impl Clone for JobsSchedulerLocked {
Expand Down Expand Up @@ -209,7 +208,7 @@ impl JobsSchedulerLocked {
notification_deleter: Arc::new(Default::default()),
notification_runner: Arc::new(Default::default()),
scheduler: Arc::new(Default::default()),
shutdown_notifier: None,
shutdown_notifier: Default::default(),
};

Ok(val)
Expand Down Expand Up @@ -249,7 +248,7 @@ impl JobsSchedulerLocked {
notification_deleter: Arc::new(Default::default()),
notification_runner: Arc::new(Default::default()),
scheduler: Arc::new(Default::default()),
shutdown_notifier: None,
shutdown_notifier: Default::default(),
};

Ok(val)
Expand Down Expand Up @@ -302,6 +301,49 @@ impl JobsSchedulerLocked {
JobDeleter::remove(&context, to_be_removed).await
}

pub async fn is_running(&self, job_id: Uuid) -> Result<bool, JobSchedulerError> {
if !self.inited().await {
let mut s = self.clone();
s.init().await?;
}

let mut metadata = self.context.metadata_storage.write().await;
let jm = metadata.get(job_id).await?;
match jm {
Some(job_metadata) => {
if job_metadata.stopped {
return Ok(false);
}
if job_metadata.ran {
return Ok(true);
}
Ok(false)
}
_ => Ok(true),
}
}

pub(crate) async fn set_ran(&self, job_id: Uuid, ran: bool) -> Result<(), JobSchedulerError> {
if !self.inited().await {
let mut s = self.clone();
s.init().await?;
}

let mut metadata = self.context.metadata_storage.write().await;
let jm = metadata.get(job_id).await?;
match jm {
Some(mut job_metadata) => {
job_metadata
.last_updated
.replace(Utc::now().timestamp() as u64);
job_metadata.ran = ran;
metadata.add_or_update(job_metadata).await?;
}
_ => {}
}
Ok(())
}

/// The `start` spawns a Tokio task where it loops. Every 500ms it
/// runs the tick method to increment any pending jobs.
///
Expand Down Expand Up @@ -381,15 +423,11 @@ impl JobsSchedulerLocked {
///
/// Shut the scheduler down
pub async fn shutdown(&mut self) -> Result<(), JobSchedulerError> {
let mut notify = None;
std::mem::swap(&mut self.shutdown_notifier, &mut notify);

let mut scheduler = self.scheduler.write().await;
scheduler.shutdown().await;

let notify = { self.shutdown_notifier.write().await.take() };
if let Some(notify) = notify {
let mut notify = notify.write().await;
notify().await;
notify.await;
}
Ok(())
}
Expand Down Expand Up @@ -428,14 +466,17 @@ impl JobsSchedulerLocked {

///
/// Code that is run after the shutdown was run
pub fn set_shutdown_handler(&mut self, job: Box<ShutdownNotification>) {
self.shutdown_notifier = Some(Arc::new(RwLock::new(job)));
pub async fn set_shutdown_handler(
&mut self,
job: impl Future<Output = ()> + Send + Sync + 'static,
) {
self.shutdown_notifier.write().await.replace(Box::pin(job));
}

///
/// Remove the shutdown handler
pub fn remove_shutdown_handler(&mut self) {
self.shutdown_notifier = None;
pub async fn remove_shutdown_handler(&mut self) {
self.shutdown_notifier.write().await.take();
}

///
Expand Down
Loading

0 comments on commit 5ad95ff

Please sign in to comment.