diff --git a/CHANGELOG.md b/CHANGELOG.md index 21339d0..1e0a066 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ - Add `config parse-from-url` command for parsing configuration from a URL - Add ability to download attachments for comments - Increase default http timeout to 120s +- Add `--resume-on-error` flag when creating annotations # v0.28.0 - Add general fields to `create datasets` diff --git a/cli/src/commands/create/annotations.rs b/cli/src/commands/create/annotations.rs index 7672230..285dbb2 100644 --- a/cli/src/commands/create/annotations.rs +++ b/cli/src/commands/create/annotations.rs @@ -47,6 +47,10 @@ pub struct CreateAnnotationsArgs { #[structopt(long = "batch-size", default_value = "128")] /// Number of comments to batch in a single request. batch_size: usize, + + #[structopt(long = "resume-on-error")] + /// Whether to attempt to resume processing on error + resume_on_error: bool, } pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) -> Result<()> { @@ -95,6 +99,7 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) -> args.use_moon_forms, args.batch_size, pool, + args.resume_on_error, )?; if let Some(mut progress) = progress { progress.done(); @@ -117,6 +122,7 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) -> args.use_moon_forms, args.batch_size, pool, + args.resume_on_error, )?; statistics } @@ -131,8 +137,10 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) -> pub trait AnnotationStatistic { fn add_annotation(&self); + fn add_failed_annotation(&self); } +#[allow(clippy::too_many_arguments)] pub fn upload_batch_of_annotations( annotations_to_upload: &mut Vec, client: &Client, @@ -141,6 +149,7 @@ pub fn upload_batch_of_annotations( dataset_name: &DatasetFullName, use_moon_forms: bool, pool: &mut Pool, + resume_on_error: bool, ) -> Result<()> { let (error_sender, error_receiver) = channel(); @@ -182,15 +191,21 @@ pub fn upload_batch_of_annotations( if let Err(error) = result { error_sender.send(error).expect("Could not send error"); + statistics.add_failed_annotation(); + } else { + statistics.add_annotation(); } - - statistics.add_annotation(); }); }) }); if let Ok(error) = error_receiver.try_recv() { - Err(error) + if resume_on_error { + annotations_to_upload.clear(); + Ok(()) + } else { + Err(error) + } } else { annotations_to_upload.clear(); Ok(()) @@ -207,6 +222,7 @@ fn upload_annotations_from_reader( use_moon_forms: bool, batch_size: usize, pool: &mut Pool, + resume_on_error: bool, ) -> Result<()> { let mut annotations_to_upload = Vec::new(); @@ -224,6 +240,7 @@ fn upload_annotations_from_reader( dataset_name, use_moon_forms, pool, + resume_on_error, )?; } } @@ -238,6 +255,7 @@ fn upload_annotations_from_reader( dataset_name, use_moon_forms, pool, + resume_on_error, )?; } @@ -307,12 +325,16 @@ fn read_annotations_iter<'a>( pub struct Statistics { bytes_read: AtomicUsize, annotations: AtomicUsize, + failed_annotations: AtomicUsize, } impl AnnotationStatistic for Statistics { fn add_annotation(&self) { self.annotations.fetch_add(1, Ordering::SeqCst); } + fn add_failed_annotation(&self) { + self.failed_annotations.fetch_add(1, Ordering::SeqCst); + } } impl Statistics { @@ -320,6 +342,7 @@ impl Statistics { Self { bytes_read: AtomicUsize::new(0), annotations: AtomicUsize::new(0), + failed_annotations: AtomicUsize::new(0), } } @@ -337,14 +360,32 @@ impl Statistics { pub fn num_annotations(&self) -> usize { self.annotations.load(Ordering::SeqCst) } + + #[inline] + pub fn num_failed_annotations(&self) -> usize { + self.failed_annotations.load(Ordering::SeqCst) + } } fn basic_statistics(statistics: &Statistics) -> (u64, String) { let bytes_read = statistics.bytes_read(); let num_annotations = statistics.num_annotations(); + let num_failed_annotations = statistics.num_failed_annotations(); + + let failed_annotations_string = if num_failed_annotations > 0 { + format!(" {num_failed_annotations} {}", "skipped".dimmed()) + } else { + String::new() + }; + ( bytes_read as u64, - format!("{} {}", num_annotations, "annotations".dimmed(),), + format!( + "{} {}{}", + num_annotations, + "annotations".dimmed(), + failed_annotations_string + ), ) } diff --git a/cli/src/commands/create/comments.rs b/cli/src/commands/create/comments.rs index e28624f..e18a7c3 100644 --- a/cli/src/commands/create/comments.rs +++ b/cli/src/commands/create/comments.rs @@ -74,6 +74,10 @@ pub struct CreateCommentsArgs { #[structopt(short = "y", long = "yes")] /// Consent to ai unit charge. Suppresses confirmation prompt. yes: bool, + + #[structopt(long = "resume-on-error")] + /// Whether to attempt to resume processing on error + resume_on_error: bool, } pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Result<()> { @@ -152,6 +156,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re args.use_moon_forms, args.no_charge, pool, + args.resume_on_error, )?; if let Some(mut progress) = progress { progress.done(); @@ -180,6 +185,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re args.use_moon_forms, args.no_charge, pool, + args.resume_on_error, )?; statistics } @@ -330,6 +336,7 @@ fn upload_comments_from_reader( use_moon_forms: bool, no_charge: bool, pool: &mut Pool, + resume_on_error: bool, ) -> Result<()> { assert!(batch_size > 0); @@ -415,6 +422,7 @@ fn upload_comments_from_reader( dataset_name, use_moon_forms, pool, + resume_on_error, )?; } } @@ -442,6 +450,7 @@ fn upload_comments_from_reader( dataset_name, use_moon_forms, pool, + resume_on_error, )?; } } @@ -464,12 +473,16 @@ pub struct Statistics { updated: AtomicUsize, unchanged: AtomicUsize, annotations: AtomicUsize, + failed_annotations: AtomicUsize, } impl AnnotationStatistic for Statistics { fn add_annotation(&self) { self.annotations.fetch_add(1, Ordering::SeqCst); } + fn add_failed_annotation(&self) { + self.failed_annotations.fetch_add(1, Ordering::SeqCst); + } } impl Statistics { @@ -481,6 +494,7 @@ impl Statistics { updated: AtomicUsize::new(0), unchanged: AtomicUsize::new(0), annotations: AtomicUsize::new(0), + failed_annotations: AtomicUsize::new(0), } } @@ -526,6 +540,11 @@ impl Statistics { fn num_annotations(&self) -> usize { self.annotations.load(Ordering::SeqCst) } + + #[inline] + fn num_failed_annotations(&self) -> usize { + self.failed_annotations.load(Ordering::SeqCst) + } } /// Detailed statistics - only make sense if using --overwrite (i.e. exclusively sync endpoint) @@ -537,10 +556,17 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { let num_updated = statistics.num_updated(); let num_unchanged = statistics.num_unchanged(); let num_annotations = statistics.num_annotations(); + let num_failed_annotations = statistics.num_failed_annotations(); + let failed_annotations_string = if num_failed_annotations > 0 { + format!(" {num_failed_annotations} {}", "skipped".dimmed()) + } else { + String::new() + }; + ( bytes_read as u64, format!( - "{} {}: {} {} {} {} {} {} [{} {}]", + "{} {}: {} {} {} {} {} {} [{} {}{}]", num_uploaded.to_string().bold(), "comments".dimmed(), num_new, @@ -551,6 +577,7 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { "nop".dimmed(), num_annotations, "annotations".dimmed(), + failed_annotations_string ), ) } @@ -560,14 +587,22 @@ fn basic_statistics(statistics: &Statistics) -> (u64, String) { let bytes_read = statistics.bytes_read(); let num_uploaded = statistics.num_uploaded(); let num_annotations = statistics.num_annotations(); + let num_failed_annotations = statistics.num_failed_annotations(); + let failed_annotations_string = if num_failed_annotations > 0 { + format!(" {num_failed_annotations} {}", "skipped".dimmed()) + } else { + String::new() + }; + ( bytes_read as u64, format!( - "{} {} [{} {}]", + "{} {} [{} {}{}]", num_uploaded.to_string().bold(), "comments".dimmed(), num_annotations, "annotations".dimmed(), + failed_annotations_string ), ) }