Skip to content

Commit

Permalink
feat(commands): add resume on error for comments and emails (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-prosser authored Jul 26, 2024
1 parent 4c6d593 commit fba30bf
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Increase default http timeout to 120s
- Add `--resume-on-error` flag when creating annotations
- Remove `--use-moon-forms` flag
- Add `--resume-on-error` flag when creating comments / emails

# v0.28.0
- Add general fields to `create datasets`
Expand Down
214 changes: 178 additions & 36 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ use resources::{
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{cell::Cell, fmt::Display, io::Read, path::Path, time::Duration};
use std::{
cell::Cell,
fmt::{Debug, Display},
io::Read,
path::Path,
time::Duration,
};
use url::Url;

use crate::resources::{
Expand Down Expand Up @@ -137,6 +143,38 @@ pub use crate::{
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Token(pub String);

pub trait SplittableRequest {
fn split(self) -> impl Iterator<Item = Self>
where
Self: Sized;

fn count(&self) -> usize;
}

pub struct SplitableRequestResponse<ResponseT>
where
for<'de> ResponseT: Deserialize<'de> + ReducibleResponse,
{
pub response: ResponseT,
pub num_failed: usize,
}

pub trait ReducibleResponse {
fn merge(self, _b: Self) -> Self
where
Self: std::default::Default,
{
Default::default()
}

fn empty() -> Self
where
Self: std::default::Default,
{
Default::default()
}
}

pub struct Config {
pub endpoint: Url,
pub token: Token,
Expand Down Expand Up @@ -434,13 +472,13 @@ impl Client {
integration: &NewIntegration,
) -> Result<PostIntegrationResponse> {
self.request(
Method::POST,
self.endpoints.integration(name)?,
Some(PostIntegrationRequest {
&Method::POST,
&self.endpoints.integration(name)?,
&Some(PostIntegrationRequest {
integration: integration.clone(),
}),
None::<()>,
Retry::No,
&None::<()>,
&Retry::No,
)
}

Expand All @@ -450,31 +488,46 @@ impl Client {
integration: &NewIntegration,
) -> Result<PutIntegrationResponse> {
self.request(
Method::PUT,
self.endpoints.integration(name)?,
Some(PutIntegrationRequest {
&Method::PUT,
&self.endpoints.integration(name)?,
&Some(PutIntegrationRequest {
integration: integration.clone(),
}),
None::<()>,
Retry::No,
&None::<()>,
&Retry::No,
)
}

pub fn put_comments(
pub fn put_comments_split_on_failure(
&self,
source_name: &SourceFullName,
comments: &[NewComment],
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<PutCommentsResponse> {
self.request(
) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
self.splitable_request(
Method::PUT,
self.endpoints.put_comments(source_name)?,
Some(PutCommentsRequest { comments }),
PutCommentsRequest { comments },
Some(NoChargeQuery { no_charge }),
Retry::No,
)
}

pub fn put_comments(
&self,
source_name: &SourceFullName,
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<PutCommentsResponse> {
self.request(
&Method::PUT,
&self.endpoints.put_comments(source_name)?,
&Some(PutCommentsRequest { comments }),
&Some(NoChargeQuery { no_charge }),
&Retry::No,
)
}

pub fn put_stream(
&self,
dataset_name: &DatasetFullName,
Expand Down Expand Up @@ -536,13 +589,28 @@ impl Client {
pub fn sync_comments(
&self,
source_name: &SourceFullName,
comments: &[NewComment],
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<SyncCommentsResponse> {
self.request(
&Method::POST,
&self.endpoints.sync_comments(source_name)?,
&Some(SyncCommentsRequest { comments }),
&Some(NoChargeQuery { no_charge }),
&Retry::Yes,
)
}

pub fn sync_comments_split_on_failure(
&self,
source_name: &SourceFullName,
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
self.splitable_request(
Method::POST,
self.endpoints.sync_comments(source_name)?,
Some(SyncCommentsRequest { comments }),
SyncCommentsRequest { comments },
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
Expand All @@ -557,13 +625,28 @@ impl Client {
no_charge: bool,
) -> Result<SyncRawEmailsResponse> {
self.request(
Method::POST,
self.endpoints.sync_comments_raw_emails(source_name)?,
Some(SyncRawEmailsRequest {
&Method::POST,
&self.endpoints.sync_comments_raw_emails(source_name)?,
&Some(SyncRawEmailsRequest {
documents,
transform_tag,
include_comments,
}),
&Some(NoChargeQuery { no_charge }),
&Retry::Yes,
)
}

pub fn put_emails_split_on_failure(
&self,
bucket_name: &BucketFullName,
emails: Vec<NewEmail>,
no_charge: bool,
) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
self.splitable_request(
Method::PUT,
self.endpoints.put_emails(bucket_name)?,
PutEmailsRequest { emails },
Some(NoChargeQuery { no_charge }),
Retry::Yes,
)
Expand All @@ -572,15 +655,15 @@ impl Client {
pub fn put_emails(
&self,
bucket_name: &BucketFullName,
emails: &[NewEmail],
emails: Vec<NewEmail>,
no_charge: bool,
) -> Result<PutEmailsResponse> {
self.request(
Method::PUT,
self.endpoints.put_emails(bucket_name)?,
Some(PutEmailsRequest { emails }),
Some(NoChargeQuery { no_charge }),
Retry::Yes,
&Method::PUT,
&self.endpoints.put_emails(bucket_name)?,
&Some(PutEmailsRequest { emails }),
&Some(NoChargeQuery { no_charge }),
&Retry::Yes,
)
}

Expand Down Expand Up @@ -1079,7 +1162,7 @@ impl Client {
LocationT: IntoUrl + Display + Clone,
for<'de> SuccessT: Deserialize<'de>,
{
self.request(Method::GET, url, None::<()>, None::<()>, Retry::Yes)
self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes)
}

fn get_query<LocationT, QueryT, SuccessT>(
Expand All @@ -1092,7 +1175,7 @@ impl Client {
QueryT: Serialize,
for<'de> SuccessT: Deserialize<'de>,
{
self.request(Method::GET, url, None::<()>, Some(query), Retry::Yes)
self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes)
}

fn delete<LocationT>(&self, url: LocationT) -> Result<()>
Expand Down Expand Up @@ -1157,7 +1240,7 @@ impl Client {
RequestT: Serialize,
for<'de> SuccessT: Deserialize<'de>,
{
self.request(Method::POST, url, Some(request), None::<()>, retry)
self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry)
}

fn put<LocationT, RequestT, SuccessT>(
Expand All @@ -1170,7 +1253,7 @@ impl Client {
RequestT: Serialize,
for<'de> SuccessT: Deserialize<'de>,
{
self.request(Method::PUT, url, Some(request), None::<()>, Retry::Yes)
self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes)
}

fn raw_request<LocationT, RequestT, QueryT>(
Expand Down Expand Up @@ -1214,22 +1297,80 @@ impl Client {
Ok(http_response)
}

fn request<LocationT, RequestT, SuccessT, QueryT>(
fn splitable_request<LocationT, RequestT, SuccessT, QueryT>(
&self,
method: Method,
url: LocationT,
body: Option<RequestT>,
body: RequestT,
query: Option<QueryT>,
retry: Retry,
) -> Result<SplitableRequestResponse<SuccessT>>
where
LocationT: IntoUrl + Display + Clone,
RequestT: Serialize + SplittableRequest + Clone,
QueryT: Serialize + Clone,
for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default,
{
debug!("Attempting {method} `{url}`");
let result: Result<SuccessT> =
self.request(&method, &url, &Some(body.clone()), &query, &retry);

fn should_split(error: &Error) -> bool {
if let Error::Api { status_code, .. } = error {
*status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
|| *status_code == reqwest::StatusCode::BAD_REQUEST
} else {
false
}
}

match result {
Ok(response) => Ok(SplitableRequestResponse {
response,
num_failed: 0,
}),
Err(error) if should_split(&error) => {
let mut num_failed = 0;
let response = body
.split()
.filter_map(|request| {
match self.request(&method, &url, &Some(request), &query, &retry) {
Ok(response) => Some(response),
Err(_) => {
num_failed += 1;
None
}
}
})
.fold(SuccessT::empty(), |merged, next: SuccessT| {
merged.merge(next)
});

Ok(SplitableRequestResponse {
num_failed,
response,
})
}
Err(error) => Err(error),
}
}

fn request<LocationT, RequestT, SuccessT, QueryT>(
&self,
method: &Method,
url: &LocationT,
body: &Option<RequestT>,
query: &Option<QueryT>,
retry: &Retry,
) -> Result<SuccessT>
where
LocationT: IntoUrl + Display + Clone,
RequestT: Serialize,
QueryT: Serialize,
QueryT: Serialize + Clone,
for<'de> SuccessT: Deserialize<'de>,
{
debug!("Attempting {} `{}`", method, url);
let http_response = self.raw_request(&method, &url, &body, &query, &retry)?;
let http_response = self.raw_request(method, url, body, query, retry)?;

let status = http_response.status();

Expand All @@ -1250,6 +1391,7 @@ impl Client {
}
}

#[derive(Copy, Clone)]
enum Retry {
Yes,
No,
Expand Down Expand Up @@ -1473,7 +1615,7 @@ struct Endpoints {
projects: Url,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, Copy)]
struct NoChargeQuery {
no_charge: bool,
}
Expand Down
Loading

0 comments on commit fba30bf

Please sign in to comment.