From 0c22d508e81f8e1f01be78bd33cb7b0a475e1599 Mon Sep 17 00:00:00 2001 From: joe-prosser Date: Wed, 20 Mar 2024 11:45:32 +0000 Subject: [PATCH] feat(commands): add get emails (#262) * feat(commands): add get emails --------- Co-authored-by: Irina Gossmann --- CHANGELOG.md | 3 +- api/src/lib.rs | 111 +++++++++++++++++++++- api/src/resources/bucket_statistics.rs | 11 +++ api/src/resources/email.rs | 9 ++ api/src/resources/mod.rs | 1 + cli/src/commands/get/emails.rs | 122 +++++++++++++++++++++++++ cli/src/commands/get/mod.rs | 7 ++ cli/src/printer.rs | 4 +- 8 files changed, 261 insertions(+), 7 deletions(-) create mode 100644 api/src/resources/bucket_statistics.rs create mode 100644 cli/src/commands/get/emails.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index eb41d585..8be9efe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Unreleased +- Add `get emails` - Added support for `--auto-increase-up-to` when creating quotas. # v0.22.2 @@ -60,7 +61,7 @@ ## v0.18.2 --Add ability to filter on user properties when getting comments +- Add ability to filter on user properties when getting comments ## v0.18.1 diff --git a/api/src/lib.rs b/api/src/lib.rs index dff93aad..efac26d2 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -13,6 +13,7 @@ use reqwest::{ IntoUrl, Proxy, Result as ReqwestResult, }; use resources::{ + bucket_statistics::GetBucketStatisticsResponse, comment::CommentTimestampFilter, dataset::{ QueryRequestParams, QueryResponse, @@ -44,6 +45,7 @@ use crate::resources::{ CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse, GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse, }, + bucket_statistics::Statistics as BucketStatistics, comment::{ GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse, GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage, @@ -102,7 +104,10 @@ pub use crate::{ Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier, ModelVersion, Name as DatasetName, NewDataset, UpdateDataset, }, - email::{Id as EmailId, Mailbox, MimeContent, NewEmail}, + email::{ + Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent, + NewEmail, + }, entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef}, integration::FullName as IntegrationFullName, label_def::{ @@ -117,7 +122,7 @@ pub use crate::{ FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier, Name as SourceName, NewSource, Source, SourceKind, UpdateSource, }, - statistics::Statistics, + statistics::Statistics as CommentStatistics, stream::{ Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId, Stream, StreamException, StreamExceptionMetadata, @@ -186,6 +191,13 @@ pub struct GetCommentsIterPageQuery<'a> { pub include_markup: bool, } +#[derive(Serialize)] +pub struct GetEmailsIterPageQuery<'a> { + #[serde(skip_serializing_if = "Option::is_none")] + pub continuation: Option<&'a EmailContinuation>, + pub limit: usize, +} + #[derive(Serialize)] pub struct GetCommentQuery { pub include_markup: bool, @@ -373,6 +385,33 @@ impl Client { CommentsIter::new(self, source_name, page_size, timerange) } + /// Get a page of comments from a source. + pub fn get_emails_iter_page( + &self, + bucket_name: &BucketFullName, + continuation: Option<&EmailContinuation>, + limit: usize, + ) -> Result { + let query_params = GetEmailsIterPageQuery { + continuation, + limit, + }; + self.post( + self.endpoints.get_emails(bucket_name)?, + Some(&query_params), + Retry::Yes, + ) + } + + /// Iterate through all comments in a source. + pub fn get_emails_iter<'a>( + &'a self, + bucket_name: &'a BucketFullName, + page_size: Option, + ) -> EmailsIter<'a> { + EmailsIter::new(self, bucket_name, page_size) + } + /// Get a single comment by id. pub fn get_comment<'a>( &'a self, @@ -812,11 +851,17 @@ impl Client { Ok(()) } + pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result { + Ok(self + .get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)? + .statistics) + } + pub fn get_dataset_statistics( &self, dataset_name: &DatasetFullName, params: &DatasetStatisticsRequestParams, - ) -> Result { + ) -> Result { Ok(self .post::<_, _, GetStatisticsResponse>( self.endpoints.dataset_statistics(dataset_name)?, @@ -831,7 +876,7 @@ impl Client { &self, source_name: &SourceFullName, params: &SourceStatisticsRequestParams, - ) -> Result { + ) -> Result { Ok(self .post::<_, _, GetStatisticsResponse>( self.endpoints.source_statistics(source_name)?, @@ -1215,6 +1260,51 @@ pub enum ContinuationKind { Continuation(Continuation), } +pub struct EmailsIter<'a> { + client: &'a Client, + bucket_name: &'a BucketFullName, + continuation: Option, + done: bool, + page_size: usize, +} + +impl<'a> EmailsIter<'a> { + // Default number of emails per page to request from API. + pub const DEFAULT_PAGE_SIZE: usize = 64; + // Maximum number of emails per page which can be requested from the API. + pub const MAX_PAGE_SIZE: usize = 256; + + fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option) -> Self { + Self { + client, + bucket_name, + continuation: None, + done: false, + page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE), + } + } +} + +impl<'a> Iterator for EmailsIter<'a> { + type Item = Result>; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + let response = self.client.get_emails_iter_page( + self.bucket_name, + self.continuation.as_ref(), + self.page_size, + ); + Some(response.map(|page| { + self.continuation = page.continuation; + self.done = self.continuation.is_none(); + page.emails + })) + } +} + pub struct CommentsIter<'a> { client: &'a Client, source_name: &'a SourceFullName, @@ -1436,6 +1526,12 @@ impl Endpoints { ], ) } + fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result { + construct_endpoint( + &self.base, + &["api", "_private", "buckets", &bucket_name.0, "statistics"], + ) + } fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result { construct_endpoint( @@ -1649,6 +1745,13 @@ impl Endpoints { ) } + fn get_emails(&self, bucket_name: &BucketFullName) -> Result { + construct_endpoint( + &self.base, + &["api", "_private", "buckets", &bucket_name.0, "emails"], + ) + } + fn put_emails(&self, bucket_name: &BucketFullName) -> Result { construct_endpoint( &self.base, diff --git a/api/src/resources/bucket_statistics.rs b/api/src/resources/bucket_statistics.rs new file mode 100644 index 00000000..acf411e2 --- /dev/null +++ b/api/src/resources/bucket_statistics.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct GetBucketStatisticsResponse { + pub statistics: Statistics, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct Statistics { + pub count: usize, +} diff --git a/api/src/resources/email.rs b/api/src/resources/email.rs index 6ab17d64..c764fcc0 100644 --- a/api/src/resources/email.rs +++ b/api/src/resources/email.rs @@ -56,3 +56,12 @@ pub(crate) struct PutEmailsRequest<'request> { #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct PutEmailsResponse {} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] +pub struct Continuation(pub String); + +#[derive(Debug, Clone, Deserialize)] +pub struct EmailsIterPage { + pub emails: Vec, + pub continuation: Option, +} diff --git a/api/src/resources/mod.rs b/api/src/resources/mod.rs index 1ac80f55..061f5a43 100644 --- a/api/src/resources/mod.rs +++ b/api/src/resources/mod.rs @@ -1,5 +1,6 @@ pub mod audit; pub mod bucket; +pub mod bucket_statistics; pub mod comment; pub mod dataset; pub mod documents; diff --git a/cli/src/commands/get/emails.rs b/cli/src/commands/get/emails.rs new file mode 100644 index 00000000..ac2692ce --- /dev/null +++ b/cli/src/commands/get/emails.rs @@ -0,0 +1,122 @@ +use anyhow::{Context, Result}; + +use colored::Colorize; +use reinfer_client::{BucketIdentifier, Client}; +use std::{ + fs::File, + io::{self, BufWriter, Write}, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use structopt::StructOpt; + +use crate::{ + printer::print_resources_as_json, + progress::{Options as ProgressOptions, Progress}, +}; + +#[derive(Debug, StructOpt)] +pub struct GetManyEmailsArgs { + #[structopt(name = "bucket")] + /// Bucket name or id + bucket: BucketIdentifier, + + #[structopt(short = "f", long = "file", parse(from_os_str))] + /// Path where to write comments as JSON. If not specified, stdout will be used. + path: Option, +} + +pub fn get_many(client: &Client, args: &GetManyEmailsArgs) -> Result<()> { + let GetManyEmailsArgs { bucket, path } = args; + + let file = match path { + Some(path) => Some( + File::create(path) + .with_context(|| format!("Could not open file for writing `{}`", path.display())) + .map(BufWriter::new)?, + ), + None => None, + }; + + if let Some(file) = file { + download_emails(client, bucket.clone(), file) + } else { + download_emails(client, bucket.clone(), io::stdout().lock()) + } +} + +fn download_emails( + client: &Client, + bucket_identifier: BucketIdentifier, + mut writer: impl Write, +) -> Result<()> { + let bucket = client + .get_bucket(bucket_identifier) + .context("Operation to get bucket has failed.")?; + + let bucket_statistics = client + .get_bucket_statistics(&bucket.full_name()) + .context("Could not get bucket statistics")?; + + let statistics = Arc::new(Statistics::new()); + + let _progress = get_emails_progress_bar(bucket_statistics.count as u64, &statistics); + + client + .get_emails_iter(&bucket.full_name(), None) + .try_for_each(|page| { + let page = page.context("Operation to get emails has failed.")?; + statistics.add_emails(page.len()); + print_resources_as_json(page.into_iter(), &mut writer) + })?; + log::info!( + "Successfully downloaded {} emails.", + statistics.num_downloaded(), + ); + Ok(()) +} + +#[derive(Debug)] +pub struct Statistics { + downloaded: AtomicUsize, +} + +impl Statistics { + fn new() -> Self { + Self { + downloaded: AtomicUsize::new(0), + } + } + + #[inline] + fn add_emails(&self, num_downloaded: usize) { + self.downloaded.fetch_add(num_downloaded, Ordering::SeqCst); + } + + #[inline] + fn num_downloaded(&self) -> usize { + self.downloaded.load(Ordering::SeqCst) + } +} + +fn get_emails_progress_bar(total_bytes: u64, statistics: &Arc) -> Progress { + Progress::new( + move |statistics| { + let num_downloaded = statistics.num_downloaded(); + ( + num_downloaded as u64, + format!( + "{} {}", + num_downloaded.to_string().bold(), + "emails".dimmed(), + ), + ) + }, + statistics, + Some(total_bytes), + ProgressOptions { bytes_units: false }, + ) +} diff --git a/cli/src/commands/get/mod.rs b/cli/src/commands/get/mod.rs index 6d2e4eb1..7bc9a62a 100644 --- a/cli/src/commands/get/mod.rs +++ b/cli/src/commands/get/mod.rs @@ -2,6 +2,7 @@ mod audit_events; mod buckets; mod comments; mod datasets; +mod emails; mod integrations; mod projects; mod quota; @@ -19,6 +20,7 @@ use self::{ buckets::GetBucketsArgs, comments::{GetManyCommentsArgs, GetSingleCommentArgs}, datasets::GetDatasetsArgs, + emails::GetManyEmailsArgs, integrations::GetIntegrationsArgs, projects::GetProjectsArgs, sources::GetSourcesArgs, @@ -33,6 +35,10 @@ pub enum GetArgs { /// List the available buckets Buckets(GetBucketsArgs), + #[structopt(name = "emails")] + /// Download all emails from a source + Emails(GetManyEmailsArgs), + #[structopt(name = "comment")] /// Get a single comment from a source Comment(GetSingleCommentArgs), @@ -89,6 +95,7 @@ pub enum GetArgs { pub fn run(args: &GetArgs, client: Client, printer: &Printer, pool: &mut Pool) -> Result<()> { match args { GetArgs::Buckets(args) => buckets::get(&client, args, printer), + GetArgs::Emails(args) => emails::get_many(&client, args), GetArgs::Comment(args) => comments::get_single(&client, args), GetArgs::Comments(args) => comments::get_many(&client, args), GetArgs::Datasets(args) => datasets::get(&client, args, printer), diff --git a/cli/src/printer.rs b/cli/src/printer.rs index 6a2b9df6..d8cbc121 100644 --- a/cli/src/printer.rs +++ b/cli/src/printer.rs @@ -6,7 +6,7 @@ use reinfer_client::{ audit::PrintableAuditEvent, dataset::DatasetAndStats, integration::Integration, quota::Quota, }, - Bucket, Dataset, Project, Source, Statistics, Stream, User, + Bucket, CommentStatistics, Dataset, Project, Source, Stream, User, }; use serde::{Serialize, Serializer}; @@ -188,7 +188,7 @@ impl DisplayTable for Source { pub struct PrintableSource { pub source: Source, pub bucket: Option, - pub stats: Option, + pub stats: Option, } impl Serialize for PrintableSource {