From f1ffc5a661fd3dedbf43110c1f03fd46289398b6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 30 Apr 2024 19:27:10 +0900 Subject: [PATCH] yet another --- chitchat/src/lib.rs | 94 ++++++++------- chitchat/src/listener.rs | 15 ++- chitchat/src/server.rs | 10 +- chitchat/src/state.rs | 252 ++++++++++++++++++++++----------------- 4 files changed, 207 insertions(+), 164 deletions(-) diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index ecfdc6b..a2bfd89 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -450,7 +450,6 @@ mod tests { use tokio_stream::StreamExt; use super::*; - use crate::server::{spawn_chitchat, ChitchatHandle}; use crate::transport::{ChannelTransport, Transport}; const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(20); @@ -573,17 +572,11 @@ mod tests { fn test_chitchat_handshake() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = Chitchat::with_chitchat_id_and_seeds( - node_config1, - empty_seeds.clone(), - ); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); node1.self_node_state().set("key1a", "1"); node1.self_node_state().set("key2a", "2"); let node_config2 = ChitchatConfig::for_test(10_002); - let mut node2 = Chitchat::with_chitchat_id_and_seeds( - node_config2, - empty_seeds, - ); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); node2.self_node_state().set("key1b", "1"); node2.self_node_state().set("key2b", "2"); run_chitchat_handshake(&mut node1, &mut node2); @@ -604,8 +597,7 @@ mod tests { fn test_chitchat_dead_node_liveness() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = - Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); let chitchat_id = ChitchatId::for_local_test(10u16); node1.reset_node_state(&chitchat_id, std::iter::empty(), 10_000, 10u64); node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64)); @@ -625,8 +617,7 @@ mod tests { tokio::time::pause(); let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = - Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); let node_config2 = ChitchatConfig::for_test(10_002); let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); // Because of compression, we need a lot of keys to reach the MTU. @@ -1049,10 +1040,7 @@ mod tests { fn test_chitchat_listener() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = Chitchat::with_chitchat_id_and_seeds( - node_config1, - empty_seeds.clone(), - ); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); node1.self_node_state().set("self1:suffix1", "hello1"); let counter_self_key: Arc = Default::default(); let counter_other_key: Arc = Default::default(); @@ -1084,10 +1072,7 @@ mod tests { .forever(); let node_config2 = ChitchatConfig::for_test(10_002); - let mut node2 = Chitchat::with_chitchat_id_and_seeds( - node_config2, - empty_seeds, - ); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); node2.self_node_state().set("other:suffix", "hello"); assert_eq!(counter_self_key.load(Ordering::SeqCst), 0); @@ -1176,9 +1161,30 @@ mod tests { let chitchat_id = ChitchatId::for_local_test(10_003); let mut node_state = node.cluster_state.node_state_mut(&chitchat_id); - node_state.set_versioned_value("foo".to_string(), VersionedValue { value: "bar".to_string(), version: 1, status: DeletionStatus::Set }); - node_state.set_versioned_value("qux".to_string(), VersionedValue { value: "baz".to_string(), version: 2, status: DeletionStatus::Set }); - node_state.set_versioned_value("toto".to_string(), VersionedValue { value: "titi".to_string(), version: 3, status: DeletionStatus::Set }); + assert!(node_state.set_versioned_value( + "foo".to_string(), + VersionedValue { + value: "bar".to_string(), + version: 1, + status: DeletionStatus::Set + } + )); + assert!(node_state.set_versioned_value( + "qux".to_string(), + VersionedValue { + value: "baz".to_string(), + version: 2, + status: DeletionStatus::Set + } + )); + assert!(node_state.set_versioned_value( + "toto".to_string(), + VersionedValue { + value: "titi".to_string(), + version: 3, + status: DeletionStatus::Set + } + )); node.reset_node_state( &chitchat_id, @@ -1205,22 +1211,30 @@ mod tests { let chitchat_id = ChitchatId::for_local_test(10_004); let mut node_state = node.cluster_state.node_state_mut(&chitchat_id); - node_state.set_versioned_value("foo".to_string(), VersionedValue { - value: "bar".to_string(), - version: 1, - status: DeletionStatus::Set, - }); - node_state.set_versioned_value("qux".to_string(), VersionedValue { - value: "baz".to_string(), - version: 2, - status: DeletionStatus::Set, - }); - node_state.set_versioned_value("toto".to_string(), VersionedValue { - value: "titi".to_string(), - version: 3, - status: DeletionStatus::Set, - }); - + assert!(node_state.set_versioned_value( + "foo".to_string(), + VersionedValue { + value: "bar".to_string(), + version: 1, + status: DeletionStatus::Set, + } + )); + assert!(node_state.set_versioned_value( + "qux".to_string(), + VersionedValue { + value: "baz".to_string(), + version: 2, + status: DeletionStatus::Set, + } + )); + assert!(node_state.set_versioned_value( + "toto".to_string(), + VersionedValue { + value: "titi".to_string(), + version: 3, + status: DeletionStatus::Set, + } + )); node.reset_node_state( &chitchat_id, [ diff --git a/chitchat/src/listener.rs b/chitchat/src/listener.rs index 849ee7d..9479945 100644 --- a/chitchat/src/listener.rs +++ b/chitchat/src/listener.rs @@ -33,7 +33,7 @@ impl Drop for ListenerHandle { type BoxedListener = Box; -#[derive(Default, Clone)] +#[derive(Default)] pub(crate) struct Listeners { inner: Arc>, } @@ -76,7 +76,10 @@ impl Listeners { #[cfg(test)] pub(crate) fn trigger_event(&self, key_change_event: KeyChangeEvent) { - self.inner.read().unwrap().trigger_events(&[key_change_event]); + self.inner + .read() + .unwrap() + .trigger_events(&[key_change_event]); } pub(crate) fn trigger_events(&self, key_change_events: &[KeyChangeEvent]) { @@ -233,7 +236,7 @@ mod tests { #[test] fn test_listeners_simple() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); let handle = listeners.subscribe_events("prefix:", move |events| { @@ -245,7 +248,7 @@ mod tests { }); let node_id = chitchat_id(7280u16); assert_eq!(counter.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent{ + listeners.trigger_event(KeyChangeEvent { key: "prefix:strippedprefix".to_string(), value: "value".to_string(), node: node_id.clone(), @@ -253,7 +256,7 @@ mod tests { assert_eq!(counter.load(Ordering::Relaxed), 1); std::mem::drop(handle); let node_id = chitchat_id(7280u16); - listeners.trigger_event(KeyChangeEvent{ + listeners.trigger_event(KeyChangeEvent { key: "prefix:strippedprefix".to_string(), value: "value".to_string(), node: node_id.clone(), @@ -317,7 +320,7 @@ mod tests { #[test] fn test_listeners_prefixes() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let subscribe_event = |prefix: &str| { let counter: Arc = Default::default(); diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index f72e83b..be6fceb 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -417,13 +417,11 @@ where mod tests { use std::collections::BTreeMap; use std::future::Future; - use std::time::Duration; use tokio_stream::{Stream, StreamExt}; use super::*; - use crate::message::ChitchatMessage; - use crate::transport::{ChannelTransport, Transport}; + use crate::transport::ChannelTransport; use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; #[derive(Debug, Default)] @@ -526,8 +524,7 @@ mod tests { .open(outsider_config.chitchat_id.gossip_advertise_addr) .await .unwrap(); - let outsider = - Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds()); + let outsider = Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds()); let server_config = ChitchatConfig::for_test(2223); let server_addr = server_config.chitchat_id.gossip_advertise_addr; @@ -573,8 +570,7 @@ mod tests { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); let test_config = ChitchatConfig::for_test(1); let test_addr = test_config.chitchat_id.gossip_advertise_addr; - let mut test_chitchat = - Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds()); + let mut test_chitchat = Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds()); let mut test_transport = transport.open(test_addr).await.unwrap(); let server_config = ChitchatConfig::for_test(2); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 12c36b1..8855ce4 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -2,7 +2,7 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; use std::fmt::{Debug, Formatter}; -use std::net::{Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::ops::{Bound, Deref, DerefMut}; use std::time::Duration; @@ -25,9 +25,6 @@ pub struct NodeState { chitchat_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, - // TODO remove me - #[serde(skip)] - listeners: Listeners, pub(crate) max_version: Version, // This is the maximum version of the last tombstone GC. // @@ -67,13 +64,13 @@ impl Debug for NodeState { } impl NodeState { - fn new(chitchat_id: ChitchatId, listeners: Listeners) -> NodeState { + fn new(chitchat_id: ChitchatId) -> NodeState { NodeState { chitchat_id, heartbeat: Heartbeat(0), key_values: Default::default(), max_version: 0u64, - listeners, + // listeners, last_gc_version: 0u64, } } @@ -90,21 +87,6 @@ impl NodeState { self.last_gc_version = last_gc_version; } - pub fn for_test() -> NodeState { - NodeState { - chitchat_id: ChitchatId { - node_id: "test-node".to_string(), - generation_id: 0, - gossip_advertise_addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280), - }, - heartbeat: Heartbeat(0), - key_values: Default::default(), - max_version: Default::default(), - listeners: Listeners::default(), - last_gc_version: 0u64, - } - } - /// Returns the node's last heartbeat value. pub fn heartbeat(&self) -> Heartbeat { self.heartbeat @@ -207,7 +189,7 @@ impl NodeState { last_gc_version=node_delta.last_gc_version, current_last_gc_version=self.last_gc_version, "resetting node"); - *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); + *self = NodeState::new(node_delta.chitchat_id.clone()); // It is possible for the node delta to not contain any KVs. // (for instance they all have been GCed.) // @@ -228,8 +210,8 @@ impl NodeState { true } - fn apply_delta<'a>( - &'a mut self, + fn apply_delta( + &mut self, node_delta: NodeDelta, now: Instant, key_change_events: &mut Vec, @@ -320,8 +302,6 @@ impl NodeState { versioned_value.status = DeletionStatusMutation::Delete.into_status(Instant::now()); } - - fn digest(&self) -> NodeDigest { NodeDigest { heartbeat: self.heartbeat, @@ -409,7 +389,6 @@ impl NodeState { pub(crate) struct ClusterState { pub(crate) node_states: BTreeMap, seed_addrs: watch::Receiver>, - // TODO move this one level up. in the chitchat object. pub(crate) listeners: Listeners, } @@ -443,26 +422,20 @@ impl ClusterState { } } - // pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> &mut NodeState { - // self.node_states - // .entry(chitchat_id.clone()) - // .or_insert_with(|| NodeState::new(chitchat_id.clone(), self.listeners.clone())) - // } - - - pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> NodeStateMut { - // TODO use the `hash_raw_entry` feature once it gets stabilized. - // Most of the time the entry is already present. We avoid cloning chitchat_id with - // this if statement. - let listeners = self.listeners.clone(); - let self_node_state_mut= self.node_states - .entry(chitchat_id.clone()) - .or_insert_with(|| NodeState::new(chitchat_id.clone(), self.listeners.clone())); - NodeStateMut { - node_state_mut: self_node_state_mut, - listeners, - } - } + pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> NodeStateMut { + // TODO use the `hash_raw_entry` feature once it gets stabilized. + // Most of the time the entry is already present. We avoid cloning chitchat_id with + // this if statement. + let listeners = &self.listeners; + let self_node_state_mut = self + .node_states + .entry(chitchat_id.clone()) + .or_insert_with(|| NodeState::new(chitchat_id.clone())); + NodeStateMut { + node_state_mut: self_node_state_mut, + listeners, + } + } pub fn node_state(&self, chitchat_id: &ChitchatId) -> Option<&NodeState> { self.node_states.get(chitchat_id) @@ -746,7 +719,7 @@ impl From<&ClusterState> for ClusterStateSnapshot { } pub struct NodeStateMut<'a> { - pub(crate) listeners: Listeners, + pub(crate) listeners: &'a Listeners, pub(crate) node_state_mut: &'a mut NodeState, } @@ -758,7 +731,6 @@ impl<'a> Deref for NodeStateMut<'a> { } } - impl<'a> DerefMut for NodeStateMut<'a> { fn deref_mut(&mut self) -> &mut NodeState { self.node_state_mut @@ -770,7 +742,8 @@ impl<'a> NodeStateMut<'a> { self.node_state_mut.heartbeat.inc(); } - /// Attempts to set the heartbeat of another node. + /// Attempts to set the heartbeat of a node different from self. + /// (`self` should update its own heartbeat using `inc_heartbeat`.) /// If the value is actually not an update, just ignore the data and return false. /// As a corner case, the first value is not considered an update. /// @@ -790,28 +763,6 @@ impl<'a> NodeStateMut<'a> { } } - /// Contrary to `delete`, this does not delete an entry right away, - /// but rather schedules its deletion for after the grace period. - /// - /// At the grace period, the entry will be really deleted just like a regular - /// tombstoned entry. - /// - /// Implementation wise, the only difference with `delete` is that it is - /// treated as if it was present during the grace period.`` - pub fn delete_after_ttl(&mut self, key: &str) { - let delete_version = self.node_state_mut.get_new_version(); - let Some(versioned_value) = self.node_state_mut.key_values.get_mut(key) else { - warn!( - "Key `{key}` does not exist in the node's state and could not scheduled for an \ - eventual deletion.", - ); - return; - }; - self.node_state_mut.max_version = delete_version; - versioned_value.version = delete_version; - versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now()); - } - /// Sets a new value for a given key. /// /// Setting a new value automatically increments the @@ -832,7 +783,9 @@ impl<'a> NodeStateMut<'a> { self.set_with_version(key, value, new_version); } - /// Sets a new value with a TTL. + /// Set a key value with a specific version. + /// + /// If the value is changed, all matching event listener will be trigger. pub fn set_with_ttl(&mut self, key: impl ToString, value: impl ToString) { let key = key.to_string(); let value = value.to_string(); @@ -858,20 +811,10 @@ impl<'a> NodeStateMut<'a> { ); } - /// Deletes the entry associated to the given key. + /// Set a key value with a specific version. /// - /// From the reader's perspective, the entry is deleted right away. - /// - /// In reality, the entry is not removed from memory right away, but rather - /// marked with a tombstone. - /// That tombstone is annotated with the time of removal, so that after a configurable - /// grace period, it will be remove by the garbage collection. - /// - /// Delete do not trigger listeners. - pub fn delete(&mut self, key: &str) { - self.node_state_mut.delete(key); - } - + /// If the version is modified (= version is than the current version, and the value is + /// different than the existing value), then all matching event listener will be trigger. pub fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: u64) { let key = key.to_string(); let value = value.to_string(); @@ -885,6 +828,10 @@ impl<'a> NodeStateMut<'a> { ); } + /// Inner helper function. Sets the given key with the given versioned value. + /// + /// If the versioned value is not a delete, has indeed a version higher than the current + /// version, all matching event listener will be trigger. fn set_versioned_value(&mut self, key: String, versioned_value: VersionedValue) { let key_change_evt = KeyChangeEvent { key: key.clone(), @@ -898,6 +845,42 @@ impl<'a> NodeStateMut<'a> { self.listeners.trigger_events(&[key_change_evt]); } } + + /// Deletes the entry associated to the given key. + /// + /// From the reader's perspective, the entry is deleted right away. + /// + /// In reality, the entry is not removed from memory right away, but rather + /// marked with a tombstone. + /// That tombstone is annotated with the time of removal, so that after a configurable + /// grace period, it will be remove by the garbage collection. + /// + /// Delete do not trigger listeners. + pub fn delete(&mut self, key: &str) { + self.node_state_mut.delete(key); + } + + /// Contrary to `delete`, this does not delete an entry right away, + /// but rather schedules its deletion for after the grace period. + /// + /// At the grace period, the entry will be really deleted just like a regular + /// tombstoned entry. + /// + /// Implementation wise, the only difference with `delete` is that it is + /// treated as if it was present during the grace period.`` + pub fn delete_after_ttl(&mut self, key: &str) { + let delete_version = self.node_state_mut.get_new_version(); + let Some(versioned_value) = self.node_state_mut.key_values.get_mut(key) else { + warn!( + "Key `{key}` does not exist in the node's state and could not scheduled for an \ + eventual deletion.", + ); + return; + }; + self.node_state_mut.max_version = delete_version; + versioned_value.version = delete_version; + versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now()); + } } #[cfg(not(test))] @@ -918,7 +901,7 @@ mod tests { use super::*; use crate::serialize::Serializable; use crate::types::KeyValueMutation; - use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; + use crate::{KeyChangeEventRef, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; #[test] fn test_stale_node_iter_stale_key_values() { @@ -954,7 +937,6 @@ mod tests { } } - #[test] fn test_sorted_stale_nodes_empty() { let stale_nodes = SortedStaleNodes::default(); @@ -977,7 +959,6 @@ mod tests { node_state1.max_version = 2; } - { let mut node_state2 = cluster_state.node_state_mut(&node2); node_state2.set_with_version("key_a", "value_a", 1); @@ -993,10 +974,10 @@ mod tests { let node_state2 = cluster_state.node_state(&node2).unwrap(); let node_state3 = cluster_state.node_state(&node3).unwrap(); - stale_nodes.offer(&node1, &node_state1, 0u64); + stale_nodes.offer(&node1, node_state1, 0u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); - stale_nodes.offer(&node2, &node_state2, 0u64); + stale_nodes.offer(&node2, node_state2, 0u64); let expected_staleness = Staleness { is_unknown: true, @@ -1005,7 +986,7 @@ mod tests { }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); - stale_nodes.offer(&node3, &node_state3, 0u64); + stale_nodes.offer(&node3, node_state3, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 3, @@ -1028,8 +1009,8 @@ mod tests { cluster_state.node_state_mut(&node1); } { - let mut node2_state = cluster_state.node_state_mut(&node2); - node2_state.set_with_version("key_a", "value_a", 1); + let mut node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_with_version("key_a", "value_a", 1); } { @@ -1043,15 +1024,15 @@ mod tests { let node2_state = cluster_state.node_state(&node2).unwrap(); let node3_state = cluster_state.node_state(&node3).unwrap(); - stale_nodes.offer(&node1, &node1_state, 1u64); + stale_nodes.offer(&node1, node1_state, 1u64); // No stale records. This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); - stale_nodes.offer(&node2, &node2_state, 1u64); + stale_nodes.offer(&node2, node2_state, 1u64); // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); - stale_nodes.offer(&node3, &node3_state, 1u64); + stale_nodes.offer(&node3, node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, @@ -1097,7 +1078,6 @@ mod tests { // 0 stale values. { - let mut node_state4 = cluster_state.node_state_mut(&node4); node_state4.set_with_version("key_a", "value_a", 1); node_state4.set_with_version("key_b", "value_b", 2); @@ -1110,14 +1090,12 @@ mod tests { cluster_state.node_state_mut(&node5); } - // 3 stale values { let mut node_state6 = cluster_state.node_state_mut(&node6); - node_state6.set_with_version("key_a", "value_a", 1); + node_state6.set_with_version("key_a", "value_a", 1); } - let node_state1 = cluster_state.node_state(&node1).unwrap(); let node_state2 = cluster_state.node_state(&node2).unwrap(); let node_state3 = cluster_state.node_state(&node3).unwrap(); @@ -1126,12 +1104,12 @@ mod tests { let node_state6 = cluster_state.node_state(&node6).unwrap(); let mut stale_nodes = SortedStaleNodes::default(); - stale_nodes.offer(&node1, &node_state1, 1u64); - stale_nodes.offer(&node2, &node_state2, 2u64); - stale_nodes.offer(&node3, &node_state3, 7u64); - stale_nodes.offer(&node4, &node_state4, 1); - stale_nodes.offer(&node5, &node_state5, 0); - stale_nodes.offer(&node6, &node_state6, 0u64); + stale_nodes.offer(&node1, node_state1, 1u64); + stale_nodes.offer(&node2, node_state2, 2u64); + stale_nodes.offer(&node3, node_state3, 7u64); + stale_nodes.offer(&node4, node_state4, 1); + stale_nodes.offer(&node5, node_state5, 0); + stale_nodes.offer(&node6, node_state6, 0u64); // 1 stale values assert_eq!( @@ -1206,7 +1184,6 @@ mod tests { ); } - #[test] fn test_cluster_state_set_with_same_value_updates_version() { let mut cluster_state = ClusterState::default(); @@ -1362,12 +1339,11 @@ mod tests { // GC if tombstone (=100) + grace_period > heartbeat (=110). tokio::time::advance(Duration::from_secs(5)).await; cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); - assert!(cluster_state + assert!(!cluster_state .node_state(&node1) .unwrap() .key_values - .get("key_a") - .is_none()); + .contains_key("key_a")); cluster_state .node_state(&node1) .unwrap() @@ -1913,4 +1889,58 @@ mod tests { )); assert_eq!(versioned_value.value, "val_b"); } + + #[tokio::test] + async fn test_listener_batch() { + let mut cluster_state = ClusterState::default(); + let (key_change_tx, mut key_change_rx) = tokio::sync::mpsc::unbounded_channel(); + let listen_handle = cluster_state.listeners.subscribe_events( + "prefix", + move |key_changes: &[KeyChangeEventRef]| { + for key_change in key_changes { + key_change_tx + .send(format!( + "{}={}:{:?}", + &key_change.key, &key_change.value, &key_change.node + )) + .unwrap(); + } + }, + ); + let node1 = ChitchatId::for_local_test(10_001); + let node2 = ChitchatId::for_local_test(10_002); + { + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.set("prefi", "val"); + node_state1.set("prefix", "val"); + node_state1.set("prefix_a", "val"); + } + { + let mut node_state2 = cluster_state.node_state_mut(&node2); + node_state2.set("prefix_b", "val"); + node_state2.set("nonprefix", "val"); + node_state2.set("prefix_a", "val2"); + } + { + drop(listen_handle); + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.set("prefix_ignored", "val"); + } + + let mut key_change_events = Vec::new(); + for _ in 0..4 { + let key_change_event = key_change_rx.recv().await.unwrap(); + key_change_events.push(key_change_event); + } + assert!(key_change_rx.try_recv().is_err()); + assert_eq!( + &key_change_events[..], + &[ + "=val:node-10001:0:127.0.0.1:10001", + "_a=val:node-10001:0:127.0.0.1:10001", + "_b=val:node-10002:0:127.0.0.1:10002", + "_a=val2:node-10002:0:127.0.0.1:10002", + ][..] + ); + } }