Skip to content

Commit

Permalink
no longer use ActiveValue::Set directly (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr authored Jun 27, 2023
1 parent b367a1f commit 4e1537b
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 58 deletions.
16 changes: 8 additions & 8 deletions src/entity/account.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sea_orm::{entity::prelude::*, ActiveValue::Set, IntoActiveModel};
use sea_orm::{entity::prelude::*, ActiveValue, IntoActiveModel};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use validator::{Validate, ValidationErrors};
Expand Down Expand Up @@ -66,11 +66,11 @@ impl NewAccount {
pub fn build(self) -> Result<ActiveModel, ValidationErrors> {
self.validate()?;
Ok(ActiveModel {
id: Set(Uuid::new_v4()),
name: Set(self.name.unwrap()),
created_at: Set(TimeDateTimeWithTimeZone::now_utc()),
updated_at: Set(TimeDateTimeWithTimeZone::now_utc()),
admin: Set(false),
id: ActiveValue::Set(Uuid::new_v4()),
name: ActiveValue::Set(self.name.unwrap()),
created_at: ActiveValue::Set(TimeDateTimeWithTimeZone::now_utc()),
updated_at: ActiveValue::Set(TimeDateTimeWithTimeZone::now_utc()),
admin: ActiveValue::Set(false),
})
}
}
Expand All @@ -85,8 +85,8 @@ impl UpdateAccount {
pub fn build(self, account: Model) -> Result<ActiveModel, ValidationErrors> {
self.validate()?;
let mut am = account.into_active_model();
am.name = Set(self.name.unwrap());
am.updated_at = Set(TimeDateTimeWithTimeZone::now_utc());
am.name = ActiveValue::Set(self.name.unwrap());
am.updated_at = ActiveValue::Set(TimeDateTimeWithTimeZone::now_utc());
Ok(am)
}
}
6 changes: 3 additions & 3 deletions src/entity/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::clients::AggregatorClient;

use super::{account, membership, url::Url};
use sea_orm::{entity::prelude::*, IntoActiveModel, Set};
use sea_orm::{entity::prelude::*, ActiveValue, IntoActiveModel};
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt::{self, Display, Formatter};
Expand Down Expand Up @@ -40,8 +40,8 @@ pub struct Model {
impl Model {
pub fn tombstone(self) -> ActiveModel {
let mut aggregator = self.into_active_model();
aggregator.updated_at = Set(OffsetDateTime::now_utc());
aggregator.deleted_at = Set(Some(OffsetDateTime::now_utc()));
aggregator.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
aggregator.deleted_at = ActiveValue::Set(Some(OffsetDateTime::now_utc()));
aggregator
}

Expand Down
6 changes: 3 additions & 3 deletions src/entity/aggregator/update_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sea_orm::{ActiveValue::Set, IntoActiveModel};
use sea_orm::{ActiveValue, IntoActiveModel};
use serde::Deserialize;
use time::OffsetDateTime;
use validator::Validate;
Expand All @@ -13,8 +13,8 @@ impl UpdateAggregator {
pub fn build(self, model: super::Model) -> Result<super::ActiveModel, crate::handler::Error> {
self.validate()?;
let mut am = model.into_active_model();
am.name = Set(self.name.unwrap());
am.updated_at = Set(OffsetDateTime::now_utc());
am.name = ActiveValue::Set(self.name.unwrap());
am.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
Ok(am)
}
}
10 changes: 5 additions & 5 deletions src/entity/membership.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::entity::{account, task};
use sea_orm::{entity::prelude::*, ActiveValue::Set};
use sea_orm::{entity::prelude::*, ActiveValue};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use validator::{Validate, ValidationErrors};
Expand Down Expand Up @@ -62,10 +62,10 @@ impl CreateMembership {
pub fn build(self, account: &account::Model) -> Result<ActiveModel, ValidationErrors> {
self.validate()?;
Ok(ActiveModel {
id: Set(Uuid::new_v4()),
account_id: Set(account.id),
user_email: Set(self.user_email.unwrap()),
created_at: Set(TimeDateTimeWithTimeZone::now_utc()),
id: ActiveValue::Set(Uuid::new_v4()),
account_id: ActiveValue::Set(account.id),
user_email: ActiveValue::Set(self.user_email.unwrap()),
created_at: ActiveValue::Set(TimeDateTimeWithTimeZone::now_utc()),
})
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/entity/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
use sea_orm::{
entity::prelude::*,
sea_query::{self, all, any, LockBehavior, LockType},
DatabaseTransaction, QueryOrder, QuerySelect, Set,
ActiveValue, DatabaseTransaction, QueryOrder, QuerySelect,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down Expand Up @@ -52,24 +52,24 @@ impl ActiveModelBehavior for ActiveModel {}
impl From<Job> for ActiveModel {
fn from(job: Job) -> Self {
Self {
id: Set(Uuid::new_v4()),
created_at: Set(OffsetDateTime::now_utc()),
updated_at: Set(OffsetDateTime::now_utc()),
scheduled_at: Set(None),
failure_count: Set(0),
status: Set(JobStatus::Pending),
job: Set(job),
error_message: Set(None),
parent_id: Set(None),
child_id: Set(None),
id: ActiveValue::Set(Uuid::new_v4()),
created_at: ActiveValue::Set(OffsetDateTime::now_utc()),
updated_at: ActiveValue::Set(OffsetDateTime::now_utc()),
scheduled_at: ActiveValue::Set(None),
failure_count: ActiveValue::Set(0),
status: ActiveValue::Set(JobStatus::Pending),
job: ActiveValue::Set(job),
error_message: ActiveValue::Set(None),
parent_id: ActiveValue::Set(None),
child_id: ActiveValue::Set(None),
}
}
}

impl From<EnqueueJob> for ActiveModel {
fn from(EnqueueJob { job, scheduled }: EnqueueJob) -> Self {
let mut am = Self::from(job);
am.scheduled_at = Set(scheduled);
am.scheduled_at = ActiveValue::Set(scheduled);
am
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/entity/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
account, membership, AccountColumn, Accounts, Aggregator, AggregatorColumn, Aggregators,
},
};
use sea_orm::{entity::prelude::*, ActiveValue::Set, IntoActiveModel};
use sea_orm::{entity::prelude::*, ActiveValue, IntoActiveModel};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use validator::{Validate, ValidationError};
Expand Down Expand Up @@ -48,10 +48,10 @@ impl Model {
db: impl ConnectionTrait,
) -> Result<Self, DbErr> {
let mut task = self.into_active_model();
task.report_count = Set(metrics.reports.try_into().unwrap_or(i32::MAX));
task.report_count = ActiveValue::Set(metrics.reports.try_into().unwrap_or(i32::MAX));
task.aggregate_collection_count =
Set(metrics.report_aggregations.try_into().unwrap_or(i32::MAX));
task.updated_at = Set(OffsetDateTime::now_utc());
ActiveValue::Set(metrics.report_aggregations.try_into().unwrap_or(i32::MAX));
task.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
task.update(&db).await
}

Expand Down
6 changes: 3 additions & 3 deletions src/entity/task/update_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sea_orm::{entity::prelude::*, ActiveValue::Set, IntoActiveModel};
use sea_orm::{entity::prelude::*, ActiveValue, IntoActiveModel};
use serde::Deserialize;
use validator::Validate;

Expand All @@ -12,8 +12,8 @@ impl UpdateTask {
pub fn build(self, model: super::Model) -> Result<super::ActiveModel, crate::handler::Error> {
self.validate()?;
let mut am = model.into_active_model();
am.name = Set(self.name.unwrap());
am.updated_at = Set(TimeDateTimeWithTimeZone::now_utc());
am.name = ActiveValue::Set(self.name.unwrap());
am.updated_at = ActiveValue::Set(TimeDateTimeWithTimeZone::now_utc());
Ok(am)
}
}
41 changes: 22 additions & 19 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
};
use sea_orm::{
sea_query::{self, all, Expr},
ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, PaginatorTrait,
QueryFilter, Set, TransactionTrait,
ActiveModelTrait, ActiveValue, ColumnTrait, DbErr, EntityTrait, IntoActiveModel,
PaginatorTrait, QueryFilter, TransactionTrait,
};
use std::{ops::Range, sync::Arc, time::Duration};
use time::OffsetDateTime;
Expand Down Expand Up @@ -119,43 +119,46 @@ impl Queue {
})?;

let result = job.perform(&self.job_state, &tx).await;
queue_item.job = Set(job);
queue_item.job = ActiveValue::Set(job);

match result {
Ok(Some(next_job)) => {
queue_item.status = Set(JobStatus::Success);
queue_item.scheduled_at = Set(None);
queue_item.status = ActiveValue::Set(JobStatus::Success);
queue_item.scheduled_at = ActiveValue::Set(None);

let mut next_job = ActiveModel::from(next_job);
next_job.parent_id = Set(Some(*queue_item.id.as_ref()));
next_job.parent_id = ActiveValue::Set(Some(*queue_item.id.as_ref()));
let next_job = next_job.insert(&tx).await?;
queue_item.child_id = Set(Some(next_job.id));
queue_item.child_id = ActiveValue::Set(Some(next_job.id));
}

Ok(None) => {
queue_item.scheduled_at = Set(None);
queue_item.status = Set(JobStatus::Success);
queue_item.scheduled_at = ActiveValue::Set(None);
queue_item.status = ActiveValue::Set(JobStatus::Success);
}

Err(e) if e.is_retryable() => {
queue_item.failure_count = Set(queue_item.failure_count.as_ref() + 1);
queue_item.failure_count =
ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
let reschedule =
reschedule_based_on_failure_count(*queue_item.failure_count.as_ref());
queue_item.status =
Set(reschedule.map_or(JobStatus::Failed, |_| JobStatus::Pending));
queue_item.scheduled_at = Set(reschedule);
queue_item.error_message = Set(Some(e));
queue_item.status = ActiveValue::Set(
reschedule.map_or(JobStatus::Failed, |_| JobStatus::Pending),
);
queue_item.scheduled_at = ActiveValue::Set(reschedule);
queue_item.error_message = ActiveValue::Set(Some(e));
}

Err(e) => {
queue_item.failure_count = Set(queue_item.failure_count.as_ref() + 1);
queue_item.scheduled_at = Set(None);
queue_item.status = Set(JobStatus::Failed);
queue_item.error_message = Set(Some(e));
queue_item.failure_count =
ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
queue_item.scheduled_at = ActiveValue::Set(None);
queue_item.status = ActiveValue::Set(JobStatus::Failed);
queue_item.error_message = ActiveValue::Set(Some(e));
}
}

queue_item.updated_at = Set(OffsetDateTime::now_utc());
queue_item.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
Some(queue_item.update(&tx).await?)
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion tests/harness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use pretty_assertions::{assert_eq, assert_ne, assert_str_eq};
pub use querystrong::QueryStrong;
pub use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, DbBackend, DbErr, EntityTrait,
IntoActiveModel, PaginatorTrait, QueryFilter, Schema, Set,
IntoActiveModel, PaginatorTrait, QueryFilter, Schema,
};
pub use serde_json::{json, Value};
pub use test_harness::test;
Expand Down
1 change: 1 addition & 0 deletions tests/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ mod show {
let task = fixtures::task(&app, &account).await;
let mut task = task.into_active_model();
task.updated_at = ActiveValue::Set(OffsetDateTime::now_utc() - Duration::minutes(10));

let task = task.update(app.db()).await?;

let first_party_aggregator = task.first_party_aggregator(app.db()).await?.unwrap();
Expand Down

0 comments on commit 4e1537b

Please sign in to comment.