From 66581658efe7def2643ecb26d85f4d1740e51fc1 Mon Sep 17 00:00:00 2001 From: NxPKG <116948796+NxPKG@users.noreply.github.com> Date: Thu, 6 Mar 2025 07:03:59 +0600 Subject: [PATCH] Use preloaded header serde dict across (de)compress calls Signed-off-by: NxPKG <116948796+NxPKG@users.noreply.github.com> --- bongonet-header-serde/src/lib.rs | 57 ++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/bongonet-header-serde/src/lib.rs b/bongonet-header-serde/src/lib.rs index 9aac5ce..42bc900 100644 --- a/bongonet-header-serde/src/lib.rs +++ b/bongonet-header-serde/src/lib.rs @@ -24,10 +24,10 @@ pub mod dict; mod thread_zstd; -use bongonet_error::{Error, ErrorType, Result}; -use bongonet_http::ResponseHeader; use bytes::BufMut; use http::Version; +use bongonet_error::{Error, ErrorType, Result}; +use bongonet_http::ResponseHeader; use std::cell::RefCell; use std::ops::DerefMut; use thread_local::ThreadLocal; @@ -37,8 +37,7 @@ use thread_local::ThreadLocal; /// This struct provides the APIs to convert HTTP response header into compressed wired format for /// storage. pub struct HeaderSerde { - compression: thread_zstd::Compression, - level: i32, + compression: ZstdCompression, // internal buffer for uncompressed data to be compressed and vice versa buf: ThreadLocal>>, } @@ -54,14 +53,18 @@ impl HeaderSerde { pub fn new(dict: Option>) -> Self { if let Some(dict) = dict { HeaderSerde { - compression: thread_zstd::Compression::with_dict(dict), - level: COMPRESS_LEVEL, + compression: ZstdCompression::WithDict(thread_zstd::CompressionWithDict::new( + &dict, + COMPRESS_LEVEL, + )), buf: ThreadLocal::new(), } } else { HeaderSerde { - compression: thread_zstd::Compression::new(), - level: COMPRESS_LEVEL, + compression: ZstdCompression::Default( + thread_zstd::Compression::new(), + COMPRESS_LEVEL, + ), buf: ThreadLocal::new(), } } @@ -77,9 +80,7 @@ impl HeaderSerde { .borrow_mut(); buf.clear(); // reset the buf resp_header_to_buf(header, &mut buf); - self.compression - .compress(&buf, self.level) - .map_err(|e| into_error(e, "compress header")) + self.compression.compress(&buf) } /// Deserialize the given response header @@ -90,17 +91,47 @@ impl HeaderSerde { .borrow_mut(); buf.clear(); // reset the buf self.compression - .decompress_to_buffer(data, buf.deref_mut()) - .map_err(|e| into_error(e, "decompress header"))?; + .decompress_to_buffer(data, buf.deref_mut())?; buf_to_http_header(&buf) } } +// Wrapper type to unify compressing with and withuot a dictionary, +// since the two structs have different inputs for their APIs. +enum ZstdCompression { + Default(thread_zstd::Compression, i32), + WithDict(thread_zstd::CompressionWithDict), +} + #[inline] fn into_error(e: &'static str, context: &'static str) -> Box { Error::because(ErrorType::InternalError, context, e) } +impl ZstdCompression { + fn compress(&self, data: &[u8]) -> Result> { + match &self { + ZstdCompression::Default(c, level) => c + .compress(data, *level) + .map_err(|e| into_error(e, "decompress header")), + ZstdCompression::WithDict(c) => c + .compress(data) + .map_err(|e| into_error(e, "decompress header")), + } + } + + fn decompress_to_buffer(&self, source: &[u8], destination: &mut Vec) -> Result { + match &self { + ZstdCompression::Default(c, _) => c + .decompress_to_buffer(source, destination) + .map_err(|e| into_error(e, "decompress header")), + ZstdCompression::WithDict(c) => c + .decompress_to_buffer(source, destination) + .map_err(|e| into_error(e, "decompress header")), + } + } +} + const CRLF: &[u8; 2] = b"\r\n"; // Borrowed from bongonet http1