From 921c799c0914943f5e2281477fb4912118049814 Mon Sep 17 00:00:00 2001 From: Jacob Rothstein Date: Wed, 2 Aug 2023 14:28:36 -0700 Subject: [PATCH] use streaming in task sync --- src/queue/job/v1/task_sync.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/queue/job/v1/task_sync.rs b/src/queue/job/v1/task_sync.rs index 9786351e..2449789a 100644 --- a/src/queue/job/v1/task_sync.rs +++ b/src/queue/job/v1/task_sync.rs @@ -2,7 +2,8 @@ use crate::{ entity::*, queue::job::{EnqueueJob, Job, JobError, SharedJobState, V1}, }; -use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter}; +use futures_lite::StreamExt; +use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, ModelTrait, PaginatorTrait, QueryFilter}; use serde::{Deserialize, Serialize}; use time::Duration; @@ -24,8 +25,11 @@ impl TaskSync { for aggregator in aggregators { let client = aggregator.client(job_state.http_client.clone()); - for task_id in client.get_task_ids().await? { - if 0 == Tasks::find_by_id(&task_id).count(db).await? { + while let Some(task_from_aggregator) = client.task_stream().next().await.transpose()? { + let task_id = task_from_aggregator.task_id.to_string(); + if let Some(_task_from_db) = Tasks::find_by_id(&task_id).one(db).await? { + // TODO: confirm that the task matches + } else { client.delete_task(&task_id).await?; } }