Skip to content

Commit

Permalink
nats: De/Ser using sample_freq instead of sample_frequency
Browse files Browse the repository at this point in the history
Also adds private module from handling schema differences
  • Loading branch information
Benjamin Sparks committed Aug 21, 2024
1 parent f044e06 commit 41dc557
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
27 changes: 26 additions & 1 deletion nats/src/jetstream/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ pub struct ConsumerConfig {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// The maximum number of waiting consumers.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down Expand Up @@ -254,6 +259,26 @@ pub struct ConsumerConfig {
pub inactive_threshold: Duration,
}

mod from_str {
pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
T::from_str(&s).map_err(serde::de::Error::custom)
}

pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: std::fmt::Display,
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}
}

pub(crate) enum ConsumerKind {
Pull,
}
Expand Down
26 changes: 26 additions & 0 deletions nats/tests/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,32 @@ fn jetstream_pull_subscribe_bad_stream() {
.expect_err("expected not found stream for a given subject");
}

#[test]
fn jetstream_consumer_configs_sample_frequency() {
let s = nats_server::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {err}"))
.connect(s.client_url())
.unwrap();
let js = nats::jetstream::new(nc);

let sconfig = StreamConfig {
name: "SampledStream".into(),
..Default::default()
};
js.add_stream(sconfig).unwrap();

let cconfig = ConsumerConfig {
durable_name: Some("SampledConsumer".into()),
filter_subject: "SampledSubject".into(),
sample_frequency: 80,
..Default::default()
};
let consumer = js.add_consumer("SampledStream", cconfig).unwrap();

assert_eq!(80, consumer.config.sample_frequency);
}

// Helper function to return server and client.
pub fn run_basic_jetstream() -> (nats_server::Server, Connection, JetStream) {
let s = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 41dc557

Please sign in to comment.