diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index f31d2bab0..64464ba0e 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1029,35 +1029,32 @@ impl Store { self.keys_with_filters(vec![">"]).await } - pub async fn keys_with_filters(&self, filters: impl IntoIterator) -> Result { - let mut config: super::consumer::push::OrderedConfig = super::consumer::push::OrderedConfig { - deliver_subject: self.stream.context.client.new_inbox(), - description: Some("kv history consumer".to_string()), - headers_only: true, - replay_policy: super::consumer::ReplayPolicy::Instant, - // We only need to know the latest state for each key, not the whole history - deliver_policy: DeliverPolicy::LastPerSubject, - ..Default::default() - }; + pub async fn keys_with_filters( + &self, + filters: impl IntoIterator, + ) -> Result { + let mut config: super::consumer::push::OrderedConfig = + super::consumer::push::OrderedConfig { + deliver_subject: self.stream.context.client.new_inbox(), + description: Some("kv history consumer".to_string()), + headers_only: true, + replay_policy: super::consumer::ReplayPolicy::Instant, + // We only need to know the latest state for each key, not the whole history + deliver_policy: DeliverPolicy::LastPerSubject, + ..Default::default() + }; - let mut iter = filters.into_iter().enumerate(); - - // iterate over filters and set them in the config - while let Some(filter) = iter.next() { - match filter.0 { - 0 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filter.1), - 1 => { - // we ""move"" the filter_subject to filter_subjects, since there are > 1 filters - config.filter_subjects.push(config.filter_subject ); - config.filter_subject = Default::default(); - // obv we also push the new filter - config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); - } - _ => { - // size() is > 2 here - config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1)); - } + let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f)); + + match (filters.next(), filters.next()) { + (Some(first), None) => { + config.filter_subject = first; + } + (Some(first), Some(second)) => { + config.filter_subjects = vec![first, second]; + config.filter_subjects.extend(filters); } + _ => {} } let consumer = self.stream.create_consumer(config).await?;