diff --git a/clickhouse/migrations/03_create_active_matches_table.sql b/clickhouse/migrations/03_create_active_matches_table.sql new file mode 100644 index 0000000..8b455f6 --- /dev/null +++ b/clickhouse/migrations/03_create_active_matches_table.sql @@ -0,0 +1,46 @@ +CREATE TABLE IF NOT EXISTS active_matches +( + match_id UInt32, + scraped_at DateTime64, + start_time DATETIME, + winning_team UInt8, + players Nested ( + account_id UInt64, + team Enum8 ( + 'Team0' = 0, + 'Team1' = 1, + 'Spectator' = 16, + ), + abandoned bool, + hero_id UInt32 + ), + lobby_id UInt64, + net_worth_team_0 UInt32, + net_worth_team_1 UInt32, + duration_s UInt32, + spectators UInt32, + open_spectator_slots UInt32, + objectives_mask_team0 UInt16, + objectives_mask_team1 UInt16, + match_mode Enum8 ( + 'Invalid' = 0, + 'Unranked' = 1, + 'PrivateLobby' = 2, + 'CoopBot' = 3, + 'Ranked' = 4, + 'ServerTest' = 5, + 'Tutorial' = 6 + ), + game_mode Enum8 ( + 'Invalid' = 0, 'Normal' = 1, 'OneVsOneTest' = 2, 'Sandbox' = 3 + ), + match_score UInt32, + region_mode Enum8 ( + 'Row' = 0, + 'Europe' = 1, + 'SEAsia' = 2, + 'SAmerica' = 3, + 'Russia' = 4, + 'Oceania' = 5 + ) +) ENGINE = ReplacingMergeTree() ORDER BY (match_id, scraped_at); diff --git a/db-ingest/Cargo.lock b/db-ingest/Cargo.lock index b22dfe4..c543931 100644 --- a/db-ingest/Cargo.lock +++ b/db-ingest/Cargo.lock @@ -703,6 +703,7 @@ dependencies = [ "prost", "rust-s3", "serde", + "serde_json", "serde_repr", "tokio", "valveprotos", diff --git a/db-ingest/Cargo.toml b/db-ingest/Cargo.toml index b450edd..9b7a2e0 100644 --- a/db-ingest/Cargo.toml +++ b/db-ingest/Cargo.toml @@ -17,3 +17,4 @@ prost = "0.13.3" serde_repr = "0.1.19" serde = { version = "1.0.210", features = ["derive"] } clickhouse = { version = "0.13.0", features = ["time"] } +serde_json = "1.0.128" diff --git a/db-ingest/src/ingestors/clickhouse_ingestor.rs b/db-ingest/src/ingestors/clickhouse_ingestor.rs index 1438d58..8a0905e 100644 --- a/db-ingest/src/ingestors/clickhouse_ingestor.rs +++ b/db-ingest/src/ingestors/clickhouse_ingestor.rs @@ -1,7 +1,10 @@ use crate::ingestors::ingestor::Ingestor; +use crate::models::active_match::ActiveMatch; +use crate::models::clickhouse_active_match::ClickHouseActiveMatch; use crate::models::clickhouse_match_metadata::{ClickhouseMatchInfo, ClickhouseMatchPlayer}; use crate::models::error::ParseError; use clickhouse::{Client, Compression}; +use log::debug; use std::sync::LazyLock; use valveprotos::deadlock::c_msg_match_meta_data_contents::MatchInfo; @@ -68,3 +71,29 @@ impl Ingestor for ClickhouseIngestor { Ok(()) } } + +impl Ingestor> for ClickhouseIngestor { + async fn ingest(&self, active_matches: &Vec) -> Result<(), ParseError> { + debug!("Ingesting {} active matches", active_matches.len()); + let mut active_match_insert = self + .client + .insert("active_matches") + .map_err(ParseError::ClickhouseError)?; + let ch_active_matches: Vec = active_matches + .iter() + .cloned() + .map(ClickHouseActiveMatch::from) + .collect(); + for active_match in ch_active_matches { + active_match_insert + .write(&active_match) + .await + .map_err(ParseError::ClickhouseError)?; + } + active_match_insert + .end() + .await + .map_err(ParseError::ClickhouseError)?; + Ok(()) + } +} diff --git a/db-ingest/src/main.rs b/db-ingest/src/main.rs index dda4ec6..b19bef5 100644 --- a/db-ingest/src/main.rs +++ b/db-ingest/src/main.rs @@ -4,6 +4,10 @@ use crate::models::compression::Compression; use crate::models::error::ParseError; use crate::models::file_data::FileData; use crate::models::file_type::FileType; +use crate::parsers::active_matches_json_lines_parser::ActiveMatchesJsonLinesParser; +use crate::parsers::metadata_content_parser::MetaDataContentParser; +use crate::parsers::metadata_parser::MetaDataParser; +use crate::parsers::parser::Parser; use ::s3::request::ResponseData; use futures_lite::StreamExt; use lapin::message::Delivery; @@ -98,33 +102,23 @@ async fn try_process_message(message: &Delivery) -> Result<(), ParseError> { } async fn process_file(file_data: &FileData, file_content: &ResponseData) -> Result<(), ParseError> { - let file_content = file_data + let decompressed = file_data .compression .decompress(&file_content.bytes().to_vec()) .await?; - let parser = file_data.file_type.get_parser(); - let result = parser.parse(file_data, &file_content)?; - let clickhouse_ingestor = ClickhouseIngestor::new(); - let compressed = if file_data.compression == result.compression && file_content == result.data { - debug!("No changes detected, moving file to parsed"); - &file_content - } else { - &result.compression.compress(&result.data).await? - }; - - match result.file_type { - FileType::Metadata => clickhouse_ingestor.ingest(&result.parsed_data).await?, - FileType::MetadataContent => clickhouse_ingestor.ingest(&result.parsed_data).await?, + match file_data.file_type { + FileType::Metadata => { + process_metadata(file_data, decompressed.as_slice(), file_content.as_slice()).await + } + FileType::MetadataContent => { + process_metadata_content(file_data, decompressed.as_slice(), file_content.as_slice()) + .await + } + FileType::ActiveMatchesJsonLines => { + process_active_matches(file_data, decompressed.as_slice(), file_content.as_slice()) + .await + } } - - let parsed_path = get_parsed_path(&file_data.file_name, result.file_type, result.compression); - s3::upload_to_s3(compressed, &parsed_path).await?; - let s3_path = file_data - .file_path - .to_str() - .ok_or(ParseError::FilenameParse)?; - s3::delete_from_s3(s3_path).await?; - Ok(()) } fn get_parsed_path(file_name: &str, file_type: FileType, compression: Compression) -> String { @@ -132,7 +126,10 @@ fn get_parsed_path(file_name: &str, file_type: FileType, compression: Compressio Compression::Uncompressed => format!("/parsed/{}/{}.{}", file_type, file_name, file_type), _ => format!( "/parsed/{}/{}.{}.{}", - file_type, file_name, file_type, compression + file_type, + file_name, + file_type.extension(), + compression ), } } @@ -146,3 +143,89 @@ fn get_failed_path(file_name: &str, file_type: FileType, compression: Compressio ), } } + +async fn process_metadata( + file_data: &FileData, + decompressed: &[u8], + file_content: &[u8], +) -> Result<(), ParseError> { + let result = MetaDataParser::default().parse(file_data, decompressed)?; + let compressed = if file_data.compression == result.compression && file_content == result.data { + debug!("No changes detected, moving file to parsed"); + file_content + } else { + &result.compression.compress(&result.data).await? + }; + ClickhouseIngestor::new() + .ingest(&result.parsed_data) + .await?; + + s3::upload_to_s3( + compressed, + &get_parsed_path(&file_data.file_name, result.file_type, result.compression), + ) + .await?; + let s3_path = file_data + .file_path + .to_str() + .ok_or(ParseError::FilenameParse)?; + s3::delete_from_s3(s3_path).await?; + Ok(()) +} + +async fn process_metadata_content( + file_data: &FileData, + decompressed: &[u8], + file_content: &[u8], +) -> Result<(), ParseError> { + let result = MetaDataContentParser::default().parse(file_data, decompressed)?; + let compressed = if file_data.compression == result.compression && file_content == result.data { + debug!("No changes detected, moving file to parsed"); + file_content + } else { + &result.compression.compress(&result.data).await? + }; + ClickhouseIngestor::new() + .ingest(&result.parsed_data) + .await?; + + s3::upload_to_s3( + compressed, + &get_parsed_path(&file_data.file_name, result.file_type, result.compression), + ) + .await?; + let s3_path = file_data + .file_path + .to_str() + .ok_or(ParseError::FilenameParse)?; + s3::delete_from_s3(s3_path).await?; + Ok(()) +} + +async fn process_active_matches( + file_data: &FileData, + decompressed: &[u8], + file_content: &[u8], +) -> Result<(), ParseError> { + let result = ActiveMatchesJsonLinesParser::default().parse(file_data, decompressed)?; + let compressed = if file_data.compression == result.compression && file_content == result.data { + debug!("No changes detected, moving file to parsed"); + file_content + } else { + &result.compression.compress(&result.data).await? + }; + let active_matches = result.parsed_data.into_iter().flatten().collect::>(); + ClickhouseIngestor::new().ingest(&active_matches).await?; + + s3::upload_to_s3( + compressed, + &get_parsed_path(&file_data.file_name, result.file_type, result.compression), + ) + .await?; + let s3_path = file_data + .file_path + .to_str() + .ok_or(ParseError::FilenameParse)?; + s3::delete_from_s3(s3_path).await?; + Ok(()) +} diff --git a/db-ingest/src/models/active_match.rs b/db-ingest/src/models/active_match.rs new file mode 100644 index 0000000..d6142d6 --- /dev/null +++ b/db-ingest/src/models/active_match.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ActiveMatch { + pub match_id: u32, + pub scraped_at: i64, + pub winning_team: u8, + pub start_time: u32, + pub players: Vec, + pub lobby_id: u64, + pub duration_s: u32, + pub spectators: u32, + pub open_spectator_slots: u32, + pub objectives_mask_team0: u16, + pub objectives_mask_team1: u16, + pub net_worth_team_0: u32, + pub net_worth_team_1: u32, + pub match_mode: u8, + pub game_mode: u8, + pub match_score: u32, + pub region_mode: u8, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ActiveMatchPlayer { + pub account_id: u64, + pub team: u8, + pub abandoned: bool, + pub hero_id: u32, +} diff --git a/db-ingest/src/models/clickhouse_active_match.rs b/db-ingest/src/models/clickhouse_active_match.rs new file mode 100644 index 0000000..09e0e6f --- /dev/null +++ b/db-ingest/src/models/clickhouse_active_match.rs @@ -0,0 +1,59 @@ +use crate::models::active_match::ActiveMatch; +use crate::models::enums::{GameMode, MatchMode, RegionMode, Team}; +use clickhouse::Row; +use serde::Serialize; + +#[derive(Row, Serialize, Debug)] +pub struct ClickHouseActiveMatch { + pub match_id: u32, + pub scraped_at: i64, + pub start_time: u32, + pub winning_team: u8, + #[serde(rename = "players.account_id")] + pub players_account_id: Vec, + #[serde(rename = "players.team")] + pub players_team: Vec, + #[serde(rename = "players.abandoned")] + pub players_abandoned: Vec, + #[serde(rename = "players.hero_id")] + pub players_hero_id: Vec, + pub lobby_id: u64, + pub net_worth_team_0: u32, + pub net_worth_team_1: u32, + pub duration_s: u32, + pub spectators: u32, + pub open_spectator_slots: u32, + pub objectives_mask_team0: u16, + pub objectives_mask_team1: u16, + pub match_mode: MatchMode, + pub game_mode: GameMode, + pub match_score: u32, + pub region_mode: RegionMode, +} + +impl From for ClickHouseActiveMatch { + fn from(am: ActiveMatch) -> Self { + Self { + start_time: am.start_time, + winning_team: am.winning_team, + match_id: am.match_id, + scraped_at: am.scraped_at, + players_account_id: am.players.iter().map(|p| p.account_id).collect(), + players_team: am.players.iter().map(|p| p.team).map(Team::from).collect(), + players_abandoned: am.players.iter().map(|p| p.abandoned).collect(), + players_hero_id: am.players.iter().map(|p| p.hero_id).collect(), + lobby_id: am.lobby_id, + net_worth_team_0: am.net_worth_team_0, + net_worth_team_1: am.net_worth_team_1, + duration_s: am.duration_s, + spectators: am.spectators, + open_spectator_slots: am.open_spectator_slots, + objectives_mask_team0: am.objectives_mask_team0, + objectives_mask_team1: am.objectives_mask_team1, + match_mode: MatchMode::from(am.match_mode), + game_mode: GameMode::from(am.game_mode), + match_score: am.match_score, + region_mode: RegionMode::from(am.region_mode), + } + } +} diff --git a/db-ingest/src/models/enums.rs b/db-ingest/src/models/enums.rs index 8d41c2b..5ff75e8 100644 --- a/db-ingest/src/models/enums.rs +++ b/db-ingest/src/models/enums.rs @@ -24,6 +24,18 @@ impl From for GameMode { } } +impl From for GameMode { + fn from(value: u8) -> Self { + match value { + 0 => GameMode::Invalid, + 1 => GameMode::Normal, + 2 => GameMode::OnevOneTest, + 3 => GameMode::Sandbox, + _ => GameMode::Invalid, + } + } +} + #[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone)] #[repr(u8)] pub enum MatchMode { @@ -50,6 +62,21 @@ impl From for MatchMode { } } +impl From for MatchMode { + fn from(value: u8) -> Self { + match value { + 0 => MatchMode::Invalid, + 1 => MatchMode::Unranked, + 2 => MatchMode::PrivateLobby, + 3 => MatchMode::CoopBot, + 4 => MatchMode::Ranked, + 5 => MatchMode::ServerTest, + 6 => MatchMode::Tutorial, + _ => MatchMode::Invalid, + } + } +} + #[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone)] #[repr(u8)] pub enum MatchOutcome { @@ -84,6 +111,17 @@ impl From for Team { } } +impl From for Team { + fn from(value: u8) -> Self { + match value { + 0 => Team::Team0, + 1 => Team::Team1, + 16 => Team::Spectator, + _ => Team::Spectator, + } + } +} + #[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone)] #[repr(u8)] pub enum Objective { diff --git a/db-ingest/src/models/file_type.rs b/db-ingest/src/models/file_type.rs index f40470d..1f9b7ca 100644 --- a/db-ingest/src/models/file_type.rs +++ b/db-ingest/src/models/file_type.rs @@ -1,25 +1,13 @@ use crate::models::error::ParseError; -use crate::parsers::metadata_content_parser::MetaDataContentParser; -use crate::parsers::metadata_parser::MetaDataParser; -use crate::parsers::parser::Parser; use std::fmt::Display; use std::str::FromStr; use std::write; -use valveprotos::deadlock::c_msg_match_meta_data_contents::MatchInfo; #[derive(Debug, Clone, Copy)] pub enum FileType { Metadata, MetadataContent, -} - -impl FileType { - pub fn get_parser(&self) -> Box> { - match self { - FileType::Metadata => Box::new(MetaDataParser::default()), - FileType::MetadataContent => Box::new(MetaDataContentParser::default()), - } - } + ActiveMatchesJsonLines, } impl FromStr for FileType { @@ -29,6 +17,7 @@ impl FromStr for FileType { match s { "meta" => Ok(Self::Metadata), "metac" => Ok(Self::MetadataContent), + "amjsonl" => Ok(Self::ActiveMatchesJsonLines), _ => Err(ParseError::UnknownVariant), } } @@ -39,6 +28,17 @@ impl Display for FileType { match self { Self::Metadata => write!(f, "meta"), Self::MetadataContent => write!(f, "metac"), + Self::ActiveMatchesJsonLines => write!(f, "active-matches"), + } + } +} + +impl FileType { + pub fn extension(&self) -> &'static str { + match self { + Self::Metadata => "meta", + Self::MetadataContent => "metac", + Self::ActiveMatchesJsonLines => "amjsonl", } } } diff --git a/db-ingest/src/models/mod.rs b/db-ingest/src/models/mod.rs index 5d168b2..cb88023 100644 --- a/db-ingest/src/models/mod.rs +++ b/db-ingest/src/models/mod.rs @@ -1,3 +1,5 @@ +pub mod active_match; +pub mod clickhouse_active_match; pub mod clickhouse_match_metadata; pub mod compression; pub mod enums; diff --git a/db-ingest/src/parsers/active_matches_json_lines_parser.rs b/db-ingest/src/parsers/active_matches_json_lines_parser.rs new file mode 100644 index 0000000..1424ada --- /dev/null +++ b/db-ingest/src/parsers/active_matches_json_lines_parser.rs @@ -0,0 +1,41 @@ +use crate::models::active_match::ActiveMatch; +use crate::models::compression::Compression; +use crate::models::error::ParseError; +use crate::models::file_data::FileData; +use crate::models::file_type::FileType; +use crate::models::parse_result::ParseResult; +use crate::parsers::parser::Parser; +use log::{debug, warn}; + +#[derive(Default, Debug)] +pub struct ActiveMatchesJsonLinesParser; + +impl Parser>> for ActiveMatchesJsonLinesParser { + fn parse( + &self, + _: &FileData, + data: &[u8], + ) -> Result>>, ParseError> { + let data_str = String::from_utf8_lossy(data); + debug!("Data: {}", data_str); + for line in data_str.lines() { + debug!("Line: {}", line); + match serde_json::from_str::>(line) { + Ok(_) => {} + Err(e) => { + warn!("Error parsing line: {}", e); + } + } + } + let parsed_data = data_str + .lines() + .filter_map(|l| serde_json::from_str::>(l).ok()) + .collect::>(); + Ok(ParseResult { + file_type: FileType::ActiveMatchesJsonLines, + compression: Compression::Zstd, + data: data.to_vec(), + parsed_data, + }) + } +} diff --git a/db-ingest/src/parsers/metadata_content_parser.rs b/db-ingest/src/parsers/metadata_content_parser.rs index 5488fc8..b687592 100644 --- a/db-ingest/src/parsers/metadata_content_parser.rs +++ b/db-ingest/src/parsers/metadata_content_parser.rs @@ -9,14 +9,10 @@ use valveprotos::deadlock::c_msg_match_meta_data_contents::MatchInfo; use valveprotos::deadlock::CMsgMatchMetaDataContents; #[derive(Default, Debug)] -pub struct MetaDataContentParser {} +pub struct MetaDataContentParser; impl Parser for MetaDataContentParser { - fn parse( - &self, - file_data: &FileData, - data: &[u8], - ) -> Result, ParseError> { + fn parse(&self, _: &FileData, data: &[u8]) -> Result, ParseError> { let parsed_data = CMsgMatchMetaDataContents::decode(data) .map_err(ParseError::ProtobufDecode)? .match_info diff --git a/db-ingest/src/parsers/metadata_parser.rs b/db-ingest/src/parsers/metadata_parser.rs index 5d6c203..70af040 100644 --- a/db-ingest/src/parsers/metadata_parser.rs +++ b/db-ingest/src/parsers/metadata_parser.rs @@ -10,14 +10,10 @@ use valveprotos::deadlock::c_msg_match_meta_data_contents::MatchInfo; use valveprotos::deadlock::{CMsgMatchMetaData, CMsgMatchMetaDataContents}; #[derive(Default, Debug)] -pub struct MetaDataParser {} +pub struct MetaDataParser; impl Parser for MetaDataParser { - fn parse( - &self, - file_data: &FileData, - data: &[u8], - ) -> Result, ParseError> { + fn parse(&self, _: &FileData, data: &[u8]) -> Result, ParseError> { // Check if match metadata is parseable let match_metadata = CMsgMatchMetaData::decode(data).map_err(ParseError::ProtobufDecode)?; let match_details = match_metadata.match_details(); diff --git a/db-ingest/src/parsers/mod.rs b/db-ingest/src/parsers/mod.rs index a169ccd..b191a31 100644 --- a/db-ingest/src/parsers/mod.rs +++ b/db-ingest/src/parsers/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod active_matches_json_lines_parser; pub(crate) mod metadata_content_parser; pub(crate) mod metadata_parser; pub(crate) mod parser;