Skip to content

Commit

Permalink
Fix: message-id in postprocessor/gelf-chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
BharatKJain committed Sep 29, 2024
1 parent ca0cbc5 commit ba74fab
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Fixes
* gbq connector naming is now `gbq_writer` as it is documented
* fix gbq connector url missing `/`
* fix GELF message-id from auto-increment to random-id in postprocessor/gelf_chunking.rs

## [0.13.0-rc.29]

Expand Down
3 changes: 1 addition & 2 deletions tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ anyhow = { version = "1.0", default-features = true }
thiserror = { version = "1.0", default-features = false }

#gelf
rand = { version = "0.8", optional = true, default-features = false, features = [
] }
rand = { version = "0.8", optional = true, default-features = false, features = ["std_rng"] }

# Compression
brotli = { version = "6", optional = true, default-features = false, features = [
Expand Down
43 changes: 42 additions & 1 deletion tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking).

use super::Postprocessor;
use std::time::{SystemTime, UNIX_EPOCH};
use rand::{thread_rng,Rng};

#[derive(Clone)]
pub struct Gelf {
Expand Down Expand Up @@ -50,7 +52,46 @@ impl Gelf {
if n > 128 {
return Err(Error::ChunkCount(n));
};
self.id += 1;


/*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
*
*
* Idea is borrowed from logstash-gelf by <a href="https://github.com/mp911de">Mark Paluch</a>, MIT licensed
* <a href="https://github.com/mp911de/logstash-gelf/blob/a938063de1f822c8d26c8d51ed3871db24355017/src/main/java/biz/paluch/logging/gelf/intern/GelfMessage.java">GelfMessage.java</a>
*
* Considerations about generating the message ID: The GELF documentation suggests to
* "Generate from millisecond timestamp + hostname, for example.":
* https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
*
* However, relying on current time in milliseconds on the same system will result in a high collision
* probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
* to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
* is even unlikely to be unique at all.
*
* The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
* point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
* https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
* be that clever:
*
* Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
* within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
* Every second Graylog will evict expired messaged (5 seconds old) from the pool:
* https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
* GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
* Then we can spend the rest on a random number.
*/


const BITS_13: u64 = 0b1_1111_1111_1111;

let message_id_current_epoch_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64;

let message_id_random_number = thread_rng().gen::<u64>();

self.id = (message_id_current_epoch_time & BITS_13) | (message_id_random_number & !BITS_13);

Check warning on line 94 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L55-L94

Added lines #L55 - L94 were not covered by tests
Ok(chunks
.enumerate()
.map(|(i, chunk)| {
Expand Down

0 comments on commit ba74fab

Please sign in to comment.