From 8356acc45a1b59922a10fed9d0116d97582bbe08 Mon Sep 17 00:00:00 2001 From: Manuel Raimann Date: Fri, 11 Oct 2024 01:14:13 +0200 Subject: [PATCH] user-ingest: Make salts optional --- user-ingest/src/download.rs | 98 +++++++++++++++++++++++++------------ user-ingest/src/main.rs | 10 ++-- user-ingest/src/models.rs | 4 +- 3 files changed, 76 insertions(+), 36 deletions(-) diff --git a/user-ingest/src/download.rs b/user-ingest/src/download.rs index 3c30b3b..7b52bf8 100644 --- a/user-ingest/src/download.rs +++ b/user-ingest/src/download.rs @@ -35,12 +35,13 @@ pub async fn process_salts(salts: Salts) { pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), ProcessError> { let local_file = NamedTempFile::new().map_err(ProcessError::Io)?; let local_path = local_file.path().to_path_buf(); - let s3_path = format!( - "/ingest/user-ingest/{}/{}", - data_type, - get_file_name(&salts, data_type) - ); - + let file_name = get_file_name(&salts, data_type); + if file_name.is_none() { + info!("No salt provided for {:?}", data_type); + return Ok(()); + } + let file_name = file_name.unwrap(); + let s3_path = format!("/ingest/user-ingest/{}/{}", data_type, file_name); if s3::has_file(&s3_path).await.is_ok_and(|m| m) { info!("File already exists: {}", s3_path); return Ok(()); @@ -54,7 +55,15 @@ pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), Proc local_file.close().map_err(ProcessError::Io) } -fn get_file_name(salts: &&Salts, data_type: DataType) -> String { +fn get_file_name(salts: &&Salts, data_type: DataType) -> Option { + let salt = match data_type { + DataType::Meta => &salts.metadata_salt, + DataType::Demo => &salts.replay_salt, + }; + if salt.is_none() { + return None; + } + let salt = salt.unwrap(); format!( "T{data_type_id}_M{match_id}_C{cluster_id}_S{salt}.{data_type}.bz2", data_type_id = match data_type { @@ -63,12 +72,10 @@ fn get_file_name(salts: &&Salts, data_type: DataType) -> String { }, match_id = salts.match_id, cluster_id = salts.cluster_id, - salt = match data_type { - DataType::Meta => &salts.metadata_salt, - DataType::Demo => &salts.replay_salt, - }, + salt = salt, data_type = data_type ) + .into() } async fn download_to_file( @@ -76,16 +83,20 @@ async fn download_to_file( data_type: DataType, local_path: &PathBuf, ) -> Result<(), ProcessError> { + let salt = match data_type { + DataType::Meta => &salts.metadata_salt, + DataType::Demo => &salts.replay_salt, + }; + if salt.is_none() { + return Err(ProcessError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "No salt provided", + ))); + } + let salt = salt.unwrap(); let url = format!( "https://replay{}.valve.net/{}/{}_{}.{}.bz2", - salts.cluster_id, - APP_ID, - salts.match_id, - match data_type { - DataType::Meta => &salts.metadata_salt, - DataType::Demo => &salts.replay_salt, - }, - data_type + salts.cluster_id, APP_ID, salts.match_id, salt, data_type ); info!("Downloading {} to {:?}", url, local_path); @@ -110,23 +121,48 @@ async fn download_to_file( .map_err(ProcessError::Io) } -pub(crate) async fn check_salts(salts: Salts) -> reqwest::Result<()> { +pub(crate) async fn check_salts(salts: Salts) -> reqwest::Result { let client = ClientBuilder::new() .danger_accept_invalid_hostnames(true) .danger_accept_invalid_certs(true) .build()?; - let demo_url = format!( - "https://replay{}.valve.net/{}/{}_{}.dem.bz2", - salts.cluster_id, APP_ID, salts.match_id, salts.replay_salt - ); - client.head(&demo_url).send().await?.error_for_status()?; + let d_valid = match salts.replay_salt { + None => false, + Some(replay_salt) => { + let demo_url = format!( + "https://replay{}.valve.net/{}/{}_{}.dem.bz2", + salts.cluster_id, APP_ID, salts.match_id, replay_salt + ); + client + .head(&demo_url) + .send() + .await + .and_then(|r| r.error_for_status()) + .is_ok() + } + }; - let meta_url = format!( - "https://replay{}.valve.net/{}/{}_{}.meta.bz2", - salts.cluster_id, APP_ID, salts.match_id, salts.metadata_salt - ); - client.head(&meta_url).send().await?.error_for_status()?; + let m_valid = match salts.metadata_salt { + None => false, + Some(metadata_salt) => { + let meta_url = format!( + "https://replay{}.valve.net/{}/{}_{}.meta.bz2", + salts.cluster_id, APP_ID, salts.match_id, metadata_salt + ); + client + .head(&meta_url) + .send() + .await + .and_then(|r| r.error_for_status()) + .is_ok() + } + }; - Ok(()) + Ok(Salts { + cluster_id: salts.cluster_id, + match_id: salts.match_id, + metadata_salt: if m_valid { salts.metadata_salt } else { None }, + replay_salt: if d_valid { salts.replay_salt } else { None }, + }) } diff --git a/user-ingest/src/main.rs b/user-ingest/src/main.rs index bc9bcda..df6227f 100644 --- a/user-ingest/src/main.rs +++ b/user-ingest/src/main.rs @@ -93,9 +93,13 @@ pub async fn post_salts( Json(salts): Json, ) -> (StatusCode, &'static str) { debug!("Received Salts: {:?}", salts); - if download::check_salts(salts.clone()).await.is_err() { - return (StatusCode::INTERNAL_SERVER_ERROR, "Checking salts failed"); - } + let salts = match download::check_salts(salts.clone()).await { + Ok(salts) => salts, + Err(e) => { + error!("Failed to validate salts: {:?}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, "Checking salts failed"); + } + }; if state.salts_channel.send(salts).await.is_err() { return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to send salts"); } diff --git a/user-ingest/src/models.rs b/user-ingest/src/models.rs index b3def58..718c5be 100644 --- a/user-ingest/src/models.rs +++ b/user-ingest/src/models.rs @@ -14,8 +14,8 @@ pub enum ProcessError { pub struct Salts { pub cluster_id: u32, pub match_id: u64, - pub metadata_salt: u32, - pub replay_salt: u32, + pub metadata_salt: Option, + pub replay_salt: Option, } #[derive(Debug, Copy, Clone)]