Skip to content

Commit

Permalink
fixed: code-formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Bharat Jain <[email protected]>
  • Loading branch information
BharatKJain committed Oct 21, 2024
1 parent fec5885 commit 116a48f
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,20 @@
//! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking).
//!
//! ## What's the logic for creating messgae id?
//!
//! TL;DR: We are using `ingest_ns + increment_id + thread_id` combination as the message id.
//!
//! *Long explaination:*
//!
//! The GELF documentation suggests to "Generate from millisecond timestamp + hostname, for example.":
//! [GELF via UDP](https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP)
//!
//! However, relying on current time in milliseconds on the same system will result in a high collision
//! probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
//! to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
//! is even unlikely to be unique at all.
//!
//! The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
//! point, a hyphen, and an eight byte random number: [logstash-output-gelf](
//!
//! It probably doesn't have to be that clever:
//!
//! Using the timestamp plus a random number will mean we only have to worry about collision of random numbers,
//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID.
//!
//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID.
//! To keep it simple we're using this logic: `(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )`
//!
//! [Reference conversation]<https://github.com/tremor-rs/tremor-runtime/pull/2662>
//!

Expand Down Expand Up @@ -67,14 +58,13 @@ enum Error {
ChunkCount(usize),
}

fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{

fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64 {
/*
* OLD/OTHER CONTEXT:
*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
*
*
*
*
* Idea is borrowed from logstash-gelf by <a href="https://github.com/mp911de">Mark Paluch</a>, MIT licensed
* <a href="https://github.com/mp911de/logstash-gelf/blob/a938063de1f822c8d26c8d51ed3871db24355017/src/main/java/biz/paluch/logging/gelf/intern/GelfMessage.java">GelfMessage.java</a>
*
Expand All @@ -100,32 +90,30 @@ fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{
* Then we can spend the rest on a random number.
*/

/*
/*
Approach taken here is similar to others mentioned above but with a slight modification.
To make this:
fast
deterministic
non-coliding
Using the ingest_ns + an incremental id as a message ID. This is:
1. fast since we just have to look at two integers, no RNG, no hashing
2. deterministic since the ingest_ns is settable, and the incremental counter is well, incremental so determinstic as well
3. it avoids duplicates by not creating duplicates on the same system due to incremtal id's and makes them extremely unlikely on multiple systems due to nano second timestamps involved.
*/

(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )
(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16)
}

impl Gelf {

// We cut i and n to u8 but check that n <= 128 before so it is safe.
#[allow(clippy::cast_possible_truncation)]
fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result<Vec<Vec<u8>>, Error> {
Expand All @@ -136,15 +124,14 @@ impl Gelf {
return Err(Error::ChunkCount(n));
};

let gelf_message_id = generate_message_id(epoch_timestamp,self.auto_increment_id);
let gelf_message_id = generate_message_id(epoch_timestamp, self.auto_increment_id);

if self.auto_increment_id == u64::MAX {
self.auto_increment_id = 0;
} else {
self.auto_increment_id += 1;
}
else{
self.auto_increment_id+=1;
}


Ok(chunks
.enumerate()
.map(|(i, chunk)| {
Expand Down Expand Up @@ -182,13 +169,14 @@ impl Postprocessor for Gelf {
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
Ok(self.encode_gelf(data,ingest_ns)?)
Ok(self.encode_gelf(data, ingest_ns)?)
}

#[allow(clippy::cast_possible_truncation)]
fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some(data) = data {
let current_epoch_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64;
let current_epoch_timestamp =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64;
Ok(self.encode_gelf(data, current_epoch_timestamp)?)

Check warning on line 180 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L178-L180

Added lines #L178 - L180 were not covered by tests
} else {
Ok(vec![])
Expand Down Expand Up @@ -251,12 +239,19 @@ mod test {
];
let mut encoder = super::Gelf {
auto_increment_id: 2,
chunk_size: 20
chunk_size: 20,
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, 2);
assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);
assert_eq!(
u64::from_be_bytes(
encoded_gelf[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id
);

// print!("\nLength of encoding message: {}",encoded_gelf[1].len());
// print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::<String>());
Expand All @@ -272,15 +267,29 @@ mod test {
];
let mut encoder = super::Gelf {
auto_increment_id: u64::MAX,
chunk_size: 20
chunk_size: 20,
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, u64::MAX);
let encoded_gelf_2 = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id_2 = generate_message_id(0, 0);
assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);
assert_eq!(u64::from_be_bytes(encoded_gelf_2[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id_2);
assert_eq!(
u64::from_be_bytes(
encoded_gelf[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id
);
assert_eq!(
u64::from_be_bytes(
encoded_gelf_2[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id_2
);

Ok(())
}
Expand Down

0 comments on commit 116a48f

Please sign in to comment.