Skip to content

Commit

Permalink
feat(commands): add get emails (#262)
Browse files Browse the repository at this point in the history
* feat(commands): add get emails
---------

Co-authored-by: Irina Gossmann <[email protected]>
  • Loading branch information
joe-prosser and margold authored Mar 20, 2024
1 parent 54136d5 commit 0c22d50
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Add `get emails`
- Added support for `--auto-increase-up-to` when creating quotas.

# v0.22.2
Expand Down Expand Up @@ -60,7 +61,7 @@

## v0.18.2

-Add ability to filter on user properties when getting comments
- Add ability to filter on user properties when getting comments

## v0.18.1

Expand Down
111 changes: 107 additions & 4 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use reqwest::{
IntoUrl, Proxy, Result as ReqwestResult,
};
use resources::{
bucket_statistics::GetBucketStatisticsResponse,
comment::CommentTimestampFilter,
dataset::{
QueryRequestParams, QueryResponse,
Expand Down Expand Up @@ -44,6 +45,7 @@ use crate::resources::{
CreateRequest as CreateBucketRequest, CreateResponse as CreateBucketResponse,
GetAvailableResponse as GetAvailableBucketsResponse, GetResponse as GetBucketResponse,
},
bucket_statistics::Statistics as BucketStatistics,
comment::{
GetAnnotationsResponse, GetCommentResponse, GetLabellingsAfter, GetPredictionsResponse,
GetRecentRequest, PutCommentsRequest, PutCommentsResponse, RecentCommentsPage,
Expand Down Expand Up @@ -102,7 +104,10 @@ pub use crate::{
Dataset, FullName as DatasetFullName, Id as DatasetId, Identifier as DatasetIdentifier,
ModelVersion, Name as DatasetName, NewDataset, UpdateDataset,
},
email::{Id as EmailId, Mailbox, MimeContent, NewEmail},
email::{
Continuation as EmailContinuation, EmailsIterPage, Id as EmailId, Mailbox, MimeContent,
NewEmail,
},
entity_def::{EntityDef, Id as EntityDefId, Name as EntityName, NewEntityDef},
integration::FullName as IntegrationFullName,
label_def::{
Expand All @@ -117,7 +122,7 @@ pub use crate::{
FullName as SourceFullName, Id as SourceId, Identifier as SourceIdentifier,
Name as SourceName, NewSource, Source, SourceKind, UpdateSource,
},
statistics::Statistics,
statistics::Statistics as CommentStatistics,
stream::{
Batch as StreamBatch, FullName as StreamFullName, SequenceId as StreamSequenceId,
Stream, StreamException, StreamExceptionMetadata,
Expand Down Expand Up @@ -186,6 +191,13 @@ pub struct GetCommentsIterPageQuery<'a> {
pub include_markup: bool,
}

#[derive(Serialize)]
pub struct GetEmailsIterPageQuery<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
pub continuation: Option<&'a EmailContinuation>,
pub limit: usize,
}

#[derive(Serialize)]
pub struct GetCommentQuery {
pub include_markup: bool,
Expand Down Expand Up @@ -373,6 +385,33 @@ impl Client {
CommentsIter::new(self, source_name, page_size, timerange)
}

/// Get a page of comments from a source.
pub fn get_emails_iter_page(
&self,
bucket_name: &BucketFullName,
continuation: Option<&EmailContinuation>,
limit: usize,
) -> Result<EmailsIterPage> {
let query_params = GetEmailsIterPageQuery {
continuation,
limit,
};
self.post(
self.endpoints.get_emails(bucket_name)?,
Some(&query_params),
Retry::Yes,
)
}

/// Iterate through all comments in a source.
pub fn get_emails_iter<'a>(
&'a self,
bucket_name: &'a BucketFullName,
page_size: Option<usize>,
) -> EmailsIter<'a> {
EmailsIter::new(self, bucket_name, page_size)
}

/// Get a single comment by id.
pub fn get_comment<'a>(
&'a self,
Expand Down Expand Up @@ -812,11 +851,17 @@ impl Client {
Ok(())
}

pub fn get_bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<BucketStatistics> {
Ok(self
.get::<_, GetBucketStatisticsResponse>(self.endpoints.bucket_statistics(bucket_name)?)?
.statistics)
}

pub fn get_dataset_statistics(
&self,
dataset_name: &DatasetFullName,
params: &DatasetStatisticsRequestParams,
) -> Result<Statistics> {
) -> Result<CommentStatistics> {
Ok(self
.post::<_, _, GetStatisticsResponse>(
self.endpoints.dataset_statistics(dataset_name)?,
Expand All @@ -831,7 +876,7 @@ impl Client {
&self,
source_name: &SourceFullName,
params: &SourceStatisticsRequestParams,
) -> Result<Statistics> {
) -> Result<CommentStatistics> {
Ok(self
.post::<_, _, GetStatisticsResponse>(
self.endpoints.source_statistics(source_name)?,
Expand Down Expand Up @@ -1215,6 +1260,51 @@ pub enum ContinuationKind {
Continuation(Continuation),
}

pub struct EmailsIter<'a> {
client: &'a Client,
bucket_name: &'a BucketFullName,
continuation: Option<EmailContinuation>,
done: bool,
page_size: usize,
}

impl<'a> EmailsIter<'a> {
// Default number of emails per page to request from API.
pub const DEFAULT_PAGE_SIZE: usize = 64;
// Maximum number of emails per page which can be requested from the API.
pub const MAX_PAGE_SIZE: usize = 256;

fn new(client: &'a Client, bucket_name: &'a BucketFullName, page_size: Option<usize>) -> Self {
Self {
client,
bucket_name,
continuation: None,
done: false,
page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE),
}
}
}

impl<'a> Iterator for EmailsIter<'a> {
type Item = Result<Vec<NewEmail>>;

fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let response = self.client.get_emails_iter_page(
self.bucket_name,
self.continuation.as_ref(),
self.page_size,
);
Some(response.map(|page| {
self.continuation = page.continuation;
self.done = self.continuation.is_none();
page.emails
}))
}
}

pub struct CommentsIter<'a> {
client: &'a Client,
source_name: &'a SourceFullName,
Expand Down Expand Up @@ -1436,6 +1526,12 @@ impl Endpoints {
],
)
}
fn bucket_statistics(&self, bucket_name: &BucketFullName) -> Result<Url> {
construct_endpoint(
&self.base,
&["api", "_private", "buckets", &bucket_name.0, "statistics"],
)
}

fn dataset_summary(&self, dataset_name: &DatasetFullName) -> Result<Url> {
construct_endpoint(
Expand Down Expand Up @@ -1649,6 +1745,13 @@ impl Endpoints {
)
}

fn get_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
construct_endpoint(
&self.base,
&["api", "_private", "buckets", &bucket_name.0, "emails"],
)
}

fn put_emails(&self, bucket_name: &BucketFullName) -> Result<Url> {
construct_endpoint(
&self.base,
Expand Down
11 changes: 11 additions & 0 deletions api/src/resources/bucket_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct GetBucketStatisticsResponse {
pub statistics: Statistics,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Statistics {
pub count: usize,
}
9 changes: 9 additions & 0 deletions api/src/resources/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ pub(crate) struct PutEmailsRequest<'request> {

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct PutEmailsResponse {}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)]
pub struct Continuation(pub String);

#[derive(Debug, Clone, Deserialize)]
pub struct EmailsIterPage {
pub emails: Vec<NewEmail>,
pub continuation: Option<Continuation>,
}
1 change: 1 addition & 0 deletions api/src/resources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod audit;
pub mod bucket;
pub mod bucket_statistics;
pub mod comment;
pub mod dataset;
pub mod documents;
Expand Down
122 changes: 122 additions & 0 deletions cli/src/commands/get/emails.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use anyhow::{Context, Result};

use colored::Colorize;
use reinfer_client::{BucketIdentifier, Client};
use std::{
fs::File,
io::{self, BufWriter, Write},
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use structopt::StructOpt;

use crate::{
printer::print_resources_as_json,
progress::{Options as ProgressOptions, Progress},
};

#[derive(Debug, StructOpt)]
pub struct GetManyEmailsArgs {
#[structopt(name = "bucket")]
/// Bucket name or id
bucket: BucketIdentifier,

#[structopt(short = "f", long = "file", parse(from_os_str))]
/// Path where to write comments as JSON. If not specified, stdout will be used.
path: Option<PathBuf>,
}

pub fn get_many(client: &Client, args: &GetManyEmailsArgs) -> Result<()> {
let GetManyEmailsArgs { bucket, path } = args;

let file = match path {
Some(path) => Some(
File::create(path)
.with_context(|| format!("Could not open file for writing `{}`", path.display()))
.map(BufWriter::new)?,
),
None => None,
};

if let Some(file) = file {
download_emails(client, bucket.clone(), file)
} else {
download_emails(client, bucket.clone(), io::stdout().lock())
}
}

fn download_emails(
client: &Client,
bucket_identifier: BucketIdentifier,
mut writer: impl Write,
) -> Result<()> {
let bucket = client
.get_bucket(bucket_identifier)
.context("Operation to get bucket has failed.")?;

let bucket_statistics = client
.get_bucket_statistics(&bucket.full_name())
.context("Could not get bucket statistics")?;

let statistics = Arc::new(Statistics::new());

let _progress = get_emails_progress_bar(bucket_statistics.count as u64, &statistics);

client
.get_emails_iter(&bucket.full_name(), None)
.try_for_each(|page| {
let page = page.context("Operation to get emails has failed.")?;
statistics.add_emails(page.len());
print_resources_as_json(page.into_iter(), &mut writer)
})?;
log::info!(
"Successfully downloaded {} emails.",
statistics.num_downloaded(),
);
Ok(())
}

#[derive(Debug)]
pub struct Statistics {
downloaded: AtomicUsize,
}

impl Statistics {
fn new() -> Self {
Self {
downloaded: AtomicUsize::new(0),
}
}

#[inline]
fn add_emails(&self, num_downloaded: usize) {
self.downloaded.fetch_add(num_downloaded, Ordering::SeqCst);
}

#[inline]
fn num_downloaded(&self) -> usize {
self.downloaded.load(Ordering::SeqCst)
}
}

fn get_emails_progress_bar(total_bytes: u64, statistics: &Arc<Statistics>) -> Progress {
Progress::new(
move |statistics| {
let num_downloaded = statistics.num_downloaded();
(
num_downloaded as u64,
format!(
"{} {}",
num_downloaded.to_string().bold(),
"emails".dimmed(),
),
)
},
statistics,
Some(total_bytes),
ProgressOptions { bytes_units: false },
)
}
7 changes: 7 additions & 0 deletions cli/src/commands/get/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod audit_events;
mod buckets;
mod comments;
mod datasets;
mod emails;
mod integrations;
mod projects;
mod quota;
Expand All @@ -19,6 +20,7 @@ use self::{
buckets::GetBucketsArgs,
comments::{GetManyCommentsArgs, GetSingleCommentArgs},
datasets::GetDatasetsArgs,
emails::GetManyEmailsArgs,
integrations::GetIntegrationsArgs,
projects::GetProjectsArgs,
sources::GetSourcesArgs,
Expand All @@ -33,6 +35,10 @@ pub enum GetArgs {
/// List the available buckets
Buckets(GetBucketsArgs),

#[structopt(name = "emails")]
/// Download all emails from a source
Emails(GetManyEmailsArgs),

#[structopt(name = "comment")]
/// Get a single comment from a source
Comment(GetSingleCommentArgs),
Expand Down Expand Up @@ -89,6 +95,7 @@ pub enum GetArgs {
pub fn run(args: &GetArgs, client: Client, printer: &Printer, pool: &mut Pool) -> Result<()> {
match args {
GetArgs::Buckets(args) => buckets::get(&client, args, printer),
GetArgs::Emails(args) => emails::get_many(&client, args),
GetArgs::Comment(args) => comments::get_single(&client, args),
GetArgs::Comments(args) => comments::get_many(&client, args),
GetArgs::Datasets(args) => datasets::get(&client, args, printer),
Expand Down
Loading

0 comments on commit 0c22d50

Please sign in to comment.