From 3aa2b310b830b455a7aed52bb4c6c18d9d401fb5 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 20 Jun 2024 14:13:16 +0200 Subject: [PATCH 01/11] Fix: adds new filters functions --- async-nats/src/jetstream/kv/mod.rs | 34 +++++++++++++++++++++++++++++- async-nats/tests/kv_tests.rs | 23 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ff99e364a..ccef6bd8c 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1026,8 +1026,14 @@ impl Store { /// # } /// ``` pub async fn keys(&self) -> Result { - let subject = format!("{}>", self.prefix.as_str()); + + self.keys_with_filter(">").await + } + + pub async fn keys_with_filter(&self, filter: &str) -> Result { + let subject: String = format!("{}{}", self.prefix.as_str(), filter); + println!("Filtering by {}", filter); let consumer = self .stream .create_consumer(super::consumer::push::OrderedConfig { @@ -1049,6 +1055,32 @@ impl Store { bucket: self.name.clone(), }; + Ok(Keys { inner: entries }) + } + pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { + let subjects = filters.iter().map(|filter| format!("{}{}", self.prefix.as_str(), filter)).collect::>(); + + let consumer = self + .stream + .create_consumer(super::consumer::push::OrderedConfig { + deliver_subject: self.stream.context.client.new_inbox(), + description: Some("kv history consumer".to_string()), + filter_subjects: subjects, + headers_only: true, + replay_policy: super::consumer::ReplayPolicy::Instant, + // We only need to know the latest state for each key, not the whole history + deliver_policy: DeliverPolicy::LastPerSubject, + ..Default::default() + }) + .await?; + + let entries = History { + done: consumer.info.num_pending == 0, + subscription: consumer.messages().await?, + prefix: self.prefix.clone(), + bucket: self.name.clone(), + }; + Ok(Keys { inner: entries }) } } diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 670c19127..5807415e2 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -758,6 +758,7 @@ mod kv { #[tokio::test] async fn keys() { let server = nats_server::run_server("tests/configs/jetstream.conf"); + println!("Server created"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) .connect(server.client_url()) @@ -795,6 +796,28 @@ mod kv { keys.sort(); assert_eq!(vec!["bar", "foo"], keys); + + let mut keys_with_filter = kv + .keys_with_filter("bar") + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filter.sort(); + assert_eq!(vec!["bar"], keys_with_filter); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo", "bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); +keys_with_filters.sort(); +assert_eq!(vec!["bar", "foo"], keys_with_filters); + + // Delete a key and make sure it doesn't show up in the keys list kv.delete("bar").await.unwrap(); let keys = kv From 8ea7e772e0cfb8f7401a823f1ac838d7612fd9bd Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 20 Jun 2024 18:10:08 +0200 Subject: [PATCH 02/11] Fix: change implementation as for the new ADR --- async-nats/src/jetstream/kv/mod.rs | 40 ++++++++---------------------- async-nats/tests/kv_tests.rs | 2 +- 2 files changed, 12 insertions(+), 30 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ccef6bd8c..b77411503 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1027,50 +1027,32 @@ impl Store { /// ``` pub async fn keys(&self) -> Result { - self.keys_with_filter(">").await + self.keys_with_filters(vec![]).await } - pub async fn keys_with_filter(&self, filter: &str) -> Result { - let subject: String = format!("{}{}", self.prefix.as_str(), filter); - - println!("Filtering by {}", filter); - let consumer = self - .stream - .create_consumer(super::consumer::push::OrderedConfig { - deliver_subject: self.stream.context.client.new_inbox(), - description: Some("kv history consumer".to_string()), - filter_subject: subject, - headers_only: true, - replay_policy: super::consumer::ReplayPolicy::Instant, - // We only need to know the latest state for each key, not the whole history - deliver_policy: DeliverPolicy::LastPerSubject, - ..Default::default() - }) - .await?; + pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { + - let entries = History { - done: consumer.info.num_pending == 0, - subscription: consumer.messages().await?, - prefix: self.prefix.clone(), - bucket: self.name.clone(), + let mut filters_config:super::consumer::push::OrderedConfig = Default::default(); + + match filters.len() { + 0 => filters_config.filter_subject = format!("{}{}", self.prefix.as_str(), ">"), + 1 => filters_config.filter_subject = format!("{}{}", self.prefix.as_str(), filters[0]), + _ => filters_config.filter_subjects = filters.iter().map(|filter| format!("{}{}", self.prefix.as_str(), filter)).collect::>() }; - Ok(Keys { inner: entries }) - } - pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { - let subjects = filters.iter().map(|filter| format!("{}{}", self.prefix.as_str(), filter)).collect::>(); + let consumer = self .stream .create_consumer(super::consumer::push::OrderedConfig { deliver_subject: self.stream.context.client.new_inbox(), description: Some("kv history consumer".to_string()), - filter_subjects: subjects, headers_only: true, replay_policy: super::consumer::ReplayPolicy::Instant, // We only need to know the latest state for each key, not the whole history deliver_policy: DeliverPolicy::LastPerSubject, - ..Default::default() + ..filters_config }) .await?; diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 5807415e2..3f85266f3 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -798,7 +798,7 @@ mod kv { let mut keys_with_filter = kv - .keys_with_filter("bar") + .keys_with_filters(vec!["bar"]) .await .unwrap() .try_collect::>() From 12ec78c9de245732b03c71fc67acbcbb6b063bc2 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 20 Jun 2024 18:15:17 +0200 Subject: [PATCH 03/11] Fix: skip the 0 case, for better performance --- async-nats/src/jetstream/kv/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index b77411503..90a0ebeba 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1027,7 +1027,7 @@ impl Store { /// ``` pub async fn keys(&self) -> Result { - self.keys_with_filters(vec![]).await + self.keys_with_filters(vec![">"]).await } pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { @@ -1036,7 +1036,7 @@ impl Store { let mut filters_config:super::consumer::push::OrderedConfig = Default::default(); match filters.len() { - 0 => filters_config.filter_subject = format!("{}{}", self.prefix.as_str(), ">"), + 0 => (), 1 => filters_config.filter_subject = format!("{}{}", self.prefix.as_str(), filters[0]), _ => filters_config.filter_subjects = filters.iter().map(|filter| format!("{}{}", self.prefix.as_str(), filter)).collect::>() }; From 5b833eaf2bf427387f505af7d2705ff2544f9aa1 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 20 Jun 2024 18:24:17 +0200 Subject: [PATCH 04/11] Fix: default is not just implicit now --- async-nats/src/jetstream/kv/mod.rs | 37 ++++++++++++++---------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 90a0ebeba..00f59ce0e 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1026,35 +1026,32 @@ impl Store { /// # } /// ``` pub async fn keys(&self) -> Result { - self.keys_with_filters(vec![">"]).await } pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { - + let mut config: super::consumer::push::OrderedConfig = super::consumer::push::OrderedConfig { + deliver_subject: self.stream.context.client.new_inbox(), + description: Some("kv history consumer".to_string()), + headers_only: true, + replay_policy: super::consumer::ReplayPolicy::Instant, + // We only need to know the latest state for each key, not the whole history + deliver_policy: DeliverPolicy::LastPerSubject, + ..Default::default() + }; - let mut filters_config:super::consumer::push::OrderedConfig = Default::default(); - match filters.len() { 0 => (), - 1 => filters_config.filter_subject = format!("{}{}", self.prefix.as_str(), filters[0]), - _ => filters_config.filter_subjects = filters.iter().map(|filter| format!("{}{}", self.prefix.as_str(), filter)).collect::>() + 1 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filters[0]), + _ => { + config.filter_subjects = filters + .iter() + .map(|filter| format!("{}{}", self.prefix.as_str(), filter)) + .collect::>() + } }; - - - let consumer = self - .stream - .create_consumer(super::consumer::push::OrderedConfig { - deliver_subject: self.stream.context.client.new_inbox(), - description: Some("kv history consumer".to_string()), - headers_only: true, - replay_policy: super::consumer::ReplayPolicy::Instant, - // We only need to know the latest state for each key, not the whole history - deliver_policy: DeliverPolicy::LastPerSubject, - ..filters_config - }) - .await?; + let consumer = self.stream.create_consumer(config).await?; let entries = History { done: consumer.info.num_pending == 0, From 0439e1355c15d1661a0a2c0e6117088cfb46afd0 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 27 Jun 2024 05:16:53 +0200 Subject: [PATCH 05/11] Fix: remove println --- async-nats/tests/kv_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 3f85266f3..d82d53193 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -758,7 +758,6 @@ mod kv { #[tokio::test] async fn keys() { let server = nats_server::run_server("tests/configs/jetstream.conf"); - println!("Server created"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) .connect(server.client_url()) From 5817c57510911179022206f6e7a2b033d4c8c589 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 27 Jun 2024 06:00:43 +0200 Subject: [PATCH 06/11] Fix: refactor using IntoIterator instead of vec --- async-nats/src/jetstream/kv/mod.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 00f59ce0e..f31d2bab0 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1029,7 +1029,7 @@ impl Store { self.keys_with_filters(vec![">"]).await } - pub async fn keys_with_filters(&self, filters: Vec<&str>) -> Result { + pub async fn keys_with_filters(&self, filters: impl IntoIterator) -> Result { let mut config: super::consumer::push::OrderedConfig = super::consumer::push::OrderedConfig { deliver_subject: self.stream.context.client.new_inbox(), description: Some("kv history consumer".to_string()), @@ -1040,16 +1040,25 @@ impl Store { ..Default::default() }; - match filters.len() { - 0 => (), - 1 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filters[0]), - _ => { - config.filter_subjects = filters - .iter() - .map(|filter| format!("{}{}", self.prefix.as_str(), filter)) - .collect::>() + let mut iter = filters.into_iter().enumerate(); + + // iterate over filters and set them in the config + while let Some(filter) = iter.next() { + match filter.0 { + 0 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filter.1), + 1 => { + // we ""move"" the filter_subject to filter_subjects, since there are > 1 filters + config.filter_subjects.push(config.filter_subject ); + config.filter_subject = Default::default(); + // obv we also push the new filter + config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); + } + _ => { + // size() is > 2 here + config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); + } } - }; + } let consumer = self.stream.create_consumer(config).await?; From e0e191946f2136159830e141b8634305d0f42be1 Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Thu, 27 Jun 2024 06:16:00 +0200 Subject: [PATCH 07/11] adds more tests --- async-nats/tests/kv_tests.rs | 51 +++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index d82d53193..4600709a7 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -806,15 +806,60 @@ mod kv { keys_with_filter.sort(); assert_eq!(vec!["bar"], keys_with_filter); + + kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); + kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); + kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo", "bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["bar", "foo"], keys_with_filters); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + let mut keys_with_filters = kv - .keys_with_filters(vec!["foo", "bar"]) + .keys_with_filters(vec!["*.baz.*",]) .await .unwrap() .try_collect::>() .await .unwrap(); -keys_with_filters.sort(); -assert_eq!(vec!["bar", "foo"], keys_with_filters); + + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + + // cleanup the keys + kv.delete("foo1.bar").await.unwrap(); + kv.delete("foo1.baz.boo").await.unwrap(); + kv.delete("foo1.baz.baz").await.unwrap(); + + // filters like "foo.b*" should not return anything because it's not a valid filter // Delete a key and make sure it doesn't show up in the keys list From a54ec80199036366c049efaaec233ae6facda22a Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Tue, 30 Jul 2024 07:04:25 +0200 Subject: [PATCH 08/11] fix(1280): apply suggestions from reviewer --- async-nats/src/jetstream/kv/mod.rs | 51 ++++++++++++++---------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index f31d2bab0..64464ba0e 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1029,35 +1029,32 @@ impl Store { self.keys_with_filters(vec![">"]).await } - pub async fn keys_with_filters(&self, filters: impl IntoIterator) -> Result { - let mut config: super::consumer::push::OrderedConfig = super::consumer::push::OrderedConfig { - deliver_subject: self.stream.context.client.new_inbox(), - description: Some("kv history consumer".to_string()), - headers_only: true, - replay_policy: super::consumer::ReplayPolicy::Instant, - // We only need to know the latest state for each key, not the whole history - deliver_policy: DeliverPolicy::LastPerSubject, - ..Default::default() - }; + pub async fn keys_with_filters( + &self, + filters: impl IntoIterator, + ) -> Result { + let mut config: super::consumer::push::OrderedConfig = + super::consumer::push::OrderedConfig { + deliver_subject: self.stream.context.client.new_inbox(), + description: Some("kv history consumer".to_string()), + headers_only: true, + replay_policy: super::consumer::ReplayPolicy::Instant, + // We only need to know the latest state for each key, not the whole history + deliver_policy: DeliverPolicy::LastPerSubject, + ..Default::default() + }; - let mut iter = filters.into_iter().enumerate(); - - // iterate over filters and set them in the config - while let Some(filter) = iter.next() { - match filter.0 { - 0 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filter.1), - 1 => { - // we ""move"" the filter_subject to filter_subjects, since there are > 1 filters - config.filter_subjects.push(config.filter_subject ); - config.filter_subject = Default::default(); - // obv we also push the new filter - config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); - } - _ => { - // size() is > 2 here - config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); - } + let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f)); + + match (filters.next(), filters.next()) { + (Some(first), None) => { + config.filter_subject = first; + } + (Some(first), Some(second)) => { + config.filter_subjects = vec![first, second]; + config.filter_subjects.extend(filters); } + _ => {} } let consumer = self.stream.create_consumer(config).await?; From 24e985a93c1d6f2782d5192cdd21454a2d76b85b Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Tue, 30 Jul 2024 07:12:44 +0200 Subject: [PATCH 09/11] fix(1261): format code for linter --- async-nats/tests/kv_tests.rs | 112 +++++++++++++++++------------------ 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 4600709a7..1b088b1d7 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -795,72 +795,70 @@ mod kv { keys.sort(); assert_eq!(vec!["bar", "foo"], keys); - let mut keys_with_filter = kv - .keys_with_filters(vec!["bar"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filter.sort(); - assert_eq!(vec!["bar"], keys_with_filter); - - - kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); - kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); - kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo", "bar"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!(vec!["bar", "foo"], keys_with_filters); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo1.*.*"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + .keys_with_filters(vec!["bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filter.sort(); + assert_eq!(vec!["bar"], keys_with_filter); + kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); + kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); + kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!(vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo", "bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["bar", "foo"], keys_with_filters); - let mut keys_with_filters = kv - .keys_with_filters(vec!["*.baz.*",]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); - keys_with_filters.sort(); - assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!( + vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], + keys_with_filters + ); + let mut keys_with_filters = kv + .keys_with_filters(vec!["*.baz.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); - // cleanup the keys - kv.delete("foo1.bar").await.unwrap(); - kv.delete("foo1.baz.boo").await.unwrap(); - kv.delete("foo1.baz.baz").await.unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); - // filters like "foo.b*" should not return anything because it's not a valid filter + // cleanup the keys + kv.delete("foo1.bar").await.unwrap(); + kv.delete("foo1.baz.boo").await.unwrap(); + kv.delete("foo1.baz.baz").await.unwrap(); + // filters like "foo.b*" should not return anything because it's not a valid filter // Delete a key and make sure it doesn't show up in the keys list kv.delete("bar").await.unwrap(); From d432bf44b507923a8f8f1625f66c5cc636ce1a7d Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Tue, 30 Jul 2024 08:36:04 +0200 Subject: [PATCH 10/11] fix(1281): adds a config feature check --- async-nats/src/jetstream/kv/mod.rs | 12 ++- async-nats/tests/kv_tests.rs | 126 +++++++++++++++-------------- 2 files changed, 74 insertions(+), 64 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 64464ba0e..284671518 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1051,8 +1051,16 @@ impl Store { config.filter_subject = first; } (Some(first), Some(second)) => { - config.filter_subjects = vec![first, second]; - config.filter_subjects.extend(filters); + #[cfg(feature = "server_2_10")] + { + config.filter_subjects = vec![first, second]; + config.filter_subjects.extend(filters); + } + #[cfg(not(feature = "server_2_10"))] + { + config.filter_subject = first; + // maybe a warning + } } _ => {} } diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 1b088b1d7..917466679 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -795,69 +795,71 @@ mod kv { keys.sort(); assert_eq!(vec!["bar", "foo"], keys); - let mut keys_with_filter = kv - .keys_with_filters(vec!["bar"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filter.sort(); - assert_eq!(vec!["bar"], keys_with_filter); - - kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); - kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); - kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo", "bar"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!(vec!["bar", "foo"], keys_with_filters); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo1.*.*"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - keys_with_filters.sort(); - assert_eq!( - vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], - keys_with_filters - ); - - let mut keys_with_filters = kv - .keys_with_filters(vec!["*.baz.*"]) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - - keys_with_filters.sort(); - assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); - - // cleanup the keys - kv.delete("foo1.bar").await.unwrap(); - kv.delete("foo1.baz.boo").await.unwrap(); - kv.delete("foo1.baz.baz").await.unwrap(); + #[cfg(feature = "server_2_10")] + { + let mut keys_with_filter = kv + .keys_with_filters(vec!["bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filter.sort(); + assert_eq!(vec!["bar"], keys_with_filter); + + kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); + kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); + kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo", "bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["bar", "foo"], keys_with_filters); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!( + vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], + keys_with_filters + ); + let mut keys_with_filters = kv + .keys_with_filters(vec!["*.baz.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + // cleanup the keys + kv.delete("foo1.bar").await.unwrap(); + kv.delete("foo1.baz.boo").await.unwrap(); + kv.delete("foo1.baz.baz").await.unwrap(); + } // filters like "foo.b*" should not return anything because it's not a valid filter // Delete a key and make sure it doesn't show up in the keys list From dd84dfc8d930bb9caab0dac7e395dfdbdb8e9f1e Mon Sep 17 00:00:00 2001 From: Luca Simonetti Date: Tue, 30 Jul 2024 13:30:30 +0200 Subject: [PATCH 11/11] fix(1281): no unused variable fix --- async-nats/src/jetstream/kv/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 284671518..69fae490c 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1050,10 +1050,10 @@ impl Store { (Some(first), None) => { config.filter_subject = first; } - (Some(first), Some(second)) => { + (Some(first), Some(_second)) => { #[cfg(feature = "server_2_10")] { - config.filter_subjects = vec![first, second]; + config.filter_subjects = vec![first, _second]; config.filter_subjects.extend(filters); } #[cfg(not(feature = "server_2_10"))]