Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: message-id in postprocessor/gelf-chunking #2662

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
### Fixes
* gbq connector naming is now `gbq_writer` as it is documented
* fix gbq connector url missing `/`

* fix GELF message-id from auto-increment to random-id in postprocessor/gelf_chunking.rs

## [0.13.0-rc.29]

### New features
Expand Down
3 changes: 1 addition & 2 deletions tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ anyhow = { version = "1.0", default-features = true }
thiserror = { version = "1.0", default-features = false }

#gelf
rand = { version = "0.8", optional = true, default-features = false, features = [
] }
rand = { version = "0.8", optional = true, default-features = false, features = [] }

# Compression
brotli = { version = "6", optional = true, default-features = false, features = [
Expand Down
170 changes: 159 additions & 11 deletions tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,37 @@
// limitations under the License.

//! Splits the data using [GELF chunking protocol](https://docs.graylog.org/en/3.0/pages/gelf.html#chunking).
//!
//! ## What's the logic for creating messgae id?
//! TL;DR: We are using `ingest_ns + increment_id + thread_id` combination as the message id.
//! *Long explaination:*
//! The GELF documentation suggests to "Generate from millisecond timestamp + hostname, for example.":
//! [GELF via UDP](https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP)
//! However, relying on current time in milliseconds on the same system will result in a high collision
//! probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
//! to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
//! is even unlikely to be unique at all.
//! The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
//! point, a hyphen, and an eight byte random number: [logstash-output-gelf](
//! It probably doesn't have to be that clever:
//! Using the timestamp plus a random number will mean we only have to worry about collision of random numbers,
//! we can make it more deterministic by using the `ingest_ns` + an incremental id as a message ID.
//! To keep it simple we're using this logic: `(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16 )`
//! [Reference conversation]<https://github.com/tremor-rs/tremor-runtime/pull/2662>
//!

use super::Postprocessor;

use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct Gelf {
id: u64,
auto_increment_id: u64,
chunk_size: usize,
}

impl Default for Gelf {
fn default() -> Self {
Self {
id: 0,
auto_increment_id: 0,
chunk_size: 8192,
}
}
Expand All @@ -40,17 +58,80 @@
ChunkCount(usize),
}

fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64 {
/*
* OLD/OTHER CONTEXT:
*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
*
*
* Idea is borrowed from logstash-gelf by <a href="https://github.com/mp911de">Mark Paluch</a>, MIT licensed
* <a href="https://github.com/mp911de/logstash-gelf/blob/a938063de1f822c8d26c8d51ed3871db24355017/src/main/java/biz/paluch/logging/gelf/intern/GelfMessage.java">GelfMessage.java</a>
*
* Considerations about generating the message ID: The GELF documentation suggests to
* "Generate from millisecond timestamp + hostname, for example.":
* https://go2docs.graylog.org/5-1/getting_in_log_data/gelf.html#GELFviaUDP
*
* However, relying on current time in milliseconds on the same system will result in a high collision
* probability if lots of messages are generated quickly. Things will be even worse if multiple servers send
* to the same log server. Adding the hostname is not guaranteed to help, and if the hostname is the FQDN it
* is even unlikely to be unique at all.
*
* The GELF module used by Logstash uses the first eight bytes of an MD5 hash of the current time as floating
* point, a hyphen, and an eight byte random number: https://github.com/logstash-plugins/logstash-output-gelf
* https://github.com/graylog-labs/gelf-rb/blob/master/lib/gelf/notifier.rb#L239 It probably doesn't have to
* be that clever:
*
* Using the timestamp plus a random number will mean we only have to worry about collision of random numbers
* within the same milliseconds. How short can the timestamp be before it will collide with old timestamps?
* Every second Graylog will evict expired messaged (5 seconds old) from the pool:
* https://github.com/Graylog2/graylog2-server/blob/master/graylog2-server/src/main/java/org/graylog2/inputs/codecs/
* GelfChunkAggregator.java Thus, we just need six seconds which will require 13 bits.
* Then we can spend the rest on a random number.
*/

/*

Approach taken here is similar to others mentioned above but with a slight modification.

To make this:

fast
deterministic
non-coliding

Using the ingest_ns + an incremental id as a message ID. This is:

1. fast since we just have to look at two integers, no RNG, no hashing
2. deterministic since the ingest_ns is settable, and the incremental counter is well, incremental so determinstic as well
3. it avoids duplicates by not creating duplicates on the same system due to incremtal id's and makes them extremely unlikely on multiple systems due to nano second timestamps involved.



*/

(epoch_timestamp & 0xFF_FF) | (auto_increment_id << 16)
}

impl Gelf {
// We cut i and n to u8 but check that n <= 128 before so it is safe.
#[allow(clippy::cast_possible_truncation)]
fn encode_gelf(&mut self, data: &[u8]) -> Result<Vec<Vec<u8>>, Error> {
fn encode_gelf(&mut self, data: &[u8], epoch_timestamp: u64) -> Result<Vec<Vec<u8>>, Error> {
let chunks = data.chunks(self.chunk_size - 12);
let n = chunks.len();
let id = self.id;
// let id = self.id;
if n > 128 {
return Err(Error::ChunkCount(n));
};
self.id += 1;

let gelf_message_id = generate_message_id(epoch_timestamp, self.auto_increment_id);

if self.auto_increment_id == u64::MAX {
self.auto_increment_id = 0;
} else {
self.auto_increment_id += 1;
}

Ok(chunks
.enumerate()
.map(|(i, chunk)| {
Expand All @@ -60,7 +141,7 @@
// magic number
buf.append(&mut vec![0x1e, 0x0f]);
// gelf package id
buf.append(&mut id.to_be_bytes().to_vec());
buf.append(&mut gelf_message_id.to_be_bytes().to_vec());
// sequence number
buf.push(i as u8);
// sequence count
Expand All @@ -84,16 +165,19 @@

fn process(
&mut self,
_ingest_ns: u64,
ingest_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
Ok(self.encode_gelf(data)?)
Ok(self.encode_gelf(data, ingest_ns)?)
}

#[allow(clippy::cast_possible_truncation)]
fn finish(&mut self, data: Option<&[u8]>) -> anyhow::Result<Vec<Vec<u8>>> {
if let Some(data) = data {
Ok(self.encode_gelf(data)?)
let current_epoch_timestamp =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as u64;
Ok(self.encode_gelf(data, current_epoch_timestamp)?)

Check warning on line 180 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L178-L180

Added lines #L178 - L180 were not covered by tests
} else {
Ok(vec![])
}
Expand All @@ -105,6 +189,8 @@
use crate::postprocessor::Postprocessor;
use crate::preprocessor::{self as pre, prelude::*};

use super::generate_message_id;

#[test]
fn is_streaming() {
let gelf = super::Gelf::default();
Expand All @@ -125,7 +211,7 @@
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
id: 0,
auto_increment_id: 0,
chunk_size: 20,
};

Expand All @@ -145,4 +231,66 @@
assert_eq!(r[0].0, input_data);
Ok(())
}

#[test]
fn gelf_message_id_validation() -> anyhow::Result<()> {
let input_data = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
auto_increment_id: 2,
chunk_size: 20,
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, 2);
assert_eq!(
u64::from_be_bytes(
encoded_gelf[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id
);

// print!("\nLength of encoding message: {}",encoded_gelf[1].len());
// print!("\nBytes output: {}", encoded_gelf[1][2..10].iter().map(|x| format!("{:02x} ", x)).collect::<String>());
// print!("\nExpected Message ID: {}", u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")));

Ok(())
}

#[test]
fn gelf_message_id_validation_with_max_autoincrement_id() -> anyhow::Result<()> {
let input_data = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
auto_increment_id: u64::MAX,
chunk_size: 20,
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, u64::MAX);
let encoded_gelf_2 = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id_2 = generate_message_id(0, 0);
assert_eq!(
u64::from_be_bytes(
encoded_gelf[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id
);
assert_eq!(
u64::from_be_bytes(
encoded_gelf_2[1][2..10]
.try_into()
.expect("slice with incorrect length")
),
expected_message_id_2
);

Ok(())
}
}
Loading