diff --git a/chitchat-test/src/main.rs b/chitchat-test/src/main.rs index 458bbbf..1503fe3 100644 --- a/chitchat-test/src/main.rs +++ b/chitchat-test/src/main.rs @@ -37,8 +37,7 @@ impl Api { #[oai(path = "/set_kv/", method = "get")] async fn set_kv(&self, key: Query, value: Query) -> Json { let mut chitchat_guard = self.chitchat.lock().await; - - let cc_state = chitchat_guard.self_node_state(); + let mut cc_state = chitchat_guard.self_node_state(); cc_state.set(key.as_str(), value.as_str()); Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap()) @@ -48,8 +47,7 @@ impl Api { #[oai(path = "/mark_for_deletion/", method = "get")] async fn mark_for_deletion(&self, key: Query) -> Json { let mut chitchat_guard = self.chitchat.lock().await; - - let cc_state = chitchat_guard.self_node_state(); + let mut cc_state = chitchat_guard.self_node_state(); cc_state.delete(key.as_str()); Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap()) } diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index fa20272..8c4ee0c 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -240,7 +240,7 @@ impl Delta { pub(crate) fn num_tuples(&self) -> usize { self.node_deltas .iter() - .map(|node_delta| node_delta.num_tuples()) + .map(|node_delta| node_delta.key_values.len()) .sum() } @@ -323,10 +323,12 @@ pub(crate) struct NodeDelta { pub max_version: Option, } -#[cfg(test)] -impl NodeDelta { - pub fn num_tuples(&self) -> usize { - self.key_values.len() +impl IntoIterator for NodeDelta { + type Item = KeyValueMutation; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.key_values.into_iter() } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 95dfb84..d7fdd3b 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -22,6 +22,7 @@ use failure_detector::FailureDetector; pub use failure_detector::FailureDetectorConfig; pub use listener::ListenerHandle; pub use serialize::Serializable; +pub use state::NodeStateMut; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tracing::{error, info, warn}; @@ -31,7 +32,7 @@ pub use self::state::{ClusterStateSnapshot, NodeState}; use crate::digest::Digest; pub use crate::message::ChitchatMessage; pub use crate::server::{spawn_chitchat, ChitchatHandle}; -use crate::state::ClusterState; +pub use crate::state::ClusterState; pub use crate::types::{ChitchatId, DeletionStatus, Heartbeat, Version, VersionedValue}; /// Maximum UDP datagram payload size (in bytes). @@ -59,7 +60,6 @@ impl Chitchat { pub fn with_chitchat_id_and_seeds( config: ChitchatConfig, seed_addrs: watch::Receiver>, - initial_key_values: Vec<(String, String)>, ) -> Self { let failure_detector = FailureDetector::new(config.failure_detector_config.clone()); let previous_live_nodes = HashMap::new(); @@ -73,15 +73,10 @@ impl Chitchat { live_nodes_watcher_rx, }; - let self_node_state = chitchat.self_node_state(); + let mut self_node_state = chitchat.self_node_state(); // Immediately mark the node as alive to ensure it responds to SYN messages. self_node_state.inc_heartbeat(); - - // Set initial key/value pairs. - for (key, value) in initial_key_values { - self_node_state.set(key, value); - } chitchat } @@ -182,7 +177,7 @@ impl Chitchat { if chitchat_id == self.self_chitchat_id() { return; } - let node_state = self.cluster_state.node_state_mut(chitchat_id); + let mut node_state = self.cluster_state.node_state_mut(chitchat_id); if node_state.try_set_heartbeat(heartbeat) { self.failure_detector.report_heartbeat(chitchat_id); } @@ -242,7 +237,7 @@ impl Chitchat { self.cluster_state.node_state(chitchat_id) } - pub fn self_node_state(&mut self) -> &mut NodeState { + pub fn self_node_state(&mut self) -> NodeStateMut { self.cluster_state.node_state_mut(&self.config.chitchat_id) } @@ -304,17 +299,39 @@ impl Chitchat { /// /// Existing key-values that are not present in `key_values` will be deleted /// (not marked with a tombstone). + /// + /// This method returns an error if the key values version are inconsistent with the max + /// version. pub fn reset_node_state( &mut self, chitchat_id: &ChitchatId, - key_values: impl Iterator, + key_values: Vec<(String, VersionedValue)>, max_version: Version, last_gc_version: Version, - ) { - let node_state = self.cluster_state.node_state_mut(chitchat_id); + ) -> anyhow::Result<()> { + // We validate the version is compatible with what is in key_values. + if !key_values.is_empty() { + for (key, value) in &key_values { + if value.version > max_version { + anyhow::bail!( + "resetting node with kv exceeding the declared max version \ + (max_version={max_version}, key={}, kv_version={})", + key, + value.version + ); + } + } + } + + let mut node_state: NodeStateMut = self.cluster_state.node_state_mut(chitchat_id); if node_state.max_version() >= max_version { - return; + info!( + max_version = max_version, + node_max_version = node_state.max_version(), + "skipping resetting node: the state received is obsolete" + ); + return Ok(()); } // We make sure that the node is listed in the failure detector, @@ -325,20 +342,8 @@ impl Chitchat { self.failure_detector .get_or_create_sampling_window(chitchat_id); - // We don't want to call listeners for keys that are already up to date so we must do this - // dance instead of clearing the node state and then setting the new values. - let mut previous_keys: HashSet = node_state - .key_values_including_deleted() - .map(|(key, _)| key.to_string()) - .collect(); - for (key, value) in key_values { - previous_keys.remove(&key); - node_state.set_versioned_value(key, value) - } - for key in previous_keys { - node_state.remove_key_value_internal(&key); - } - node_state.set_last_gc_version(last_gc_version); + node_state.reset_node_state(key_values, max_version, last_gc_version); + Ok(()) } pub(crate) fn update_self_heartbeat(&mut self) { @@ -373,16 +378,51 @@ impl Chitchat { pub fn subscribe_event( &self, key_prefix: impl ToString, - callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync, + callback: impl Fn(KeyChangeEventRef) + 'static + Send + Sync, + ) -> ListenerHandle { + self.subscribe_event_batch(key_prefix, move |events| { + for event in events { + callback(*event) + } + }) + } + + /// Same a `subscribe_event` but receives events in batch upon + /// a `reset_node` or the reception of a chitchat update. + #[must_use] + pub fn subscribe_event_batch( + &self, + key_prefix: impl ToString, + callback: impl Fn(&[KeyChangeEventRef]) + 'static + Send + Sync, ) -> ListenerHandle { self.cluster_state() .listeners - .subscribe_event(key_prefix, callback) + .subscribe(key_prefix, callback) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct KeyChangeEvent { + /// The matching key without the prefix used to subscribe to the event. + pub key: String, + /// The new value. + pub value: String, + /// The node for which the event was triggered. + pub node: ChitchatId, +} + +impl<'a> From<&'a KeyChangeEvent> for KeyChangeEventRef<'a> { + fn from(evt: &'a KeyChangeEvent) -> Self { + KeyChangeEventRef { + key: evt.key.as_str(), + value: evt.value.as_str(), + node: &evt.node, + } } } #[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub struct KeyChangeEvent<'a> { +pub struct KeyChangeEventRef<'a> { /// The matching key without the prefix used to subscribe to the event. pub key: &'a str, /// The new value. @@ -391,10 +431,10 @@ pub struct KeyChangeEvent<'a> { pub node: &'a ChitchatId, } -impl<'a> KeyChangeEvent<'a> { - fn strip_key_prefix(&self, prefix: &str) -> Option { +impl<'a> KeyChangeEventRef<'a> { + fn strip_key_prefix(&self, prefix: &str) -> Option> { let key_without_prefix = self.key.strip_prefix(prefix)?; - Some(KeyChangeEvent { + Some(KeyChangeEventRef { key: key_without_prefix, value: self.value, node: self.node, @@ -403,7 +443,7 @@ impl<'a> KeyChangeEvent<'a> { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::ops::{Add, RangeInclusive}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -414,11 +454,19 @@ mod tests { use tokio_stream::StreamExt; use super::*; - use crate::server::{spawn_chitchat, ChitchatHandle}; use crate::transport::{ChannelTransport, Transport}; const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(20); + pub(crate) fn event_batch_to_str(events: &[KeyChangeEventRef]) -> String { + use std::fmt::Write; + let mut events_str: String = String::new(); + for evt in events { + write!(&mut events_str, "{}={},", evt.key, evt.value).unwrap(); + } + events_str + } + fn run_chitchat_handshake(initiating_node: &mut Chitchat, peer_node: &mut Chitchat) { let syn_message = initiating_node.create_syn_message(); let syn_ack_message = peer_node.process_message(syn_message).unwrap(); @@ -537,30 +585,20 @@ mod tests { fn test_chitchat_handshake() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = Chitchat::with_chitchat_id_and_seeds( - node_config1, - empty_seeds.clone(), - vec![ - ("key1a".to_string(), "1".to_string()), - ("key2a".to_string(), "2".to_string()), - ], - ); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); + node1.self_node_state().set("key1a", "1"); + node1.self_node_state().set("key2a", "2"); let node_config2 = ChitchatConfig::for_test(10_002); - let mut node2 = Chitchat::with_chitchat_id_and_seeds( - node_config2, - empty_seeds, - vec![ - ("key1b".to_string(), "1".to_string()), - ("key2b".to_string(), "2".to_string()), - ], - ); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); + node2.self_node_state().set("key1b", "1"); + node2.self_node_state().set("key2b", "2"); run_chitchat_handshake(&mut node1, &mut node2); assert_nodes_sync(&[&node1, &node2]); // useless handshake run_chitchat_handshake(&mut node1, &mut node2); assert_nodes_sync(&[&node1, &node2]); { - let state1 = node1.self_node_state(); + let mut state1 = node1.self_node_state(); state1.set("key1a", "3"); state1.set("key1c", "4"); } @@ -568,14 +606,73 @@ mod tests { assert_nodes_sync(&[&node1, &node2]); } + #[tokio::test] + async fn test_chitchat_dead_node_liveness() { + let node_config1 = ChitchatConfig::for_test(10_001); + let node_id1 = node_config1.chitchat_id.clone(); + let empty_seeds = watch::channel(Default::default()).1; + let mut chitchat = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); + let (events_tx, mut events_rx) = tokio::sync::mpsc::unbounded_channel(); + chitchat + .subscribe_event_batch("key", move |events| { + events_tx.send(event_batch_to_str(events)).unwrap(); + }) + .forever(); + chitchat + .reset_node_state( + &node_id1, + vec![ + ( + "key".to_string(), + VersionedValue { + value: "value".to_string(), + version: 1, + status: DeletionStatus::Set, + }, + ), + ( + "key2".to_string(), + VersionedValue { + value: "value".to_string(), + version: 1, + status: DeletionStatus::Set, + }, + ), + ( + "key_deleted".to_string(), + VersionedValue { + value: "value".to_string(), + version: 1, + status: DeletionStatus::Deleted(tokio::time::Instant::now()), + }, + ), + ( + "nonmatching".to_string(), + VersionedValue { + value: "value".to_string(), + version: 1, + status: DeletionStatus::Set, + }, + ), + ], + 10_000, + 10u64, + ) + .unwrap(); + let event_batch_str = events_rx.try_recv().unwrap(); + assert!(events_rx.try_recv().is_err()); + assert_eq!(&event_batch_str, "=value,2=value,"); + } + #[test] - fn test_chitchat_dead_node_liveness() { + fn test_chitchat_reset_node_triggers_batch_events() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = - Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone(), Vec::new()); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); let chitchat_id = ChitchatId::for_local_test(10u16); - node1.reset_node_state(&chitchat_id, std::iter::empty(), 10_000, 10u64); + node1 + .reset_node_state(&chitchat_id, Vec::new(), 10_000, 10u64) + .unwrap(); node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64)); node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64)); node1.update_nodes_liveness(); @@ -593,10 +690,9 @@ mod tests { tokio::time::pause(); let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = - Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone(), vec![]); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); let node_config2 = ChitchatConfig::for_test(10_002); - let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds, vec![]); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); // Because of compression, we need a lot of keys to reach the MTU. for i in 0..20_000 { let key = format!("k{}", i); @@ -1017,11 +1113,8 @@ mod tests { fn test_chitchat_listener() { let node_config1 = ChitchatConfig::for_test(10_001); let empty_seeds = watch::channel(Default::default()).1; - let mut node1 = Chitchat::with_chitchat_id_and_seeds( - node_config1, - empty_seeds.clone(), - vec![("self1:suffix1".to_string(), "hello1".to_string())], - ); + let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone()); + node1.self_node_state().set("self1:suffix1", "hello1"); let counter_self_key: Arc = Default::default(); let counter_other_key: Arc = Default::default(); @@ -1052,11 +1145,8 @@ mod tests { .forever(); let node_config2 = ChitchatConfig::for_test(10_002); - let mut node2 = Chitchat::with_chitchat_id_and_seeds( - node_config2, - empty_seeds, - vec![("other:suffix".to_string(), "hello".to_string())], - ); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds); + node2.self_node_state().set("other:suffix", "hello"); assert_eq!(counter_self_key.load(Ordering::SeqCst), 0); assert_eq!(counter_other_key.load(Ordering::SeqCst), 0); @@ -1105,7 +1195,7 @@ mod tests { })); let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default()); - let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new()); + let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx); let delta = Delta::default(); node.process_delta(delta); @@ -1121,36 +1211,35 @@ mod tests { async fn test_reset_node_state() { let config = ChitchatConfig::for_test(10_001); let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default()); - let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new()); + let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx); - let chitchat_id = ChitchatId::for_local_test(10_002); + let node_id1 = ChitchatId::for_local_test(10_002); node.reset_node_state( - &chitchat_id, - [( + &node_id1, + vec![( "foo".to_string(), VersionedValue::new("bar".to_string(), 1, false), - )] - .into_iter(), + )], 1, 1337, - ); - node.failure_detector.contains_node(&chitchat_id); + ) + .unwrap(); + node.failure_detector.contains_node(&node_id1); - let node_state = node.cluster_state.node_state(&chitchat_id).unwrap(); + let node_state = node.cluster_state.node_state(&node_id1).unwrap(); assert_eq!(node_state.num_key_values(), 1); assert_eq!(node_state.get("foo"), Some("bar")); assert_eq!(node_state.max_version(), 1); assert_eq!(node_state.last_gc_version(), 1337); - let chitchat_id = ChitchatId::for_local_test(10_003); - let node_state = node.cluster_state.node_state_mut(&chitchat_id); + let node_id2 = ChitchatId::for_local_test(10_003); + let mut node_state = node.cluster_state.node_state_mut(&node_id2); node_state.set("foo", "bar"); node_state.set("qux", "baz"); node_state.set("toto", "titi"); - node.reset_node_state( - &chitchat_id, - [ + &node_id2, + vec![ ( "qux".to_string(), VersionedValue::new("baz".to_string(), 2, false), @@ -1159,27 +1248,26 @@ mod tests { "toto".to_string(), VersionedValue::new("tutu".to_string(), 4, false), ), - ] - .into_iter(), + ], 4, 1337, - ); - let node_state = node.cluster_state.node_state(&chitchat_id).unwrap(); + ) + .unwrap(); + let node_state = node.cluster_state.node_state(&node_id2).unwrap(); assert_eq!(node_state.num_key_values(), 2); assert_eq!(node_state.get("qux"), Some("baz")); assert_eq!(node_state.get("toto"), Some("tutu")); assert_eq!(node_state.max_version(), 4); assert_eq!(node_state.last_gc_version(), 1337); - let chitchat_id = ChitchatId::for_local_test(10_004); - let node_state = node.cluster_state.node_state_mut(&chitchat_id); + let node_id3 = ChitchatId::for_local_test(10_004); + let mut node_state = node.cluster_state.node_state_mut(&node_id3); node_state.set("foo", "bar"); node_state.set("qux", "baz"); node_state.set("toto", "titi"); - node.reset_node_state( - &chitchat_id, - [ + &node_id3, + vec![ ( "foo".to_string(), VersionedValue::new("bar".to_string(), 1, false), @@ -1188,12 +1276,12 @@ mod tests { "qux".to_string(), VersionedValue::new("baz".to_string(), 2, false), ), - ] - .into_iter(), + ], 2, 1337, - ); - let node_state = node.cluster_state.node_state(&chitchat_id).unwrap(); + ) + .unwrap(); + let node_state = node.cluster_state.node_state(&node_id3).unwrap(); assert_eq!(node_state.num_key_values(), 3); assert_eq!(node_state.get("foo"), Some("bar")); assert_eq!(node_state.get("qux"), Some("baz")); diff --git a/chitchat/src/listener.rs b/chitchat/src/listener.rs index 7eebaaa..1d8a740 100644 --- a/chitchat/src/listener.rs +++ b/chitchat/src/listener.rs @@ -1,12 +1,13 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::Bound; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, Weak}; -use crate::KeyChangeEvent; +use tracing::error; + +use crate::{KeyChangeEvent, KeyChangeEventRef}; pub struct ListenerHandle { - prefix: String, + // The prefix and listener_id are used for removal of the listener. listener_id: usize, listeners: Weak>, } @@ -25,31 +26,31 @@ impl Drop for ListenerHandle { fn drop(&mut self) { if let Some(listeners) = self.listeners.upgrade() { let mut listeners_guard = listeners.write().unwrap(); - listeners_guard.remove_listener(&self.prefix, self.listener_id); + listeners_guard.remove_listener(self.listener_id); } } } -type BoxedListener = Box; +type BoxedListener = Box; -#[derive(Default, Clone)] +#[derive(Default)] pub(crate) struct Listeners { inner: Arc>, } impl Listeners { #[must_use] - pub(crate) fn subscribe_event( + pub(crate) fn subscribe( &self, key_prefix: impl ToString, - callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync, + callback: impl Fn(&[KeyChangeEventRef]) + 'static + Send + Sync, ) -> ListenerHandle { let key_prefix = key_prefix.to_string(); let boxed_listener = Box::new(callback); - self.subscribe_event_for_ligher_monomorphization(key_prefix, boxed_listener) + self.subscribe_events_for_ligher_monomorphization(key_prefix, boxed_listener) } - fn subscribe_event_for_ligher_monomorphization( + fn subscribe_events_for_ligher_monomorphization( &self, key_prefix: String, boxed_listener: BoxedListener, @@ -57,49 +58,86 @@ impl Listeners { let key_prefix = key_prefix.to_string(); let weak_listeners = Arc::downgrade(&self.inner); let mut inner_listener_guard = self.inner.write().unwrap(); - let new_idx = inner_listener_guard - .listener_idx - .fetch_add(1, Ordering::Relaxed); - inner_listener_guard.subscribe_event(&key_prefix, new_idx, boxed_listener); + let new_idx = inner_listener_guard.listener_idx; + inner_listener_guard.listener_idx += 1; + let callback_entry = CallbackEntry { + prefix: key_prefix.clone(), + callback: boxed_listener, + }; + inner_listener_guard + .callbacks + .insert(new_idx, callback_entry); + inner_listener_guard.subscribe_events(&key_prefix, new_idx); ListenerHandle { - prefix: key_prefix, listener_id: new_idx, listeners: weak_listeners, } } - pub(crate) fn trigger_event(&mut self, key_change_event: KeyChangeEvent) { - self.inner.read().unwrap().trigger_event(key_change_event); + #[cfg(test)] + pub(crate) fn trigger_single_event(&self, key_change_event: KeyChangeEvent) { + self.inner + .read() + .unwrap() + .trigger_events(&[key_change_event]); + } + + pub(crate) fn trigger_events(&self, key_change_events: &[KeyChangeEvent]) { + self.inner.read().unwrap().trigger_events(key_change_events); } } +struct CallbackEntry { + prefix: String, + callback: BoxedListener, +} + +type CallbackId = usize; + #[derive(Default)] struct InnerListeners { // A trie would have been more efficient, but in reality we don't have // that many listeners. - listeners: BTreeMap>, - listener_idx: AtomicUsize, + listeners: BTreeMap>, + listener_idx: usize, + // Callbacks is a hashmap because as we delete listeners, we create "holes" in the + // callback_id -> callback mapping + callbacks: HashMap, } impl InnerListeners { // We don't inline this to make sure monomorphization generates as little code as possible. - fn subscribe_event(&mut self, key_prefix: &str, idx: usize, callback: BoxedListener) { - if let Some(callbacks) = self.listeners.get_mut(key_prefix) { - callbacks.insert(idx, callback); - } else { - let mut listener_map = HashMap::new(); - listener_map.insert(idx, callback); - self.listeners.insert(key_prefix.to_string(), listener_map); - } + fn subscribe_events(&mut self, key_prefix: &str, idx: CallbackId) { + self.listeners + .entry(key_prefix.to_string()) + .or_default() + .push(idx); } - fn trigger_event(&self, key_change_event: KeyChangeEvent) { + fn call(&self, callback_id: CallbackId, key_change_event: &[KeyChangeEventRef]) { + let Some(CallbackEntry { callback, .. }) = self.callbacks.get(&callback_id) else { + error!( + "callback {callback_id} not found upon call. this should not happen, please report" + ); + return; + }; + (*callback)(key_change_event); + } + + fn collect_events_to_trigger<'a>( + &self, + key_change_event: KeyChangeEventRef<'a>, + callback_to_events: &mut HashMap>>, + ) { // We treat the empty prefix a tiny bit separately to get able to at least // use the first character as a range bound, as if we were going to the first level of // a trie. - if let Some(listeners) = self.listeners.get("") { - for listener in listeners.values() { - (*listener)(key_change_event); + if let Some(callback_ids) = self.listeners.get("") { + for &callback_id in callback_ids { + callback_to_events + .entry(callback_id) + .or_default() + .push(key_change_event); } } if key_change_event.key.is_empty() { @@ -115,22 +153,55 @@ impl InnerListeners { break; } if let Some(stripped_key_change_event) = key_change_event.strip_key_prefix(prefix_key) { - for listener in listeners.values() { - (*listener)(stripped_key_change_event); + for &callback_id in listeners { + callback_to_events + .entry(callback_id) + .or_default() + .push(stripped_key_change_event); } } } } - fn remove_listener(&mut self, key_prefix: &str, idx: usize) { - if let Some(callbacks) = self.listeners.get_mut(key_prefix) { - callbacks.remove(&idx); + fn trigger_events(&self, key_change_events: &[KeyChangeEvent]) { + let mut callback_to_events: HashMap> = + HashMap::default(); + // We aggregate events to trigger per callback, so that we can call each callback + // with a batch of events. + for key_change_event in key_change_events { + self.collect_events_to_trigger(key_change_event.into(), &mut callback_to_events); } + for (callback_id, key_change_events) in callback_to_events { + self.call(callback_id, &key_change_events[..]); + } + } + + fn remove_listener(&mut self, callback_id: CallbackId) { + let Some(CallbackEntry { prefix, .. }) = self.callbacks.remove(&callback_id) else { + error!( + "callback {callback_id} not found upon remove. this should not happen, please \ + report" + ); + return; + }; + let Some(callbacks) = self.listeners.get_mut(&prefix) else { + error!( + "callback prefix not foudn upon remove. this should never happen, please report" + ); + return; + }; + let position = callbacks + .iter() + .position(|x| *x == callback_id) + .expect("callback not found"); + callbacks.swap_remove(position); } } #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use super::*; use crate::ChitchatId; @@ -140,39 +211,43 @@ mod tests { #[test] fn test_listeners_simple() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); - let handle = listeners.subscribe_event("prefix:", move |key_change_event| { + let handle = listeners.subscribe("prefix:", move |events| { + assert_eq!(events.len(), 1); + let key_change_event = events[0]; assert_eq!(key_change_event.key, "strippedprefix"); assert_eq!(key_change_event.value, "value"); counter_clone.fetch_add(1, Ordering::Relaxed); }); let node_id = chitchat_id(7280u16); assert_eq!(counter.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "prefix:strippedprefix", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "prefix:strippedprefix".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter.load(Ordering::Relaxed), 1); std::mem::drop(handle); let node_id = chitchat_id(7280u16); - listeners.trigger_event(KeyChangeEvent { - key: "prefix:strippedprefix", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "prefix:strippedprefix".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter.load(Ordering::Relaxed), 1); } #[test] fn test_listeners_empty_prefix() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); listeners - .subscribe_event("", move |key_change_event| { + .subscribe("", move |events| { + assert_eq!(events.len(), 1); + let key_change_event = &events[0]; assert_eq!(key_change_event.key, "prefix:strippedprefix"); assert_eq!(key_change_event.value, "value"); counter_clone.fetch_add(1, Ordering::Relaxed); @@ -181,50 +256,54 @@ mod tests { assert_eq!(counter.load(Ordering::Relaxed), 0); let node_id = chitchat_id(7280u16); let key_change_event = KeyChangeEvent { - key: "prefix:strippedprefix", - value: "value", - node: &node_id, + key: "prefix:strippedprefix".to_string(), + value: "value".to_string(), + node: node_id.clone(), }; - listeners.trigger_event(key_change_event); + listeners.trigger_single_event(key_change_event); assert_eq!(counter.load(Ordering::Relaxed), 1); } + #[test] fn test_listeners_forever() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); - let handle = listeners.subscribe_event("prefix:", move |evt| { + let handle = listeners.subscribe("prefix:", move |evts| { + assert_eq!(evts.len(), 1); + let evt = evts[0]; assert_eq!(evt.key, "strippedprefix"); assert_eq!(evt.value, "value"); counter_clone.fetch_add(1, Ordering::Relaxed); }); assert_eq!(counter.load(Ordering::Relaxed), 0); let node_id = chitchat_id(7280u16); - listeners.trigger_event(KeyChangeEvent { - key: "prefix:strippedprefix", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "prefix:strippedprefix".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter.load(Ordering::Relaxed), 1); handle.forever(); - listeners.trigger_event(KeyChangeEvent { - key: "prefix:strippedprefix", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "prefix:strippedprefix".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter.load(Ordering::Relaxed), 2); } #[test] fn test_listeners_prefixes() { - let mut listeners = Listeners::default(); + let listeners = Listeners::default(); let subscribe_event = |prefix: &str| { let counter: Arc = Default::default(); let counter_clone = counter.clone(); listeners - .subscribe_event(prefix, move |_evt| { - counter_clone.fetch_add(1, Ordering::Relaxed); + .subscribe(prefix, move |events| { + assert_eq!(events.len(), 1); + counter_clone.fetch_add(events.len(), Ordering::Relaxed); }) .forever(); counter @@ -237,20 +316,20 @@ mod tests { let counter_bc = subscribe_event("bc"); let node_id = chitchat_id(7280u16); - listeners.trigger_event(KeyChangeEvent { - key: "hello", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "hello".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 1); assert_eq!(counter_b.load(Ordering::Relaxed), 0); assert_eq!(counter_bb.load(Ordering::Relaxed), 0); assert_eq!(counter_bb2.load(Ordering::Relaxed), 0); assert_eq!(counter_bc.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 2); assert_eq!(counter_b.load(Ordering::Relaxed), 0); @@ -258,10 +337,10 @@ mod tests { assert_eq!(counter_bb2.load(Ordering::Relaxed), 0); assert_eq!(counter_bc.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "a", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "a".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 3); assert_eq!(counter_b.load(Ordering::Relaxed), 0); @@ -269,10 +348,10 @@ mod tests { assert_eq!(counter_bb2.load(Ordering::Relaxed), 0); assert_eq!(counter_bc.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "b", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "b".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 4); @@ -281,10 +360,10 @@ mod tests { assert_eq!(counter_bb2.load(Ordering::Relaxed), 0); assert_eq!(counter_bc.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "ba", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "ba".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 5); assert_eq!(counter_b.load(Ordering::Relaxed), 2); @@ -292,10 +371,10 @@ mod tests { assert_eq!(counter_bb2.load(Ordering::Relaxed), 0); assert_eq!(counter_bc.load(Ordering::Relaxed), 0); - listeners.trigger_event(KeyChangeEvent { - key: "bb", - value: "value", - node: &node_id, + listeners.trigger_single_event(KeyChangeEvent { + key: "bb".to_string(), + value: "value".to_string(), + node: node_id.clone(), }); assert_eq!(counter_empty.load(Ordering::Relaxed), 6); assert_eq!(counter_b.load(Ordering::Relaxed), 3); diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 81bc446..be6fceb 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -139,7 +139,12 @@ pub async fn spawn_chitchat( let socket = transport.open(config.listen_addr).await?; let chitchat_id = config.chitchat_id.clone(); - let chitchat = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs, initial_key_values); + let mut chitchat = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs); + let mut self_node = chitchat.self_node_state(); + for (key, value) in initial_key_values { + self_node.set(key, value); + } + let chitchat_arc = Arc::new(Mutex::new(chitchat)); let chitchat_arc_clone = chitchat_arc.clone(); @@ -412,13 +417,11 @@ where mod tests { use std::collections::BTreeMap; use std::future::Future; - use std::time::Duration; use tokio_stream::{Stream, StreamExt}; use super::*; - use crate::message::ChitchatMessage; - use crate::transport::{ChannelTransport, Transport}; + use crate::transport::ChannelTransport; use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; #[derive(Debug, Default)] @@ -496,7 +499,7 @@ mod tests { let config1 = ChitchatConfig::for_test(1); let addr1 = config1.chitchat_id.gossip_advertise_addr; - let chitchat = Chitchat::with_chitchat_id_and_seeds(config2, empty_seeds(), Vec::new()); + let chitchat = Chitchat::with_chitchat_id_and_seeds(config2, empty_seeds()); let _handler = spawn_chitchat(config1, Vec::new(), &transport) .await .unwrap(); @@ -521,8 +524,7 @@ mod tests { .open(outsider_config.chitchat_id.gossip_advertise_addr) .await .unwrap(); - let outsider = - Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds(), Vec::new()); + let outsider = Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds()); let server_config = ChitchatConfig::for_test(2223); let server_addr = server_config.chitchat_id.gossip_advertise_addr; @@ -568,8 +570,7 @@ mod tests { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); let test_config = ChitchatConfig::for_test(1); let test_addr = test_config.chitchat_id.gossip_advertise_addr; - let mut test_chitchat = - Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds(), Vec::new()); + let mut test_chitchat = Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds()); let mut test_transport = transport.open(test_addr).await.unwrap(); let server_config = ChitchatConfig::for_test(2); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index b2a09c2..38331db 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; use std::fmt::{Debug, Formatter}; -use std::net::{Ipv4Addr, SocketAddr}; -use std::ops::Bound; +use std::net::SocketAddr; +use std::ops::{Bound, Deref, DerefMut}; use std::time::Duration; use itertools::Itertools; @@ -17,7 +17,7 @@ use tracing::{info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; use crate::listener::Listeners; -use crate::types::{DeletionStatus, DeletionStatusMutation}; +use crate::types::{DeletionStatus, DeletionStatusMutation, KeyValueMutation}; use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue}; #[derive(Clone, Serialize, Deserialize)] @@ -25,9 +25,7 @@ pub struct NodeState { chitchat_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, - #[serde(skip)] - listeners: Listeners, - max_version: Version, + pub(crate) max_version: Version, // This is the maximum version of the last tombstone GC. // // Due to the garbage collection of tombstones, we cannot @@ -65,14 +63,61 @@ impl Debug for NodeState { } } +fn is_key_value_applicable( + key_value_mutation: &KeyValueMutation, + max_version: u64, + last_gc_version: u64, +) -> bool { + if key_value_mutation.version <= max_version { + // We already know about this KV. + return false; + } + if key_value_mutation.status.scheduled_for_deletion() { + // This KV has already been GCed. + if key_value_mutation.version <= last_gc_version { + return false; + } + } + true +} + +#[cfg(feature = "testsuite")] impl NodeState { - fn new(chitchat_id: ChitchatId, listeners: Listeners) -> NodeState { + pub fn for_test() -> NodeState { + use std::net::Ipv4Addr; + + NodeState { + chitchat_id: ChitchatId { + node_id: "test-node".to_string(), + generation_id: 0, + gossip_advertise_addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280), + }, + heartbeat: Heartbeat(0), + key_values: Default::default(), + max_version: Default::default(), + last_gc_version: 0u64, + } + } + + pub fn set(&mut self, key: impl ToString, value: impl ToString) { + let version = self.get_new_version(); + let versioned_value = VersionedValue { + value: value.to_string(), + version, + status: DeletionStatus::Set, + }; + let _ = self.set_versioned_value_internal(key.to_string(), versioned_value); + } +} + +impl NodeState { + fn new(chitchat_id: ChitchatId) -> NodeState { NodeState { chitchat_id, heartbeat: Heartbeat(0), key_values: Default::default(), max_version: 0u64, - listeners, + // listeners, last_gc_version: 0u64, } } @@ -89,27 +134,13 @@ impl NodeState { self.last_gc_version = last_gc_version; } - pub fn for_test() -> NodeState { - NodeState { - chitchat_id: ChitchatId { - node_id: "test-node".to_string(), - generation_id: 0, - gossip_advertise_addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280), - }, - heartbeat: Heartbeat(0), - key_values: Default::default(), - max_version: Default::default(), - listeners: Listeners::default(), - last_gc_version: 0u64, - } - } - /// Returns the node's last heartbeat value. pub fn heartbeat(&self) -> Heartbeat { self.heartbeat } /// Returns the node's max version. + #[inline] pub fn max_version(&self) -> Version { self.max_version } @@ -129,10 +160,6 @@ impl NodeState { .map(|(key, versioned_value)| (key, versioned_value.value.as_str())) } - pub fn set_max_version(&mut self, max_version: Version) { - self.max_version = max_version; - } - // Prepare the node state to receive a delta. // Returns `true` if the delta can be applied. In that case, the node state may be mutated (if a // reset is required) Returns `false` if the delta cannot be applied. In that case, the node @@ -209,8 +236,12 @@ impl NodeState { last_gc_version=node_delta.last_gc_version, current_last_gc_version=self.last_gc_version, "resetting node"); - *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); - // The node_delta max_version whe + *self = NodeState::new(node_delta.chitchat_id.clone()); + // It is possible for the node delta to not contain any KVs. + // (for instance they all have been GCed.) + // + // In that case, no KV are here to tell us what the max version is, so the + // node_delta itself holds a max_version. if let Some(max_version) = node_delta.max_version { if node_delta.key_values.is_empty() { self.max_version = max_version; @@ -226,28 +257,38 @@ impl NodeState { true } - fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) { + fn apply_delta( + &mut self, + node_delta: NodeDelta, + now: Instant, + key_change_events: &mut Vec, + ) { if !self.prepare_apply_delta(&node_delta) { return; } - let current_max_version = self.max_version(); - for key_value_mutation in node_delta.key_values { - if key_value_mutation.version <= current_max_version { - // We already know about this KV. + let current_max_version = self.max_version; + for key_value_mutation in node_delta { + if !is_key_value_applicable( + &key_value_mutation, + current_max_version, + self.last_gc_version, + ) { continue; } - if key_value_mutation.status.scheduled_for_deletion() { - // This KV has already been GCed. - if key_value_mutation.version <= self.last_gc_version { - continue; - } - } let new_versioned_value = VersionedValue { - value: key_value_mutation.value, + value: key_value_mutation.value.clone(), version: key_value_mutation.version, status: key_value_mutation.status.into_status(now), }; - self.set_versioned_value(key_value_mutation.key, new_versioned_value); + let was_an_update = self + .set_versioned_value_internal(key_value_mutation.key.clone(), new_versioned_value); + if was_an_update { + key_change_events.push(KeyChangeEvent { + key: key_value_mutation.key, + value: key_value_mutation.value, + node: self.chitchat_id().clone(), + }); + } } } @@ -287,50 +328,11 @@ impl NodeState { self.key_values.get(key) } - /// Sets a new value for a given key. - /// - /// Setting a new value automatically increments the - /// version of the entire NodeState unless the value stays - /// the same. - pub fn set(&mut self, key: impl ToString, value: impl ToString) { - let key = key.to_string(); - let value = value.to_string(); - if let Some(previous_versioned_value) = self.get_versioned(&key) { - if previous_versioned_value.value == value - && matches!(previous_versioned_value.status, DeletionStatus::Set) - { - // No need to change anything, the value is already set! - return; - } - } - let new_version = self.max_version + 1; - self.set_with_version(key, value, new_version); - } - - /// Sets a new value with a TTL. - pub fn set_with_ttl(&mut self, key: impl ToString, value: impl ToString) { - let key = key.to_string(); - let value = value.to_string(); - if let Some(previous_versioned_value) = self.get_versioned(&key) { - if previous_versioned_value.value == value - && matches!( - previous_versioned_value.status, - DeletionStatus::DeleteAfterTtl(_) - ) - { - // No need to change anything, the value is already set! - return; - } - } - let new_version = self.max_version + 1; - self.set_versioned_value( - key.to_string(), - VersionedValue { - value: value.to_string(), - version: new_version, - status: DeletionStatus::DeleteAfterTtl(Instant::now()), - }, - ); + /// Get a brand new version. This function does NOT update + /// the max_version. The `set` operation that should do this + /// will do that. + pub(crate) fn get_new_version(&self) -> u64 { + self.max_version + 1 } /// Deletes the entry associated to the given key. @@ -352,51 +354,6 @@ impl NodeState { versioned_value.status = DeletionStatusMutation::Delete.into_status(Instant::now()); } - /// Contrary to `delete`, this does not delete an entry right away, - /// but rather schedules its deletion for after the grace period. - /// - /// At the grace period, the entry will be really deleted just like a regular - /// tombstoned entry. - /// - /// Implementation wise, the only difference with `delete` is that it is - /// treated as if it was present during the grace period.`` - pub fn delete_after_ttl(&mut self, key: &str) { - let Some(versioned_value) = self.key_values.get_mut(key) else { - warn!( - "Key `{key}` does not exist in the node's state and could not scheduled for an \ - eventual deletion.", - ); - return; - }; - self.max_version += 1; - versioned_value.version = self.max_version; - versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now()); - } - - pub(crate) fn inc_heartbeat(&mut self) { - self.heartbeat.inc(); - } - - /// Attempts to set the heartbeat of another node. - /// If the value is actually not an update, just ignore the data and return false. - /// As a corner case, the first value is not considered an update. - /// - /// Otherwise, returns true. - pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool { - if self.heartbeat.0 == 0 { - // This is the first heartbeat. - // Let's set it, but we do not consider it as an update. - self.heartbeat = heartbeat_new_value; - return false; - } - if heartbeat_new_value > self.heartbeat { - self.heartbeat = heartbeat_new_value; - true - } else { - false - } - } - fn digest(&self) -> NodeDigest { NodeDigest { heartbeat: self.heartbeat, @@ -454,25 +411,25 @@ impl NodeState { /// This operation is ignored if the key value inserted has a version that is obsolete. /// /// This method also update the max_version if necessary. - pub(crate) fn set_versioned_value( + /// + /// Returns true iff the value was actually updated, and is associated to a value that is NOT + /// deleted. + /// + /// This method is marked as internal as no listeners will be called. + /// You should probably be mutating NodeState through the `NodeStateMut` object. + #[must_use] + fn set_versioned_value_internal( &mut self, key: String, versioned_value_update: VersionedValue, - ) { - let key_clone = key.clone(); - let key_change_event = KeyChangeEvent { - key: key_clone.as_str(), - value: &versioned_value_update.value, - node: &self.chitchat_id, - }; + ) -> bool { self.max_version = versioned_value_update.version.max(self.max_version); - - match self.key_values.entry(key) { + match self.key_values.entry(key.clone()) { Entry::Occupied(mut occupied) => { let occupied_versioned_value = occupied.get_mut(); // The current version is more recent than the newer version. if occupied_versioned_value.version >= versioned_value_update.version { - return; + return false; } *occupied_versioned_value = versioned_value_update.clone(); } @@ -480,25 +437,11 @@ impl NodeState { vacant.insert(versioned_value_update.clone()); } }; - if !versioned_value_update.is_deleted() { - self.listeners.trigger_event(key_change_event); - } - } - - fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: Version) { - assert!(version > self.max_version); - self.set_versioned_value( - key.to_string(), - VersionedValue { - value: value.to_string(), - version, - status: DeletionStatus::Set, - }, - ); + !versioned_value_update.is_deleted() } } -pub(crate) struct ClusterState { +pub struct ClusterState { pub(crate) node_states: BTreeMap, seed_addrs: watch::Receiver>, pub(crate) listeners: Listeners, @@ -513,7 +456,7 @@ impl Debug for ClusterState { } } -#[cfg(test)] +#[cfg(any(test, feature = "testsuite"))] impl Default for ClusterState { fn default() -> Self { let (_seed_addrs_tx, seed_addrs_rx) = watch::channel(Default::default()); @@ -534,13 +477,19 @@ impl ClusterState { } } - pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> &mut NodeState { + pub fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> NodeStateMut { // TODO use the `hash_raw_entry` feature once it gets stabilized. // Most of the time the entry is already present. We avoid cloning chitchat_id with // this if statement. - self.node_states + let listeners = &self.listeners; + let self_node_state_mut = self + .node_states .entry(chitchat_id.clone()) - .or_insert_with(|| NodeState::new(chitchat_id.clone(), self.listeners.clone())) + .or_insert_with(|| NodeState::new(chitchat_id.clone())); + NodeStateMut { + node_state_mut: self_node_state_mut, + listeners, + } } pub fn node_state(&self, chitchat_id: &ChitchatId) -> Option<&NodeState> { @@ -562,10 +511,12 @@ impl ClusterState { pub(crate) fn apply_delta(&mut self, delta: Delta) { let now = Instant::now(); // Apply delta. + let mut key_change_events: Vec = Vec::new(); for node_delta in delta.node_deltas { - let node_state = self.node_state_mut(&node_delta.chitchat_id); - node_state.apply_delta(node_delta, now); + let mut node_state = self.node_state_mut(&node_delta.chitchat_id); + node_state.apply_delta(node_delta, now, &mut key_change_events); } + self.listeners.trigger_events(&key_change_events) } pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest { @@ -822,6 +773,213 @@ impl From<&ClusterState> for ClusterStateSnapshot { } } +/// A thin wrapper around `NodeState` that provides a mutable view of the node state +/// listens for key-value updates and triggers updates when necessary. +pub struct NodeStateMut<'a> { + pub(crate) listeners: &'a Listeners, + pub(crate) node_state_mut: &'a mut NodeState, +} + +impl<'a> Deref for NodeStateMut<'a> { + type Target = NodeState; + + fn deref(&self) -> &NodeState { + self.node_state_mut + } +} + +impl<'a> DerefMut for NodeStateMut<'a> { + fn deref_mut(&mut self) -> &mut NodeState { + self.node_state_mut + } +} + +impl<'a> NodeStateMut<'a> { + pub(crate) fn inc_heartbeat(&mut self) { + self.node_state_mut.heartbeat.inc(); + } + + /// Attempts to set the heartbeat of a node different from self. + /// (`self` should update its own heartbeat using `inc_heartbeat`.) + /// If the value is actually not an update, just ignore the data and return false. + /// As a corner case, the first value is not considered an update. + /// + /// Otherwise, returns true. + pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool { + if self.heartbeat.0 == 0 { + // This is the first heartbeat. + // Let's set it, but we do not consider it as an update. + self.node_state_mut.heartbeat = heartbeat_new_value; + return false; + } + if heartbeat_new_value > self.heartbeat { + self.node_state_mut.heartbeat = heartbeat_new_value; + true + } else { + false + } + } + + /// Panics if the reset version is not greater than the actual current version. + pub(crate) fn reset_node_state( + &mut self, + key_values: Vec<(String, VersionedValue)>, + max_version: Version, + last_gc_version: Version, + ) { + assert!(max_version > self.max_version()); + + // We don't want to call listeners for keys that are already up to date so we must do this + // dance instead of clearing the node state and then setting the new values. + let mut previous_keys: HashSet = self + .key_values_including_deleted() + .map(|(key, _)| key.to_string()) + .collect(); + + let mut key_change_events = Vec::new(); + for (key, value) in key_values { + assert!(value.version <= max_version); + previous_keys.remove(&key); + let is_a_value_update: bool = + self.set_versioned_value_internal(key.clone(), value.clone()); + if is_a_value_update { + // We need to keep track of the key change evenets and batch their execution + key_change_events.push(KeyChangeEvent { + key: key.clone(), + value: value.value, + node: self.chitchat_id.clone(), + }); + } + } + for key in previous_keys { + self.remove_key_value_internal(&key); + } + + self.set_last_gc_version(last_gc_version); + self.max_version = self.max_version.max(max_version); + self.listeners.trigger_events(&key_change_events[..]); + } + + /// Sets a new value for a given key. + /// + /// Setting a new value automatically increments the + /// version of the entire NodeState unless the value stays + /// the same. + pub fn set(&mut self, key: impl ToString, value: impl ToString) { + let key = key.to_string(); + let value = value.to_string(); + if let Some(previous_versioned_value) = self.node_state_mut.get_versioned(&key) { + if previous_versioned_value.value == value + && matches!(previous_versioned_value.status, DeletionStatus::Set) + { + // No need to change anything, the value is already set! + return; + } + } + let new_version = self.node_state_mut.max_version + 1; + self.set_with_version(key, value, new_version); + } + + /// Set a key value with a specific version. + /// + /// If the value is changed, all matching event listener will be trigger. + pub fn set_with_ttl(&mut self, key: impl ToString, value: impl ToString) { + let key = key.to_string(); + let value = value.to_string(); + if let Some(previous_versioned_value) = self.node_state_mut.get_versioned(&key) { + if previous_versioned_value.value == value + && matches!( + previous_versioned_value.status, + DeletionStatus::DeleteAfterTtl(_) + ) + { + // No need to change anything, the value is already set! + return; + } + } + let new_version = self.node_state_mut.max_version + 1; + self.set_versioned_value( + key.to_string(), + VersionedValue { + value: value.to_string(), + version: new_version, + status: DeletionStatus::DeleteAfterTtl(Instant::now()), + }, + ); + } + + /// Set a key value with a specific version. + /// + /// If the version is modified (= version is than the current version, and the value is + /// different than the existing value), then all matching event listener will be trigger. + pub fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: u64) { + let key = key.to_string(); + let value = value.to_string(); + self.set_versioned_value( + key, + VersionedValue { + value, + version, + status: DeletionStatus::Set, + }, + ); + } + + /// Inner helper function. Sets the given key with the given versioned value. + /// + /// If the versioned value is not a delete, has indeed a version higher than the current + /// version, all matching event listener will be trigger. + fn set_versioned_value(&mut self, key: String, versioned_value: VersionedValue) { + let key_change_evt = KeyChangeEvent { + key: key.clone(), + value: versioned_value.value.clone(), + node: self.node_state_mut.chitchat_id().clone(), + }; + let was_updated = self + .node_state_mut + .set_versioned_value_internal(key, versioned_value); + if was_updated { + self.listeners.trigger_events(&[key_change_evt]); + } + } + + /// Deletes the entry associated to the given key. + /// + /// From the reader's perspective, the entry is deleted right away. + /// + /// In reality, the entry is not removed from memory right away, but rather + /// marked with a tombstone. + /// That tombstone is annotated with the time of removal, so that after a configurable + /// grace period, it will be remove by the garbage collection. + /// + /// Delete do not trigger listeners. + pub fn delete(&mut self, key: &str) { + self.node_state_mut.delete(key); + } + + /// Contrary to `delete`, this does not delete an entry right away, + /// but rather schedules its deletion for after the grace period. + /// + /// At the grace period, the entry will be really deleted just like a regular + /// tombstoned entry. + /// + /// Implementation wise, the only difference with `delete` is that it is + /// treated as if it was present during the grace period.`` + pub fn delete_after_ttl(&mut self, key: &str) { + let delete_version = self.node_state_mut.get_new_version(); + let Some(versioned_value) = self.node_state_mut.key_values.get_mut(key) else { + warn!( + "Key `{key}` does not exist in the node's state and could not scheduled for an \ + eventual deletion.", + ); + return; + }; + self.node_state_mut.max_version = delete_version; + versioned_value.version = delete_version; + versioned_value.status = DeletionStatusMutation::DeleteAfterTtl.into_status(Instant::now()); + } +} + #[cfg(not(test))] fn random_generator() -> impl Rng { rand::thread_rng() @@ -839,14 +997,14 @@ fn random_generator() -> impl Rng { mod tests { use super::*; use crate::serialize::Serializable; - use crate::types::{DeletionStatusMutation, KeyValueMutation}; - use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; + use crate::{KeyChangeEventRef, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; #[test] fn test_stale_node_iter_stale_key_values() { + let mut cluster_state = ClusterState::default(); { let node = ChitchatId::for_local_test(10_001); - let node_state = NodeState::for_test(); + let node_state = cluster_state.node_state_mut(&node); let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, @@ -856,17 +1014,10 @@ mod tests { } { let node = ChitchatId::for_local_test(10_001); - let mut node_state = NodeState::for_test(); - node_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 3)); - node_state - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 1)); - + let mut node_state = cluster_state.node_state_mut(&node); + node_state.set_with_version("key_c", "value_c", 1); + node_state.set_with_version("key_b", "value_b", 2); + node_state.set_with_version("key_a", "value_a", 3); let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, @@ -890,6 +1041,7 @@ mod tests { #[test] fn test_sorted_stale_nodes_insert() { + let mut cluster_state = ClusterState::default(); let mut stale_nodes = SortedStaleNodes::default(); let node1 = ChitchatId::for_local_test(10_001); @@ -898,15 +1050,31 @@ mod tests { // No stale KV. We still insert the node! // That way it will get a node state, and be a candidate for gossip later. - let mut node_state1 = NodeState::for_test(); - node_state1.set_max_version(2); + { + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.max_version = 2; + } + + { + let mut node_state2 = cluster_state.node_state_mut(&node2); + node_state2.set_with_version("key_a", "value_a", 1); + } + + { + let mut node3_state = cluster_state.node_state_mut(&node3); + node3_state.set_with_version("key_b", "value_b", 2); + node3_state.set_with_version("key_c", "value_c", 3); + } - stale_nodes.offer(&node1, &node_state1, 0u64); + let node_state1 = cluster_state.node_state(&node1).unwrap(); + let node_state2 = cluster_state.node_state(&node2).unwrap(); + let node_state3 = cluster_state.node_state(&node3).unwrap(); + + stale_nodes.offer(&node1, node_state1, 0u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); - let mut node2_state = NodeState::for_test(); - node2_state.set_with_version("key_a", "value_a", 1); - stale_nodes.offer(&node2, &node2_state, 0u64); + stale_nodes.offer(&node2, node_state2, 0u64); + let expected_staleness = Staleness { is_unknown: true, max_version: 1, @@ -914,11 +1082,7 @@ mod tests { }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); - let mut node3_state = NodeState::for_test(); - node3_state.set_with_version("key_b", "value_b", 2); - node3_state.set_with_version("key_c", "value_c", 3); - - stale_nodes.offer(&node3, &node3_state, 0u64); + stale_nodes.offer(&node3, node_state3, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 3, @@ -929,27 +1093,42 @@ mod tests { #[test] fn test_sorted_stale_nodes_offer() { + let mut cluster_state = ClusterState::default(); + let mut stale_nodes = SortedStaleNodes::default(); let node1 = ChitchatId::for_local_test(10_001); - let node1_state = NodeState::for_test(); - stale_nodes.offer(&node1, &node1_state, 1u64); + let node2 = ChitchatId::for_local_test(10_002); + let node3 = ChitchatId::for_local_test(10_003); + + { + cluster_state.node_state_mut(&node1); + } + { + let mut node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_with_version("key_a", "value_a", 1); + } + + { + let mut node3_state = cluster_state.node_state_mut(&node3); + node3_state.set_with_version("key_a", "value_a", 1); + node3_state.set_with_version("key_b", "value_b", 2); + node3_state.set_with_version("key_c", "value_c", 3); + } + + let node1_state = cluster_state.node_state(&node1).unwrap(); + let node2_state = cluster_state.node_state(&node2).unwrap(); + let node3_state = cluster_state.node_state(&node3).unwrap(); + + stale_nodes.offer(&node1, node1_state, 1u64); // No stale records. This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); - let node2 = ChitchatId::for_local_test(10_002); - let mut node2_state = NodeState::for_test(); - node2_state.set_with_version("key_a", "value_a", 1); - stale_nodes.offer(&node2, &node2_state, 1u64); + stale_nodes.offer(&node2, node2_state, 1u64); // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); - let node3 = ChitchatId::for_local_test(10_002); - let mut node3_state = NodeState::for_test(); - node3_state.set_with_version("key_a", "value_a", 1); - node3_state.set_with_version("key_b", "value_b", 2); - node3_state.set_with_version("key_c", "value_c", 3); - stale_nodes.offer(&node3, &node3_state, 1u64); + stale_nodes.offer(&node3, node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, @@ -961,49 +1140,72 @@ mod tests { #[test] fn test_sorted_stale_nodes_into_iter() { - let mut stale_nodes = SortedStaleNodes::default(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); - let mut node_state1 = NodeState::for_test(); - node_state1.set_with_version("key_a", "value_a", 1); - node_state1.set_with_version("key_b", "value_b", 2); - node_state1.set_with_version("key_c", "value_c", 3); - stale_nodes.offer(&node1, &node_state1, 1u64); + let node2 = ChitchatId::for_local_test(10_002); + let node3 = ChitchatId::for_local_test(10_003); + let node4 = ChitchatId::for_local_test(10_004); + let node5 = ChitchatId::for_local_test(10_005); + let node6 = ChitchatId::for_local_test(10_006); + + { + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.set_with_version("key_a", "value_a", 1); + node_state1.set_with_version("key_b", "value_b", 2); + node_state1.set_with_version("key_c", "value_c", 3); + } + // 2 stale values. + { + let mut node_state2 = cluster_state.node_state_mut(&node2); + node_state2.set_with_version("key_a", "value", 1); + node_state2.set_with_version("key_b", "value_b", 2); + node_state2.set_with_version("key_c", "value_c", 5); + } - let node2 = ChitchatId::for_local_test(10_002); - let mut node_state2 = NodeState::for_test(); - node_state2.set_with_version("key_a", "value", 1); - node_state2.set_with_version("key_b", "value_b", 2); - node_state2.set_with_version("key_c", "value_c", 5); - stale_nodes.offer(&node2, &node_state2, 2u64); // 1 stale value. + { + let mut node_state3 = cluster_state.node_state_mut(&node3); + node_state3.set_with_version("key_a", "value_a", 1); + node_state3.set_with_version("key_b", "value_b", 2); + node_state3.set_with_version("key_c", "value_c", 3); + } - let node3 = ChitchatId::for_local_test(10_003); - let mut node_state3 = NodeState::for_test(); - node_state3.set_with_version("key_a", "value_a", 1); - node_state3.set_with_version("key_b", "value_b", 2); - node_state3.set_with_version("key_c", "value_c", 3); - stale_nodes.offer(&node3, &node_state3, 7u64); // 0 stale values. + { + let mut node_state4 = cluster_state.node_state_mut(&node4); + node_state4.set_with_version("key_a", "value_a", 1); + node_state4.set_with_version("key_b", "value_b", 2); + node_state4.set_with_version("key_c", "value_c", 5); + node_state4.set_with_version("key_d", "value_d", 7); + } - let node4 = ChitchatId::for_local_test(10_004); - let mut node_state4 = NodeState::for_test(); - node_state4.set_with_version("key_a", "value_a", 1); - node_state4.set_with_version("key_b", "value_b", 2); - node_state4.set_with_version("key_c", "value_c", 5); - node_state4.set_with_version("key_d", "value_d", 7); - stale_nodes.offer(&node4, &node_state4, 1); + // 0 stale values + { + cluster_state.node_state_mut(&node5); + } // 3 stale values - let node5 = ChitchatId::for_local_test(10_005); - let node_state5 = NodeState::for_test(); - stale_nodes.offer(&node5, &node_state5, 0); + { + let mut node_state6 = cluster_state.node_state_mut(&node6); + node_state6.set_with_version("key_a", "value_a", 1); + } - // 0 stale values - let node6 = ChitchatId::for_local_test(10_006); - let mut node_state6 = NodeState::for_test(); - node_state6.set_with_version("key_a", "value_a", 1); - stale_nodes.offer(&node6, &node_state6, 0u64); + let node_state1 = cluster_state.node_state(&node1).unwrap(); + let node_state2 = cluster_state.node_state(&node2).unwrap(); + let node_state3 = cluster_state.node_state(&node3).unwrap(); + let node_state4 = cluster_state.node_state(&node4).unwrap(); + let node_state5 = cluster_state.node_state(&node5).unwrap(); + let node_state6 = cluster_state.node_state(&node6).unwrap(); + + let mut stale_nodes = SortedStaleNodes::default(); + stale_nodes.offer(&node1, node_state1, 1u64); + stale_nodes.offer(&node2, node_state2, 2u64); + stale_nodes.offer(&node3, node_state3, 7u64); + stale_nodes.offer(&node4, node_state4, 1); + stale_nodes.offer(&node5, node_state5, 0); + stale_nodes.offer(&node6, node_state6, 0u64); // 1 stale values assert_eq!( @@ -1025,8 +1227,8 @@ mod tests { #[test] fn test_cluster_state_first_version_is_one() { let mut cluster_state = ClusterState::default(); - let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); - node_state.set("key_a", ""); + let mut node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + node_state.set_with_version("key_a", "", 1); assert_eq!( node_state.get_versioned("key_a").unwrap(), &VersionedValue { @@ -1040,8 +1242,8 @@ mod tests { #[test] fn test_cluster_state_set() { let mut cluster_state = ClusterState::default(); - let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); - node_state.set("key_a", "1"); + let mut node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + node_state.set_with_version("key_a", "1", 1); assert_eq!( node_state.get_versioned("key_a").unwrap(), &VersionedValue { @@ -1050,7 +1252,7 @@ mod tests { status: DeletionStatus::Set, } ); - node_state.set("key_b", "2"); + node_state.set_with_version("key_b", "2", 2); assert_eq!( node_state.get_versioned("key_a").unwrap(), &VersionedValue { @@ -1067,7 +1269,7 @@ mod tests { status: DeletionStatus::Set, } ); - node_state.set("key_a", "3"); + node_state.set_with_version("key_a", "3", 3); assert_eq!( node_state.get_versioned("key_a").unwrap(), &VersionedValue { @@ -1081,7 +1283,7 @@ mod tests { #[test] fn test_cluster_state_set_with_same_value_updates_version() { let mut cluster_state = ClusterState::default(); - let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + let mut node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); node_state.set("key", "1"); assert_eq!( node_state.get_versioned("key").unwrap(), @@ -1105,16 +1307,15 @@ mod tests { #[test] fn test_cluster_state_set_and_mark_for_deletion() { let mut cluster_state = ClusterState::default(); - let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + let mut node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); node_state.heartbeat = Heartbeat(10); - node_state.set("key", "1"); + node_state.set_with_version("key", "1", 1); node_state.delete("key"); assert!(node_state.get("key").is_none()); { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, ""); assert_eq!(versioned_value.version, 2u64); - assert!(versioned_value.is_deleted()); assert!(versioned_value .status .time_of_start_scheduled_for_deletion() @@ -1122,7 +1323,7 @@ mod tests { } // Overriding the same key - node_state.set("key", "2"); + node_state.set_with_version("key", "2", 3u64); { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, "2"); @@ -1138,16 +1339,19 @@ mod tests { #[test] fn test_cluster_state_delete_after_ttl() { let mut cluster_state = ClusterState::default(); - let node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); - node_state.heartbeat = Heartbeat(10); - node_state.set("key", "1"); + let mut node_state = cluster_state.node_state_mut(&ChitchatId::for_local_test(10_001)); + node_state.inc_heartbeat(); + node_state.inc_heartbeat(); + node_state.inc_heartbeat(); + assert_eq!(node_state.heartbeat(), Heartbeat(3)); + node_state.set_with_version("key", "1", 3); node_state.delete_after_ttl("key"); { let value = node_state.get("key").unwrap(); assert_eq!(value, "1"); let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, "1"); - assert_eq!(versioned_value.version, 2u64); + assert_eq!(versioned_value.version, 4u64); assert!(versioned_value .status .time_of_start_scheduled_for_deletion() @@ -1160,11 +1364,11 @@ mod tests { } // Overriding the same key - node_state.set("key", "2"); + node_state.set_with_version("key", "2", 5u64); { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, "2"); - assert_eq!(versioned_value.version, 3u64); + assert_eq!(versioned_value.version, 5u64); assert!(!versioned_value.is_deleted()); assert!(versioned_value .status @@ -1178,21 +1382,24 @@ mod tests { fn test_cluster_state_compute_digest() { let mut cluster_state = ClusterState::default(); let node1 = ChitchatId::for_local_test(10_001); - let node1_state = cluster_state.node_state_mut(&node1); - node1_state.set("key_a", ""); - let node2 = ChitchatId::for_local_test(10_002); - let node2_state = cluster_state.node_state_mut(&node2); - node2_state.set_last_gc_version(10u64); - node2_state.set("key_a", ""); - node2_state.set("key_b", ""); - let digest = cluster_state.compute_digest(&HashSet::new()); + { + let mut node1_state = cluster_state.node_state_mut(&node1); + node1_state.set("key_a", ""); + } + + { + let mut node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_last_gc_version(10u64); + node2_state.set("key_a", ""); + node2_state.set("key_b", ""); + } + let digest = cluster_state.compute_digest(&HashSet::new()); let mut expected_node_digests = Digest::default(); expected_node_digests.add_node(node1.clone(), Heartbeat(0), 0, 1); expected_node_digests.add_node(node2.clone(), Heartbeat(0), 10u64, 2); - assert_eq!(&digest, &expected_node_digests); } @@ -1201,12 +1408,14 @@ mod tests { tokio::time::pause(); let mut cluster_state = ClusterState::default(); let node1 = ChitchatId::for_local_test(10_001); - let node1_state = cluster_state.node_state_mut(&node1); - node1_state.set("key_a", "1"); - node1_state.delete("key_a"); // Version 2. Tombstone set to heartbeat 100. - tokio::time::advance(Duration::from_secs(5)).await; - node1_state.set_with_version("key_b".to_string(), "3".to_string(), 13); // 3 - node1_state.heartbeat = Heartbeat(110); + { + let mut node1_state = cluster_state.node_state_mut(&node1); + node1_state.set("key_a", "1"); + node1_state.delete("key_a"); // Version 2. Tombstone set to heartbeat 100. + tokio::time::advance(Duration::from_secs(5)).await; + node1_state.set_with_version("key_b".to_string(), "3".to_string(), 13); // 3 + node1_state.heartbeat = Heartbeat(110); + } // No GC as tombstone is less than 10 secs old. cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); @@ -1226,12 +1435,11 @@ mod tests { // GC if tombstone (=100) + grace_period > heartbeat (=110). tokio::time::advance(Duration::from_secs(5)).await; cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); - assert!(cluster_state + assert!(!cluster_state .node_state(&node1) .unwrap() .key_values - .get("key_a") - .is_none()); + .contains_key("key_a")); cluster_state .node_state(&node1) .unwrap() @@ -1245,13 +1453,17 @@ mod tests { let mut cluster_state = ClusterState::default(); let node1 = ChitchatId::for_local_test(10_001); - let node1_state = cluster_state.node_state_mut(&node1); - node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 - node1_state.set_with_version("key_b".to_string(), "3".to_string(), 3); // 2 + { + let mut node1_state = cluster_state.node_state_mut(&node1); + node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 + node1_state.set_with_version("key_b".to_string(), "3".to_string(), 3); // 2 + } let node2 = ChitchatId::for_local_test(10_002); - let node2_state = cluster_state.node_state_mut(&node2); - node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 + { + let mut node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 + } let mut delta = Delta::default(); delta.add_node(node1.clone(), 0u64, 0u64); @@ -1339,18 +1551,22 @@ mod tests { fn test_cluster_state() -> ClusterState { let mut cluster_state = ClusterState::default(); - let node1 = ChitchatId::for_local_test(10_001); - let node1_state = cluster_state.node_state_mut(&node1); - node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 - node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 + { + let node1 = ChitchatId::for_local_test(10_001); + let mut node1_state = cluster_state.node_state_mut(&node1); + node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 + node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 + } - let node2 = ChitchatId::for_local_test(10_002); - let node2_state = cluster_state.node_state_mut(&node2); - node2_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 - node2_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 - node2_state.set_with_version("key_c".to_string(), "3".to_string(), 3); // 3 - node2_state.set_with_version("key_d".to_string(), "4".to_string(), 4); // 4 - node2_state.delete("key_d"); // 5 + { + let node2 = ChitchatId::for_local_test(10_002); + let mut node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 + node2_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 + node2_state.set_with_version("key_c".to_string(), "3".to_string(), 3); // 3 + node2_state.set_with_version("key_d".to_string(), "4".to_string(), 4); // 4 + node2_state.delete("key_d"); // 5 + } cluster_state } @@ -1449,12 +1665,13 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); let node2 = ChitchatId::for_local_test(10_002); { - let node1_state = cluster_state.node_state_mut(&node1); + let mut node1_state = cluster_state.node_state_mut(&node1); node1_state.heartbeat = Heartbeat(10000); node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 - - let node2_state = cluster_state.node_state_mut(&node2); + } + { + let mut node2_state = cluster_state.node_state_mut(&node2); node2_state.set_with_version("key_c".to_string(), "3".to_string(), 2); // 2 } @@ -1526,7 +1743,9 @@ mod tests { #[test] fn test_iter_prefix() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set("Europe", ""); node_state.set("Europe:", ""); node_state.set("Europe:UK", ""); @@ -1544,7 +1763,9 @@ mod tests { #[test] fn test_node_apply_delta_simple() { - let mut node_state = NodeState::for_test(); + let node1 = ChitchatId::for_local_test(10_001); + let mut cluster_state = ClusterState::default(); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_version("key_a", "val_a", 1); node_state.set_with_version("key_b", "val_a", 2); let node_delta = NodeDelta { @@ -1567,7 +1788,8 @@ mod tests { }, ], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut key_change_events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut key_change_events); assert_eq!(node_state.num_key_values(), 3); assert_eq!(node_state.max_version(), 4); assert_eq!(node_state.last_gc_version, 0); @@ -1582,7 +1804,9 @@ mod tests { // version. #[test] fn test_node_apply_same_value_different_version() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let chitchat_id = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&chitchat_id); node_state.set_with_version("key_a", "val_a", 1); let node_delta = NodeDelta { chitchat_id: node_state.chitchat_id.clone(), @@ -1596,7 +1820,8 @@ mod tests { status: DeletionStatusMutation::Set, }], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut events); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 3); assert_eq!(versioned_a.status, DeletionStatus::Set); @@ -1605,7 +1830,9 @@ mod tests { #[test] fn test_node_skip_delta_from_the_future() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_version("key_a", "val_a", 5); assert_eq!(node_state.max_version(), 5); let node_delta = NodeDelta { @@ -1620,7 +1847,8 @@ mod tests { status: DeletionStatusMutation::Set, }], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut events); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 5); assert_eq!(versioned_a.status, DeletionStatus::Set); @@ -1631,7 +1859,9 @@ mod tests { async fn test_node_apply_delta_different_last_gc_is_ok_if_below_max_version() { tokio::time::pause(); const GC_PERIOD: Duration = Duration::from_secs(10); - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_version("key_a", "val_a", 17); node_state.delete("key_a"); tokio::time::advance(GC_PERIOD).await; @@ -1651,7 +1881,8 @@ mod tests { status: DeletionStatusMutation::Set, }], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut events); let versioned_a = node_state.get_versioned("key_a").unwrap(); assert_eq!(versioned_a.version, 32); assert_eq!(node_state.max_version(), 32); @@ -1662,7 +1893,9 @@ mod tests { #[tokio::test] async fn test_node_apply_delta_on_reset_fresher_version() { tokio::time::pause(); - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_version("key_a", "val_a", 17); assert_eq!(node_state.max_version(), 17); let node_delta = NodeDelta { @@ -1677,7 +1910,8 @@ mod tests { status: DeletionStatusMutation::Set, }], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut events); assert!(node_state.get_versioned("key_a").is_none()); let versioned_b = node_state.get_versioned("key_b").unwrap(); assert_eq!(versioned_b.version, 32); @@ -1686,7 +1920,9 @@ mod tests { #[tokio::test] async fn test_node_apply_delta_no_reset_if_older_version() { tokio::time::pause(); - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_version("key_a", "val_a", 31); node_state.set_with_version("key_b", "val_b2", 32); assert_eq!(node_state.max_version(), 32); @@ -1704,16 +1940,84 @@ mod tests { status: DeletionStatusMutation::Set, }], }; - node_state.apply_delta(node_delta, Instant::now()); + let mut events = Vec::new(); + node_state.apply_delta(node_delta, Instant::now(), &mut events); assert_eq!(node_state.max_version, 32); let versioned_b = node_state.get_versioned("key_b").unwrap(); assert_eq!(versioned_b.version, 32); assert_eq!(versioned_b.value, "val_b2"); } + #[tokio::test] + async fn test_node_apply_delta_batches_events() { + let mut cluster_state = ClusterState::default(); + let (events_tx, mut events_rx) = tokio::sync::mpsc::unbounded_channel::(); + let _listener = cluster_state.listeners.subscribe("key", move |events| { + let events_str: String = crate::tests::event_batch_to_str(events); + events_tx.send(events_str).unwrap(); + }); + let node1 = ChitchatId::for_local_test(10_001); + let node2 = ChitchatId::for_local_test(10_002); + cluster_state.node_state_mut(&node1); + + let mut delta_serializer = DeltaSerializer::with_mtu(100_000); + delta_serializer.try_add_node(node1.clone(), 0, 0); + delta_serializer.try_add_kv( + "key", + VersionedValue { + value: "value".to_string(), + version: 1, + status: DeletionStatus::Set, + }, + ); + delta_serializer.try_add_kv( + "key1", + VersionedValue { + value: "value1".to_string(), + version: 2, + status: DeletionStatus::Set, + }, + ); + delta_serializer.try_add_kv( + "key2", + VersionedValue { + value: "deleted".to_string(), + version: 3, + status: DeletionStatus::Deleted(Instant::now()), + }, + ); + delta_serializer.try_add_kv( + "key3", + VersionedValue { + value: "value3".to_string(), + version: 4, + status: DeletionStatus::DeleteAfterTtl(Instant::now()), + }, + ); + // we add another node to make sure we are batching events across nodes. + delta_serializer.try_add_node(node2.clone(), 0, 0); + delta_serializer.try_add_kv( + "key3", + VersionedValue { + value: "value3".to_string(), + version: 1, + status: DeletionStatus::DeleteAfterTtl(Instant::now()), + }, + ); + let delta: Delta = delta_serializer.finish(); + cluster_state.apply_delta(delta); + + let event = events_rx.recv().await.unwrap(); + assert!(events_rx.try_recv().is_err()); + + assert_eq!(&event, "=value,1=value1,3=value3,3=value3,"); + } + #[test] fn test_node_set_delete() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set("key_a", "val_b"); node_state.delete("key_a"); assert!(node_state.get("key_a").is_none()); @@ -1721,7 +2025,9 @@ mod tests { #[test] fn test_node_set_delete_after_ttl_set() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set("key_a", "val_b"); node_state.delete_after_ttl("key_a"); node_state.set("key_a", "val_b2"); @@ -1733,7 +2039,9 @@ mod tests { #[test] fn test_node_set_with_ttl() { - let mut node_state = NodeState::for_test(); + let mut cluster_state = ClusterState::default(); + let node1 = ChitchatId::for_local_test(10_001); + let mut node_state = cluster_state.node_state_mut(&node1); node_state.set_with_ttl("key_a", "val_b"); let versioned_value = node_state.get_versioned("key_a").unwrap(); assert!(matches!( @@ -1742,4 +2050,58 @@ mod tests { )); assert_eq!(versioned_value.value, "val_b"); } + + #[tokio::test] + async fn test_listener_batch() { + let mut cluster_state = ClusterState::default(); + let (key_change_tx, mut key_change_rx) = tokio::sync::mpsc::unbounded_channel(); + let listen_handle = cluster_state.listeners.subscribe( + "prefix", + move |key_changes: &[KeyChangeEventRef]| { + for key_change in key_changes { + key_change_tx + .send(format!( + "{}={}:{:?}", + &key_change.key, &key_change.value, &key_change.node + )) + .unwrap(); + } + }, + ); + let node1 = ChitchatId::for_local_test(10_001); + let node2 = ChitchatId::for_local_test(10_002); + { + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.set("prefi", "val"); + node_state1.set("prefix", "val"); + node_state1.set("prefix_a", "val"); + } + { + let mut node_state2 = cluster_state.node_state_mut(&node2); + node_state2.set("prefix_b", "val"); + node_state2.set("nonprefix", "val"); + node_state2.set("prefix_a", "val2"); + } + { + drop(listen_handle); + let mut node_state1 = cluster_state.node_state_mut(&node1); + node_state1.set("prefix_ignored", "val"); + } + + let mut key_change_events = Vec::new(); + for _ in 0..4 { + let key_change_event = key_change_rx.recv().await.unwrap(); + key_change_events.push(key_change_event); + } + assert!(key_change_rx.try_recv().is_err()); + assert_eq!( + &key_change_events[..], + &[ + "=val:node-10001:0:127.0.0.1:10001", + "_a=val:node-10001:0:127.0.0.1:10001", + "_b=val:node-10002:0:127.0.0.1:10002", + "_a=val2:node-10002:0:127.0.0.1:10002", + ][..] + ); + } }