From ed345d477fe2d9e1a0fecf6728f5bf9f1406653a Mon Sep 17 00:00:00 2001 From: Joe Prosser Date: Mon, 22 Jul 2024 20:53:56 +0100 Subject: [PATCH 1/2] feat(commands): add ability to download attachment content for comments --- CHANGELOG.md | 1 + api/src/lib.rs | 65 ++++++++++-- api/src/resources/comment.rs | 10 ++ cli/src/commands/get/comments.rs | 172 ++++++++++++++++++++++++++++--- cli/src/commands/mod.rs | 43 ++++++++ 5 files changed, 263 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e3f1b8..b6a1abf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Unreleased - Add `config parse-from-url` command for parsing configuration from a URL +- Add ability to download attachments for comments # v0.28.0 - Add general fields to `create datasets` diff --git a/api/src/lib.rs b/api/src/lib.rs index ec4be82..2959a14 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -14,7 +14,7 @@ use reqwest::{ }; use resources::{ bucket_statistics::GetBucketStatisticsResponse, - comment::CommentTimestampFilter, + comment::{AttachmentReference, CommentTimestampFilter}, dataset::{ QueryRequestParams, QueryResponse, StatisticsRequestParams as DatasetStatisticsRequestParams, SummaryRequestParams, @@ -36,7 +36,7 @@ use resources::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{cell::Cell, fmt::Display, path::Path}; +use std::{cell::Cell, fmt::Display, io::Read, path::Path}; use url::Url; use crate::resources::{ @@ -622,6 +622,27 @@ impl Client { Ok(()) } + pub fn get_attachment(&self, reference: &AttachmentReference) -> Result> { + let mut response = self.raw_request( + &Method::GET, + &self.endpoints.attachment(reference)?, + &None::<()>, + &None::<()>, + &Retry::Yes, + )?; + + let mut buffer = Vec::new(); + + response + .read_to_end(&mut buffer) + .map_err(|source| Error::Unknown { + message: "Failed to read buffer".to_string(), + source: Box::new(source), + })?; + + Ok(buffer) + } + pub fn get_integrations(&self) -> Result> { Ok(self .get::<_, GetIntegrationsResponse>(self.endpoints.integrations()?)? @@ -1152,21 +1173,19 @@ impl Client { self.request(Method::PUT, url, Some(request), None::<()>, Retry::Yes) } - fn request( + fn raw_request( &self, - method: Method, - url: LocationT, - body: Option, - query: Option, - retry: Retry, - ) -> Result + method: &Method, + url: &LocationT, + body: &Option, + query: &Option, + retry: &Retry, + ) -> Result where LocationT: IntoUrl + Display + Clone, RequestT: Serialize, QueryT: Serialize, - for<'de> SuccessT: Deserialize<'de>, { - debug!("Attempting {} `{}`", method, url); let do_request = || { let request = self .http_client @@ -1192,6 +1211,26 @@ impl Client { message: format!("{method} operation failed."), })?; + Ok(http_response) + } + + fn request( + &self, + method: Method, + url: LocationT, + body: Option, + query: Option, + retry: Retry, + ) -> Result + where + LocationT: IntoUrl + Display + Clone, + RequestT: Serialize, + QueryT: Serialize, + for<'de> SuccessT: Deserialize<'de>, + { + debug!("Attempting {} `{}`", method, url); + let http_response = self.raw_request(&method, &url, &body, &query, &retry)?; + let status = http_response.status(); http_response @@ -1489,6 +1528,10 @@ impl Endpoints { construct_endpoint(&self.base, &["api", "_private", "integrations", &name.0]) } + fn attachment(&self, reference: &AttachmentReference) -> Result { + construct_endpoint(&self.base, &["api", "v1", "attachments", &reference.0]) + } + fn validation( &self, dataset_name: &DatasetFullName, diff --git a/api/src/resources/comment.rs b/api/src/resources/comment.rs index f1c127f..4714f95 100644 --- a/api/src/resources/comment.rs +++ b/api/src/resources/comment.rs @@ -292,11 +292,21 @@ pub enum Sentiment { Negative, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct AttachmentReference(pub String); + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct ContentHash(pub String); + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct AttachmentMetadata { pub name: String, pub size: u64, pub content_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub attachment_reference: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub content_hash: Option, } #[derive(Debug, Clone, PartialEq, Default, Eq)] diff --git a/cli/src/commands/get/comments.rs b/cli/src/commands/get/comments.rs index fd5849c..bea097e 100644 --- a/cli/src/commands/get/comments.rs +++ b/cli/src/commands/get/comments.rs @@ -17,16 +17,17 @@ use reinfer_client::{ }, source::StatisticsRequestParams as SourceStatisticsRequestParams, }, - AnnotatedComment, Client, CommentFilter, CommentId, CommentsIterTimerange, DatasetFullName, - DatasetIdentifier, Entities, HasAnnotations, LabelName, Labelling, ModelVersion, - PredictedLabel, PropertyValue, Source, SourceIdentifier, DEFAULT_LABEL_GROUP_NAME, + AnnotatedComment, Client, Comment, CommentFilter, CommentId, CommentsIterTimerange, + DatasetFullName, DatasetIdentifier, Entities, HasAnnotations, LabelName, Labelling, + ModelVersion, PredictedLabel, PropertyValue, Source, SourceIdentifier, + DEFAULT_LABEL_GROUP_NAME, }; use serde::Deserialize; use std::{ collections::HashMap, - fs::File, + fs::{create_dir, File}, io::{self, BufWriter, Write}, - path::PathBuf, + path::{Path, PathBuf}, str::FromStr, sync::{ atomic::{AtomicUsize, Ordering}, @@ -36,6 +37,7 @@ use std::{ use structopt::StructOpt; use crate::{ + commands::LocalAttachmentPath, printer::print_resources_as_json, progress::{Options as ProgressOptions, Progress}, }; @@ -116,6 +118,10 @@ pub struct GetManyCommentsArgs { #[structopt(long = "attachment-types")] /// The list of attachment types to filter to attachment_type_filters: Vec, + + #[structopt(long = "attachments")] + /// Save attachment content for each comment + include_attachment_content: Option, } #[derive(Debug, Deserialize)] @@ -238,6 +244,43 @@ fn get_user_properties_filter_interactively( Ok(UserPropertiesFilter(filters)) } +#[derive(Default)] +struct OutputLocations { + jsonl_file: Option>, + attachments_dir: Option, +} + +fn get_output_locations(path: &Option) -> Result { + if let Some(path) = path { + let jsonl_file = Some( + File::create(path) + .with_context(|| format!("Could not open file for writing `{}`", path.display())) + .map(BufWriter::new)?, + ); + + let attachments_dir = path + .parent() + .context("Could not get attachments directory")? + .join(format!( + "{0}.attachments", + path.file_name() + .context("Could not get output file name")? + .to_string_lossy() + )); + + if !attachments_dir.exists() { + create_dir(&attachments_dir)?; + } + + Ok(OutputLocations { + jsonl_file, + attachments_dir: Some(attachments_dir), + }) + } else { + Ok(OutputLocations::default()) + } +} + pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { let GetManyCommentsArgs { source, @@ -255,6 +298,7 @@ pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { interactive_property_filter: interative_property_filter, recipients, senders, + include_attachment_content, } = args; let by_timerange = from_timestamp.is_some() || to_timestamp.is_some(); @@ -309,14 +353,14 @@ pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { bail!("Cannot filter on `senders` or `recipients` when `dataset` is not provided") } - let file = match path { - Some(path) => Some( - File::create(path) - .with_context(|| format!("Could not open file for writing `{}`", path.display())) - .map(BufWriter::new)?, - ), - None => None, - }; + if path.is_none() && include_attachment_content.is_some() { + bail!("Cannot include attachment content when no file is provided") + } + + let OutputLocations { + jsonl_file, + attachments_dir, + } = get_output_locations(path)?; let mut label_attribute_filter: Option = None; if let (Some(dataset_id), Some(filter)) = (dataset, label_filter) { @@ -386,9 +430,10 @@ pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { user_properties_filter, attachment_property_types_filter, messages_filter: Some(messages_filter), + attachments_dir, }; - if let Some(file) = file { + if let Some(file) = jsonl_file { download_comments(client, source.clone(), file, download_options) } else { download_comments( @@ -439,6 +484,7 @@ struct CommentDownloadOptions { attachment_property_types_filter: Option, user_properties_filter: Option, messages_filter: Option, + attachments_dir: Option, } impl CommentDownloadOptions { @@ -508,6 +554,7 @@ fn download_comments( }, &statistics, dataset_name.is_some(), + options.attachments_dir.is_some(), )) }; @@ -529,6 +576,7 @@ fn download_comments( source, &statistics, options.include_predictions, + options.attachments_dir, writer, )?; } else { @@ -554,6 +602,13 @@ fn download_comments( .try_for_each(|page| { let page = page.context("Operation to get comments has failed.")?; statistics.add_comments(page.len()); + + if let Some(attachments_dir) = &options.attachments_dir { + page.iter().try_for_each(|comment| -> Result<()> { + download_comment_attachments(client, attachments_dir, comment, &statistics) + })?; + } + print_resources_as_json( page.into_iter().map(|comment| AnnotatedComment { comment, @@ -625,7 +680,7 @@ fn get_comments_from_uids( .context("Operation to get predictions has failed.")?; // since predict-comments endpoint doesn't return some fields, // they are set to None or [] here - let comments = + let mut comments = page.into_iter() .zip(predictions.into_iter()) .map(|(comment, prediction)| AnnotatedComment { @@ -659,9 +714,20 @@ fn get_comments_from_uids( moon_forms: None, label_properties: None, }); + + if let Some(attachments_dir) = &options.attachments_dir { + comments.try_for_each(|comment| -> Result<()> { + download_comment_attachments( + client, + attachments_dir, + &comment.comment, + statistics, + ) + })?; + } print_resources_as_json(comments, &mut writer) } else { - let comments = page.into_iter().map(|mut annotated_comment| { + let mut comments = page.into_iter().map(|mut annotated_comment| { if !include_predictions { annotated_comment = annotated_comment.without_predictions(); } @@ -670,18 +736,58 @@ fn get_comments_from_uids( } annotated_comment }); + if let Some(attachments_dir) = &options.attachments_dir { + comments.try_for_each(|comment| -> Result<()> { + download_comment_attachments( + client, + attachments_dir, + &comment.comment, + statistics, + ) + })?; + } print_resources_as_json(comments, &mut writer) } })?; Ok(()) } +fn download_comment_attachments( + client: &Client, + attachments_dir: &Path, + comment: &Comment, + statistics: &Arc, +) -> Result<()> { + comment + .attachments + .iter() + .enumerate() + .try_for_each(|(idx, attachment)| -> Result<()> { + if let Some(attachment_reference) = &attachment.attachment_reference { + let local_attachment = LocalAttachmentPath { + index: idx, + name: attachment.name.clone(), + parent_dir: attachments_dir.join(&comment.id.0), + }; + + let attachment_buf = client.get_attachment(attachment_reference)?; + + if local_attachment.write(attachment_buf)? { + statistics.add_attachments(1); + }; + } + Ok(()) + })?; + Ok(()) +} + fn get_reviewed_comments_in_bulk( client: &Client, dataset_name: DatasetFullName, source: Source, statistics: &Arc, include_predictions: bool, + attachments_dir: Option, mut writer: impl Write, ) -> Result<()> { client @@ -690,6 +796,18 @@ fn get_reviewed_comments_in_bulk( let page = page.context("Operation to get labellings has failed.")?; statistics.add_comments(page.len()); statistics.add_annotated(page.len()); + + if let Some(attachments_dir) = &attachments_dir { + page.iter().try_for_each(|comment| -> Result<()> { + download_comment_attachments( + client, + attachments_dir, + &comment.comment, + statistics, + ) + })?; + } + let comments = page.into_iter().map(|comment| { if !include_predictions { comment.without_predictions() @@ -697,6 +815,7 @@ fn get_reviewed_comments_in_bulk( comment } }); + print_resources_as_json(comments, &mut writer) })?; Ok(()) @@ -706,6 +825,7 @@ fn get_reviewed_comments_in_bulk( pub struct Statistics { downloaded: AtomicUsize, annotated: AtomicUsize, + attachments: AtomicUsize, } impl Statistics { @@ -713,6 +833,7 @@ impl Statistics { Self { downloaded: AtomicUsize::new(0), annotated: AtomicUsize::new(0), + attachments: AtomicUsize::new(0), } } @@ -721,6 +842,11 @@ impl Statistics { self.downloaded.fetch_add(num_downloaded, Ordering::SeqCst); } + #[inline] + fn add_attachments(&self, num_downloaded: usize) { + self.attachments.fetch_add(num_downloaded, Ordering::SeqCst); + } + #[inline] fn add_annotated(&self, num_downloaded: usize) { self.annotated.fetch_add(num_downloaded, Ordering::SeqCst); @@ -731,6 +857,11 @@ impl Statistics { self.downloaded.load(Ordering::SeqCst) } + #[inline] + fn num_attachments(&self) -> usize { + self.attachments.load(Ordering::SeqCst) + } + #[inline] fn num_annotated(&self) -> usize { self.annotated.load(Ordering::SeqCst) @@ -741,21 +872,28 @@ fn get_comments_progress_bar( total_bytes: u64, statistics: &Arc, show_annotated: bool, + show_attachments: bool, ) -> Progress { Progress::new( move |statistics| { let num_downloaded = statistics.num_downloaded(); let num_annotated = statistics.num_annotated(); + let num_attachments = statistics.num_attachments(); ( num_downloaded as u64, format!( - "{} {}{}", + "{} {}{}{}", num_downloaded.to_string().bold(), "comments".dimmed(), if show_annotated { format!(" [{} {}]", num_annotated, "annotated".dimmed()) } else { "".into() + }, + if show_attachments { + format!(" [{} {}]", num_attachments, "attachments".dimmed()) + } else { + "".into() } ), ) diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index 8657ee9..51187fd 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -1,3 +1,9 @@ +use std::{ + fs::{create_dir, File}, + io::{BufWriter, Write}, + path::PathBuf, +}; + use anyhow::{anyhow, Result}; use dialoguer::Confirm; use once_cell::sync::Lazy; @@ -39,3 +45,40 @@ Do you want to continue?"#, static DEFAULT_TRANSFORM_TAG: Lazy = Lazy::new(|| TransformTag("generic.0.CONVKER5".to_string())); + +pub struct LocalAttachmentPath { + index: usize, + name: String, + parent_dir: PathBuf, +} + +impl LocalAttachmentPath { + fn ensure_parent_dir_exists(&self) -> Result<()> { + if !self.parent_dir.exists() { + create_dir(&self.parent_dir)?; + } + Ok(()) + } + + fn name(&self) -> String { + format!("{0}.{1}", self.index, self.name) + } + + fn path(&self) -> PathBuf { + self.parent_dir.join(self.name()) + } + + pub fn write(&self, buf_to_write: Vec) -> Result { + self.ensure_parent_dir_exists()?; + + if !self.path().is_file() { + let f = File::create(&self.path()).expect("Could not create attachment output file"); + + let mut buf_writer = BufWriter::new(f); + buf_writer.write_all(&buf_to_write)?; + Ok(true) + } else { + Ok(false) + } + } +} From f6be1ea097696205985be77d64fa992a2dd5c24a Mon Sep 17 00:00:00 2001 From: Joe Prosser Date: Tue, 23 Jul 2024 11:22:35 +0100 Subject: [PATCH 2/2] fix clippy --- cli/src/commands/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index 51187fd..1e5cacc 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -72,7 +72,7 @@ impl LocalAttachmentPath { self.ensure_parent_dir_exists()?; if !self.path().is_file() { - let f = File::create(&self.path()).expect("Could not create attachment output file"); + let f = File::create(self.path()).expect("Could not create attachment output file"); let mut buf_writer = BufWriter::new(f); buf_writer.write_all(&buf_to_write)?;