Skip to content

Commit

Permalink
yet another
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 1, 2024
1 parent a3189dd commit af3b099
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 165 deletions.
2 changes: 1 addition & 1 deletion chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl NodeDelta {
}

impl NodeDelta {
pub fn applicable_key_value_mutations(
pub(crate) fn applicable_key_value_mutations(
self,
max_version: u64,
last_gc_version: u64,
Expand Down
94 changes: 54 additions & 40 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ mod tests {
use tokio_stream::StreamExt;

use super::*;
use crate::server::{spawn_chitchat, ChitchatHandle};
use crate::transport::{ChannelTransport, Transport};

const DEAD_NODE_GRACE_PERIOD: Duration = Duration::from_secs(20);
Expand Down Expand Up @@ -573,17 +572,11 @@ mod tests {
fn test_chitchat_handshake() {
let node_config1 = ChitchatConfig::for_test(10_001);
let empty_seeds = watch::channel(Default::default()).1;
let mut node1 = Chitchat::with_chitchat_id_and_seeds(
node_config1,
empty_seeds.clone(),
);
let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
node1.self_node_state().set("key1a", "1");
node1.self_node_state().set("key2a", "2");
let node_config2 = ChitchatConfig::for_test(10_002);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(
node_config2,
empty_seeds,
);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds);
node2.self_node_state().set("key1b", "1");
node2.self_node_state().set("key2b", "2");
run_chitchat_handshake(&mut node1, &mut node2);
Expand All @@ -604,8 +597,7 @@ mod tests {
fn test_chitchat_dead_node_liveness() {
let node_config1 = ChitchatConfig::for_test(10_001);
let empty_seeds = watch::channel(Default::default()).1;
let mut node1 =
Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
let chitchat_id = ChitchatId::for_local_test(10u16);
node1.reset_node_state(&chitchat_id, std::iter::empty(), 10_000, 10u64);
node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64));
Expand All @@ -625,8 +617,7 @@ mod tests {
tokio::time::pause();
let node_config1 = ChitchatConfig::for_test(10_001);
let empty_seeds = watch::channel(Default::default()).1;
let mut node1 =
Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
let node_config2 = ChitchatConfig::for_test(10_002);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds);
// Because of compression, we need a lot of keys to reach the MTU.
Expand Down Expand Up @@ -1049,10 +1040,7 @@ mod tests {
fn test_chitchat_listener() {
let node_config1 = ChitchatConfig::for_test(10_001);
let empty_seeds = watch::channel(Default::default()).1;
let mut node1 = Chitchat::with_chitchat_id_and_seeds(
node_config1,
empty_seeds.clone(),
);
let mut node1 = Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone());
node1.self_node_state().set("self1:suffix1", "hello1");
let counter_self_key: Arc<AtomicUsize> = Default::default();
let counter_other_key: Arc<AtomicUsize> = Default::default();
Expand Down Expand Up @@ -1084,10 +1072,7 @@ mod tests {
.forever();

let node_config2 = ChitchatConfig::for_test(10_002);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(
node_config2,
empty_seeds,
);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds);
node2.self_node_state().set("other:suffix", "hello");

assert_eq!(counter_self_key.load(Ordering::SeqCst), 0);
Expand Down Expand Up @@ -1176,9 +1161,30 @@ mod tests {

let chitchat_id = ChitchatId::for_local_test(10_003);
let mut node_state = node.cluster_state.node_state_mut(&chitchat_id);
node_state.set_versioned_value("foo".to_string(), VersionedValue { value: "bar".to_string(), version: 1, status: DeletionStatus::Set });
node_state.set_versioned_value("qux".to_string(), VersionedValue { value: "baz".to_string(), version: 2, status: DeletionStatus::Set });
node_state.set_versioned_value("toto".to_string(), VersionedValue { value: "titi".to_string(), version: 3, status: DeletionStatus::Set });
assert!(node_state.set_versioned_value(
"foo".to_string(),
VersionedValue {
value: "bar".to_string(),
version: 1,
status: DeletionStatus::Set
}
));
assert!(node_state.set_versioned_value(
"qux".to_string(),
VersionedValue {
value: "baz".to_string(),
version: 2,
status: DeletionStatus::Set
}
));
assert!(node_state.set_versioned_value(
"toto".to_string(),
VersionedValue {
value: "titi".to_string(),
version: 3,
status: DeletionStatus::Set
}
));

node.reset_node_state(
&chitchat_id,
Expand All @@ -1205,22 +1211,30 @@ mod tests {

let chitchat_id = ChitchatId::for_local_test(10_004);
let mut node_state = node.cluster_state.node_state_mut(&chitchat_id);
node_state.set_versioned_value("foo".to_string(), VersionedValue {
value: "bar".to_string(),
version: 1,
status: DeletionStatus::Set,
});
node_state.set_versioned_value("qux".to_string(), VersionedValue {
value: "baz".to_string(),
version: 2,
status: DeletionStatus::Set,
});
node_state.set_versioned_value("toto".to_string(), VersionedValue {
value: "titi".to_string(),
version: 3,
status: DeletionStatus::Set,
});

assert!(node_state.set_versioned_value(
"foo".to_string(),
VersionedValue {
value: "bar".to_string(),
version: 1,
status: DeletionStatus::Set,
}
));
assert!(node_state.set_versioned_value(
"qux".to_string(),
VersionedValue {
value: "baz".to_string(),
version: 2,
status: DeletionStatus::Set,
}
));
assert!(node_state.set_versioned_value(
"toto".to_string(),
VersionedValue {
value: "titi".to_string(),
version: 3,
status: DeletionStatus::Set,
}
));
node.reset_node_state(
&chitchat_id,
[
Expand Down
15 changes: 9 additions & 6 deletions chitchat/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Drop for ListenerHandle {

type BoxedListener = Box<dyn Fn(&[KeyChangeEventRef]) + 'static + Send + Sync>;

#[derive(Default, Clone)]
#[derive(Default)]
pub(crate) struct Listeners {
inner: Arc<RwLock<InnerListeners>>,
}
Expand Down Expand Up @@ -76,7 +76,10 @@ impl Listeners {

#[cfg(test)]
pub(crate) fn trigger_event(&self, key_change_event: KeyChangeEvent) {
self.inner.read().unwrap().trigger_events(&[key_change_event]);
self.inner
.read()
.unwrap()
.trigger_events(&[key_change_event]);
}

pub(crate) fn trigger_events(&self, key_change_events: &[KeyChangeEvent]) {
Expand Down Expand Up @@ -233,7 +236,7 @@ mod tests {

#[test]
fn test_listeners_simple() {
let mut listeners = Listeners::default();
let listeners = Listeners::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let handle = listeners.subscribe_events("prefix:", move |events| {
Expand All @@ -245,15 +248,15 @@ mod tests {
});
let node_id = chitchat_id(7280u16);
assert_eq!(counter.load(Ordering::Relaxed), 0);
listeners.trigger_event(KeyChangeEvent{
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix".to_string(),
value: "value".to_string(),
node: node_id.clone(),
});
assert_eq!(counter.load(Ordering::Relaxed), 1);
std::mem::drop(handle);
let node_id = chitchat_id(7280u16);
listeners.trigger_event(KeyChangeEvent{
listeners.trigger_event(KeyChangeEvent {
key: "prefix:strippedprefix".to_string(),
value: "value".to_string(),
node: node_id.clone(),
Expand Down Expand Up @@ -317,7 +320,7 @@ mod tests {

#[test]
fn test_listeners_prefixes() {
let mut listeners = Listeners::default();
let listeners = Listeners::default();

let subscribe_event = |prefix: &str| {
let counter: Arc<AtomicUsize> = Default::default();
Expand Down
10 changes: 3 additions & 7 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,11 @@ where
mod tests {
use std::collections::BTreeMap;
use std::future::Future;
use std::time::Duration;

use tokio_stream::{Stream, StreamExt};

use super::*;
use crate::message::ChitchatMessage;
use crate::transport::{ChannelTransport, Transport};
use crate::transport::ChannelTransport;
use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -526,8 +524,7 @@ mod tests {
.open(outsider_config.chitchat_id.gossip_advertise_addr)
.await
.unwrap();
let outsider =
Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds());
let outsider = Chitchat::with_chitchat_id_and_seeds(outsider_config, empty_seeds());

let server_config = ChitchatConfig::for_test(2223);
let server_addr = server_config.chitchat_id.gossip_advertise_addr;
Expand Down Expand Up @@ -573,8 +570,7 @@ mod tests {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let test_config = ChitchatConfig::for_test(1);
let test_addr = test_config.chitchat_id.gossip_advertise_addr;
let mut test_chitchat =
Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds());
let mut test_chitchat = Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds());
let mut test_transport = transport.open(test_addr).await.unwrap();

let server_config = ChitchatConfig::for_test(2);
Expand Down
Loading

0 comments on commit af3b099

Please sign in to comment.