diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e0a0669..f51d8c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - Add ability to download attachments for comments - Increase default http timeout to 120s - Add `--resume-on-error` flag when creating annotations +- Add ability to upload attachment content for comments # v0.28.0 - Add general fields to `create datasets` diff --git a/api/src/error.rs b/api/src/error.rs index 1c051372..75543e69 100644 --- a/api/src/error.rs +++ b/api/src/error.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use reqwest::StatusCode; pub type Result = std::result::Result; @@ -16,6 +18,9 @@ pub enum Error { #[error("Bad token: {}", token)] BadToken { token: String }, + #[error("File does not exist : {}", path.to_string_lossy())] + FileDoesNotExist { path: PathBuf }, + #[error("Expected / or a source id, got: {}", identifier)] BadSourceIdentifier { identifier: String }, diff --git a/api/src/lib.rs b/api/src/lib.rs index f10590a9..034eb7c9 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -13,6 +13,7 @@ use reqwest::{ IntoUrl, Proxy, Result as ReqwestResult, }; use resources::{ + attachments::UploadAttachmentResponse, bucket_statistics::GetBucketStatisticsResponse, comment::{AttachmentReference, CommentTimestampFilter}, dataset::{ @@ -36,7 +37,13 @@ use resources::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{cell::Cell, fmt::Display, io::Read, path::Path, time::Duration}; +use std::{ + cell::Cell, + fmt::Display, + io::Read, + path::{Path, PathBuf}, + time::Duration, +}; use url::Url; use crate::resources::{ @@ -622,10 +629,59 @@ impl Client { Ok(()) } + pub fn upload_comment_attachment( + &self, + source_id: &SourceId, + comment_id: &CommentId, + attachment_index: usize, + attachment: &PathBuf, + ) -> Result { + let url = self + .endpoints + .attachment_upload(source_id, comment_id, attachment_index)?; + + if !attachment.is_file() || !attachment.exists() { + return Err(Error::FileDoesNotExist { + path: attachment.clone(), + }); + } + + let do_request = || { + let form = Form::new() + .file("file", attachment) + .map_err(|source| Error::Unknown { + message: "PUT comment attachment operation failed".to_owned(), + source: source.into(), + }) + .unwrap(); + let request = self + .http_client + .request(Method::PUT, url.clone()) + .multipart(form) + .headers(self.headers.clone()); + + request.send() + }; + + let result = self.with_retries(do_request); + + let http_response = result.map_err(|source| Error::ReqwestError { + source, + message: "Operation failed.".to_string(), + })?; + + let status = http_response.status(); + + http_response + .json::>() + .map_err(Error::BadJsonResponse)? + .into_result(status) + } + pub fn get_attachment(&self, reference: &AttachmentReference) -> Result> { let mut response = self.raw_request( &Method::GET, - &self.endpoints.attachment(reference)?, + &self.endpoints.attachment_reference(reference)?, &None::<()>, &None::<()>, &Retry::Yes, @@ -1528,10 +1584,31 @@ impl Endpoints { construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0]) } - fn attachment(&self, reference: &AttachmentReference) -> Result { + fn attachment_reference(&self, reference: &AttachmentReference) -> Result { construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0]) } + fn attachment_upload( + &self, + source_id: &SourceId, + comment_id: &CommentId, + attachment_index: usize, + ) -> Result { + construct_endpoint( + &self.base, + &[ + "api", + "_private", + "sources", + &format!("id:{}", source_id.0), + "comments", + &comment_id.0, + "attachments", + &attachment_index.to_string(), + ], + ) + } + fn validation( &self, dataset_name: &DatasetFullName, diff --git a/api/src/resources/attachments.rs b/api/src/resources/attachments.rs new file mode 100644 index 00000000..86c3b162 --- /dev/null +++ b/api/src/resources/attachments.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct ContentHash(pub String); + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct UploadAttachmentResponse { + pub content_hash: ContentHash, +} diff --git a/api/src/resources/comment.rs b/api/src/resources/comment.rs index 4714f95f..1f4e0fef 100644 --- a/api/src/resources/comment.rs +++ b/api/src/resources/comment.rs @@ -23,6 +23,8 @@ use std::{ str::FromStr, }; +use super::attachments::ContentHash; + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Id(pub String); @@ -295,9 +297,6 @@ pub enum Sentiment { #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct AttachmentReference(pub String); -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub struct ContentHash(pub String); - #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct AttachmentMetadata { pub name: String, diff --git a/api/src/resources/mod.rs b/api/src/resources/mod.rs index 061f5a43..738f8b60 100644 --- a/api/src/resources/mod.rs +++ b/api/src/resources/mod.rs @@ -1,3 +1,4 @@ +pub mod attachments; pub mod audit; pub mod bucket; pub mod bucket_statistics; diff --git a/cli/src/commands/create/annotations.rs b/cli/src/commands/create/annotations.rs index 285dbb27..129ceb8d 100644 --- a/cli/src/commands/create/annotations.rs +++ b/cli/src/commands/create/annotations.rs @@ -140,6 +140,11 @@ pub trait AnnotationStatistic { fn add_failed_annotation(&self); } +pub trait AttachmentStatistic { + fn add_attachment(&self); + fn add_failed_attachment(&self); +} + #[allow(clippy::too_many_arguments)] pub fn upload_batch_of_annotations( annotations_to_upload: &mut Vec, diff --git a/cli/src/commands/create/comments.rs b/cli/src/commands/create/comments.rs index e18a7c38..e7742656 100644 --- a/cli/src/commands/create/comments.rs +++ b/cli/src/commands/create/comments.rs @@ -3,7 +3,7 @@ use crate::{ create::annotations::{ upload_batch_of_annotations, AnnotationStatistic, CommentIdComment, NewAnnotation, }, - ensure_uip_user_consents_to_ai_unit_charge, + ensure_uip_user_consents_to_ai_unit_charge, LocalAttachmentPath, }, progress::{Options as ProgressOptions, Progress}, }; @@ -11,15 +11,15 @@ use anyhow::{anyhow, ensure, Context, Result}; use colored::Colorize; use log::{debug, info}; use reinfer_client::{ - Client, CommentId, DatasetFullName, DatasetIdentifier, NewAnnotatedComment, NewComment, Source, - SourceIdentifier, + resources::comment::AttachmentMetadata, Client, CommentId, DatasetFullName, DatasetIdentifier, + NewAnnotatedComment, NewComment, Source, SourceId, SourceIdentifier, }; use scoped_threadpool::Pool; use std::{ collections::HashSet, fs::File, io::{self, BufRead, BufReader, Seek}, - path::PathBuf, + path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -28,6 +28,8 @@ use std::{ use structopt::StructOpt; +use super::annotations::AttachmentStatistic; + #[derive(Debug, StructOpt)] pub struct CreateCommentsArgs { #[structopt(short = "f", long = "file", parse(from_os_str))] @@ -78,6 +80,10 @@ pub struct CreateCommentsArgs { #[structopt(long = "resume-on-error")] /// Whether to attempt to resume processing on error resume_on_error: bool, + + #[structopt(short = "a", long = "attachments", parse(from_os_str))] + /// Path to folder containing the attachemtns to upload + attachments_dir: Option, } pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Result<()> { @@ -85,6 +91,19 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re ensure_uip_user_consents_to_ai_unit_charge(client.base_url())?; } + ensure!(args.batch_size > 0, "--batch-size must be greater than 0"); + + if let Some(attachments_dir) = &args.attachments_dir { + ensure!( + attachments_dir.is_dir(), + "--attachments must be a directory" + ); + ensure!( + attachments_dir.exists(), + "--attachments directory must exist" + ) + } + let source = client .get_source(args.source.clone()) .with_context(|| format!("Unable to get source {}", args.source))?; @@ -101,8 +120,6 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re None => None, }; - ensure!(args.batch_size > 0, "--batch-size must be greater than 0"); - let statistics = match &args.comments_path { Some(comments_path) => { info!( @@ -157,6 +174,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re args.no_charge, pool, args.resume_on_error, + &args.attachments_dir, )?; if let Some(mut progress) = progress { progress.done(); @@ -186,6 +204,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs, pool: &mut Pool) -> Re args.no_charge, pool, args.resume_on_error, + &args.attachments_dir, )?; statistics } @@ -262,6 +281,67 @@ fn check_no_duplicate_ids(comments: impl BufRead) -> Result<()> { Ok(()) } +fn upload_local_attachment( + comment_id: &CommentId, + attachment: &mut AttachmentMetadata, + index: usize, + client: &Client, + attachments_dir: &Path, + source_id: &SourceId, +) -> Result<()> { + let local_attachemnt = LocalAttachmentPath { + index, + name: attachment.name.clone(), + parent_dir: attachments_dir.join(&comment_id.0), + }; + + match client.upload_comment_attachment(source_id, comment_id, index, &local_attachemnt.path()) { + Ok(response) => { + attachment.attachment_reference = None; + attachment.content_hash = Some(response.content_hash); + Ok(()) + } + Err(err) => { + attachment.attachment_reference = None; + Err(anyhow::Error::msg(err)) + } + } +} + +fn upload_attachments_for_comments( + client: &Client, + comments: &mut [NewComment], + attachments_dir: &Path, + statistics: &Statistics, + source_id: &SourceId, + resume_on_error: bool, +) -> Result<()> { + for comment in comments.iter_mut() { + for (index, attachment) in comment.attachments.iter_mut().enumerate() { + match upload_local_attachment( + &comment.id, + attachment, + index, + client, + attachments_dir, + source_id, + ) { + Ok(_) => { + statistics.add_attachment(); + } + Err(err) => { + if resume_on_error { + statistics.add_failed_attachment(); + } else { + return Err(err); + } + } + } + } + } + Ok(()) +} + #[allow(clippy::too_many_arguments)] fn upload_batch_of_comments( client: &Client, @@ -271,6 +351,8 @@ fn upload_batch_of_comments( comments_to_sync: &mut Vec, audio_paths: &mut Vec<(CommentId, PathBuf)>, no_charge: bool, + attachments_dir: &Option, + resume_on_error: bool, ) -> Result<()> { let mut uploaded = 0; let mut new = 0; @@ -279,6 +361,17 @@ fn upload_batch_of_comments( // Upload comments if !comments_to_put.is_empty() { + if let Some(attachments_dir) = attachments_dir { + upload_attachments_for_comments( + client, + comments_to_put, + attachments_dir, + statistics, + &source.id, + resume_on_error, + )?; + } + client .put_comments(&source.full_name(), comments_to_put, no_charge) .context("Could not put batch of comments")?; @@ -287,6 +380,17 @@ fn upload_batch_of_comments( } if !comments_to_sync.is_empty() { + if let Some(attachments_dir) = attachments_dir { + upload_attachments_for_comments( + client, + comments_to_sync, + attachments_dir, + statistics, + &source.id, + resume_on_error, + )?; + } + let result = client .sync_comments(&source.full_name(), comments_to_sync, no_charge) .context("Could not sync batch of comments")?; @@ -337,6 +441,7 @@ fn upload_comments_from_reader( no_charge: bool, pool: &mut Pool, resume_on_error: bool, + attachments_dir: &Option, ) -> Result<()> { assert!(batch_size > 0); @@ -399,6 +504,8 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + attachments_dir, + resume_on_error, )?; } @@ -412,6 +519,8 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + attachments_dir, + resume_on_error, )?; upload_batch_of_annotations( @@ -437,6 +546,8 @@ fn upload_comments_from_reader( &mut comments_to_sync, &mut audio_paths, no_charge, + attachments_dir, + resume_on_error, )?; } @@ -474,6 +585,8 @@ pub struct Statistics { unchanged: AtomicUsize, annotations: AtomicUsize, failed_annotations: AtomicUsize, + attachments: AtomicUsize, + failed_attachments: AtomicUsize, } impl AnnotationStatistic for Statistics { @@ -484,6 +597,14 @@ impl AnnotationStatistic for Statistics { self.failed_annotations.fetch_add(1, Ordering::SeqCst); } } +impl AttachmentStatistic for Statistics { + fn add_attachment(&self) { + self.attachments.fetch_add(1, Ordering::SeqCst); + } + fn add_failed_attachment(&self) { + self.failed_attachments.fetch_add(1, Ordering::SeqCst); + } +} impl Statistics { fn new() -> Self { @@ -495,6 +616,8 @@ impl Statistics { unchanged: AtomicUsize::new(0), annotations: AtomicUsize::new(0), failed_annotations: AtomicUsize::new(0), + attachments: AtomicUsize::new(0), + failed_attachments: AtomicUsize::new(0), } } @@ -545,6 +668,16 @@ impl Statistics { fn num_failed_annotations(&self) -> usize { self.failed_annotations.load(Ordering::SeqCst) } + + #[inline] + fn num_attachments(&self) -> usize { + self.attachments.load(Ordering::SeqCst) + } + + #[inline] + fn num_failed_attachments(&self) -> usize { + self.failed_attachments.load(Ordering::SeqCst) + } } /// Detailed statistics - only make sense if using --overwrite (i.e. exclusively sync endpoint) @@ -557,16 +690,33 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { let num_unchanged = statistics.num_unchanged(); let num_annotations = statistics.num_annotations(); let num_failed_annotations = statistics.num_failed_annotations(); + let num_attachments = statistics.num_attachments(); + let num_failed_attachments = statistics.num_failed_attachments(); + let failed_annotations_string = if num_failed_annotations > 0 { format!(" {num_failed_annotations} {}", "skipped".dimmed()) } else { String::new() }; + let failed_attachments_string = if num_failed_attachments > 0 { + format!(" {num_failed_attachments} {}", "skipped".dimmed()) + } else { + String::new() + }; + + let attachments_string = if (num_attachments + num_failed_attachments) > 0 { + format!( + " [{num_attachments} {}{failed_attachments_string}]", + "attachments".dimmed() + ) + } else { + String::new() + }; ( bytes_read as u64, format!( - "{} {}: {} {} {} {} {} {} [{} {}{}]", + "{} {}: {} {} {} {} {} {} [{} {}{}]{}", num_uploaded.to_string().bold(), "comments".dimmed(), num_new, @@ -577,7 +727,8 @@ fn detailed_statistics(statistics: &Statistics) -> (u64, String) { "nop".dimmed(), num_annotations, "annotations".dimmed(), - failed_annotations_string + failed_annotations_string, + attachments_string ), ) } @@ -588,21 +739,40 @@ fn basic_statistics(statistics: &Statistics) -> (u64, String) { let num_uploaded = statistics.num_uploaded(); let num_annotations = statistics.num_annotations(); let num_failed_annotations = statistics.num_failed_annotations(); + let num_attachments = statistics.num_attachments(); + let num_failed_attachments = statistics.num_failed_attachments(); + let failed_annotations_string = if num_failed_annotations > 0 { format!(" {num_failed_annotations} {}", "skipped".dimmed()) } else { String::new() }; + let failed_attachments_string = if num_failed_attachments > 0 { + format!(" {num_failed_attachments} {}", "skipped".dimmed()) + } else { + String::new() + }; + + let attachments_string = if (num_attachments + num_failed_attachments) > 0 { + format!( + " [{num_attachments} {}{failed_attachments_string}]", + "attachments".dimmed() + ) + } else { + String::new() + }; + ( bytes_read as u64, format!( - "{} {} [{} {}{}]", + "{} {} [{} {}{}]{}", num_uploaded.to_string().bold(), "comments".dimmed(), num_annotations, "annotations".dimmed(), - failed_annotations_string + failed_annotations_string, + attachments_string ), ) } diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index 1e5cacc4..9915ca96 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -4,7 +4,7 @@ use std::{ path::PathBuf, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use dialoguer::Confirm; use once_cell::sync::Lazy; use reinfer_client::TransformTag; @@ -72,7 +72,7 @@ impl LocalAttachmentPath { self.ensure_parent_dir_exists()?; if !self.path().is_file() { - let f = File::create(self.path()).expect("Could not create attachment output file"); + let f = File::create(self.path()).context("Could not create attachment output file")?; let mut buf_writer = BufWriter::new(f); buf_writer.write_all(&buf_to_write)?;