Skip to content

Commit

Permalink
[backup] Add /prepare_user_keys endpoint
Browse files Browse the repository at this point in the history
Summary:
TBD

Depends on D14275

Test Plan: TBD

Reviewers: kamil, tomek

Subscribers: ashoat

Differential Revision: https://phab.comm.dev/D14276
  • Loading branch information
barthap committed Jan 31, 2025
1 parent da6d2fb commit 6e1cee1
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 74 deletions.
196 changes: 122 additions & 74 deletions services/backup/src/http/handlers/backup.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use actix_web::{
error::ErrorBadRequest,
error::{ErrorBadRequest, ErrorForbidden},
web::{self, Bytes},
HttpResponse, Responder,
};
use comm_lib::{
auth::UserIdentity,
auth::{AuthorizationCredential, UserIdentity},
backup::LatestBackupInfoResponse,
blob::{client::BlobServiceClient, types::BlobInfo},
http::{
Expand Down Expand Up @@ -90,90 +90,25 @@ pub async fn upload_user_keys(
user: UserIdentity,
blob_client: Authenticated<BlobServiceClient>,
db_client: web::Data<DatabaseClient>,
mut multipart: actix_multipart::Multipart,
multipart: actix_multipart::Multipart,
) -> actix_web::Result<HttpResponse> {
let backup_id = get_named_text_field("backup_id", &mut multipart).await?;
let blob_client = blob_client.with_user_identity(user.clone());
tracing::Span::current().record("backup_id", &backup_id);

info!("Backup User Keys upload started");

let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob(
&mut multipart,
let (item, revokes) = upload_userkeys_and_create_backup_item(
&db_client,
&blob_client,
"user_keys_hash",
"user_keys",
multipart,
&user.user_id,
)
.await?;

let siwe_backup_msg = get_siwe_backup_msg(&mut multipart).await?;

let ordered_backup_item = db_client
.find_last_backup_item(&user.user_id)
.await
.map_err(BackupError::from)?;

let old_backup_item = match ordered_backup_item {
None => None,
Some(item) => db_client
.find_backup_item(&user.user_id, &item.backup_id)
.await
.map_err(BackupError::from)?,
};

let mut revokes = Vec::new();

let (user_data, attachments) = match old_backup_item.clone() {
None => (None, Vec::new()),
// If attachments and user_data exists, we need to create holder.
// Otherwise, cleanup can remove this data.
Some(item) => {
let attachments_hashes: Vec<String> = item
.attachments
.iter()
.map(|attachment| attachment.blob_hash.clone())
.collect();

let (attachments, attachments_revokes) =
process_blob_hashes(attachments_hashes, &blob_client).await?;

revokes.extend(attachments_revokes);

match item.user_data {
None => (None, attachments),
Some(data) => {
let (blob_infos, defers) =
process_blob_hashes(vec![data.blob_hash], &blob_client).await?;

let blob_info = blob_infos
.into_iter()
.next()
.ok_or(BackupError::BadRequest)?;
revokes.extend(defers);

(Some(blob_info), attachments)
}
}
}
};

let item = BackupItem::new(
user.user_id.clone(),
backup_id,
user_keys_blob_info,
user_data,
attachments,
siwe_backup_msg,
);

db_client
.put_backup_item(item)
.await
.map_err(BackupError::from)?;

user_keys_revoke.cancel();
for attachment_revoke in revokes {
attachment_revoke.cancel();
for revoke in revokes {
revoke.cancel();
}

db_client
Expand All @@ -184,6 +119,39 @@ pub async fn upload_user_keys(
Ok(HttpResponse::Ok().finish())
}

#[instrument(skip_all, fields(backup_id))]
pub async fn prepare_user_keys(
requesting_identity: AuthorizationCredential,
blob_client: Authenticated<BlobServiceClient>,
db_client: web::Data<DatabaseClient>,
mut multipart: actix_multipart::Multipart,
) -> actix_web::Result<HttpResponse> {
match requesting_identity {
AuthorizationCredential::ServicesToken(_) => (),
_ => {
return Err(ErrorForbidden(
"This endpoint can only be called by other services",
));
}
};

let user_id = get_named_text_field("user_id", &mut multipart).await?;

let (item, revokes) = upload_userkeys_and_create_backup_item(
&db_client,
&blob_client,
multipart,
&user_id,
)
.await?;

for revoke in revokes {
revoke.cancel();
}

Ok(HttpResponse::Ok().json(item))
}

#[instrument(skip_all, fields(backup_id))]
pub async fn upload_user_data(
user: UserIdentity,
Expand Down Expand Up @@ -510,3 +478,83 @@ pub async fn download_latest_backup_keys(
.streaming(stream),
)
}

async fn upload_userkeys_and_create_backup_item<'revoke, 'blob: 'revoke>(
db_client: &DatabaseClient,
blob_client: &'blob BlobServiceClient,
mut multipart: actix_multipart::Multipart,
user_id: &str,
) -> actix_web::Result<(BackupItem, Vec<Defer<'revoke>>)> {
let backup_id = get_named_text_field("backup_id", &mut multipart).await?;
tracing::Span::current().record("backup_id", &backup_id);
info!("Backup User Keys upload started");

let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob(
&mut multipart,
blob_client,
"user_keys_hash",
"user_keys",
)
.await?;

let siwe_backup_msg = get_siwe_backup_msg(&mut multipart).await?;
let ordered_backup_item = db_client
.find_last_backup_item(user_id)
.await
.map_err(BackupError::from)?;

let old_backup_item = match ordered_backup_item {
None => None,
Some(item) => db_client
.find_backup_item(user_id, &item.backup_id)
.await
.map_err(BackupError::from)?,
};

let mut revokes = vec![user_keys_revoke];

let (user_data, attachments) = match old_backup_item.clone() {
None => (None, Vec::new()),
// If attachments and user_data exists, we need to create holder.
// Otherwise, cleanup can remove this data.
Some(item) => {
let attachments_hashes: Vec<String> = item
.attachments
.iter()
.map(|attachment| attachment.blob_hash.clone())
.collect();

let (attachments, attachments_revokes) =
process_blob_hashes(attachments_hashes, blob_client).await?;

revokes.extend(attachments_revokes);

match item.user_data {
None => (None, attachments),
Some(data) => {
let (blob_infos, defers) =
process_blob_hashes(vec![data.blob_hash], blob_client).await?;

let blob_info = blob_infos
.into_iter()
.next()
.ok_or(BackupError::BadRequest)?;
revokes.extend(defers);

(Some(blob_info), attachments)
}
}
}
};

let item = BackupItem::new(
user_id.to_string(),
backup_id,
user_keys_blob_info,
user_data,
attachments,
siwe_backup_msg,
);

Ok((item, revokes))
}
8 changes: 8 additions & 0 deletions services/backup/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ pub async fn run_http_server(
.route(web::delete().to(handlers::user_data::delete_user_data)),
),
)
// Called by Identity Service during restore protocol to upload and store
// UserKeys, without saving them in database, in contrast
// to the `POST /backups/user_keys` endpoint.
.service(
web::resource("/utils/prepare_user_keys")
.wrap(get_comm_authentication_middleware())
.route(web::post().to(handlers::backup::prepare_user_keys)),
)
})
.bind(("0.0.0.0", CONFIG.http_port))?
.run()
Expand Down

0 comments on commit 6e1cee1

Please sign in to comment.