From 41dc557b5f13f988dffbcb2d8a2efc671b259094 Mon Sep 17 00:00:00 2001 From: Benjamin Sparks Date: Wed, 7 Aug 2024 14:44:13 +0200 Subject: [PATCH 1/2] nats: De/Ser using `sample_freq` instead of `sample_frequency` Also adds private module from handling schema differences --- nats/src/jetstream/types.rs | 27 ++++++++++++++++++++++++++- nats/tests/jetstream.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/nats/src/jetstream/types.rs b/nats/src/jetstream/types.rs index 56af145b4..368182d83 100644 --- a/nats/src/jetstream/types.rs +++ b/nats/src/jetstream/types.rs @@ -224,7 +224,12 @@ pub struct ConsumerConfig { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// The maximum number of waiting consumers. #[serde(default, skip_serializing_if = "is_default")] @@ -254,6 +259,26 @@ pub struct ConsumerConfig { pub inactive_threshold: Duration, } +mod from_str { + pub fn deserialize<'de, T, D>(deserializer: D) -> Result + where + T: std::str::FromStr, + T::Err: std::fmt::Display, + D: serde::Deserializer<'de>, + { + let s = ::deserialize(deserializer)?; + T::from_str(&s).map_err(serde::de::Error::custom) + } + + pub fn serialize(value: &T, serializer: S) -> Result + where + T: std::fmt::Display, + S: serde::Serializer, + { + serializer.serialize_str(&value.to_string()) + } +} + pub(crate) enum ConsumerKind { Pull, } diff --git a/nats/tests/jetstream.rs b/nats/tests/jetstream.rs index 89a8a8f98..76f137f70 100644 --- a/nats/tests/jetstream.rs +++ b/nats/tests/jetstream.rs @@ -783,6 +783,32 @@ fn jetstream_pull_subscribe_bad_stream() { .expect_err("expected not found stream for a given subject"); } +#[test] +fn jetstream_consumer_configs_sample_frequency() { + let s = nats_server::run_server("tests/configs/jetstream.conf"); + let nc = nats::Options::new() + .error_callback(|err| println!("error!: {err}")) + .connect(s.client_url()) + .unwrap(); + let js = nats::jetstream::new(nc); + + let sconfig = StreamConfig { + name: "SampledStream".into(), + ..Default::default() + }; + js.add_stream(sconfig).unwrap(); + + let cconfig = ConsumerConfig { + durable_name: Some("SampledConsumer".into()), + filter_subject: "SampledSubject".into(), + sample_frequency: 80, + ..Default::default() + }; + let consumer = js.add_consumer("SampledStream", cconfig).unwrap(); + + assert_eq!(80, consumer.config.sample_frequency); +} + // Helper function to return server and client. pub fn run_basic_jetstream() -> (nats_server::Server, Connection, JetStream) { let s = nats_server::run_server("tests/configs/jetstream.conf"); From 9aa9079788222cc89335238fe56f9540cb215c52 Mon Sep 17 00:00:00 2001 From: Benjamin Sparks Date: Wed, 21 Aug 2024 14:05:40 +0200 Subject: [PATCH 2/2] CI lint rule --- nats/tests/jetstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/tests/jetstream.rs b/nats/tests/jetstream.rs index 76f137f70..330fdc244 100644 --- a/nats/tests/jetstream.rs +++ b/nats/tests/jetstream.rs @@ -806,7 +806,7 @@ fn jetstream_consumer_configs_sample_frequency() { }; let consumer = js.add_consumer("SampledStream", cconfig).unwrap(); - assert_eq!(80, consumer.config.sample_frequency); + assert_eq!(80, consumer.config.sample_frequency); } // Helper function to return server and client.