Skip to content

Commit

Permalink
Fix interceptor finalization
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 24, 2024
1 parent 7bf03e8 commit b86d20a
Show file tree
Hide file tree
Showing 15 changed files with 698 additions and 578 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Fixes
* fix reconnect issue in HTTP client
* fix issue where post and preprocessors that hold state might not finalize correctly

## [0.13.0-rc.17]

Expand Down
46 changes: 38 additions & 8 deletions tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,49 @@ log = "0.4"
serde = { version = "1", features = ["derive"] }
byteorder = "1"
value-trait = "0.8"
rand = "0.8"
bytes = "1.5.0"
memchr = "2.6"
anyhow = { version = "1.0", default-features = true }
thiserror = { version = "1.0", default-features = false }

#gelf
rand = { version = "0.8", optional = true, default-features = false, features = [
] }

# Compression
brotli = { version = "5", default-features = false, features = ["std"] }
xz2 = "0.1"
lz4 = "1"
snap = "1"
zstd = "0.13"
libflate = "2"
brotli = { version = "5", optional = true, default-features = false, features = [
"std",
] }
xz2 = { version = "0.1", optional = true, default-features = false, features = [
] }
lz4 = { version = "1", optional = true, default-features = false, features = [
] }
snap = { version = "1", optional = true, default-features = false, features = [
] }
zstd = { version = "0.13", optional = true, default-features = false, features = [
] }
libflate = { version = "2", optional = true, default-features = false, features = [
"std",
] }

# Length Prefix
bytes = { version = "1.5.0", optional = true, default-features = false, features = [
] }


[dev-dependencies]
proptest = "1.4"


[features]
default = ["base64", "compression", "gelf", "length-prefix"]
length-prefix = ["dep:bytes"]
gelf = ["dep:rand"]
base64 = []
compression = [
"dep:brotli",
"dep:xz2",
"dep:lz4",
"dep:snap",
"dep:zstd",
"dep:libflate",
]
212 changes: 123 additions & 89 deletions tremor-interceptor/src/postprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "base64")]
pub(crate) mod base64;
mod chunk;
pub(crate) mod collect;
#[cfg(feature = "compression")]
pub(crate) mod compress;
#[cfg(feature = "gelf")]
pub(crate) mod gelf_chunking;
pub(crate) mod ingest_ns;
#[cfg(feature = "length-prefix")]
pub(crate) mod length_prefixed;
pub(crate) mod separate;

#[cfg(feature = "length-prefix")]
pub(crate) mod textual_length_prefixed;

mod chunk;
pub(crate) mod collect;
pub(crate) mod ingest_ns;
pub(crate) mod separate;

use log::error;
use tremor_common::time::nanotime;
/// Set of Postprocessors
Expand Down Expand Up @@ -68,11 +75,47 @@ pub trait Postprocessor: Send + Sync {
///
/// # Errors
/// * if the postprocessor could not be finished correctly
fn finish(&mut self, _data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
Ok(vec![])
}
fn finish(&mut self, _data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>>;
}

/// A simpliefied version of `Postprocessor` that does not require any state, it ignores the `ingres_ns` and `egress_ns` parameters.
/// and `finish`` is implemented as a passthrough to `process``.
pub trait StatelessPostprocessor: Send + Sync {
/// Canonical name of the postprocessor
fn name(&self) -> &str;
/// process data
///
/// # Errors
///
/// * Errors if the data could not be processed
fn process(&self, data: &[u8]) -> anyhow::Result<Vec<Vec<u8>>>;
}

impl<T> Postprocessor for T
where
T: StatelessPostprocessor,
{
fn name(&self) -> &str {
self.name()
}

fn process(
&mut self,
_ingres_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
StatelessPostprocessor::process(&*self, data)
}

fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some(data) = data {
(&*self).process(data)
} else {
Ok(vec![])
}
}
}
/// Lookup a postprocessor via its config
///
/// # Errors
Expand All @@ -85,14 +128,19 @@ pub fn lookup_with_config(config: &Config) -> anyhow::Result<Box<dyn Postprocess
"collect" => Ok(Box::new(collect::Postprocessor::from_config(
&config.config,
)?)),
"separate" => Ok(Box::new(separate::Separate::from_config(&config.config)?)),
"ingest-ns" => Ok(Box::<ingest_ns::IngestNs>::default()),
#[cfg(feature = "compression")]
"compress" => Ok(Box::new(compress::Compress::from_config(
config.config.as_ref(),
)?)),
"separate" => Ok(Box::new(separate::Separate::from_config(&config.config)?)),
#[cfg(feature = "base64")]
"base64" => Ok(Box::<base64::Base64>::default()),
"ingest-ns" => Ok(Box::<ingest_ns::IngestNs>::default()),
#[cfg(feature = "length-prefix")]
"length-prefixed" => Ok(Box::<length_prefixed::LengthPrefixed>::default()),
#[cfg(feature = "gelf")]
"gelf-chunking" => Ok(Box::<gelf_chunking::Gelf>::default()),
#[cfg(feature = "length-prefix")]
"textual-length-prefixed" => {
Ok(Box::<textual_length_prefixed::TextualLengthPrefixed>::default())
}
Expand Down Expand Up @@ -145,6 +193,20 @@ pub fn postprocess(
Ok(data)
}

fn finish_and_error(
alias: &str,
pp: &mut dyn Postprocessor,
data: Option<&[u8]>,
) -> anyhow::Result<Vec<Vec<u8>>> {
let res = pp.finish(data);
if let Err(e) = &res {
error!(
"[Connector::{alias}] Postprocessor '{}' finish error: {e}",
pp.name()
);
}
res
}
/// Canonical way to finish postprocessors up
///
/// # Errors
Expand All @@ -155,30 +217,17 @@ pub fn finish(
alias: &str,
) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some((head, tail)) = postprocessors.split_first_mut() {
let mut data = match head.finish(None) {
Ok(d) => d,
Err(e) => {
error!(
"[Connector::{alias}] Postprocessor '{}' finish error: {e}",
head.name()
);
return Err(e);
}
};
let mut data = finish_and_error(alias, head.as_mut(), None)?;
let mut data1 = Vec::new();
for pp in tail {
data1.clear();
if data.is_empty() {
let mut r = finish_and_error(alias, pp.as_mut(), None)?;
data1.append(&mut r);
}
for d in &data {
match pp.finish(Some(d)) {
Ok(mut r) => data1.append(&mut r),
Err(e) => {
error!(
"[Connector::{alias}] Postprocessor '{}' finish error: {e}",
pp.name()
);
return Err(e);
}
}
let mut r = finish_and_error(alias, pp.as_mut(), Some(d))?;
data1.append(&mut r);
}
std::mem::swap(&mut data, &mut data1);
}
Expand All @@ -194,29 +243,57 @@ mod test {
use tremor_config::NameWithConfig;
use tremor_value::literal;

const LOOKUP_TABLE: [&str; 6] = [
"separate",
"base64",
"gelf-chunking",
"ingest-ns",
"length-prefixed",
"textual-length-prefixed",
];
const COMPRESSION: [&str; 6] = ["gzip", "zlib", "xz2", "snappy", "lz4", "zstd"];
#[test]
fn test_lookup_separate() {
assert!(lookup("separate").is_ok());
}
#[test]
fn test_lookup_ingest_ns() {
assert!(lookup("ingest-ns").is_ok());
}
#[test]
fn test_lookup_chunk() -> anyhow::Result<()> {
let config = literal!({"name": "chunk", "config":{"max_bytes": 42}});
let config: NameWithConfig = NameWithConfig::try_from(&config)?;
assert!(lookup_with_config(&config).is_ok());
Ok(())
}
#[test]
fn test_lookup_collect() {
assert!(lookup("collect").is_ok());
}

#[test]
fn test_lookup() {
for t in &LOOKUP_TABLE {
assert!(lookup(t).is_ok());
}
fn test_lookup_errors() {
let t = "snot";
assert!(lookup(t).is_err());

assert!(lookup("bad_lookup").is_err());
}

#[cfg(feature = "gelf")]
#[test]
fn test_lookup_gelf() {
assert!(lookup("gelf-chunking").is_ok());
}

#[cfg(feature = "base64")]
#[test]
fn test_lookup_base64() {
assert!(lookup("base64").is_ok());
}

#[cfg(feature = "length-prefix")]
#[test]
fn test_lookup_length_prefix() {
assert!(lookup("length-prefixed").is_ok());
assert!(lookup("textual-length-prefixed").is_ok());
}

#[cfg(feature = "compression")]
#[test]
fn test_lookup_compression() -> anyhow::Result<()> {
const COMPRESSION: [&str; 7] = ["gzip", "zlib", "xz2", "snappy", "lz4", "zstd", "br"];
for c in COMPRESSION {
let config = literal!({"name": "compress", "config":{"algorithm": c}});
let config = NameWithConfig::try_from(&config)?;
Expand All @@ -228,62 +305,19 @@ mod test {
Ok(())
}

#[test]
fn base64() -> anyhow::Result<()> {
let mut post = base64::Base64 {};
let data: [u8; 0] = [];

assert_eq!(post.process(0, 0, &data).ok(), Some(vec![vec![]]));

assert_eq!(post.process(0, 0, b"\n").ok(), Some(vec![b"Cg==".to_vec()]));

assert_eq!(
post.process(0, 0, b"snot").ok(),
Some(vec![b"c25vdA==".to_vec()])
);

assert!(post.finish(None)?.is_empty());
Ok(())
}

#[test]
fn textual_length_prefix_postp() -> anyhow::Result<()> {
let mut post = textual_length_prefixed::TextualLengthPrefixed {};
let data = vec![1_u8, 2, 3];
let encoded = post.process(42, 23, &data)?.pop().unwrap_or_default();
assert_eq!("3 \u{1}\u{2}\u{3}", str::from_utf8(&encoded)?);
assert!(post.finish(None)?.is_empty());
Ok(())
}

#[derive(Default)]
struct Reverse {}

impl Postprocessor for Reverse {
impl StatelessPostprocessor for Reverse {
fn name(&self) -> &str {
"reverse"
}

fn process(
&mut self,
_ingres_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
fn process(&self, data: &[u8]) -> anyhow::Result<Vec<Vec<u8>>> {
let mut data = data.to_vec();
data.reverse();
Ok(vec![data])
}

fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some(data) = data {
let mut data = data.to_vec();
data.reverse();
Ok(vec![data])
} else {
Ok(vec![])
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -369,11 +403,11 @@ mod test {

a.name();
b.name();
c.name();
Postprocessor::name(c.as_ref());

assert!(a.process(0, 0, &[]).is_err());
assert!(b.process(0, 0, &[]).is_ok());
assert!(c.process(0, 0, &[]).is_ok());
assert!(Postprocessor::process(c.as_mut(), 0, 0, &[]).is_ok());

let data: Vec<u8> = b"donotcare".to_vec();
assert!(a.finish(Some(&data)).is_ok());
Expand Down
Loading

0 comments on commit b86d20a

Please sign in to comment.