Skip to content

Commit

Permalink
feat(commands): add lossy flag when creating annotations (#289)
Browse files Browse the repository at this point in the history
* feat(commands): add resume on error flag when creating annotations
  • Loading branch information
joe-prosser authored Jul 24, 2024
1 parent 1758c3d commit b90a6b6
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
- Add `config parse-from-url` command for parsing configuration from a URL
- Add ability to download attachments for comments
- Increase default http timeout to 120s
- Add `--resume-on-error` flag when creating annotations

# v0.28.0
- Add general fields to `create datasets`
Expand Down
49 changes: 45 additions & 4 deletions cli/src/commands/create/annotations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub struct CreateAnnotationsArgs {
#[structopt(long = "batch-size", default_value = "128")]
/// Number of comments to batch in a single request.
batch_size: usize,

#[structopt(long = "resume-on-error")]
/// Whether to attempt to resume processing on error
resume_on_error: bool,
}

pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) -> Result<()> {
Expand Down Expand Up @@ -95,6 +99,7 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) ->
args.use_moon_forms,
args.batch_size,
pool,
args.resume_on_error,
)?;
if let Some(mut progress) = progress {
progress.done();
Expand All @@ -117,6 +122,7 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) ->
args.use_moon_forms,
args.batch_size,
pool,
args.resume_on_error,
)?;
statistics
}
Expand All @@ -131,8 +137,10 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs, pool: &mut Pool) ->

pub trait AnnotationStatistic {
fn add_annotation(&self);
fn add_failed_annotation(&self);
}

#[allow(clippy::too_many_arguments)]
pub fn upload_batch_of_annotations(
annotations_to_upload: &mut Vec<NewAnnotation>,
client: &Client,
Expand All @@ -141,6 +149,7 @@ pub fn upload_batch_of_annotations(
dataset_name: &DatasetFullName,
use_moon_forms: bool,
pool: &mut Pool,
resume_on_error: bool,
) -> Result<()> {
let (error_sender, error_receiver) = channel();

Expand Down Expand Up @@ -182,15 +191,21 @@ pub fn upload_batch_of_annotations(

if let Err(error) = result {
error_sender.send(error).expect("Could not send error");
statistics.add_failed_annotation();
} else {
statistics.add_annotation();
}

statistics.add_annotation();
});
})
});

if let Ok(error) = error_receiver.try_recv() {
Err(error)
if resume_on_error {
annotations_to_upload.clear();
Ok(())
} else {
Err(error)
}
} else {
annotations_to_upload.clear();
Ok(())
Expand All @@ -207,6 +222,7 @@ fn upload_annotations_from_reader(
use_moon_forms: bool,
batch_size: usize,
pool: &mut Pool,
resume_on_error: bool,
) -> Result<()> {
let mut annotations_to_upload = Vec::new();

Expand All @@ -224,6 +240,7 @@ fn upload_annotations_from_reader(
dataset_name,
use_moon_forms,
pool,
resume_on_error,
)?;
}
}
Expand All @@ -238,6 +255,7 @@ fn upload_annotations_from_reader(
dataset_name,
use_moon_forms,
pool,
resume_on_error,
)?;
}

Expand Down Expand Up @@ -307,19 +325,24 @@ fn read_annotations_iter<'a>(
pub struct Statistics {
bytes_read: AtomicUsize,
annotations: AtomicUsize,
failed_annotations: AtomicUsize,
}

impl AnnotationStatistic for Statistics {
fn add_annotation(&self) {
self.annotations.fetch_add(1, Ordering::SeqCst);
}
fn add_failed_annotation(&self) {
self.failed_annotations.fetch_add(1, Ordering::SeqCst);
}
}

impl Statistics {
fn new() -> Self {
Self {
bytes_read: AtomicUsize::new(0),
annotations: AtomicUsize::new(0),
failed_annotations: AtomicUsize::new(0),
}
}

Expand All @@ -337,14 +360,32 @@ impl Statistics {
pub fn num_annotations(&self) -> usize {
self.annotations.load(Ordering::SeqCst)
}

#[inline]
pub fn num_failed_annotations(&self) -> usize {
self.failed_annotations.load(Ordering::SeqCst)
}
}

fn basic_statistics(statistics: &Statistics) -> (u64, String) {
let bytes_read = statistics.bytes_read();
let num_annotations = statistics.num_annotations();
let num_failed_annotations = statistics.num_failed_annotations();

let failed_annotations_string = if num_failed_annotations > 0 {
format!(" {num_failed_annotations} {}", "skipped".dimmed())
} else {
String::new()
};

(
bytes_read as u64,
format!("{} {}", num_annotations, "annotations".dimmed(),),
format!(
"{} {}{}",
num_annotations,
"annotations".dimmed(),
failed_annotations_string
),
)
}

Expand Down
39 changes: 37 additions & 2 deletions cli/src/commands/create/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ pub struct CreateCommentsArgs {
#[structopt(short = "y", long = "yes")]
/// Consent to ai unit charge. Suppresses confirmation prompt.
yes: bool,

#[structopt(long = "resume-on-error")]
/// Whether to attempt to resume processing on error
resume_on_error: bool,
}

pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Result<()> {
Expand Down Expand Up @@ -152,6 +156,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re
args.use_moon_forms,
args.no_charge,
pool,
args.resume_on_error,
)?;
if let Some(mut progress) = progress {
progress.done();
Expand Down Expand Up @@ -180,6 +185,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re
args.use_moon_forms,
args.no_charge,
pool,
args.resume_on_error,
)?;
statistics
}
Expand Down Expand Up @@ -330,6 +336,7 @@ fn upload_comments_from_reader(
use_moon_forms: bool,
no_charge: bool,
pool: &mut Pool,
resume_on_error: bool,
) -> Result<()> {
assert!(batch_size > 0);

Expand Down Expand Up @@ -415,6 +422,7 @@ fn upload_comments_from_reader(
dataset_name,
use_moon_forms,
pool,
resume_on_error,
)?;
}
}
Expand Down Expand Up @@ -442,6 +450,7 @@ fn upload_comments_from_reader(
dataset_name,
use_moon_forms,
pool,
resume_on_error,
)?;
}
}
Expand All @@ -464,12 +473,16 @@ pub struct Statistics {
updated: AtomicUsize,
unchanged: AtomicUsize,
annotations: AtomicUsize,
failed_annotations: AtomicUsize,
}

impl AnnotationStatistic for Statistics {
fn add_annotation(&self) {
self.annotations.fetch_add(1, Ordering::SeqCst);
}
fn add_failed_annotation(&self) {
self.failed_annotations.fetch_add(1, Ordering::SeqCst);
}
}

impl Statistics {
Expand All @@ -481,6 +494,7 @@ impl Statistics {
updated: AtomicUsize::new(0),
unchanged: AtomicUsize::new(0),
annotations: AtomicUsize::new(0),
failed_annotations: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -526,6 +540,11 @@ impl Statistics {
fn num_annotations(&self) -> usize {
self.annotations.load(Ordering::SeqCst)
}

#[inline]
fn num_failed_annotations(&self) -> usize {
self.failed_annotations.load(Ordering::SeqCst)
}
}

/// Detailed statistics - only make sense if using --overwrite (i.e. exclusively sync endpoint)
Expand All @@ -537,10 +556,17 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) {
let num_updated = statistics.num_updated();
let num_unchanged = statistics.num_unchanged();
let num_annotations = statistics.num_annotations();
let num_failed_annotations = statistics.num_failed_annotations();
let failed_annotations_string = if num_failed_annotations > 0 {
format!(" {num_failed_annotations} {}", "skipped".dimmed())
} else {
String::new()
};

(
bytes_read as u64,
format!(
"{} {}: {} {} {} {} {} {} [{} {}]",
"{} {}: {} {} {} {} {} {} [{} {}{}]",
num_uploaded.to_string().bold(),
"comments".dimmed(),
num_new,
Expand All @@ -551,6 +577,7 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) {
"nop".dimmed(),
num_annotations,
"annotations".dimmed(),
failed_annotations_string
),
)
}
Expand All @@ -560,14 +587,22 @@ fn basic_statistics(statistics: &Statistics) -> (u64, String) {
let bytes_read = statistics.bytes_read();
let num_uploaded = statistics.num_uploaded();
let num_annotations = statistics.num_annotations();
let num_failed_annotations = statistics.num_failed_annotations();
let failed_annotations_string = if num_failed_annotations > 0 {
format!(" {num_failed_annotations} {}", "skipped".dimmed())
} else {
String::new()
};

(
bytes_read as u64,
format!(
"{} {} [{} {}]",
"{} {} [{} {}{}]",
num_uploaded.to_string().bold(),
"comments".dimmed(),
num_annotations,
"annotations".dimmed(),
failed_annotations_string
),
)
}
Expand Down

0 comments on commit b90a6b6

Please sign in to comment.