Skip to content

Commit

Permalink
PDF refactor (#41)
Browse files Browse the repository at this point in the history
* Add read mode for jobs

* Fmt

* Log before upload
  • Loading branch information
augustuswm authored Nov 2, 2023
1 parent f55c083 commit a5a8033
Show file tree
Hide file tree
Showing 26 changed files with 438 additions and 341 deletions.
37 changes: 11 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dirs = "5.0.1"
dropshot = { git = "https://github.com/oxidecomputer/dropshot" }
dropshot-verified-body = { git = "https://github.com/oxidecomputer/dropshot-verified-body", branch = "dropshot-dev" }
google-cloudkms1 = "5.0.3"
google-drive = "0.7.0"
google-drive3 = "5.0.3"
google-storage1 = "5.0.3"
hex = "0.4.3"
hmac = "0.12.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE rfd_pdf DROP COLUMN external_id;
ALTER TABLE rfd_pdf DROP COLUMN rfd_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE rfd_pdf ADD COLUMN rfd_id UUID REFERENCES rfd (id) NOT NULL;
ALTER TABLE rfd_pdf ADD COLUMN external_id VARCHAR NOT NULL;
2 changes: 2 additions & 0 deletions rfd-model/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct RfdPdfModel {
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
pub rfd_id: Uuid,
pub external_id: String,
}

#[derive(Debug, Deserialize, Serialize, Queryable, Insertable)]
Expand Down
4 changes: 4 additions & 0 deletions rfd-model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub struct RfdPdf {
pub updated_at: DateTime<Utc>,
#[partial(NewRfdPdf(skip))]
pub deleted_at: Option<DateTime<Utc>>,
pub rfd_id: Uuid,
pub external_id: String,
}

impl From<RfdPdfModel> for RfdPdf {
Expand All @@ -121,6 +123,8 @@ impl From<RfdPdfModel> for RfdPdf {
created_at: value.created_at,
updated_at: value.updated_at,
deleted_at: value.deleted_at,
rfd_id: value.rfd_id,
external_id: value.external_id,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions rfd-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ diesel::table! {
created_at -> Timestamptz,
updated_at -> Timestamptz,
deleted_at -> Nullable<Timestamptz>,
rfd_id -> Uuid,
external_id -> Varchar,
}
}

Expand Down Expand Up @@ -227,6 +229,7 @@ diesel::joinable!(api_user_access_token -> api_user (api_user_id));
diesel::joinable!(api_user_provider -> api_user (api_user_id));
diesel::joinable!(oauth_client_redirect_uri -> oauth_client (oauth_client_id));
diesel::joinable!(oauth_client_secret -> oauth_client (oauth_client_id));
diesel::joinable!(rfd_pdf -> rfd (rfd_id));
diesel::joinable!(rfd_pdf -> rfd_revision (rfd_revision_id));
diesel::joinable!(rfd_revision -> rfd (rfd_id));

Expand Down
15 changes: 15 additions & 0 deletions rfd-model/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub trait RfdStore {
async fn delete(&self, id: &Uuid) -> Result<Option<Rfd>, StoreError>;
}

// TODO: Make the revision store generic over a revision type. We want to be able to have a metadata
// only version of the revision model so that we do not need to always load content from the db

#[derive(Debug, Default)]
pub struct RfdRevisionFilter {
pub id: Option<Vec<Uuid>>,
Expand Down Expand Up @@ -164,6 +167,8 @@ pub struct RfdPdfFilter {
pub rfd_revision: Option<Vec<Uuid>>,
pub source: Option<Vec<PdfSource>>,
pub deleted: bool,
pub rfd: Option<Vec<Uuid>>,
pub external_id: Option<Vec<String>>,
}

impl RfdPdfFilter {
Expand All @@ -186,6 +191,16 @@ impl RfdPdfFilter {
self.deleted = deleted;
self
}

pub fn rfd(mut self, rfd: Option<Vec<Uuid>>) -> Self {
self.rfd = rfd;
self
}

pub fn external_id(mut self, external_id: Option<Vec<String>>) -> Self {
self.external_id = external_id;
self
}
}

#[cfg_attr(feature = "mock", automock)]
Expand Down
10 changes: 10 additions & 0 deletions rfd-model/src/storage/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ impl RfdPdfStore for PostgresStore {
rfd_revision,
source,
deleted,
rfd,
external_id,
} = filter;

if let Some(id) = id {
Expand All @@ -357,6 +359,14 @@ impl RfdPdfStore for PostgresStore {
query = query.filter(rfd_pdf::source.eq_any(source));
}

if let Some(rfd) = rfd {
query = query.filter(rfd_pdf::rfd_id.eq_any(rfd));
}

if let Some(external_id) = external_id {
query = query.filter(rfd_pdf::external_id.eq_any(external_id));
}

if !deleted {
query = query.filter(rfd_pdf::deleted_at.is_null());
}
Expand Down
2 changes: 1 addition & 1 deletion rfd-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async-trait = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
google-drive = { workspace = true }
google-drive3 = { workspace = true }
google-storage1 = { workspace = true }
hex = { workspace = true }
hmac = { workspace = true }
Expand Down
134 changes: 63 additions & 71 deletions rfd-processor/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;
use std::{io::Cursor, time::Duration};

use async_trait::async_trait;
use google_drive::{traits::FileOps, Client as GDriveClient};
use google_drive3::{api::File, DriveHub};
// use google_drive::{traits::FileOps, Client as GDriveClient};
use google_storage1::{
hyper, hyper::client::HttpConnector, hyper_rustls, hyper_rustls::HttpsConnector, Storage,
};
Expand All @@ -11,19 +12,19 @@ use octorust::{
Client as GitHubClient, ClientError,
};
use reqwest::Error as ReqwestError;
use rfd_model::storage::postgres::PostgresStore;
use rfd_model::{schema_ext::PdfSource, storage::postgres::PostgresStore};
use rsa::{
pkcs1::{DecodeRsaPrivateKey, EncodeRsaPrivateKey},
RsaPrivateKey,
};
use thiserror::Error;

use crate::{
features::Feature,
github::{GitHubError, GitHubRfdRepo},
pdf::{PdfFileLocation, PdfStorage, RfdPdf, RfdPdfError},
search::RfdSearchIndex,
updater::{BoxedAction, RfdUpdaterError},
updater::{BoxedAction, RfdUpdateMode, RfdUpdaterError},
util::{gdrive_client, GDriveError},
AppConfig, GitHubAuthConfig, PdfStorageConfig, SearchConfig, StaticStorageConfig,
};

Expand Down Expand Up @@ -53,6 +54,8 @@ pub enum ContextError {
#[error("Failed to find GCP credentials {0}")]
FailedToFindGcpCredentials(std::io::Error),
#[error(transparent)]
GDrive(#[from] GDriveError),
#[error(transparent)]
GitHub(#[from] GitHubError),
#[error(transparent)]
InvalidAction(#[from] RfdUpdaterError),
Expand Down Expand Up @@ -128,6 +131,7 @@ impl Context {
processor: ProcessorCtx {
batch_size: config.processor_batch_size,
interval: Duration::from_secs(config.processor_interval),
update_mode: config.processor_update_mode,
},
scanner: ScannerCtx {
interval: Duration::from_secs(config.scanner_interval),
Expand All @@ -143,7 +147,7 @@ impl Context {
.map(|action| action.as_str().try_into())
.collect::<Result<Vec<_>, RfdUpdaterError>>()?,
assets: StaticAssetStorageCtx::new(&config.static_storage).await?,
pdf: PdfStorageCtx::new(&config.pdf_storage),
pdf: PdfStorageCtx::new(&config.pdf_storage).await?,
search: SearchCtx::new(&config.search_storage),
})
}
Expand All @@ -152,6 +156,7 @@ impl Context {
pub struct ProcessorCtx {
pub batch_size: i64,
pub interval: Duration,
pub update_mode: RfdUpdateMode,
}

pub struct ScannerCtx {
Expand Down Expand Up @@ -228,93 +233,80 @@ pub struct StaticAssetLocation {
pub bucket: String,
}

pub type GDriveClient = DriveHub<HttpsConnector<HttpConnector>>;

pub struct PdfStorageCtx {
client: Option<GDriveClient>,
client: GDriveClient,
locations: Vec<PdfStorageLocation>,
}

impl PdfStorageCtx {
pub fn new(entries: &[PdfStorageConfig]) -> Self {
Self {
pub async fn new(config: &Option<PdfStorageConfig>) -> Result<Self, GDriveError> {
Ok(Self {
// A client is only needed if files are going to be written
client: Feature::WritePdfToDrive
.enabled()
.then(|| GDriveClient::new("", "", "", "", "")),
locations: entries
.iter()
.map(|e| PdfStorageLocation {
drive_id: e.drive.to_string(),
folder_id: e.folder.to_string(),
client: gdrive_client().await?,
locations: config
.as_ref()
.map(|config| {
vec![PdfStorageLocation {
drive_id: config.drive.clone(),
folder_id: config.folder.clone(),
}]
})
.collect(),
}
.unwrap_or_default(),
})
}
}

#[async_trait]
impl PdfStorage for PdfStorageCtx {
async fn store_rfd_pdf(
&self,
external_id: Option<&str>,
filename: &str,
pdf: &RfdPdf,
) -> Vec<Result<PdfFileLocation, RfdPdfError>> {
let mut results = vec![];
if let Some(location) = self.locations.get(0) {
let req = File {
copy_requires_writer_permission: Some(true),
drive_id: Some(location.drive_id.to_string()),
parents: Some(vec![location.folder_id.to_string()]),
name: Some(filename.to_string()),
mime_type: Some("application/pdf".to_string()),
..Default::default()
};

for location in &self.locations {
// Write the pdf to storage if it is enabled
if let Some(client) = &self.client {
results.push(
client
.files()
.create_or_update(
&location.drive_id,
&location.folder_id,
&filename,
"application/pdf",
&pdf.contents,
)
.await
.map(|file| PdfFileLocation {
url: Some(format!("https://drive.google.com/open?id={}", file.body.id)),
})
.map_err(|err| RfdPdfError::from(err)),
);
} else {
// Otherwise just mark the write as a success. The id argument reported will not be
// a real id
results.push(Ok(PdfFileLocation {
url: Some(format!("https://drive.google.com/open?id={}", filename)),
}));
}
}
let stream = Cursor::new(pdf.contents.clone());

results
}

async fn remove_rfd_pdf(&self, filename: &str) -> Vec<RfdPdfError> {
let mut results = vec![];

for location in &self.locations {
if let Some(client) = &self.client {
// Delete the old filename from drive. It is expected that the target drive and
// folder already exist
if let Err(err) = client
let response = match external_id {
Some(file_id) => self
.client
.files()
.delete_by_name(&location.drive_id, &location.folder_id, &filename)
.update(req, file_id)
.upload_resumable(stream, "application_pdf".parse().unwrap())
.await
{
tracing::warn!(
?err,
?location,
?filename,
"Faileid to remove PDF from drive"
);
results.push(err.into());
}
}
}
.map_err(RfdPdfError::Remote),
None => self
.client
.files()
.create(req)
.upload_resumable(stream, "application_pdf".parse().unwrap())
.await
.map_err(RfdPdfError::Remote),
};

results
vec![response.and_then(|(_, file)| {
file.id
.ok_or_else(|| RfdPdfError::FileIdMissing(filename.to_string()))
.map(|id| PdfFileLocation {
source: PdfSource::Google,
url: format!("https://drive.google.com/open?id={}", id),
external_id: id,
})
})]
} else {
vec![]
}
}
}

Expand Down
Loading

0 comments on commit a5a8033

Please sign in to comment.