diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fa70781c8..4bb2f865de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ ### Fixes * gbq connector naming is now `gbq_writer` as it is documented * fix gbq connector url missing `/` - +* fix GELF message-id from auto-increment to random-id in postprocessor/gelf_chunking.rs + ## [0.13.0-rc.29] ### New features diff --git a/tremor-interceptor/Cargo.toml b/tremor-interceptor/Cargo.toml index 544691d15b..9a5cfe7a10 100644 --- a/tremor-interceptor/Cargo.toml +++ b/tremor-interceptor/Cargo.toml @@ -25,8 +25,7 @@ anyhow = { version = "1.0", default-features = true } thiserror = { version = "1.0", default-features = false } #gelf -rand = { version = "0.8", optional = true, default-features = false, features = [ -] } +rand = { version = "0.8", optional = true, default-features = false, features = [] } # Compression brotli = { version = "6", optional = true, default-features = false, features = [ diff --git a/tremor-interceptor/src/postprocessor/gelf_chunking.rs b/tremor-interceptor/src/postprocessor/gelf_chunking.rs index 18c481a372..e8b50f66e2 100644 --- a/tremor-interceptor/src/postprocessor/gelf_chunking.rs +++ b/tremor-interceptor/src/postprocessor/gelf_chunking.rs @@ -13,19 +13,37 @@ // limitations under the License. //! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking). +//! +//! ## What's the logic for creating messgae id? +//! TL;DR: We are using `ingest_ns + increment_id + thread_id` combination as the message id. +//! *Long explaination:* +//! The GELF documentation suggests to "Generate from millisecond timestamp + hostname, for example.": +//! [GELF via UDP](https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP) +//! However, relying on current time in milliseconds on the same system will result in a high collision +//! probability if lots of messages are generated quickly. Things will be even worse if multiple servers send +//! to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it +//! is even unlikely to be unique at all. +//! The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating +//! point, a hyphen, and an eight byte random number: [logstash-output-gelf]( +//! It probably doesn't have to be that clever: +//! Using the timestamp plus a random number will mean we only have to worry about collision of random numbers, +//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID. +//! To keep it simple we're using this logic: `(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )` +//! [Reference conversation] +//! use super::Postprocessor; - +use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Clone)] pub struct Gelf { - id: u64, + auto_increment_id: u64, chunk_size: usize, } impl Default for Gelf { fn default() -> Self { Self { - id: 0, + auto_increment_id: 0, chunk_size: 8192, } } @@ -40,17 +58,80 @@ enum Error { ChunkCount(usize), } +fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64 { + /* + * OLD/OTHER CONTEXT: + * + * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61 + * + * + * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed + * GelfMessage.java + * + * Considerations about generating the message ID: The GELF documentation suggests to + * "Generate from millisecond timestamp + hostname, for example.": + * https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP + * + * However, relying on current time in milliseconds on the same system will result in a high collision + * probability if lots of messages are generated quickly. Things will be even worse if multiple servers send + * to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it + * is even unlikely to be unique at all. + * + * The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating + * point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf + * https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to + * be that clever: + * + * Using the timestamp plus a random number will mean we only have to worry about collision of random numbers + * within the same milliseconds. How short can the timestamp be before it will collide with old timestamps? + * Every second Graylog will evict expired messaged (5 seconds old) from the pool: + * https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/ + * GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits. + * Then we can spend the rest on a random number. + */ + + /* + + Approach taken here is similar to others mentioned above but with a slight modification. + + To make this: + + fast + deterministic + non-coliding + + Using the ingest_ns + an incremental id as a message ID. This is: + + 1. fast since we just have to look at two integers, no RNG, no hashing + 2. deterministic since the ingest_ns is settable, and the incremental counter is well, incremental so determinstic as well + 3. it avoids duplicates by not creating duplicates on the same system due to incremtal id's and makes them extremely unlikely on multiple systems due to nano second timestamps involved. + + + + */ + + (epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16) +} + impl Gelf { // We cut i and n to u8 but check that n <= 128 before so it is safe. #[allow(clippy::cast_possible_truncation)] - fn encode_gelf(&mut self, data: &[u8]) -> Result>, Error> { + fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result>, Error> { let chunks = data.chunks(self.chunk_size - 12); let n = chunks.len(); - let id = self.id; + // let id = self.id; if n > 128 { return Err(Error::ChunkCount(n)); }; - self.id += 1; + + let gelf_message_id = generate_message_id(epoch_timestamp, self.auto_increment_id); + + if self.auto_increment_id == u64::MAX { + self.auto_increment_id = 0; + } else { + self.auto_increment_id += 1; + } + Ok(chunks .enumerate() .map(|(i, chunk)| { @@ -60,7 +141,7 @@ impl Gelf { // magic number buf.append(&mut vec![0x1e, 0x0f]); // gelf package id - buf.append(&mut id.to_be_bytes().to_vec()); + buf.append(&mut gelf_message_id.to_be_bytes().to_vec()); // sequence number buf.push(i as u8); // sequence count @@ -84,16 +165,19 @@ impl Postprocessor for Gelf { fn process( &mut self, - _ingest_ns: u64, + ingest_ns: u64, _egress_ns: u64, data: &[u8], ) -> anyhow::Result>> { - Ok(self.encode_gelf(data)?) + Ok(self.encode_gelf(data, ingest_ns)?) } + #[allow(clippy::cast_possible_truncation)] fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result>> { if let Some(data) = data { - Ok(self.encode_gelf(data)?) + let current_epoch_timestamp = + SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64; + Ok(self.encode_gelf(data, current_epoch_timestamp)?) } else { Ok(vec![]) } @@ -105,6 +189,8 @@ mod test { use crate::postprocessor::Postprocessor; use crate::preprocessor::{self as pre, prelude::*}; + use super::generate_message_id; + #[test] fn is_streaming() { let gelf = super::Gelf::default(); @@ -125,7 +211,7 @@ mod test { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]; let mut encoder = super::Gelf { - id: 0, + auto_increment_id: 0, chunk_size: 20, }; @@ -145,4 +231,66 @@ mod test { assert_eq!(r[0].0, input_data); Ok(()) } + + #[test] + fn gelf_message_id_validation() -> anyhow::Result<()> { + let input_data = vec![ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + ]; + let mut encoder = super::Gelf { + auto_increment_id: 2, + chunk_size: 20, + }; + + let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; + let expected_message_id = generate_message_id(0, 2); + assert_eq!( + u64::from_be_bytes( + encoded_gelf[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id + ); + + // print!("\nLength of encoding message: {}",encoded_gelf[1].len()); + // print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::()); + // print!("\nExpected Message ID: {}", u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length"))); + + Ok(()) + } + + #[test] + fn gelf_message_id_validation_with_max_autoincrement_id() -> anyhow::Result<()> { + let input_data = vec![ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + ]; + let mut encoder = super::Gelf { + auto_increment_id: u64::MAX, + chunk_size: 20, + }; + + let encoded_gelf = encoder.encode_gelf(&input_data, 0)?; + let expected_message_id = generate_message_id(0, u64::MAX); + let encoded_gelf_2 = encoder.encode_gelf(&input_data, 0)?; + let expected_message_id_2 = generate_message_id(0, 0); + assert_eq!( + u64::from_be_bytes( + encoded_gelf[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id + ); + assert_eq!( + u64::from_be_bytes( + encoded_gelf_2[1][2..10] + .try_into() + .expect("slice with incorrect length") + ), + expected_message_id_2 + ); + + Ok(()) + } }