From cb303e11e85b3940e9b076e8af164417e7421d72 Mon Sep 17 00:00:00 2001 From: Bharat Jain Date: Sat, 5 Oct 2024 23:36:00 +0530 Subject: [PATCH] Updated: gelf-message-id construction --- .../src/postprocessor/gelf_chunking.rs | 129 +++++++++++------- 1 file changed, 82 insertions(+), 47 deletions(-) diff --git a/tremor-interceptor/src/postprocessor/gelf_chunking.rs b/tremor-interceptor/src/postprocessor/gelf_chunking.rs index 7cf9a7849a..bdb2140e9a 100644 --- a/tremor-interceptor/src/postprocessor/gelf_chunking.rs +++ b/tremor-interceptor/src/postprocessor/gelf_chunking.rs @@ -16,18 +16,18 @@ use super::Postprocessor; use std::time::{SystemTime, UNIX_EPOCH}; -use rand::{thread_rng,Rng}; +use sha2::{Sha256, Digest}; +use std::thread; +use std::hash::{Hash, Hasher, DefaultHasher}; #[derive(Clone)] pub struct Gelf { - id: u64, chunk_size: usize, } impl Default for Gelf { fn default() -> Self { Self { - id: 0, chunk_size: 8192, } } @@ -42,55 +42,68 @@ enum Error { ChunkCount(usize), } +fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{ + + /* + * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61 + * + * + * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed + * GelfMessage.java + * + * Considerations about generating the message ID: The GELF documentation suggests to + * "Generate from millisecond timestamp + hostname, for example.": + * https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP + * + * However, relying on current time in milliseconds on the same system will result in a high collision + * probability if lots of messages are generated quickly. Things will be even worse if multiple servers send + * to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it + * is even unlikely to be unique at all. + * + * The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating + * point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf + * https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to + * be that clever: + * + * Using the timestamp plus a random number will mean we only have to worry about collision of random numbers + * within the same milliseconds. How short can the timestamp be before it will collide with old timestamps? + * Every second Graylog will evict expired messaged (5 seconds old) from the pool: + * https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/ + * GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits. + * Then we can spend the rest on a random number. + */ + + const BITS_13: u64 = 0b1_1111_1111_1111; + + let mut sha_hasher = Sha256::new(); + sha_hasher.update(data); + let data_sha_hash = sha_hasher.finalize(); + let data_sha_hash_u64 = u64::from_be_bytes(data_sha_hash[0..8].try_into().expect("slice with incorrect length")); + + + let current_thread = thread::current(); + let thread_id = current_thread.id(); + + let mut hasher = DefaultHasher::new(); + thread_id.hash(&mut hasher); + let thread_id_u64 = hasher.finish(); + + return (epoch_timestamp & BITS_13) | (data_sha_hash_u64 & !BITS_13) | (thread_id_u64 & BITS_13) +} + impl Gelf { + // We cut i and n to u8 but check that n <= 128 before so it is safe. #[allow(clippy::cast_possible_truncation)] - fn encode_gelf(&mut self, data: &[u8]) -> Result>, Error> { + fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result>, Error> { let chunks = data.chunks(self.chunk_size - 12); let n = chunks.len(); - let id = self.id; + // let id = self.id; if n > 128 { return Err(Error::ChunkCount(n)); }; - - /* - * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61 - * - * - * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed - * GelfMessage.java - * - * Considerations about generating the message ID: The GELF documentation suggests to - * "Generate from millisecond timestamp + hostname, for example.": - * https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP - * - * However, relying on current time in milliseconds on the same system will result in a high collision - * probability if lots of messages are generated quickly. Things will be even worse if multiple servers send - * to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it - * is even unlikely to be unique at all. - * - * The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating - * point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf - * https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to - * be that clever: - * - * Using the timestamp plus a random number will mean we only have to worry about collision of random numbers - * within the same milliseconds. How short can the timestamp be before it will collide with old timestamps? - * Every second Graylog will evict expired messaged (5 seconds old) from the pool: - * https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/ - * GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits. - * Then we can spend the rest on a random number. - */ - - - const BITS_13: u64 = 0b1_1111_1111_1111; - - let message_id_current_epoch_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64; - - let message_id_random_number = thread_rng().gen::(); - - self.id = (message_id_current_epoch_time & BITS_13) | (message_id_random_number & !BITS_13); + let id = generate_message_id(epoch_timestamp,data); Ok(chunks .enumerate() @@ -125,16 +138,17 @@ impl Postprocessor for Gelf { fn process( &mut self, - _ingest_ns: u64, + ingest_ns: u64, _egress_ns: u64, data: &[u8], ) -> anyhow::Result>> { - Ok(self.encode_gelf(data)?) + Ok(self.encode_gelf(data,ingest_ns)?) } fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result>> { if let Some(data) = data { - Ok(self.encode_gelf(data)?) + let current_epoch_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64; + Ok(self.encode_gelf(data, current_epoch_timestamp)?) } else { Ok(vec![]) } @@ -146,6 +160,8 @@ mod test { use crate::postprocessor::Postprocessor; use crate::preprocessor::{self as pre, prelude::*}; + use super::generate_message_id; + #[test] fn is_streaming() { let gelf = super::Gelf::default(); @@ -166,7 +182,6 @@ mod test { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]; let mut encoder = super::Gelf { - id: 0, chunk_size: 20, }; @@ -186,4 +201,24 @@ mod test { assert_eq!(r[0].0, input_data); Ok(()) } + + #[test] + fn gelf_message_id_validation() -> anyhow::Result<()> { + let input_data = vec![ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + ]; + let mut encoder = super::Gelf { + chunk_size: 20 + }; + + let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; + let expected_message_id = generate_message_id(0, &input_data); + assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id); + + // print!("\nLength of encoding message: {}",encoded_gelf[1].len()); + // print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::()); + // print!("\nExpected Message ID: {}", u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length"))); + + Ok(()) + } }