Skip to content

Commit

Permalink
Updated: gelf-message-id construction
Browse files Browse the repository at this point in the history
  • Loading branch information
BharatKJain committed Oct 5, 2024
1 parent 3e4f5a1 commit b45a96a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 48 deletions.
3 changes: 2 additions & 1 deletion tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ thiserror = { version = "1.0", default-features = false }

#gelf
rand = { version = "0.8", optional = true, default-features = false, features = ["std_rng"] }
sha2 = { version="0.10.8", optional = true, default-feature = false, features = ["std"]}

# Compression
brotli = { version = "6", optional = true, default-features = false, features = [
Expand Down Expand Up @@ -61,7 +62,7 @@ proptest = "1.5"
[features]
default = ["base64", "compression", "gelf", "length-prefix"]
length-prefix = ["dep:bytes"]
gelf = ["dep:rand"]
gelf = ["dep:rand", "dep:sha2"]
base64 = []
compression = [
"dep:brotli",
Expand Down
129 changes: 82 additions & 47 deletions tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

use super::Postprocessor;
use std::time::{SystemTime, UNIX_EPOCH};
use rand::{thread_rng,Rng};
use sha2::{Sha256, Digest};
use std::thread;
use std::hash::{Hash, Hasher, DefaultHasher};

#[derive(Clone)]
pub struct Gelf {
id: u64,
chunk_size: usize,
}

impl Default for Gelf {
fn default() -> Self {
Self {
id: 0,
chunk_size: 8192,
}
}
Expand All @@ -42,55 +42,68 @@ enum Error {
ChunkCount(usize),
}

fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{

/*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
*
*
* Idea is borrowed from logstash-gelf by <a href="https://github.com/mp911de">Mark Paluch</a>, MIT licensed
* <a href="https://github.com/mp911de/logstash-gelf/blob/a938063de1f822c8d26c8d51ed3871db24355017/src/main/java/biz/paluch/logging/gelf/intern/GelfMessage.java">GelfMessage.java</a>
*
* Considerations about generating the message ID: The GELF documentation suggests to
* "Generate from millisecond timestamp + hostname, for example.":
* https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
*
* However, relying on current time in milliseconds on the same system will result in a high collision
* probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
* to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
* is even unlikely to be unique at all.
*
* The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
* point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
* https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
* be that clever:
*
* Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
* within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
* Every second Graylog will evict expired messaged (5 seconds old) from the pool:
* https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
* GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
* Then we can spend the rest on a random number.
*/

const BITS_13: u64 = 0b1_1111_1111_1111;

let mut sha_hasher = Sha256::new();
sha_hasher.update(data);
let data_sha_hash = sha_hasher.finalize();
let data_sha_hash_u64 = u64::from_be_bytes(data_sha_hash[0..8].try_into().expect("slice with incorrect length"));


let current_thread = thread::current();
let thread_id = current_thread.id();

let mut hasher = DefaultHasher::new();
thread_id.hash(&mut hasher);
let thread_id_u64 = hasher.finish();

return (epoch_timestamp & BITS_13) | (data_sha_hash_u64 & !BITS_13) | (thread_id_u64 & BITS_13)
}

impl Gelf {

// We cut i and n to u8 but check that n <= 128 before so it is safe.
#[allow(clippy::cast_possible_truncation)]
fn encode_gelf(&mut self, data: &[u8]) -> Result<Vec<Vec<u8>>, Error> {
fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result<Vec<Vec<u8>>, Error> {
let chunks = data.chunks(self.chunk_size - 12);
let n = chunks.len();
let id = self.id;
// let id = self.id;
if n > 128 {
return Err(Error::ChunkCount(n));
};


/*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
*
*
* Idea is borrowed from logstash-gelf by <a href="https://github.com/mp911de">Mark Paluch</a>, MIT licensed
* <a href="https://github.com/mp911de/logstash-gelf/blob/a938063de1f822c8d26c8d51ed3871db24355017/src/main/java/biz/paluch/logging/gelf/intern/GelfMessage.java">GelfMessage.java</a>
*
* Considerations about generating the message ID: The GELF documentation suggests to
* "Generate from millisecond timestamp + hostname, for example.":
* https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
*
* However, relying on current time in milliseconds on the same system will result in a high collision
* probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
* to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
* is even unlikely to be unique at all.
*
* The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
* point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
* https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
* be that clever:
*
* Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
* within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
* Every second Graylog will evict expired messaged (5 seconds old) from the pool:
* https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
* GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
* Then we can spend the rest on a random number.
*/


const BITS_13: u64 = 0b1_1111_1111_1111;

let message_id_current_epoch_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64;

let message_id_random_number = thread_rng().gen::<u64>();

self.id = (message_id_current_epoch_time & BITS_13) | (message_id_random_number & !BITS_13);
let id = generate_message_id(epoch_timestamp,data);

Ok(chunks
.enumerate()
Expand Down Expand Up @@ -125,16 +138,17 @@ impl Postprocessor for Gelf {

fn process(
&mut self,
_ingest_ns: u64,
ingest_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
Ok(self.encode_gelf(data)?)
Ok(self.encode_gelf(data,ingest_ns)?)
}

fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some(data) = data {
Ok(self.encode_gelf(data)?)
let current_epoch_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64;
Ok(self.encode_gelf(data, current_epoch_timestamp)?)

Check warning on line 151 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L150-L151

Added lines #L150 - L151 were not covered by tests
} else {
Ok(vec![])
}
Expand All @@ -146,6 +160,8 @@ mod test {
use crate::postprocessor::Postprocessor;
use crate::preprocessor::{self as pre, prelude::*};

use super::generate_message_id;

#[test]
fn is_streaming() {
let gelf = super::Gelf::default();
Expand All @@ -166,7 +182,6 @@ mod test {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
id: 0,
chunk_size: 20,
};

Expand All @@ -186,4 +201,24 @@ mod test {
assert_eq!(r[0].0, input_data);
Ok(())
}

#[test]
fn gelf_message_id_validation() -> anyhow::Result<()> {
let input_data = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
chunk_size: 20
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, &input_data);
assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);

// print!("\nLength of encoding message: {}",encoded_gelf[1].len());
// print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::<String>());
// print!("\nExpected Message ID: {}", u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")));

Ok(())
}
}

0 comments on commit b45a96a

Please sign in to comment.