diff --git a/db-ingest/Cargo.lock b/db-ingest/Cargo.lock index 88ca3cd..b22dfe4 100644 --- a/db-ingest/Cargo.lock +++ b/db-ingest/Cargo.lock @@ -202,6 +202,8 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", + "zstd", + "zstd-safe", ] [[package]] @@ -510,6 +512,8 @@ version = "1.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -1370,6 +1374,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "lapin" version = "2.5.0" @@ -2978,3 +2991,31 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/db-ingest/Cargo.toml b/db-ingest/Cargo.toml index 6d0e5f3..b450edd 100644 --- a/db-ingest/Cargo.toml +++ b/db-ingest/Cargo.toml @@ -10,7 +10,7 @@ rust-s3 = "0.35.1" tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros"] } log = "0.4.22" futures-lite = "2.3.0" -async-compression = { version = "0.4.13", features = ["bzip2", "tokio"] } +async-compression = { version = "0.4.13", features = ["bzip2", "tokio", "zstd"] } bytes = "1.7.2" valveprotos = { git = "https://github.com/OpenSource-Deadlock-Tools/valveprotos-rs", rev = "71ab6d7de2cd43f567397f65821efd80f5aa0b71", features = ["deadlock"] } prost = "0.13.3" diff --git a/db-ingest/src/models/compression.rs b/db-ingest/src/models/compression.rs index 4f7ed6c..57e5eef 100644 --- a/db-ingest/src/models/compression.rs +++ b/db-ingest/src/models/compression.rs @@ -1,6 +1,6 @@ use crate::models::error::ParseError; -use async_compression::tokio::bufread::BzDecoder; -use async_compression::tokio::write::BzEncoder; +use async_compression::tokio::bufread::{BzDecoder, ZstdDecoder}; +use async_compression::tokio::write::{BzEncoder, ZstdEncoder}; use std::fmt::Display; use std::str::FromStr; use std::{vec, write}; @@ -11,6 +11,7 @@ pub enum Compression { #[default] Uncompressed, Bzip2, + Zstd, } impl Compression { @@ -25,6 +26,14 @@ impl Compression { .map_err(ParseError::Decompress)?; Ok(decompressed) } + Self::Zstd => { + let mut decompressed = vec![]; + ZstdDecoder::new(data.as_ref()) + .read_to_end(&mut decompressed) + .await + .map_err(ParseError::Decompress)?; + Ok(decompressed) + } } } pub async fn compress(&self, data: &[u8]) -> Result, ParseError> { @@ -38,6 +47,14 @@ impl Compression { .map_err(ParseError::Decompress)?; Ok(encoder.into_inner()) } + Self::Zstd => { + let mut encoder = ZstdEncoder::new(Vec::new()); + encoder + .write_all(data) + .await + .map_err(ParseError::Decompress)?; + Ok(encoder.into_inner()) + } } } } @@ -47,6 +64,7 @@ impl Display for Compression { match self { Self::Uncompressed => write!(f, ""), Self::Bzip2 => write!(f, "bz2"), + Self::Zstd => write!(f, "zstd"), } } } diff --git a/db-ingest/src/parsers/metadata_content_parser.rs b/db-ingest/src/parsers/metadata_content_parser.rs index 89511b6..5488fc8 100644 --- a/db-ingest/src/parsers/metadata_content_parser.rs +++ b/db-ingest/src/parsers/metadata_content_parser.rs @@ -1,3 +1,4 @@ +use crate::models::compression::Compression; use crate::models::error::ParseError; use crate::models::file_data::FileData; use crate::models::file_type::FileType; @@ -23,7 +24,7 @@ impl Parser for MetaDataContentParser { Ok(ParseResult { file_type: FileType::MetadataContent, data: data.to_vec(), - compression: file_data.compression, + compression: Compression::Zstd, parsed_data, }) } diff --git a/db-ingest/src/parsers/metadata_parser.rs b/db-ingest/src/parsers/metadata_parser.rs index 114cf94..5d6c203 100644 --- a/db-ingest/src/parsers/metadata_parser.rs +++ b/db-ingest/src/parsers/metadata_parser.rs @@ -1,6 +1,7 @@ use crate::parsers::parser::Parser; use prost::Message; +use crate::models::compression::Compression; use crate::models::error::ParseError; use crate::models::file_data::FileData; use crate::models::file_type::FileType; @@ -28,7 +29,7 @@ impl Parser for MetaDataParser { Ok(ParseResult { file_type: FileType::MetadataContent, data: match_details.to_vec(), - compression: file_data.compression, + compression: Compression::Zstd, parsed_data, }) }