Skip to content

Commit

Permalink
Updated: gelf-message-id construction v4
Browse files Browse the repository at this point in the history
  • Loading branch information
BharatKJain committed Oct 14, 2024
1 parent 026f39c commit 5591928
Showing 1 changed file with 42 additions and 15 deletions.
57 changes: 42 additions & 15 deletions tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,36 @@
// limitations under the License.

//! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking).
//!
//! ## What's the logic for creating messgae id?
//!
//! TL;DR: We are using ingest_ns + increment_id + thread_id combination as the message id.
//!
//! *Long explaination:*
//!
//! The GELF documentation suggests to "Generate from millisecond timestamp + hostname, for example.":
//! [GELF via UDP](https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP)
//!
//! However, relying on current time in milliseconds on the same system will result in a high collision
//! probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
//! to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
//! is even unlikely to be unique at all.
//!
//! The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
//! point, a hyphen, and an eight byte random number: [logstash-output-gelf](
//!
//! It probably doesn't have to be that clever:
//!
//! Using the timestamp plus a random number will mean we only have to worry about collision of random numbers,
//! we can make it more deterministic by using the ingest_ns + an incremental id as a message ID.
//!
//! To keep it simple we're using this logic: (epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )
//!
//! [Reference conversation]{https://github.com/tremor-rs/tremor-runtime/pull/2662}
//!

use super::Postprocessor;
use std::time::{SystemTime, UNIX_EPOCH};
use std::thread;
use std::hash::{Hash, Hasher, DefaultHasher};

#[derive(Clone)]
pub struct Gelf {
auto_increment_id: u64,
Expand Down Expand Up @@ -81,20 +105,23 @@ fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{
Approach taken here is similar to others mentioned above but with a slight modification.
We are using ingest_ns + increment_id + thread_id combination as the message id
*/
To make this:
fast
deterministic
non-coliding
Using the ingest_ns + an incremental id as a message ID. This is:
const BITS_13: u64 = 0b1_1111_1111_1111;
1. fast since we just have to look at two integers, no RNG, no hashing
2. deterministic since the ingest_ns is settable, and the incremental counter is well, incremental so determinstic as well
3. it avoids duplicates by not creating duplicates on the same system due to incremtal id's and makes them extremely unlikely on multiple systems due to nano second timestamps involved.
let current_thread = thread::current();
let thread_id = current_thread.id();
let mut hasher = DefaultHasher::new();
thread_id.hash(&mut hasher);
let thread_id_u64 = hasher.finish();
*/

(epoch_timestamp & BITS_13) | (auto_increment_id & !BITS_13) | (thread_id_u64 & BITS_13)
(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )
}

impl Gelf {
Expand Down Expand Up @@ -222,12 +249,12 @@ mod test {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
auto_increment_id: 0,
auto_increment_id: 2,
chunk_size: 20
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, 0);
let expected_message_id = generate_message_id(0, 2);
assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);

// print!("\nLength of encoding message: {}",encoded_gelf[1].len());
Expand Down

0 comments on commit 5591928

Please sign in to comment.