Skip to content

Commit

Permalink
rebase and clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Aug 2, 2023
1 parent 9ddd226 commit afeb980
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 162 deletions.
147 changes: 6 additions & 141 deletions src/clients/aggregator_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{
collections::VecDeque,
fmt::{self, Formatter},
pin::Pin,
task::{ready, Context, Poll},
};
mod task_id_stream;
mod task_stream;

use task_id_stream::TaskIdStream;
use task_stream::TaskStream;

use crate::{
clients::{ClientConnExt, ClientError},
entity::{task::ProvisionableTask, Aggregator},
handler::Error,
};
use futures_lite::{stream::Stream, Future, StreamExt};
use futures_lite::StreamExt;
use serde::{de::DeserializeOwned, Serialize};
use trillium::{HeaderValue, KnownHeaderName, Method};
use trillium_client::{Client, Conn};
Expand Down Expand Up @@ -148,137 +147,3 @@ impl AggregatorClient {
TaskStream::new(self)
}
}

#[derive(Clone, Debug)]
struct Page {
task_ids: VecDeque<String>,
pagination_token: Option<String>,
}

impl From<TaskIds> for Page {
fn from(
TaskIds {
task_ids,
pagination_token,
}: TaskIds,
) -> Self {
Page {
task_ids: task_ids.into_iter().map(|t| t.to_string()).collect(),
pagination_token,
}
}
}

pub struct TaskIdStream<'a> {
client: &'a AggregatorClient,
page: Option<Page>,
future: Option<Pin<Box<dyn Future<Output = Result<TaskIds, ClientError>> + Send + 'a>>>,
}

impl<'a> TaskIdStream<'a> {
fn new(client: &'a AggregatorClient) -> Self {
Self {
client,
page: None,
future: None,
}
}
}

impl<'a> fmt::Debug for TaskIdStream<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskIdStream")
.field("client", &self.client)
.field("current_page", &self.page)
.field("current_future", &"..")
.finish()
}
}

impl Stream for TaskIdStream<'_> {
type Item = Result<String, ClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
client,
ref mut page,
ref mut future,
} = *self;

loop {
if let Some(page) = page {
if let Some(task_id) = page.task_ids.pop_front() {
return Poll::Ready(Some(Ok(task_id)));
}

if page.pagination_token.is_none() {
return Poll::Ready(None);
}
}

if let Some(fut) = future {
*page = Some(ready!(Pin::new(&mut *fut).poll(cx))?.into());
*future = None;
} else {
let pagination_token = page.as_ref().and_then(|page| page.pagination_token.clone());

*future = Some(Box::pin(async move {
client.get_task_id_page(pagination_token.as_deref()).await
}));
};
}
}
}

pub struct TaskStream<'a> {
client: &'a AggregatorClient,
task_id_stream: TaskIdStream<'a>,
task_future: Option<
Pin<Box<dyn Future<Output = Option<Result<TaskResponse, ClientError>>> + Send + 'a>>,
>,
}

impl<'a> fmt::Debug for TaskStream<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskStream").field("future", &"..").finish()
}
}

impl<'a> TaskStream<'a> {
fn new(client: &'a AggregatorClient) -> Self {
Self {
task_id_stream: client.task_id_stream(),
client,
task_future: None,
}
}
}

impl Stream for TaskStream<'_> {
type Item = Result<TaskResponse, ClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
client,
ref mut task_id_stream,
ref mut task_future,
} = *self;

loop {
if let Some(future) = task_future {
let res = ready!(Pin::new(&mut *future).poll(cx));
*task_future = None;
return Poll::Ready(res);
}

*task_future = match ready!(Pin::new(&mut *task_id_stream).poll_next(cx)) {
Some(Ok(task_id)) => Some(Box::pin(async move {
let task_id = task_id;
Some(client.get_task(&task_id).await)
})),
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
};
}
}
}
93 changes: 93 additions & 0 deletions src/clients/aggregator_client/task_id_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{
collections::VecDeque,
fmt::{self, Debug, Formatter},
pin::Pin,
task::{ready, Context, Poll},
};

use super::{AggregatorClient, TaskIds};
use crate::clients::ClientError;
use futures_lite::{stream::Stream, Future};

#[derive(Clone, Debug)]
struct Page {
task_ids: VecDeque<String>,
pagination_token: Option<String>,
}

impl From<TaskIds> for Page {
fn from(
TaskIds {
task_ids,
pagination_token,
}: TaskIds,
) -> Self {
Page {
task_ids: task_ids.into_iter().map(|t| t.to_string()).collect(),
pagination_token,
}
}
}

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

pub struct TaskIdStream<'a> {
client: &'a AggregatorClient,
page: Option<Page>,
future: Option<BoxFuture<'a, Result<TaskIds, ClientError>>>,
}

impl<'a> TaskIdStream<'a> {
pub(super) fn new(client: &'a AggregatorClient) -> Self {
Self {
client,
page: None,
future: None,
}
}
}

impl<'a> Debug for TaskIdStream<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskIdStream")
.field("client", &self.client)
.field("current_page", &self.page)
.field("current_future", &"..")
.finish()
}
}

impl Stream for TaskIdStream<'_> {
type Item = Result<String, ClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
client,
ref mut page,
ref mut future,
} = *self;

loop {
if let Some(page) = page {
if let Some(task_id) = page.task_ids.pop_front() {
return Poll::Ready(Some(Ok(task_id)));
}

if page.pagination_token.is_none() {
return Poll::Ready(None);
}
}

if let Some(fut) = future {
*page = Some(ready!(Pin::new(&mut *fut).poll(cx))?.into());
*future = None;
} else {
let pagination_token = page.as_ref().and_then(|page| page.pagination_token.clone());

*future = Some(Box::pin(async move {
client.get_task_id_page(pagination_token.as_deref()).await
}));
};
}
}
}
62 changes: 62 additions & 0 deletions src/clients/aggregator_client/task_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::{
fmt::{self, Debug, Formatter},
pin::Pin,
task::{ready, Context, Poll},
};

use super::{task_id_stream::TaskIdStream, AggregatorClient, TaskResponse};
use crate::clients::ClientError;
use futures_lite::{stream::Stream, Future};

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

pub struct TaskStream<'a> {
client: &'a AggregatorClient,
task_id_stream: TaskIdStream<'a>,
task_future: Option<BoxFuture<'a, Option<Result<TaskResponse, ClientError>>>>,
}

impl<'a> Debug for TaskStream<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskStream").field("future", &"..").finish()
}
}

impl<'a> TaskStream<'a> {
pub(super) fn new(client: &'a AggregatorClient) -> Self {
Self {
task_id_stream: client.task_id_stream(),
client,
task_future: None,
}
}
}

impl Stream for TaskStream<'_> {
type Item = Result<TaskResponse, ClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
client,
ref mut task_id_stream,
ref mut task_future,
} = *self;

loop {
if let Some(future) = task_future {
let res = ready!(Pin::new(&mut *future).poll(cx));
*task_future = None;
return Poll::Ready(res);
}

*task_future = match ready!(Pin::new(&mut *task_id_stream).poll_next(cx)) {
Some(Ok(task_id)) => Some(Box::pin(async move {
let task_id = task_id;
Some(client.get_task(&task_id).await)
})),
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
};
}
}
}
2 changes: 2 additions & 0 deletions src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ pub struct SharedJobState {
pub auth0_client: Auth0Client,
pub postmark_client: PostmarkClient,
pub http_client: Client,
pub crypter: crate::Crypter,
}
impl From<&Config> for SharedJobState {
fn from(config: &Config) -> Self {
Self {
auth0_client: Auth0Client::new(config),
postmark_client: PostmarkClient::new(config),
http_client: config.client.clone(),
crypter: config.crypter.clone(),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/queue/job/v1/task_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
queue::job::{EnqueueJob, Job, JobError, SharedJobState, V1},
};
use futures_lite::StreamExt;
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, ModelTrait, PaginatorTrait, QueryFilter};
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
use serde::{Deserialize, Serialize};
use time::Duration;

Expand All @@ -24,7 +24,9 @@ impl TaskSync {
.await?;

for aggregator in aggregators {
let client = aggregator.client(job_state.http_client.clone());
let client = aggregator
.client(job_state.http_client.clone(), &job_state.crypter)
.map_err(|e| JobError::ClientOther(e.to_string()))?;
while let Some(task_from_aggregator) = client.task_stream().next().await.transpose()? {
let task_id = task_from_aggregator.task_id.to_string();
if let Some(_task_from_db) = Tasks::find_by_id(&task_id).one(db).await? {
Expand Down
Loading

0 comments on commit afeb980

Please sign in to comment.