Skip to content

Commit

Permalink
fix(1280): apply suggestions from reviewer
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Simonetti authored and Jarema committed Jul 30, 2024
1 parent e0e1919 commit a54ec80
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,35 +1029,32 @@ impl Store {
self.keys_with_filters(vec![">"]).await
}

pub async fn keys_with_filters(&self, filters: impl IntoIterator<Item = &str>) -> Result<Keys, HistoryError> {
let mut config: super::consumer::push::OrderedConfig = super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
headers_only: true,
replay_policy: super::consumer::ReplayPolicy::Instant,
// We only need to know the latest state for each key, not the whole history
deliver_policy: DeliverPolicy::LastPerSubject,
..Default::default()
};
pub async fn keys_with_filters(
&self,
filters: impl IntoIterator<Item = &str>,
) -> Result<Keys, HistoryError> {
let mut config: super::consumer::push::OrderedConfig =
super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
headers_only: true,
replay_policy: super::consumer::ReplayPolicy::Instant,
// We only need to know the latest state for each key, not the whole history
deliver_policy: DeliverPolicy::LastPerSubject,
..Default::default()
};

let mut iter = filters.into_iter().enumerate();

// iterate over filters and set them in the config
while let Some(filter) = iter.next() {
match filter.0 {
0 => config.filter_subject = format!("{}{}", self.prefix.as_str(), filter.1),
1 => {
// we ""move"" the filter_subject to filter_subjects, since there are > 1 filters
config.filter_subjects.push(config.filter_subject );
config.filter_subject = Default::default();
// obv we also push the new filter
config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1));
}
_ => {
// size() is > 2 here
config.filter_subjects.push(format!("{}{}", self.prefix.as_str(), filter.1));
}
let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f));

match (filters.next(), filters.next()) {
(Some(first), None) => {
config.filter_subject = first;
}
(Some(first), Some(second)) => {
config.filter_subjects = vec![first, second];
config.filter_subjects.extend(filters);
}
_ => {}
}

let consumer = self.stream.create_consumer(config).await?;
Expand Down

0 comments on commit a54ec80

Please sign in to comment.