diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf1557..4b9cf76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Increase default http timeout to 120s - Add `--resume-on-error` flag when creating annotations - Remove `--use-moon-forms` flag +- Add `--resume-on-error` flag when creating comments / emails # v0.28.0 - Add general fields to `create datasets` diff --git a/api/src/lib.rs b/api/src/lib.rs index f10590a..d17d740 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -36,7 +36,13 @@ use resources::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{cell::Cell, fmt::Display, io::Read, path::Path, time::Duration}; +use std::{ + cell::Cell, + fmt::{Debug, Display}, + io::Read, + path::Path, + time::Duration, +}; use url::Url; use crate::resources::{ @@ -137,6 +143,38 @@ pub use crate::{ #[derive(Clone, Debug, PartialEq, Eq)] pub struct Token(pub String); +pub trait SplittableRequest { + fn split(self) -> impl Iterator + where + Self: Sized; + + fn count(&self) -> usize; +} + +pub struct SplitableRequestResponse +where + for<'de> ResponseT: Deserialize<'de> + ReducibleResponse, +{ + pub response: ResponseT, + pub num_failed: usize, +} + +pub trait ReducibleResponse { + fn merge(self, _b: Self) -> Self + where + Self: std::default::Default, + { + Default::default() + } + + fn empty() -> Self + where + Self: std::default::Default, + { + Default::default() + } +} + pub struct Config { pub endpoint: Url, pub token: Token, @@ -434,13 +472,13 @@ impl Client { integration: &NewIntegration, ) -> Result { self.request( - Method::POST, - self.endpoints.integration(name)?, - Some(PostIntegrationRequest { + &Method::POST, + &self.endpoints.integration(name)?, + &Some(PostIntegrationRequest { integration: integration.clone(), }), - None::<()>, - Retry::No, + &None::<()>, + &Retry::No, ) } @@ -450,31 +488,46 @@ impl Client { integration: &NewIntegration, ) -> Result { self.request( - Method::PUT, - self.endpoints.integration(name)?, - Some(PutIntegrationRequest { + &Method::PUT, + &self.endpoints.integration(name)?, + &Some(PutIntegrationRequest { integration: integration.clone(), }), - None::<()>, - Retry::No, + &None::<()>, + &Retry::No, ) } - pub fn put_comments( + pub fn put_comments_split_on_failure( &self, source_name: &SourceFullName, - comments: &[NewComment], + comments: Vec, no_charge: bool, - ) -> Result { - self.request( + ) -> Result> { + self.splitable_request( Method::PUT, self.endpoints.put_comments(source_name)?, - Some(PutCommentsRequest { comments }), + PutCommentsRequest { comments }, Some(NoChargeQuery { no_charge }), Retry::No, ) } + pub fn put_comments( + &self, + source_name: &SourceFullName, + comments: Vec, + no_charge: bool, + ) -> Result { + self.request( + &Method::PUT, + &self.endpoints.put_comments(source_name)?, + &Some(PutCommentsRequest { comments }), + &Some(NoChargeQuery { no_charge }), + &Retry::No, + ) + } + pub fn put_stream( &self, dataset_name: &DatasetFullName, @@ -536,13 +589,28 @@ impl Client { pub fn sync_comments( &self, source_name: &SourceFullName, - comments: &[NewComment], + comments: Vec, no_charge: bool, ) -> Result { self.request( + &Method::POST, + &self.endpoints.sync_comments(source_name)?, + &Some(SyncCommentsRequest { comments }), + &Some(NoChargeQuery { no_charge }), + &Retry::Yes, + ) + } + + pub fn sync_comments_split_on_failure( + &self, + source_name: &SourceFullName, + comments: Vec, + no_charge: bool, + ) -> Result> { + self.splitable_request( Method::POST, self.endpoints.sync_comments(source_name)?, - Some(SyncCommentsRequest { comments }), + SyncCommentsRequest { comments }, Some(NoChargeQuery { no_charge }), Retry::Yes, ) @@ -557,13 +625,28 @@ impl Client { no_charge: bool, ) -> Result { self.request( - Method::POST, - self.endpoints.sync_comments_raw_emails(source_name)?, - Some(SyncRawEmailsRequest { + &Method::POST, + &self.endpoints.sync_comments_raw_emails(source_name)?, + &Some(SyncRawEmailsRequest { documents, transform_tag, include_comments, }), + &Some(NoChargeQuery { no_charge }), + &Retry::Yes, + ) + } + + pub fn put_emails_split_on_failure( + &self, + bucket_name: &BucketFullName, + emails: Vec, + no_charge: bool, + ) -> Result> { + self.splitable_request( + Method::PUT, + self.endpoints.put_emails(bucket_name)?, + PutEmailsRequest { emails }, Some(NoChargeQuery { no_charge }), Retry::Yes, ) @@ -572,15 +655,15 @@ impl Client { pub fn put_emails( &self, bucket_name: &BucketFullName, - emails: &[NewEmail], + emails: Vec, no_charge: bool, ) -> Result { self.request( - Method::PUT, - self.endpoints.put_emails(bucket_name)?, - Some(PutEmailsRequest { emails }), - Some(NoChargeQuery { no_charge }), - Retry::Yes, + &Method::PUT, + &self.endpoints.put_emails(bucket_name)?, + &Some(PutEmailsRequest { emails }), + &Some(NoChargeQuery { no_charge }), + &Retry::Yes, ) } @@ -1079,7 +1162,7 @@ impl Client { LocationT: IntoUrl + Display + Clone, for<'de> SuccessT: Deserialize<'de>, { - self.request(Method::GET, url, None::<()>, None::<()>, Retry::Yes) + self.request(&Method::GET, &url, &None::<()>, &None::<()>, &Retry::Yes) } fn get_query( @@ -1092,7 +1175,7 @@ impl Client { QueryT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - self.request(Method::GET, url, None::<()>, Some(query), Retry::Yes) + self.request(&Method::GET, &url, &None::<()>, &Some(query), &Retry::Yes) } fn delete(&self, url: LocationT) -> Result<()> @@ -1157,7 +1240,7 @@ impl Client { RequestT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - self.request(Method::POST, url, Some(request), None::<()>, retry) + self.request(&Method::POST, &url, &Some(request), &None::<()>, &retry) } fn put( @@ -1170,7 +1253,7 @@ impl Client { RequestT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - self.request(Method::PUT, url, Some(request), None::<()>, Retry::Yes) + self.request(&Method::PUT, &url, &Some(request), &None::<()>, &Retry::Yes) } fn raw_request( @@ -1214,22 +1297,80 @@ impl Client { Ok(http_response) } - fn request( + fn splitable_request( &self, method: Method, url: LocationT, - body: Option, + body: RequestT, query: Option, retry: Retry, + ) -> Result> + where + LocationT: IntoUrl + Display + Clone, + RequestT: Serialize + SplittableRequest + Clone, + QueryT: Serialize + Clone, + for<'de> SuccessT: Deserialize<'de> + ReducibleResponse + Clone + Default, + { + debug!("Attempting {method} `{url}`"); + let result: Result = + self.request(&method, &url, &Some(body.clone()), &query, &retry); + + fn should_split(error: &Error) -> bool { + if let Error::Api { status_code, .. } = error { + *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY + || *status_code == reqwest::StatusCode::BAD_REQUEST + } else { + false + } + } + + match result { + Ok(response) => Ok(SplitableRequestResponse { + response, + num_failed: 0, + }), + Err(error) if should_split(&error) => { + let mut num_failed = 0; + let response = body + .split() + .filter_map(|request| { + match self.request(&method, &url, &Some(request), &query, &retry) { + Ok(response) => Some(response), + Err(_) => { + num_failed += 1; + None + } + } + }) + .fold(SuccessT::empty(), |merged, next: SuccessT| { + merged.merge(next) + }); + + Ok(SplitableRequestResponse { + num_failed, + response, + }) + } + Err(error) => Err(error), + } + } + + fn request( + &self, + method: &Method, + url: &LocationT, + body: &Option, + query: &Option, + retry: &Retry, ) -> Result where LocationT: IntoUrl + Display + Clone, RequestT: Serialize, - QueryT: Serialize, + QueryT: Serialize + Clone, for<'de> SuccessT: Deserialize<'de>, { debug!("Attempting {} `{}`", method, url); - let http_response = self.raw_request(&method, &url, &body, &query, &retry)?; + let http_response = self.raw_request(method, url, body, query, retry)?; let status = http_response.status(); @@ -1250,6 +1391,7 @@ impl Client { } } +#[derive(Copy, Clone)] enum Retry { Yes, No, @@ -1473,7 +1615,7 @@ struct Endpoints { projects: Url, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone, Copy)] struct NoChargeQuery { no_charge: bool, } diff --git a/api/src/resources/comment.rs b/api/src/resources/comment.rs index 4714f95..647c73a 100644 --- a/api/src/resources/comment.rs +++ b/api/src/resources/comment.rs @@ -4,7 +4,7 @@ use crate::{ resources::label_def::Name as LabelName, resources::label_group::Name as LabelGroupName, resources::label_group::DEFAULT_LABEL_GROUP_NAME, - SourceId, + ReducibleResponse, SourceId, SplittableRequest, }; use chrono::{DateTime, Utc}; use ordered_float::NotNan; @@ -170,25 +170,71 @@ pub struct CommentsIterPage { } #[derive(Debug, Clone, Serialize)] -pub(crate) struct PutCommentsRequest<'request> { - pub comments: &'request [NewComment], +pub(crate) struct PutCommentsRequest { + pub comments: Vec, } -#[derive(Debug, Clone, Deserialize)] +impl SplittableRequest for PutCommentsRequest { + fn split(self) -> impl Iterator { + self.comments.into_iter().map(|comment| Self { + comments: vec![comment], + }) + } + + fn count(&self) -> usize { + self.comments.len() + } +} + +#[derive(Default, Debug, Copy, Clone, Deserialize)] pub struct PutCommentsResponse; +impl ReducibleResponse for PutCommentsResponse {} + #[derive(Debug, Clone, Serialize)] -pub(crate) struct SyncCommentsRequest<'request> { - pub comments: &'request [NewComment], +pub(crate) struct SyncCommentsRequest { + pub comments: Vec, } -#[derive(Debug, Clone, Deserialize)] +impl SplittableRequest for SyncCommentsRequest { + fn split(self) -> impl Iterator + where + Self: Sized, + { + self.comments.into_iter().map(|comment| Self { + comments: vec![comment], + }) + } + + fn count(&self) -> usize { + self.comments.len() + } +} + +#[derive(Debug, Clone, Deserialize, Default)] pub struct SyncCommentsResponse { pub new: usize, pub updated: usize, pub unchanged: usize, } +impl ReducibleResponse for SyncCommentsResponse { + fn empty() -> Self { + Self { + new: 0, + updated: 0, + unchanged: 0, + } + } + fn merge(self, b: Self) -> Self { + Self { + new: self.new + b.new, + updated: self.updated + b.updated, + unchanged: self.unchanged + b.unchanged, + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct GetCommentResponse { pub comment: Comment, diff --git a/api/src/resources/email.rs b/api/src/resources/email.rs index c764fcc..be41918 100644 --- a/api/src/resources/email.rs +++ b/api/src/resources/email.rs @@ -1,3 +1,4 @@ +use crate::{ReducibleResponse, SplittableRequest}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -50,13 +51,30 @@ pub struct NewEmail { } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] -pub(crate) struct PutEmailsRequest<'request> { - pub emails: &'request [NewEmail], +pub(crate) struct PutEmailsRequest { + pub emails: Vec, } -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +impl SplittableRequest for PutEmailsRequest { + fn split(self) -> impl Iterator + where + Self: Sized, + { + self.emails.into_iter().map(|email| Self { + emails: vec![email], + }) + } + + fn count(&self) -> usize { + self.emails.len() + } +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct PutEmailsResponse {} +impl ReducibleResponse for PutEmailsResponse {} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] pub struct Continuation(pub String); diff --git a/cli/src/commands/create/comments.rs b/cli/src/commands/create/comments.rs index 2a09c96..bd43b12 100644 --- a/cli/src/commands/create/comments.rs +++ b/cli/src/commands/create/comments.rs @@ -187,22 +187,24 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re if args.overwrite { info!( concat!( - "Successfully uploaded {} comments [{} new | {} updated | {} unchanged] ", + "Successfully uploaded {} comments [{} new | {} updated | {} unchanged | {} skipped] ", "of which {} are annotated." ), statistics.num_uploaded(), statistics.num_new(), statistics.num_updated(), statistics.num_unchanged(), + statistics.num_failed_comments(), statistics.num_annotations(), ); } else { // PUT comments endpoint doesn't return statistics, so can't give the breakdown when not // forcing everything the sync endpoint (via --overwrite) info!( - "Successfully uploaded {} comments (of which {} are annotated).", + "Successfully uploaded {} comments (of which {} are annotated). {} skipped", statistics.num_uploaded(), statistics.num_annotations(), + statistics.num_failed_comments() ); } @@ -264,27 +266,51 @@ fn upload_batch_of_comments( comments_to_sync: &mut Vec, audio_paths: &mut Vec<(CommentId, PathBuf)>, no_charge: bool, + resume_on_error: bool, ) -> Result<()> { let mut uploaded = 0; let mut new = 0; let mut updated = 0; let mut unchanged = 0; + let mut failed = 0; // Upload comments if !comments_to_put.is_empty() { - client - .put_comments(&source.full_name(), comments_to_put, no_charge) - .context("Could not put batch of comments")?; - - uploaded += comments_to_put.len(); + if resume_on_error { + let result = client + .put_comments_split_on_failure( + &source.full_name(), + comments_to_put.to_vec(), + no_charge, + ) + .context("Could not put batch of comments")?; + failed += result.num_failed; + } else { + client + .put_comments(&source.full_name(), comments_to_put.to_vec(), no_charge) + .context("Could not put batch of comments")?; + } + uploaded += comments_to_put.len() - failed; } if !comments_to_sync.is_empty() { - let result = client - .sync_comments(&source.full_name(), comments_to_sync, no_charge) - .context("Could not sync batch of comments")?; + let result = if resume_on_error { + let result = client + .sync_comments_split_on_failure( + &source.full_name(), + comments_to_sync.to_vec(), + no_charge, + ) + .context("Could not sync batch of comments")?; + failed += result.num_failed; + result.response + } else { + client + .sync_comments(&source.full_name(), comments_to_sync.to_vec(), no_charge) + .context("Could not sync batch of comments")? + }; - uploaded += comments_to_sync.len(); + uploaded += comments_to_sync.len() - failed; new += result.new; updated += result.updated; unchanged += result.unchanged; @@ -295,6 +321,7 @@ fn upload_batch_of_comments( new, updated, unchanged, + failed, }); // Upload audio @@ -380,6 +407,7 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + resume_on_error, )?; } @@ -393,6 +421,7 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + resume_on_error, )?; upload_batch_of_annotations( @@ -417,6 +446,7 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + resume_on_error, )?; } @@ -442,6 +472,7 @@ pub struct StatisticsUpdate { new: usize, updated: usize, unchanged: usize, + failed: usize, } #[derive(Debug)] @@ -453,6 +484,7 @@ pub struct Statistics { unchanged: AtomicUsize, annotations: AtomicUsize, failed_annotations: AtomicUsize, + failed_comments: AtomicUsize, } impl AnnotationStatistic for Statistics { @@ -474,6 +506,7 @@ impl Statistics { unchanged: AtomicUsize::new(0), annotations: AtomicUsize::new(0), failed_annotations: AtomicUsize::new(0), + failed_comments: AtomicUsize::new(0), } } @@ -488,6 +521,8 @@ impl Statistics { self.new.fetch_add(update.new, Ordering::SeqCst); self.updated.fetch_add(update.updated, Ordering::SeqCst); self.unchanged.fetch_add(update.unchanged, Ordering::SeqCst); + self.failed_comments + .fetch_add(update.failed, Ordering::SeqCst); } #[inline] @@ -524,6 +559,11 @@ impl Statistics { fn num_failed_annotations(&self) -> usize { self.failed_annotations.load(Ordering::SeqCst) } + + #[inline] + fn num_failed_comments(&self) -> usize { + self.failed_comments.load(Ordering::SeqCst) + } } /// Detailed statistics - only make sense if using --overwrite (i.e. exclusively sync endpoint) @@ -536,16 +576,24 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { let num_unchanged = statistics.num_unchanged(); let num_annotations = statistics.num_annotations(); let num_failed_annotations = statistics.num_failed_annotations(); + let num_failed_comments = statistics.num_failed_comments(); + let failed_annotations_string = if num_failed_annotations > 0 { format!(" {num_failed_annotations} {}", "skipped".dimmed()) } else { String::new() }; + let failed_comments_string = if num_failed_comments > 0 { + format!(" {num_failed_comments} {}", "skipped".dimmed()) + } else { + String::new() + }; + ( bytes_read as u64, format!( - "{} {}: {} {} {} {} {} {} [{} {}{}]", + "{} {}: {} {} {} {} {} {}{} [{} {}{}]", num_uploaded.to_string().bold(), "comments".dimmed(), num_new, @@ -554,6 +602,7 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { "upd".dimmed(), num_unchanged, "nop".dimmed(), + failed_comments_string, num_annotations, "annotations".dimmed(), failed_annotations_string @@ -567,18 +616,26 @@ fn basic_statistics(statistics: &Statistics) -> (u64, String) { let num_uploaded = statistics.num_uploaded(); let num_annotations = statistics.num_annotations(); let num_failed_annotations = statistics.num_failed_annotations(); + let num_failed_comments = statistics.num_failed_comments(); + let failed_annotations_string = if num_failed_annotations > 0 { format!(" {num_failed_annotations} {}", "skipped".dimmed()) } else { String::new() }; + let failed_comments_string = if num_failed_comments > 0 { + format!(" {num_failed_comments} {}", "skipped".dimmed()) + } else { + String::new() + }; ( bytes_read as u64, format!( - "{} {} [{} {}{}]", + "{} {}{} [{} {}{}]", num_uploaded.to_string().bold(), "comments".dimmed(), + failed_comments_string, num_annotations, "annotations".dimmed(), failed_annotations_string diff --git a/cli/src/commands/create/emails.rs b/cli/src/commands/create/emails.rs index c2657d7..8ccfaf0 100644 --- a/cli/src/commands/create/emails.rs +++ b/cli/src/commands/create/emails.rs @@ -43,6 +43,10 @@ pub struct CreateEmailsArgs { #[structopt(short = "y", long = "yes")] /// Consent to ai unit charge. Suppresses confirmation prompt. yes: bool, + + #[structopt(long = "resume-on-error")] + /// Whether to attempt to resume processing on error + resume_on_error: bool, } pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { @@ -85,6 +89,7 @@ pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { args.batch_size, &statistics, args.no_charge, + args.resume_on_error, )?; if let Some(mut progress) = progress { progress.done(); @@ -105,6 +110,7 @@ pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { args.batch_size, &statistics, args.no_charge, + args.resume_on_error, )?; statistics } @@ -125,6 +131,7 @@ fn upload_emails_from_reader( batch_size: usize, statistics: &Statistics, no_charge: bool, + resume_on_error: bool, ) -> Result<()> { assert!(batch_size > 0); let mut line_number = 1; @@ -150,13 +157,26 @@ fn upload_emails_from_reader( if batch.len() == batch_size || (!batch.is_empty() && eof) { // Upload emails - client - .put_emails(&bucket.full_name(), &batch, no_charge) - .context("Could not upload batch of emails")?; - statistics.add_emails(StatisticsUpdate { - uploaded: batch.len(), - }); - batch.clear(); + + if resume_on_error { + let result = client + .put_emails_split_on_failure(&bucket.full_name(), batch.to_vec(), no_charge) + .context("Could not upload batch of emails")?; + statistics.add_emails(StatisticsUpdate { + uploaded: batch.len() - result.num_failed, + failed: result.num_failed, + }); + batch.clear(); + } else { + client + .put_emails(&bucket.full_name(), batch.to_vec(), no_charge) + .context("Could not upload batch of emails")?; + statistics.add_emails(StatisticsUpdate { + uploaded: batch.len(), + failed: 0, + }); + batch.clear(); + } } line_number += 1; @@ -168,12 +188,14 @@ fn upload_emails_from_reader( #[derive(Debug)] pub struct StatisticsUpdate { uploaded: usize, + failed: usize, } #[derive(Debug)] pub struct Statistics { bytes_read: AtomicUsize, uploaded: AtomicUsize, + failed: AtomicUsize, } impl Statistics { @@ -181,6 +203,7 @@ impl Statistics { Self { bytes_read: AtomicUsize::new(0), uploaded: AtomicUsize::new(0), + failed: AtomicUsize::new(0), } } @@ -192,6 +215,7 @@ impl Statistics { #[inline] fn add_emails(&self, update: StatisticsUpdate) { self.uploaded.fetch_add(update.uploaded, Ordering::SeqCst); + self.failed.fetch_add(update.failed, Ordering::SeqCst); } #[inline] @@ -203,6 +227,11 @@ impl Statistics { fn num_uploaded(&self) -> usize { self.uploaded.load(Ordering::SeqCst) } + + #[inline] + fn num_failed(&self) -> usize { + self.failed.load(Ordering::SeqCst) + } } fn progress_bar(total_bytes: u64, statistics: &Arc) -> Progress { @@ -210,9 +239,22 @@ fn progress_bar(total_bytes: u64, statistics: &Arc) -> Progress { move |statistics| { let bytes_read = statistics.bytes_read(); let num_uploaded = statistics.num_uploaded(); + let num_failed = statistics.num_failed(); + + let failed_emails_string = if num_failed > 0 { + format!(" {num_failed} {}", "skipped".dimmed()) + } else { + String::new() + }; + ( bytes_read as u64, - format!("{} {}", num_uploaded.to_string().bold(), "emails".dimmed()), + format!( + "{} {}{}", + num_uploaded.to_string().bold(), + "emails".dimmed(), + failed_emails_string + ), ) }, statistics, diff --git a/cli/src/commands/parse/mod.rs b/cli/src/commands/parse/mod.rs index f3b073d..5f0fe8a 100644 --- a/cli/src/commands/parse/mod.rs +++ b/cli/src/commands/parse/mod.rs @@ -114,7 +114,7 @@ fn upload_batch_of_new_emails( no_charge: bool, statistics: &Arc, ) -> Result<()> { - client.put_emails(bucket, emails, no_charge)?; + client.put_emails(bucket, emails.to_vec(), no_charge)?; statistics.add_uploaded(emails.len()); Ok(()) }