From a0f57f9f84cf6475c1ce681a9ce5e37599aa4c7b Mon Sep 17 00:00:00 2001 From: Bharat Jain Date: Mon, 21 Oct 2024 11:21:03 +0530 Subject: [PATCH] fixed: code-formatting Signed-off-by: Bharat Jain --- .../src/postprocessor/gelf_chunking.rs | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/tremor-interceptor/src/postprocessor/gelf_chunking.rs b/tremor-interceptor/src/postprocessor/gelf_chunking.rs index 7000b69863..e8b50f66e2 100644 --- a/tremor-interceptor/src/postprocessor/gelf_chunking.rs +++ b/tremor-interceptor/src/postprocessor/gelf_chunking.rs @@ -15,29 +15,20 @@ //! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking). //! //! ## What's the logic for creating messgae id? -//! //! TL;DR: We are using `ingest_ns + increment_id + thread_id` combination as the message id. -//! //! *Long explaination:* -//! //! The GELF documentation suggests to "Generate from millisecond timestamp + hostname, for example.": //! [GELF via UDP](https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP) -//! //! However, relying on current time in milliseconds on the same system will result in a high collision //! probability if lots of messages are generated quickly. Things will be even worse if multiple servers send //! to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it //! is even unlikely to be unique at all. -//! //! The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating //! point, a hyphen, and an eight byte random number: [logstash-output-gelf]( -//! //! It probably doesn't have to be that clever: -//! //! Using the timestamp plus a random number will mean we only have to worry about collision of random numbers, -//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID. -//! +//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID. //! To keep it simple we're using this logic: `(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )` -//! //! [Reference conversation] //! @@ -67,14 +58,13 @@ enum Error { ChunkCount(usize), } -fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{ - +fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64 { /* * OLD/OTHER CONTEXT: * * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61 - * - * + * + * * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed * GelfMessage.java * @@ -100,9 +90,8 @@ fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{ * Then we can spend the rest on a random number. */ + /* - /* - Approach taken here is similar to others mentioned above but with a slight modification. To make this: @@ -110,7 +99,7 @@ fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{ fast deterministic non-coliding - + Using the ingest_ns + an incremental id as a message ID. This is: 1. fast since we just have to look at two integers, no RNG, no hashing @@ -118,14 +107,13 @@ fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{ 3. it avoids duplicates by not creating duplicates on the same system due to incremtal id's and makes them extremely unlikely on multiple systems due to nano second timestamps involved. - + */ - (epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 ) + (epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16) } impl Gelf { - // We cut i and n to u8 but check that n <= 128 before so it is safe. #[allow(clippy::cast_possible_truncation)] fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result>, Error> { @@ -136,15 +124,14 @@ impl Gelf { return Err(Error::ChunkCount(n)); }; - let gelf_message_id = generate_message_id(epoch_timestamp,self.auto_increment_id); + let gelf_message_id = generate_message_id(epoch_timestamp, self.auto_increment_id); if self.auto_increment_id == u64::MAX { self.auto_increment_id = 0; + } else { + self.auto_increment_id += 1; } - else{ - self.auto_increment_id+=1; - } - + Ok(chunks .enumerate() .map(|(i, chunk)| { @@ -182,13 +169,14 @@ impl Postprocessor for Gelf { _egress_ns: u64, data: &[u8], ) -> anyhow::Result>> { - Ok(self.encode_gelf(data,ingest_ns)?) + Ok(self.encode_gelf(data, ingest_ns)?) } #[allow(clippy::cast_possible_truncation)] fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result>> { if let Some(data) = data { - let current_epoch_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64; + let current_epoch_timestamp = + SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64; Ok(self.encode_gelf(data, current_epoch_timestamp)?) } else { Ok(vec![]) @@ -251,12 +239,19 @@ mod test { ]; let mut encoder = super::Gelf { auto_increment_id: 2, - chunk_size: 20 + chunk_size: 20, }; let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; let expected_message_id = generate_message_id(0, 2); - assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id); + assert_eq!( + u64::from_be_bytes( + encoded_gelf[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id + ); // print!("\nLength of encoding message: {}",encoded_gelf[1].len()); // print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::()); @@ -272,15 +267,29 @@ mod test { ]; let mut encoder = super::Gelf { auto_increment_id: u64::MAX, - chunk_size: 20 + chunk_size: 20, }; let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; let expected_message_id = generate_message_id(0, u64::MAX); let encoded_gelf_2 = encoder.encode_gelf(&input_data, 0)?; let expected_message_id_2 = generate_message_id(0, 0); - assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id); - assert_eq!(u64::from_be_bytes(encoded_gelf_2[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id_2); + assert_eq!( + u64::from_be_bytes( + encoded_gelf[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id + ); + assert_eq!( + u64::from_be_bytes( + encoded_gelf_2[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id_2 + ); Ok(()) }