Skip to content

Commit

Permalink
Use preloaded header serde dict across (de)compress calls
Browse files Browse the repository at this point in the history
Signed-off-by: NxPKG <[email protected]>
  • Loading branch information
NxPKG authored Mar 6, 2025
1 parent 15b7761 commit 6658165
Showing 1 changed file with 44 additions and 13 deletions.
57 changes: 44 additions & 13 deletions bongonet-header-serde/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
pub mod dict;
mod thread_zstd;

use bongonet_error::{Error, ErrorType, Result};
use bongonet_http::ResponseHeader;
use bytes::BufMut;
use http::Version;
use bongonet_error::{Error, ErrorType, Result};
use bongonet_http::ResponseHeader;
use std::cell::RefCell;
use std::ops::DerefMut;
use thread_local::ThreadLocal;
Expand All @@ -37,8 +37,7 @@ use thread_local::ThreadLocal;
/// This struct provides the APIs to convert HTTP response header into compressed wired format for
/// storage.
pub struct HeaderSerde {
compression: thread_zstd::Compression,
level: i32,
compression: ZstdCompression,
// internal buffer for uncompressed data to be compressed and vice versa
buf: ThreadLocal<RefCell<Vec<u8>>>,
}
Expand All @@ -54,14 +53,18 @@ impl HeaderSerde {
pub fn new(dict: Option<Vec<u8>>) -> Self {
if let Some(dict) = dict {
HeaderSerde {
compression: thread_zstd::Compression::with_dict(dict),
level: COMPRESS_LEVEL,
compression: ZstdCompression::WithDict(thread_zstd::CompressionWithDict::new(
&dict,
COMPRESS_LEVEL,
)),
buf: ThreadLocal::new(),
}
} else {
HeaderSerde {
compression: thread_zstd::Compression::new(),
level: COMPRESS_LEVEL,
compression: ZstdCompression::Default(
thread_zstd::Compression::new(),
COMPRESS_LEVEL,
),
buf: ThreadLocal::new(),
}
}
Expand All @@ -77,9 +80,7 @@ impl HeaderSerde {
.borrow_mut();
buf.clear(); // reset the buf
resp_header_to_buf(header, &mut buf);
self.compression
.compress(&buf, self.level)
.map_err(|e| into_error(e, "compress header"))
self.compression.compress(&buf)
}

/// Deserialize the given response header
Expand All @@ -90,17 +91,47 @@ impl HeaderSerde {
.borrow_mut();
buf.clear(); // reset the buf
self.compression
.decompress_to_buffer(data, buf.deref_mut())
.map_err(|e| into_error(e, "decompress header"))?;
.decompress_to_buffer(data, buf.deref_mut())?;
buf_to_http_header(&buf)
}
}

// Wrapper type to unify compressing with and withuot a dictionary,
// since the two structs have different inputs for their APIs.
enum ZstdCompression {
Default(thread_zstd::Compression, i32),
WithDict(thread_zstd::CompressionWithDict),
}

#[inline]
fn into_error(e: &'static str, context: &'static str) -> Box<Error> {
Error::because(ErrorType::InternalError, context, e)
}

impl ZstdCompression {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
match &self {
ZstdCompression::Default(c, level) => c
.compress(data, *level)
.map_err(|e| into_error(e, "decompress header")),
ZstdCompression::WithDict(c) => c
.compress(data)
.map_err(|e| into_error(e, "decompress header")),
}
}

fn decompress_to_buffer(&self, source: &[u8], destination: &mut Vec<u8>) -> Result<usize> {
match &self {
ZstdCompression::Default(c, _) => c
.decompress_to_buffer(source, destination)
.map_err(|e| into_error(e, "decompress header")),
ZstdCompression::WithDict(c) => c
.decompress_to_buffer(source, destination)
.map_err(|e| into_error(e, "decompress header")),
}
}
}

const CRLF: &[u8; 2] = b"\r\n";

// Borrowed from bongonet http1
Expand Down

0 comments on commit 6658165

Please sign in to comment.