diff --git a/tremor-interceptor/src/postprocessor/gelf_chunking.rs b/tremor-interceptor/src/postprocessor/gelf_chunking.rs
index 7cf9a7849a..bdb2140e9a 100644
--- a/tremor-interceptor/src/postprocessor/gelf_chunking.rs
+++ b/tremor-interceptor/src/postprocessor/gelf_chunking.rs
@@ -16,18 +16,18 @@
use super::Postprocessor;
use std::time::{SystemTime, UNIX_EPOCH};
-use rand::{thread_rng,Rng};
+use sha2::{Sha256, Digest};
+use std::thread;
+use std::hash::{Hash, Hasher, DefaultHasher};
#[derive(Clone)]
pub struct Gelf {
- id: u64,
chunk_size: usize,
}
impl Default for Gelf {
fn default() -> Self {
Self {
- id: 0,
chunk_size: 8192,
}
}
@@ -42,55 +42,68 @@ enum Error {
ChunkCount(usize),
}
+fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{
+
+ /*
+ * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
+ *
+ *
+ * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed
+ * GelfMessage.java
+ *
+ * Considerations about generating the message ID: The GELF documentation suggests to
+ * "Generate from millisecond timestamp + hostname, for example.":
+ * https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
+ *
+ * However, relying on current time in milliseconds on the same system will result in a high collision
+ * probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
+ * to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
+ * is even unlikely to be unique at all.
+ *
+ * The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
+ * point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
+ * https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
+ * be that clever:
+ *
+ * Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
+ * within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
+ * Every second Graylog will evict expired messaged (5 seconds old) from the pool:
+ * https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
+ * GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
+ * Then we can spend the rest on a random number.
+ */
+
+ const BITS_13: u64 = 0b1_1111_1111_1111;
+
+ let mut sha_hasher = Sha256::new();
+ sha_hasher.update(data);
+ let data_sha_hash = sha_hasher.finalize();
+ let data_sha_hash_u64 = u64::from_be_bytes(data_sha_hash[0..8].try_into().expect("slice with incorrect length"));
+
+
+ let current_thread = thread::current();
+ let thread_id = current_thread.id();
+
+ let mut hasher = DefaultHasher::new();
+ thread_id.hash(&mut hasher);
+ let thread_id_u64 = hasher.finish();
+
+ return (epoch_timestamp & BITS_13) | (data_sha_hash_u64 & !BITS_13) | (thread_id_u64 & BITS_13)
+}
+
impl Gelf {
+
// We cut i and n to u8 but check that n <= 128 before so it is safe.
#[allow(clippy::cast_possible_truncation)]
- fn encode_gelf(&mut self, data: &[u8]) -> Result>, Error> {
+ fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result>, Error> {
let chunks = data.chunks(self.chunk_size - 12);
let n = chunks.len();
- let id = self.id;
+ // let id = self.id;
if n > 128 {
return Err(Error::ChunkCount(n));
};
-
- /*
- * REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
- *
- *
- * Idea is borrowed from logstash-gelf by Mark Paluch, MIT licensed
- * GelfMessage.java
- *
- * Considerations about generating the message ID: The GELF documentation suggests to
- * "Generate from millisecond timestamp + hostname, for example.":
- * https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
- *
- * However, relying on current time in milliseconds on the same system will result in a high collision
- * probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
- * to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
- * is even unlikely to be unique at all.
- *
- * The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
- * point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
- * https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
- * be that clever:
- *
- * Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
- * within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
- * Every second Graylog will evict expired messaged (5 seconds old) from the pool:
- * https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
- * GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
- * Then we can spend the rest on a random number.
- */
-
-
- const BITS_13: u64 = 0b1_1111_1111_1111;
-
- let message_id_current_epoch_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64;
-
- let message_id_random_number = thread_rng().gen::();
-
- self.id = (message_id_current_epoch_time & BITS_13) | (message_id_random_number & !BITS_13);
+ let id = generate_message_id(epoch_timestamp,data);
Ok(chunks
.enumerate()
@@ -125,16 +138,17 @@ impl Postprocessor for Gelf {
fn process(
&mut self,
- _ingest_ns: u64,
+ ingest_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result>> {
- Ok(self.encode_gelf(data)?)
+ Ok(self.encode_gelf(data,ingest_ns)?)
}
fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result>> {
if let Some(data) = data {
- Ok(self.encode_gelf(data)?)
+ let current_epoch_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64;
+ Ok(self.encode_gelf(data, current_epoch_timestamp)?)
} else {
Ok(vec![])
}
@@ -146,6 +160,8 @@ mod test {
use crate::postprocessor::Postprocessor;
use crate::preprocessor::{self as pre, prelude::*};
+ use super::generate_message_id;
+
#[test]
fn is_streaming() {
let gelf = super::Gelf::default();
@@ -166,7 +182,6 @@ mod test {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
- id: 0,
chunk_size: 20,
};
@@ -186,4 +201,24 @@ mod test {
assert_eq!(r[0].0, input_data);
Ok(())
}
+
+ #[test]
+ fn gelf_message_id_validation() -> anyhow::Result<()> {
+ let input_data = vec![
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ ];
+ let mut encoder = super::Gelf {
+ chunk_size: 20
+ };
+
+ let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
+ let expected_message_id = generate_message_id(0, &input_data);
+ assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);
+
+ // print!("\nLength of encoding message: {}",encoded_gelf[1].len());
+ // print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::());
+ // print!("\nExpected Message ID: {}", u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")));
+
+ Ok(())
+ }
}