diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 2ba66a251..2e586d1c1 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2514,6 +2514,71 @@ mod jetstream { } } + #[tokio::test] + async fn consumer_configs_sample_frequency() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let js = async_nats::jetstream::new(client.clone()); + + let stream = js + .create_stream(stream::Config { + name: "StreamWithSampledConsumer".into(), + ..Default::default() + }) + .await + .unwrap(); + + { + let consumer = stream + .create_consumer(consumer::pull::Config { + name: Some("SamplePulldConsumer".into()), + description: Some( + "See below to check that Ack Sampling has been set to 100%!".to_string(), + ), + sample_frequency: 100, // <--- sample all the messages + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(100, consumer.cached_info().config.sample_frequency); + js.delete_consumer_from_stream( + consumer.cached_info().config.name.as_ref().unwrap(), + &stream.cached_info().config.name, + ) + .await + .unwrap(); + } + + { + let consumer = stream + .create_consumer(consumer::pull::Config { + name: Some("SampledPushConsumer".into()), + description: Some( + "See below to check that Ack Sampling has been set to 100%!".to_string(), + ), + sample_frequency: 100, // <--- sample all the messages + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(100, consumer.cached_info().config.sample_frequency); + js.delete_consumer_from_stream( + consumer.cached_info().config.name.as_ref().unwrap(), + &stream.cached_info().config.name, + ) + .await + .unwrap(); + } + } + #[tokio::test] async fn timeout_out_request() { let server = nats_server::run_server("tests/configs/jetstream.conf");