Skip to content

Commit

Permalink
feat(commands): add resume on error for comments and emails
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-prosser committed Jul 25, 2024
1 parent b90a6b6 commit b5b4540
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- Add ability to download attachments for comments
- Increase default http timeout to 120s
- Add `--resume-on-error` flag when creating annotations
- Add `--resume-on-error` flag when creating comments / emails

# v0.28.0
- Add general fields to `create datasets`
Expand Down
164 changes: 158 additions & 6 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ use resources::{
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{cell::Cell, fmt::Display, io::Read, path::Path, time::Duration};
use std::{
cell::Cell,
fmt::{Debug, Display},
io::Read,
path::Path,
time::Duration,
};
use url::Url;

use crate::resources::{
Expand Down Expand Up @@ -137,6 +143,27 @@ pub use crate::{
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Token(pub String);

pub trait SplittableRequest {
fn split(&self) -> Vec<Self>
where
Self: Sized;

fn count(&self) -> usize;
}

pub struct SplitableRequestResponse<ResponseT>
where
for<'de> ResponseT: Deserialize<'de> + ReducableResponse,
{
pub response: ResponseT,
pub num_failed: usize,
}

pub trait ReducableResponse {
fn merge(a: Self, b: Self) -> Self;
fn empty() -> Self;
}

pub struct Config {
pub endpoint: Url,
pub token: Token,
Expand Down Expand Up @@ -460,6 +487,23 @@ impl Client {
)
}

pub fn put_comments_split_on_failure(
&self,
source_name: &SourceFullName,
comments: &[NewComment],
no_charge: bool,
) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
self.splitable_request(
Method::PUT,
self.endpoints.put_comments(source_name)?,
PutCommentsRequest {
comments: comments.to_vec(),
},
Some(NoChargeQuery { no_charge }),
Retry::No,
)
}

pub fn put_comments(
&self,
source_name: &SourceFullName,
Expand All @@ -469,7 +513,9 @@ impl Client {
self.request(
Method::PUT,
self.endpoints.put_comments(source_name)?,
Some(PutCommentsRequest { comments }),
Some(PutCommentsRequest {
comments: comments.to_vec(),
}),
Some(NoChargeQuery { no_charge }),
Retry::No,
)
Expand Down Expand Up @@ -542,7 +588,26 @@ impl Client {
self.request(
Method::POST,
self.endpoints.sync_comments(source_name)?,
Some(SyncCommentsRequest { comments }),
Some(SyncCommentsRequest {
comments: comments.to_vec(),
}),
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
}

pub fn sync_comments_split_on_failure(
&self,
source_name: &SourceFullName,
comments: &[NewComment],
no_charge: bool,
) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
self.splitable_request(
Method::POST,
self.endpoints.sync_comments(source_name)?,
SyncCommentsRequest {
comments: comments.to_vec(),
},
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
Expand All @@ -569,6 +634,23 @@ impl Client {
)
}

pub fn put_emails_split_on_failure(
&self,
bucket_name: &BucketFullName,
emails: &[NewEmail],
no_charge: bool,
) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
self.splitable_request(
Method::PUT,
self.endpoints.put_emails(bucket_name)?,
PutEmailsRequest {
emails: emails.to_vec(),
},
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
}

pub fn put_emails(
&self,
bucket_name: &BucketFullName,
Expand All @@ -578,7 +660,9 @@ impl Client {
self.request(
Method::PUT,
self.endpoints.put_emails(bucket_name)?,
Some(PutEmailsRequest { emails }),
Some(PutEmailsRequest {
emails: emails.to_vec(),
}),
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
Expand Down Expand Up @@ -1214,6 +1298,73 @@ impl Client {
Ok(http_response)
}

fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
&self,
method: Method,
url: LocationT,
body: RequestT,
query: Option<QueryT>,
retry: Retry,
) -> Result<SplitableRequestResponse<SuccessT>>
where
LocationT: IntoUrl + Display + Clone,
RequestT: Serialize + SplittableRequest + Clone + Debug,
QueryT: Serialize + Clone,
for<'de> SuccessT: Deserialize<'de> + ReducableResponse + Clone + Debug,
{
debug!("Attempting {} `{}`", method, url);
let result: Result<SuccessT> = self.request(
method.clone(),
url.clone(),
Some(body.clone()),
query.clone(),
retry.clone(),
);

match result {
Ok(response) => Ok(SplitableRequestResponse {
response,
num_failed: 0,
}),
Err(e) => {
if Self::should_split(&e) {
let mut num_failed = 0;
let mut response = SuccessT::empty();

body.split().iter().for_each(|request| {
match self.request(
method.clone(),
url.clone(),
Some(request),
query.clone(),
retry.clone(),
) {
Ok(r) => response = SuccessT::merge(response.clone(), r),
Err(_) => num_failed += 1,
}
});

Ok(SplitableRequestResponse {
num_failed,
response,
})
} else {
// Don't resume for fatal errors
Err(e)
}
}
}
}

fn should_split(error: &Error) -> bool {
if let Error::Api { status_code, .. } = error {
*status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
|| *status_code == reqwest::StatusCode::BAD_REQUEST
} else {
false
}
}

fn request<LocationT, RequestT, SuccessT, QueryT>(
&self,
method: Method,
Expand All @@ -1225,7 +1376,7 @@ impl Client {
where
LocationT: IntoUrl + Display + Clone,
RequestT: Serialize,
QueryT: Serialize,
QueryT: Serialize + Clone,
for<'de> SuccessT: Deserialize<'de>,
{
debug!("Attempting {} `{}`", method, url);
Expand All @@ -1250,6 +1401,7 @@ impl Client {
}
}

#[derive(Clone)]
enum Retry {
Yes,
No,
Expand Down Expand Up @@ -1473,7 +1625,7 @@ struct Endpoints {
projects: Url,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
struct NoChargeQuery {
no_charge: bool,
}
Expand Down
89 changes: 83 additions & 6 deletions api/src/resources/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
resources::label_def::Name as LabelName,
resources::label_group::Name as LabelGroupName,
resources::label_group::DEFAULT_LABEL_GROUP_NAME,
SourceId,
ReducableResponse, SourceId, SplittableRequest,
};
use chrono::{DateTime, Utc};
use ordered_float::NotNan;
Expand Down Expand Up @@ -170,16 +170,63 @@ pub struct CommentsIterPage {
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct PutCommentsRequest<'request> {
pub comments: &'request [NewComment],
pub(crate) struct PutCommentsRequest {
pub comments: Vec<NewComment>,
}

#[derive(Debug, Clone, Deserialize)]
impl SplittableRequest for PutCommentsRequest {
fn split(&self) -> Vec<Self>
where
Self: Sized,
{
self.comments
.clone()
.into_iter()
.map(|comment| Self {
comments: [comment].to_vec(),
})
.collect()
}

fn count(&self) -> usize {
self.comments.len()
}
}

#[derive(Default, Debug, Clone, Deserialize)]
pub struct PutCommentsResponse;

impl ReducableResponse for PutCommentsResponse {
fn empty() -> Self {
Self {}
}
fn merge(_a: Self, _b: Self) -> Self {
Self {}
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct SyncCommentsRequest<'request> {
pub comments: &'request [NewComment],
pub(crate) struct SyncCommentsRequest {
pub comments: Vec<NewComment>,
}

impl SplittableRequest for SyncCommentsRequest {
fn split(&self) -> Vec<Self>
where
Self: Sized,
{
self.comments
.clone()
.into_iter()
.map(|comment| Self {
comments: [comment].to_vec(),
})
.collect()
}

fn count(&self) -> usize {
self.comments.len()
}
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -189,6 +236,36 @@ pub struct SyncCommentsResponse {
pub unchanged: usize,
}

impl ReducableResponse for SyncCommentsResponse {
fn empty() -> Self {
Self {
new: 0,
updated: 0,
unchanged: 0,
}
}

fn merge(a: Self, b: Self) -> Self {
let Self {
new: a_new,
updated: a_updated,
unchanged: a_unchanged,
} = a;

let Self {
new: b_new,
updated: b_updated,
unchanged: b_unchanged,
} = b;

Self {
new: a_new + b_new,
updated: a_updated + b_updated,
unchanged: a_unchanged + b_unchanged,
}
}
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct GetCommentResponse {
pub comment: Comment,
Expand Down
Loading

0 comments on commit b5b4540

Please sign in to comment.