Skip to content

Commit

Permalink
user-ingest: Make salts optional
Browse files Browse the repository at this point in the history
  • Loading branch information
raimannma committed Oct 10, 2024
1 parent bd8fd72 commit 8356acc
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 36 deletions.
98 changes: 67 additions & 31 deletions user-ingest/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ pub async fn process_salts(salts: Salts) {
pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), ProcessError> {
let local_file = NamedTempFile::new().map_err(ProcessError::Io)?;
let local_path = local_file.path().to_path_buf();
let s3_path = format!(
"/ingest/user-ingest/{}/{}",
data_type,
get_file_name(&salts, data_type)
);

let file_name = get_file_name(&salts, data_type);
if file_name.is_none() {
info!("No salt provided for {:?}", data_type);
return Ok(());
}
let file_name = file_name.unwrap();
let s3_path = format!("/ingest/user-ingest/{}/{}", data_type, file_name);
if s3::has_file(&s3_path).await.is_ok_and(|m| m) {
info!("File already exists: {}", s3_path);
return Ok(());
Expand All @@ -54,7 +55,15 @@ pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), Proc
local_file.close().map_err(ProcessError::Io)
}

fn get_file_name(salts: &&Salts, data_type: DataType) -> String {
fn get_file_name(salts: &&Salts, data_type: DataType) -> Option<String> {
let salt = match data_type {
DataType::Meta => &salts.metadata_salt,
DataType::Demo => &salts.replay_salt,
};
if salt.is_none() {
return None;
}
let salt = salt.unwrap();
format!(
"T{data_type_id}_M{match_id}_C{cluster_id}_S{salt}.{data_type}.bz2",
data_type_id = match data_type {
Expand All @@ -63,29 +72,31 @@ fn get_file_name(salts: &&Salts, data_type: DataType) -> String {
},
match_id = salts.match_id,
cluster_id = salts.cluster_id,
salt = match data_type {
DataType::Meta => &salts.metadata_salt,
DataType::Demo => &salts.replay_salt,
},
salt = salt,
data_type = data_type
)
.into()
}

async fn download_to_file(
salts: &Salts,
data_type: DataType,
local_path: &PathBuf,
) -> Result<(), ProcessError> {
let salt = match data_type {
DataType::Meta => &salts.metadata_salt,
DataType::Demo => &salts.replay_salt,
};
if salt.is_none() {
return Err(ProcessError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"No salt provided",
)));
}
let salt = salt.unwrap();
let url = format!(
"https://replay{}.valve.net/{}/{}_{}.{}.bz2",
salts.cluster_id,
APP_ID,
salts.match_id,
match data_type {
DataType::Meta => &salts.metadata_salt,
DataType::Demo => &salts.replay_salt,
},
data_type
salts.cluster_id, APP_ID, salts.match_id, salt, data_type
);
info!("Downloading {} to {:?}", url, local_path);

Expand All @@ -110,23 +121,48 @@ async fn download_to_file(
.map_err(ProcessError::Io)
}

pub(crate) async fn check_salts(salts: Salts) -> reqwest::Result<()> {
pub(crate) async fn check_salts(salts: Salts) -> reqwest::Result<Salts> {
let client = ClientBuilder::new()
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;

let demo_url = format!(
"https://replay{}.valve.net/{}/{}_{}.dem.bz2",
salts.cluster_id, APP_ID, salts.match_id, salts.replay_salt
);
client.head(&demo_url).send().await?.error_for_status()?;
let d_valid = match salts.replay_salt {
None => false,
Some(replay_salt) => {
let demo_url = format!(
"https://replay{}.valve.net/{}/{}_{}.dem.bz2",
salts.cluster_id, APP_ID, salts.match_id, replay_salt
);
client
.head(&demo_url)
.send()
.await
.and_then(|r| r.error_for_status())
.is_ok()
}
};

let meta_url = format!(
"https://replay{}.valve.net/{}/{}_{}.meta.bz2",
salts.cluster_id, APP_ID, salts.match_id, salts.metadata_salt
);
client.head(&meta_url).send().await?.error_for_status()?;
let m_valid = match salts.metadata_salt {
None => false,
Some(metadata_salt) => {
let meta_url = format!(
"https://replay{}.valve.net/{}/{}_{}.meta.bz2",
salts.cluster_id, APP_ID, salts.match_id, metadata_salt
);
client
.head(&meta_url)
.send()
.await
.and_then(|r| r.error_for_status())
.is_ok()
}
};

Ok(())
Ok(Salts {
cluster_id: salts.cluster_id,
match_id: salts.match_id,
metadata_salt: if m_valid { salts.metadata_salt } else { None },
replay_salt: if d_valid { salts.replay_salt } else { None },
})
}
10 changes: 7 additions & 3 deletions user-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ pub async fn post_salts(
Json(salts): Json<Salts>,
) -> (StatusCode, &'static str) {
debug!("Received Salts: {:?}", salts);
if download::check_salts(salts.clone()).await.is_err() {
return (StatusCode::INTERNAL_SERVER_ERROR, "Checking salts failed");
}
let salts = match download::check_salts(salts.clone()).await {
Ok(salts) => salts,
Err(e) => {
error!("Failed to validate salts: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Checking salts failed");
}
};
if state.salts_channel.send(salts).await.is_err() {
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to send salts");
}
Expand Down
4 changes: 2 additions & 2 deletions user-ingest/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub enum ProcessError {
pub struct Salts {
pub cluster_id: u32,
pub match_id: u64,
pub metadata_salt: u32,
pub replay_salt: u32,
pub metadata_salt: Option<u32>,
pub replay_salt: Option<u32>,
}

#[derive(Debug, Copy, Clone)]
Expand Down

0 comments on commit 8356acc

Please sign in to comment.