Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink: amqp): add priority property #22243

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/add_priority_amqp_sink.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Support was added for `priority` on the `amqp` sink, to set a priority on messages sent.
The priority value can be templated to an integer between 0 and 255 (inclusive).

authors: aramperes
166 changes: 160 additions & 6 deletions src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
use crate::{amqp::AmqpConfig, sinks::prelude::*};
use lapin::{types::ShortString, BasicProperties};
use std::sync::Arc;
use vector_lib::codecs::TextSerializerConfig;
use vector_lib::{
codecs::TextSerializerConfig,
internal_event::{error_stage, error_type},
};

use super::sink::AmqpSink;

Expand All @@ -19,10 +22,13 @@ pub struct AmqpPropertiesConfig {

/// Expiration for AMQP messages (in milliseconds)
pub(crate) expiration_ms: Option<u64>,

/// Priority for AMQP messages.
pub(crate) priority: Option<Template>,
}

impl AmqpPropertiesConfig {
pub(super) fn build(&self) -> BasicProperties {
pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
let mut prop = BasicProperties::default();
if let Some(content_type) = &self.content_type {
prop = prop.with_content_type(ShortString::from(content_type.clone()));
Expand All @@ -33,7 +39,36 @@ impl AmqpPropertiesConfig {
if let Some(expiration_ms) = &self.expiration_ms {
prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
}
prop
if let Some(priority_template) = &self.priority {
let priority_string = priority_template
.render_string(event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("properties.priority"),
drop_event: true,
})
})
.ok()?;

// Valid template but invalid priorty type (not numeric) does not throw an error; instead warn.
// Fall back to no priority in those cases (equivalent to 0).
match priority_string.parse::<u8>() {
Ok(priority) => {
prop = prop.with_priority(priority);
}
Err(error) => {
warn!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if this is 'too spammy' because we can potentially have one per event.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is why internal_log_rate_limit is set to true? Unless that doesn't throttle the logs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, yes 👍
We should probably make internal_log_rate_limit true by default.

message = "Failed to convert to numeric value for \"properties.priority\"",
error = %error,
error_type = error_type::CONVERSION_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,
);
}
}
}
Some(prop)
}
}

Expand Down Expand Up @@ -127,7 +162,126 @@ pub(super) async fn healthcheck(channel: Arc<lapin::Channel>) -> crate::Result<(
Ok(())
}

#[test]
pub fn generate_config() {
crate::test_util::test_generate_config::<AmqpSinkConfig>();
#[cfg(test)]
mod tests {
use super::*;
use crate::config::format::{deserialize, Format};

#[test]
pub fn generate_config() {
crate::test_util::test_generate_config::<AmqpSinkConfig>();
}

fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
assert_eq!(
config
.properties
.unwrap()
.priority
.unwrap()
.render_string(event)
.unwrap(),
priority.to_string()
);
}

#[test]
pub fn parse_config_priority_static() {
for (format, config) in [
(
Format::Yaml,
r#"
exchange: "test"
routing_key: "user_id"
encoding:
codec: "json"
connection_string: "amqp://user:[email protected]:5672/"
properties:
priority: 1
"#,
),
(
Format::Toml,
r#"
exchange = "test"
routing_key = "user_id"
encoding.codec = "json"
connection_string = "amqp://user:[email protected]:5672/"
properties = { priority = 1 }
"#,
),
(
Format::Json,
r#"
{
"exchange": "test",
"routing_key": "user_id",
"encoding": {
"codec": "json"
},
"connection_string": "amqp://user:[email protected]:5672/",
"properties": {
"priority": 1
}
}
"#,
),
] {
let config: AmqpSinkConfig = deserialize(&config, format).unwrap();
let event = LogEvent::from_str_legacy("message");
assert_config_priority_eq(config, &event, 1);
}
}

#[test]
pub fn parse_config_priority_templated() {
for (format, config) in [
(
Format::Yaml,
r#"
exchange: "test"
routing_key: "user_id"
encoding:
codec: "json"
connection_string: "amqp://user:[email protected]:5672/"
properties:
priority: "{{ .priority }}"
"#,
),
(
Format::Toml,
r#"
exchange = "test"
routing_key = "user_id"
encoding.codec = "json"
connection_string = "amqp://user:[email protected]:5672/"
properties = { priority = "{{ .priority }}" }
"#,
),
(
Format::Json,
r#"
{
"exchange": "test",
"routing_key": "user_id",
"encoding": {
"codec": "json"
},
"connection_string": "amqp://user:[email protected]:5672/",
"properties": {
"priority": "{{ .priority }}"
}
}
"#,
),
] {
let config: AmqpSinkConfig = deserialize(&config, format).unwrap();
let event = {
let mut event = LogEvent::from_str_legacy("message");
event.insert("priority", 2);
event
};
assert_config_priority_eq(config, &event, 2);
}
}
}
133 changes: 131 additions & 2 deletions src/sinks/amqp/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use crate::{
},
SourceSender,
};
use config::AmqpPropertiesConfig;
use futures::StreamExt;
use std::{collections::HashSet, sync::Arc, time::Duration};
use vector_lib::config::LogNamespace;
use vector_lib::{config::LogNamespace, event::LogEvent};

pub fn make_config() -> AmqpSinkConfig {
let mut config = AmqpSinkConfig {
Expand All @@ -21,9 +22,10 @@ pub fn make_config() -> AmqpSinkConfig {
};
let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
config.connection.connection_string =
format!("amqp://{}:{}@rabbitmq:5672/{}", user, pass, vhost);
format!("amqp://{}:{}@{}:5672/{}", user, pass, host, vhost);
config
}

Expand Down Expand Up @@ -124,6 +126,10 @@ async fn amqp_happy_path() {
{
let msg = try_msg.unwrap();
let s = String::from_utf8_lossy(msg.data.as_slice()).into_owned();

let msg_priority = *msg.properties.priority();
assert_eq!(msg_priority, None);

out.push(s);
} else {
failures += 1;
Expand Down Expand Up @@ -218,3 +224,126 @@ async fn amqp_round_trip() {

assert_eq!(output.len(), nb_events_published);
}

async fn amqp_priority_with_template(
template: &str,
event_field_priority: Option<u8>,
expected_priority: Option<u8>,
) {
let mut config = make_config();
let exchange = format!("test-{}-exchange", random_string(10));
config.exchange = Template::try_from(exchange.as_str()).unwrap();
config.properties = Some(AmqpPropertiesConfig {
priority: Some(Template::try_from(template).unwrap()),
..Default::default()
});

await_connection(&config.connection).await;
let (_conn, channel) = config.connection.connect().await.unwrap();
let exchange_opts = lapin::options::ExchangeDeclareOptions {
auto_delete: true,
..Default::default()
};
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Fanout,
exchange_opts,
lapin::types::FieldTable::default(),
)
.await
.unwrap();

let cx = SinkContext::default();
let (sink, healthcheck) = config.build(cx).await.unwrap();
healthcheck.await.expect("Health check failed");

// prepare consumer
let queue = format!("test-{}-queue", random_string(10));
let queue_opts = lapin::options::QueueDeclareOptions {
auto_delete: true,
..Default::default()
};
let queue_args = {
let mut args = lapin::types::FieldTable::default();
args.insert(
lapin::types::ShortString::from("x-max-priority"),
lapin::types::AMQPValue::ShortInt(10), // Maximum priority value
);
args
};
channel
.queue_declare(&queue, queue_opts, queue_args)
.await
.unwrap();

channel
.queue_bind(
&queue,
&exchange,
"",
lapin::options::QueueBindOptions::default(),
lapin::types::FieldTable::default(),
)
.await
.unwrap();

let consumer = format!("test-{}-consumer", random_string(10));
let mut consumer = channel
.basic_consume(
&queue,
&consumer,
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
)
.await
.unwrap();

// Send a single event with a priority defined in the event
let input = random_string(100);
let event = {
let mut event = LogEvent::from_str_legacy(&input);
if let Some(priority) = event_field_priority {
event.insert("priority", priority);
}
event
};

let events = futures::stream::iter(vec![event]);
run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;

if let Ok(Some(try_msg)) = tokio::time::timeout(Duration::from_secs(10), consumer.next()).await
{
let msg = try_msg.unwrap();
let msg_priority = *msg.properties.priority();
let output = String::from_utf8_lossy(msg.data.as_slice()).into_owned();

assert_eq!(msg_priority, expected_priority);
assert_eq!(output, input);
} else {
panic!("Did not consume message in time");
}
}

#[tokio::test]
async fn amqp_priority_template_variable() {
crate::test_util::trace_init();

amqp_priority_with_template("{{ priority }}", Some(5), Some(5)).await;
}

#[tokio::test]
async fn amqp_priority_template_constant() {
crate::test_util::trace_init();

amqp_priority_with_template("5", None, Some(5)).await;
}

#[tokio::test]
async fn amqp_priority_template_out_of_bounds() {
crate::test_util::trace_init();

amqp_priority_with_template("-1", None, None).await;
amqp_priority_with_template("100000", None, None).await;
amqp_priority_with_template("not a number", None, None).await;
}
2 changes: 1 addition & 1 deletion src/sinks/amqp/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl AmqpSink {

let properties = match &self.properties {
None => BasicProperties::default(),
Some(prop) => prop.build(),
Some(prop) => prop.build(&event)?,
};

Some(AmqpEvent {
Expand Down
Loading