Skip to content

Commit

Permalink
add to auto-scheduled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Aug 2, 2023
1 parent 4c372f1 commit 3875bba
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 29 deletions.
48 changes: 20 additions & 28 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,27 @@ impl Queue {
}

pub async fn schedule_recurring_tasks_if_needed(&self) -> Result<(), DbErr> {
let tx = self.db.begin().await?;

let session_cleanup_jobs = Entity::find()
.filter(all![
Expr::cust_with_expr("job->>'type' = $1", "SessionCleanup"),
Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
])
.count(&tx)
.await?;

if session_cleanup_jobs == 0 {
Job::from(SessionCleanup).insert(&tx).await?;
}
tx.commit().await?;

let tx = self.db.begin().await?;
let queue_cleanup_jobs = Entity::find()
.filter(all![
Expr::cust_with_expr("job->>'type' = $1", "QueueCleanup"),
Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
])
.count(&tx)
.await?;

if queue_cleanup_jobs == 0 {
Job::from(QueueCleanup).insert(&tx).await?;
let schedulable_jobs = [
(Job::from(SessionCleanup), "SessionCleanup"),
(Job::from(QueueCleanup), "QueueCleanup"),
(Job::from(TaskSync), "TaskSync"),
];

for (job, name) in schedulable_jobs {
let tx = self.db.begin().await?;
let existing_jobs = Entity::find()
.filter(all![
Expr::cust_with_expr("job->>'type' = $1", name),
Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
])
.count(&tx)
.await?;

if existing_jobs == 0 {
job.insert(&tx).await?;
tx.commit().await?;
}
}
tx.commit().await?;

Ok(())
}

Expand Down
4 changes: 3 additions & 1 deletion src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use trillium_client::Client;
use url::Url;

mod v1;
pub use v1::{CreateUser, QueueCleanup, ResetPassword, SendInvitationEmail, SessionCleanup, V1};
pub use v1::{
CreateUser, QueueCleanup, ResetPassword, SendInvitationEmail, SessionCleanup, TaskSync, V1,
};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(tag = "version")]
Expand Down

0 comments on commit 3875bba

Please sign in to comment.