Skip to content

Commit

Permalink
db-ingest: Add Active Matches Ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
raimannma committed Oct 10, 2024
1 parent c87fd6a commit 612a63d
Show file tree
Hide file tree
Showing 14 changed files with 372 additions and 49 deletions.
46 changes: 46 additions & 0 deletions clickhouse/migrations/03_create_active_matches_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
CREATE TABLE IF NOT EXISTS active_matches
(
match_id UInt32,
scraped_at DateTime64,
start_time DATETIME,
winning_team UInt8,
players Nested (
account_id UInt64,
team Enum8 (
'Team0' = 0,
'Team1' = 1,
'Spectator' = 16,
),
abandoned bool,
hero_id UInt32
),
lobby_id UInt64,
net_worth_team_0 UInt32,
net_worth_team_1 UInt32,
duration_s UInt32,
spectators UInt32,
open_spectator_slots UInt32,
objectives_mask_team0 UInt16,
objectives_mask_team1 UInt16,
match_mode Enum8 (
'Invalid' = 0,
'Unranked' = 1,
'PrivateLobby' = 2,
'CoopBot' = 3,
'Ranked' = 4,
'ServerTest' = 5,
'Tutorial' = 6
),
game_mode Enum8 (
'Invalid' = 0, 'Normal' = 1, 'OneVsOneTest' = 2, 'Sandbox' = 3
),
match_score UInt32,
region_mode Enum8 (
'Row' = 0,
'Europe' = 1,
'SEAsia' = 2,
'SAmerica' = 3,
'Russia' = 4,
'Oceania' = 5
)
) ENGINE = ReplacingMergeTree() ORDER BY (match_id, scraped_at);
1 change: 1 addition & 0 deletions db-ingest/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions db-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ prost = "0.13.3"
serde_repr = "0.1.19"
serde = { version = "1.0.210", features = ["derive"] }
clickhouse = { version = "0.13.0", features = ["time"] }
serde_json = "1.0.128"
29 changes: 29 additions & 0 deletions db-ingest/src/ingestors/clickhouse_ingestor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::ingestors::ingestor::Ingestor;
use crate::models::active_match::ActiveMatch;
use crate::models::clickhouse_active_match::ClickHouseActiveMatch;
use crate::models::clickhouse_match_metadata::{ClickhouseMatchInfo, ClickhouseMatchPlayer};
use crate::models::error::ParseError;
use clickhouse::{Client, Compression};
use log::debug;
use std::sync::LazyLock;
use valveprotos::deadlock::c_msg_match_meta_data_contents::MatchInfo;

Expand Down Expand Up @@ -68,3 +71,29 @@ impl Ingestor<MatchInfo> for ClickhouseIngestor {
Ok(())
}
}

impl Ingestor<Vec<ActiveMatch>> for ClickhouseIngestor {
async fn ingest(&self, active_matches: &Vec<ActiveMatch>) -> Result<(), ParseError> {
debug!("Ingesting {} active matches", active_matches.len());
let mut active_match_insert = self
.client
.insert("active_matches")
.map_err(ParseError::ClickhouseError)?;
let ch_active_matches: Vec<ClickHouseActiveMatch> = active_matches
.iter()
.cloned()
.map(ClickHouseActiveMatch::from)
.collect();
for active_match in ch_active_matches {
active_match_insert
.write(&active_match)
.await
.map_err(ParseError::ClickhouseError)?;
}
active_match_insert
.end()
.await
.map_err(ParseError::ClickhouseError)?;
Ok(())
}
}
131 changes: 107 additions & 24 deletions db-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use crate::models::compression::Compression;
use crate::models::error::ParseError;
use crate::models::file_data::FileData;
use crate::models::file_type::FileType;
use crate::parsers::active_matches_json_lines_parser::ActiveMatchesJsonLinesParser;
use crate::parsers::metadata_content_parser::MetaDataContentParser;
use crate::parsers::metadata_parser::MetaDataParser;
use crate::parsers::parser::Parser;
use ::s3::request::ResponseData;
use futures_lite::StreamExt;
use lapin::message::Delivery;
Expand Down Expand Up @@ -98,41 +102,34 @@ async fn try_process_message(message: &Delivery) -> Result<(), ParseError> {
}

async fn process_file(file_data: &FileData, file_content: &ResponseData) -> Result<(), ParseError> {
let file_content = file_data
let decompressed = file_data
.compression
.decompress(&file_content.bytes().to_vec())
.await?;
let parser = file_data.file_type.get_parser();
let result = parser.parse(file_data, &file_content)?;
let clickhouse_ingestor = ClickhouseIngestor::new();
let compressed = if file_data.compression == result.compression && file_content == result.data {
debug!("No changes detected, moving file to parsed");
&file_content
} else {
&result.compression.compress(&result.data).await?
};

match result.file_type {
FileType::Metadata => clickhouse_ingestor.ingest(&result.parsed_data).await?,
FileType::MetadataContent => clickhouse_ingestor.ingest(&result.parsed_data).await?,
match file_data.file_type {
FileType::Metadata => {
process_metadata(file_data, decompressed.as_slice(), file_content.as_slice()).await
}
FileType::MetadataContent => {
process_metadata_content(file_data, decompressed.as_slice(), file_content.as_slice())
.await
}
FileType::ActiveMatchesJsonLines => {
process_active_matches(file_data, decompressed.as_slice(), file_content.as_slice())
.await
}
}

let parsed_path = get_parsed_path(&file_data.file_name, result.file_type, result.compression);
s3::upload_to_s3(compressed, &parsed_path).await?;
let s3_path = file_data
.file_path
.to_str()
.ok_or(ParseError::FilenameParse)?;
s3::delete_from_s3(s3_path).await?;
Ok(())
}

fn get_parsed_path(file_name: &str, file_type: FileType, compression: Compression) -> String {
match compression {
Compression::Uncompressed => format!("/parsed/{}/{}.{}", file_type, file_name, file_type),
_ => format!(
"/parsed/{}/{}.{}.{}",
file_type, file_name, file_type, compression
file_type,
file_name,
file_type.extension(),
compression
),
}
}
Expand All @@ -146,3 +143,89 @@ fn get_failed_path(file_name: &str, file_type: FileType, compression: Compressio
),
}
}

async fn process_metadata(
file_data: &FileData,
decompressed: &[u8],
file_content: &[u8],
) -> Result<(), ParseError> {
let result = MetaDataParser::default().parse(file_data, decompressed)?;
let compressed = if file_data.compression == result.compression && file_content == result.data {
debug!("No changes detected, moving file to parsed");
file_content
} else {
&result.compression.compress(&result.data).await?
};
ClickhouseIngestor::new()
.ingest(&result.parsed_data)
.await?;

s3::upload_to_s3(
compressed,
&get_parsed_path(&file_data.file_name, result.file_type, result.compression),
)
.await?;
let s3_path = file_data
.file_path
.to_str()
.ok_or(ParseError::FilenameParse)?;
s3::delete_from_s3(s3_path).await?;
Ok(())
}

async fn process_metadata_content(
file_data: &FileData,
decompressed: &[u8],
file_content: &[u8],
) -> Result<(), ParseError> {
let result = MetaDataContentParser::default().parse(file_data, decompressed)?;
let compressed = if file_data.compression == result.compression && file_content == result.data {
debug!("No changes detected, moving file to parsed");
file_content
} else {
&result.compression.compress(&result.data).await?
};
ClickhouseIngestor::new()
.ingest(&result.parsed_data)
.await?;

s3::upload_to_s3(
compressed,
&get_parsed_path(&file_data.file_name, result.file_type, result.compression),
)
.await?;
let s3_path = file_data
.file_path
.to_str()
.ok_or(ParseError::FilenameParse)?;
s3::delete_from_s3(s3_path).await?;
Ok(())
}

async fn process_active_matches(
file_data: &FileData,
decompressed: &[u8],
file_content: &[u8],
) -> Result<(), ParseError> {
let result = ActiveMatchesJsonLinesParser::default().parse(file_data, decompressed)?;
let compressed = if file_data.compression == result.compression && file_content == result.data {
debug!("No changes detected, moving file to parsed");
file_content
} else {
&result.compression.compress(&result.data).await?
};
let active_matches = result.parsed_data.into_iter().flatten().collect::<Vec<_>>();
ClickhouseIngestor::new().ingest(&active_matches).await?;

s3::upload_to_s3(
compressed,
&get_parsed_path(&file_data.file_name, result.file_type, result.compression),
)
.await?;
let s3_path = file_data
.file_path
.to_str()
.ok_or(ParseError::FilenameParse)?;
s3::delete_from_s3(s3_path).await?;
Ok(())
}
30 changes: 30 additions & 0 deletions db-ingest/src/models/active_match.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ActiveMatch {
pub match_id: u32,
pub scraped_at: i64,
pub winning_team: u8,
pub start_time: u32,
pub players: Vec<ActiveMatchPlayer>,
pub lobby_id: u64,
pub duration_s: u32,
pub spectators: u32,
pub open_spectator_slots: u32,
pub objectives_mask_team0: u16,
pub objectives_mask_team1: u16,
pub net_worth_team_0: u32,
pub net_worth_team_1: u32,
pub match_mode: u8,
pub game_mode: u8,
pub match_score: u32,
pub region_mode: u8,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ActiveMatchPlayer {
pub account_id: u64,
pub team: u8,
pub abandoned: bool,
pub hero_id: u32,
}
59 changes: 59 additions & 0 deletions db-ingest/src/models/clickhouse_active_match.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::models::active_match::ActiveMatch;
use crate::models::enums::{GameMode, MatchMode, RegionMode, Team};
use clickhouse::Row;
use serde::Serialize;

#[derive(Row, Serialize, Debug)]
pub struct ClickHouseActiveMatch {
pub match_id: u32,
pub scraped_at: i64,
pub start_time: u32,
pub winning_team: u8,
#[serde(rename = "players.account_id")]
pub players_account_id: Vec<u64>,
#[serde(rename = "players.team")]
pub players_team: Vec<Team>,
#[serde(rename = "players.abandoned")]
pub players_abandoned: Vec<bool>,
#[serde(rename = "players.hero_id")]
pub players_hero_id: Vec<u32>,
pub lobby_id: u64,
pub net_worth_team_0: u32,
pub net_worth_team_1: u32,
pub duration_s: u32,
pub spectators: u32,
pub open_spectator_slots: u32,
pub objectives_mask_team0: u16,
pub objectives_mask_team1: u16,
pub match_mode: MatchMode,
pub game_mode: GameMode,
pub match_score: u32,
pub region_mode: RegionMode,
}

impl From<ActiveMatch> for ClickHouseActiveMatch {
fn from(am: ActiveMatch) -> Self {
Self {
start_time: am.start_time,
winning_team: am.winning_team,
match_id: am.match_id,
scraped_at: am.scraped_at,
players_account_id: am.players.iter().map(|p| p.account_id).collect(),
players_team: am.players.iter().map(|p| p.team).map(Team::from).collect(),
players_abandoned: am.players.iter().map(|p| p.abandoned).collect(),
players_hero_id: am.players.iter().map(|p| p.hero_id).collect(),
lobby_id: am.lobby_id,
net_worth_team_0: am.net_worth_team_0,
net_worth_team_1: am.net_worth_team_1,
duration_s: am.duration_s,
spectators: am.spectators,
open_spectator_slots: am.open_spectator_slots,
objectives_mask_team0: am.objectives_mask_team0,
objectives_mask_team1: am.objectives_mask_team1,
match_mode: MatchMode::from(am.match_mode),
game_mode: GameMode::from(am.game_mode),
match_score: am.match_score,
region_mode: RegionMode::from(am.region_mode),
}
}
}
Loading

0 comments on commit 612a63d

Please sign in to comment.