Skip to content

Commit

Permalink
db-ingest: Add zstd compression
Browse files Browse the repository at this point in the history
  • Loading branch information
raimannma committed Oct 9, 2024
1 parent feb1db6 commit fecbfbd
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 5 deletions.
41 changes: 41 additions & 0 deletions db-ingest/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion db-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rust-s3 = "0.35.1"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros"] }
log = "0.4.22"
futures-lite = "2.3.0"
async-compression = { version = "0.4.13", features = ["bzip2", "tokio"] }
async-compression = { version = "0.4.13", features = ["bzip2", "tokio", "zstd"] }
bytes = "1.7.2"
valveprotos = { git = "https://github.com/OpenSource-Deadlock-Tools/valveprotos-rs", rev = "71ab6d7de2cd43f567397f65821efd80f5aa0b71", features = ["deadlock"] }
prost = "0.13.3"
Expand Down
22 changes: 20 additions & 2 deletions db-ingest/src/models/compression.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::models::error::ParseError;
use async_compression::tokio::bufread::BzDecoder;
use async_compression::tokio::write::BzEncoder;
use async_compression::tokio::bufread::{BzDecoder, ZstdDecoder};
use async_compression::tokio::write::{BzEncoder, ZstdEncoder};
use std::fmt::Display;
use std::str::FromStr;
use std::{vec, write};
Expand All @@ -11,6 +11,7 @@ pub enum Compression {
#[default]
Uncompressed,
Bzip2,
Zstd,
}

impl Compression {
Expand All @@ -25,6 +26,14 @@ impl Compression {
.map_err(ParseError::Decompress)?;
Ok(decompressed)
}
Self::Zstd => {
let mut decompressed = vec![];
ZstdDecoder::new(data.as_ref())
.read_to_end(&mut decompressed)
.await
.map_err(ParseError::Decompress)?;
Ok(decompressed)
}
}
}
pub async fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ParseError> {
Expand All @@ -38,6 +47,14 @@ impl Compression {
.map_err(ParseError::Decompress)?;
Ok(encoder.into_inner())
}
Self::Zstd => {
let mut encoder = ZstdEncoder::new(Vec::new());
encoder
.write_all(data)
.await
.map_err(ParseError::Decompress)?;
Ok(encoder.into_inner())
}
}
}
}
Expand All @@ -47,6 +64,7 @@ impl Display for Compression {
match self {
Self::Uncompressed => write!(f, ""),
Self::Bzip2 => write!(f, "bz2"),
Self::Zstd => write!(f, "zstd"),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion db-ingest/src/parsers/metadata_content_parser.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::models::compression::Compression;
use crate::models::error::ParseError;
use crate::models::file_data::FileData;
use crate::models::file_type::FileType;
Expand All @@ -23,7 +24,7 @@ impl Parser<MatchInfo> for MetaDataContentParser {
Ok(ParseResult {
file_type: FileType::MetadataContent,
data: data.to_vec(),
compression: file_data.compression,
compression: Compression::Zstd,
parsed_data,
})
}
Expand Down
3 changes: 2 additions & 1 deletion db-ingest/src/parsers/metadata_parser.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::parsers::parser::Parser;
use prost::Message;

use crate::models::compression::Compression;
use crate::models::error::ParseError;
use crate::models::file_data::FileData;
use crate::models::file_type::FileType;
Expand Down Expand Up @@ -28,7 +29,7 @@ impl Parser<MatchInfo> for MetaDataParser {
Ok(ParseResult {
file_type: FileType::MetadataContent,
data: match_details.to_vec(),
compression: file_data.compression,
compression: Compression::Zstd,
parsed_data,
})
}
Expand Down

0 comments on commit fecbfbd

Please sign in to comment.