diff --git a/tremor-interceptor/Cargo.toml b/tremor-interceptor/Cargo.toml index fadb3bee32..81068bd947 100644 --- a/tremor-interceptor/Cargo.toml +++ b/tremor-interceptor/Cargo.toml @@ -25,8 +25,7 @@ anyhow = { version = "1.0", default-features = true } thiserror = { version = "1.0", default-features = false } #gelf -rand = { version = "0.8", optional = true, default-features = false, features = ["std_rng"] } -sha2 = { version="0.10.8", optional = true, default-feature = false, features = ["std"]} +rand = { version = "0.8", optional = true, default-features = false, features = [] } # Compression brotli = { version = "6", optional = true, default-features = false, features = [ @@ -62,7 +61,7 @@ proptest = "1.5" [features] default = ["base64", "compression", "gelf", "length-prefix"] length-prefix = ["dep:bytes"] -gelf = ["dep:rand", "dep:sha2"] +gelf = ["dep:rand"] base64 = [] compression = [ "dep:brotli", diff --git a/tremor-interceptor/src/postprocessor/gelf_chunking.rs b/tremor-interceptor/src/postprocessor/gelf_chunking.rs index bdb2140e9a..129bf1ab88 100644 --- a/tremor-interceptor/src/postprocessor/gelf_chunking.rs +++ b/tremor-interceptor/src/postprocessor/gelf_chunking.rs @@ -16,18 +16,19 @@ use super::Postprocessor; use std::time::{SystemTime, UNIX_EPOCH}; -use sha2::{Sha256, Digest}; use std::thread; use std::hash::{Hash, Hasher, DefaultHasher}; #[derive(Clone)] pub struct Gelf { + auto_increment_id: u64, chunk_size: usize, } impl Default for Gelf { fn default() -> Self { Self { + auto_increment_id: 0, chunk_size: 8192, } } @@ -42,7 +43,7 @@ enum Error { ChunkCount(usize), } -fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{ +fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{ /* * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61 @@ -73,13 +74,8 @@ fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{ * Then we can spend the rest on a random number. */ - const BITS_13: u64 = 0b1_1111_1111_1111; - - let mut sha_hasher = Sha256::new(); - sha_hasher.update(data); - let data_sha_hash = sha_hasher.finalize(); - let data_sha_hash_u64 = u64::from_be_bytes(data_sha_hash[0..8].try_into().expect("slice with incorrect length")); + const BITS_13: u64 = 0b1_1111_1111_1111; let current_thread = thread::current(); let thread_id = current_thread.id(); @@ -88,7 +84,7 @@ fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{ thread_id.hash(&mut hasher); let thread_id_u64 = hasher.finish(); - return (epoch_timestamp & BITS_13) | (data_sha_hash_u64 & !BITS_13) | (thread_id_u64 & BITS_13) + (epoch_timestamp & BITS_13) | (auto_increment_id & !BITS_13) | (thread_id_u64 & BITS_13) } impl Gelf { @@ -103,7 +99,9 @@ impl Gelf { return Err(Error::ChunkCount(n)); }; - let id = generate_message_id(epoch_timestamp,data); + let gelf_message_id = generate_message_id(epoch_timestamp,self.auto_increment_id); + + self.auto_increment_id+=1; Ok(chunks .enumerate() @@ -114,7 +112,7 @@ impl Gelf { // magic number buf.append(&mut vec![0x1e, 0x0f]); // gelf package id - buf.append(&mut id.to_be_bytes().to_vec()); + buf.append(&mut gelf_message_id.to_be_bytes().to_vec()); // sequence number buf.push(i as u8); // sequence count @@ -182,6 +180,7 @@ mod test { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]; let mut encoder = super::Gelf { + auto_increment_id: 0, chunk_size: 20, }; @@ -208,11 +207,12 @@ mod test { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]; let mut encoder = super::Gelf { + auto_increment_id: 0, chunk_size: 20 }; let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; - let expected_message_id = generate_message_id(0, &input_data); + let expected_message_id = generate_message_id(0, 0); assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id); // print!("\nLength of encoding message: {}",encoded_gelf[1].len());