diff --git a/Cargo.lock b/Cargo.lock index 752f820b1c85..f0eec3176133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3166,14 +3166,6 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite", - "topolotree_datatypes", -] - -[[package]] -name = "topolotree_datatypes" -version = "0.0.0" -dependencies = [ - "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0bef8076d8dc..a5b428dd3281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ members = [ "pusherator", "relalg", "topolotree", - "topolotree_datatypes", "variadics", "website_playground", ] diff --git a/topolotree/Cargo.toml b/topolotree/Cargo.toml index 2822cf951c8b..6fde3c72c24a 100644 --- a/topolotree/Cargo.toml +++ b/topolotree/Cargo.toml @@ -23,7 +23,6 @@ path = "src/latency_measure.rs" [dependencies] hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } hydroflow_datalog = { path = "../hydroflow_datalog" } -topolotree_datatypes = { path = "../topolotree_datatypes" } tokio = { version = "1.16", features = [ "full" ] } serde = { version = "1", features = ["rc"] } diff --git a/topolotree/src/latency_measure.rs b/topolotree/src/latency_measure.rs index 91f2dab04263..deae097b5c78 100644 --- a/topolotree/src/latency_measure.rs +++ b/topolotree/src/latency_measure.rs @@ -5,16 +5,12 @@ use std::time::Instant; use futures::{SinkExt, StreamExt}; use hydroflow::bytes::Bytes; -use hydroflow::serde::{Deserialize, Serialize}; use hydroflow::tokio; use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; #[tokio::main] async fn main() { @@ -78,9 +74,9 @@ async fn main() { let increment = rand::random::(); let start = Instant::now(); inc_sender - .send(serialize_to_bytes(IncrementRequest { - tweet_id: id, - likes: if increment { 1 } else { -1 }, + .send(serialize_to_bytes(OperationPayload { + key: id, + change: if increment { 1 } else { -1 }, })) .unwrap(); diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 022e7ca282a5..f717614e69a4 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -2,7 +2,8 @@ mod tests; use std::cell::RefCell; -use std::fmt::Display; +use std::collections::HashMap; +use std::fmt::{Display, Debug}; use std::io; use std::rc::Rc; @@ -13,14 +14,17 @@ use hydroflow::scheduled::graph::Hydroflow; use hydroflow::util::cli::{ ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, }; -use topolotree_datatypes::{OperationPayload, Payload}; + +mod protocol; +use hydroflow::util::{serialize_to_bytes, deserialize_from_bytes}; +use protocol::*; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] struct NodeID(pub u32); impl Display for NodeID { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) + Display::fmt(&self.0, f) } } @@ -38,7 +42,7 @@ fn run_topolotree( // Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it // but it would require at least one more join and one more cross join just specifically for the local timestamps // Until we need it to be proper then we can take a shortcut and use rc refcell - let self_timestamp = Rc::new(RefCell::new(0)); + let self_timestamp = Rc::new(RefCell::new(HashMap::::new())); let self_timestamp1 = Rc::clone(&self_timestamp); let self_timestamp2 = Rc::clone(&self_timestamp); @@ -47,33 +51,36 @@ fn run_topolotree( hydroflow_syntax! { from_neighbors = source_stream(input_recv) -> map(Result::unwrap) - -> map(|(src, payload)| (NodeID(src), serde_json::from_slice(&payload[..]).unwrap())) - -> inspect(|(src, payload): &(NodeID, Payload)| println!("received from: {src}: payload: {payload:?}")); + -> map(|(src, x)| (NodeID(src), deserialize_from_bytes::>(&x).unwrap())) + -> inspect(|(src, payload): &(NodeID, Payload)| eprintln!("received from: {src}: payload: {payload:?}")); from_neighbors - -> fold_keyed::<'static>(|| Payload { timestamp: -1, data: Default::default() }, |acc: &mut Payload, val: Payload| { + -> map(|(src, payload)| ((payload.key, src), (payload.key, payload.contents))) + -> fold_keyed::<'static>(|| Timestamped { timestamp: -1, data: Default::default() }, |acc: &mut Timestamped, (key, val): (u64, Timestamped)| { if val.timestamp > acc.timestamp { *acc = val; - *self_timestamp1.borrow_mut() += 1; + *self_timestamp1.borrow_mut().entry(key).or_insert(0) += 1; } }) - -> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}")) - -> map(|(src, payload)| (Some(src), payload.data)) + -> inspect(|(src, data)| eprintln!("data from stream+key: {src:?}: data: {data:?}")) + -> map(|((key, src), payload)| ((key, Some(src)), (payload.data, context.current_tick()))) -> from_neighbors_or_local; local_value = source_stream(increment_requests) - -> map(Result::unwrap) - -> map(|change_payload: BytesMut| (serde_json::from_slice(&change_payload[..]).unwrap())) - -> inspect(|change_payload: &OperationPayload| println!("change: {change_payload:?}")) - -> inspect(|_| { - *self_timestamp2.borrow_mut() += 1; + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> inspect(|change_payload: &OperationPayload| eprintln!("change: {change_payload:?}")) + -> inspect(|change| { + *self_timestamp2.borrow_mut().entry(change.key).or_insert(0) += 1; }) - -> map(|change_payload: OperationPayload| change_payload.change) - -> reduce::<'static>(|agg: &mut i64, change: i64| *agg += change); + -> map(|change_payload: OperationPayload| (change_payload.key, (change_payload.change, context.current_tick()))) + -> reduce_keyed::<'static>(|agg: &mut (i64, usize), change: (i64, usize)| { + agg.0 += change.0; + agg.1 = std::cmp::max(agg.1, change.1); + }); - local_value -> map(|data| (None, data)) -> from_neighbors_or_local; + local_value -> map(|(key, data)| ((key, None), data)) -> from_neighbors_or_local; - from_neighbors_or_local = union(); + from_neighbors_or_local = union() -> tee(); from_neighbors_or_local -> [0]all_neighbor_data; neighbors = source_iter(neighbors) @@ -82,25 +89,34 @@ fn run_topolotree( neighbors -> [1]all_neighbor_data; + // query_result = from_neighbors_or_local + // -> map(|((key, _), data)| (key, data)) + // -> fold_keyed(|| 0, |acc: &mut i64, data: i64| { + // merge(acc, data); + // }); + all_neighbor_data = cross_join_multiset() - -> filter(|((aggregate_from_this_guy, _), target_neighbor)| { + -> filter(|(((_, aggregate_from_this_guy), _), target_neighbor)| { aggregate_from_this_guy.iter().all(|source| source != target_neighbor) }) - -> map(|((_, payload), target_neighbor)| { - (target_neighbor, payload) - }) - -> fold_keyed(|| 0, |acc: &mut i64, data: i64| { - merge(acc, data); + -> map(|(((key, _), payload), target_neighbor)| { + ((key, target_neighbor), payload) }) - -> inspect(|(target_neighbor, data)| println!("data from neighbors: {target_neighbor:?}: data: {data:?}")) - -> map(|(target_neighbor, data)| { - (target_neighbor, Payload { - timestamp: *self_timestamp3.borrow(), - data - }) + -> reduce_keyed(|acc: &mut (i64, usize), (data, change_tick): (i64, usize)| { + merge(&mut acc.0, data); + acc.1 = std::cmp::max(acc.1, change_tick); }) + -> filter(|(_, (_, change_tick))| *change_tick == context.current_tick()) + -> inspect(|((key, target_neighbor), data)| eprintln!("data from key: {key:?}, neighbors: {target_neighbor:?}: data: {data:?}")) + -> map(|((key, target_neighbor), (data, _))| (target_neighbor, Payload { + key, + contents: Timestamped { + timestamp: self_timestamp3.borrow().get(&key).copied().unwrap_or(0), + data, + } + })) -> for_each(|(target_neighbor, output): (NodeID, Payload)| { - let serialized = BytesMut::from(serde_json::to_string(&output).unwrap().as_str()).freeze(); + let serialized = serialize_to_bytes(output); output_send.send((target_neighbor.0, serialized)).unwrap(); }); } @@ -110,7 +126,6 @@ fn run_topolotree( async fn main() { let args: Vec = std::env::args().skip(1).collect(); let neighbors: Vec = args.into_iter().map(|x| x.parse().unwrap()).collect(); - // let current_id = neighbors[0]; let mut ports = hydroflow::util::cli::init().await; diff --git a/topolotree/src/pn.rs b/topolotree/src/pn.rs index 0920f420ba35..9d053cfca115 100644 --- a/topolotree/src/pn.rs +++ b/topolotree/src/pn.rs @@ -8,18 +8,15 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; use hydroflow::{hydroflow_syntax, tokio}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; -type NextStateType = (u64, Rc, Vec)>>); +type NextStateType = (u64, Rc, Vec)>>); #[derive(Serialize, Deserialize, Clone, Debug)] enum GossipOrIncrement { Gossip(Vec), - Increment(u64, i32), + Increment(u64, i64), } #[hydroflow::main] @@ -67,7 +64,7 @@ async fn main() { let df = hydroflow_syntax! { next_state = union() - -> fold::<'static>((HashMap::, Vec)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| { + -> fold::<'static>((HashMap::, Vec)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| { if context.current_tick() != *last_tick { modified_tweets.clear(); } @@ -102,9 +99,9 @@ async fn main() { let mut cur_value = cur_value.as_ref().borrow_mut(); if delta > 0 { - cur_value.0[my_id] += delta as u32; + cur_value.0[my_id] += delta as u64; } else { - cur_value.1[my_id] += (-delta) as u32; + cur_value.1[my_id] += (-delta) as u64; } modified_tweets.insert(counter_id); @@ -123,8 +120,8 @@ async fn main() { -> next_state; source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|t| GossipOrIncrement::Increment(t.key, t.change)) -> next_state; all_peers = source_iter(0..num_replicas) @@ -143,10 +140,10 @@ async fn main() { a.into_iter().map(|(k, rc_array)| { let rc_borrowed = rc_array.as_ref().borrow(); let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) + (k, pos.iter().sum::() as i64 - neg.iter().sum::() as i64) }).collect::>() }) - -> map(serialize_to_bytes::<(u64, i32)>) + -> map(serialize_to_bytes::<(u64, i64)>) -> dest_sink(query_responses); }; diff --git a/topolotree/src/pn_delta.rs b/topolotree/src/pn_delta.rs index 5ee7df79b9e4..1bc30e916668 100644 --- a/topolotree/src/pn_delta.rs +++ b/topolotree/src/pn_delta.rs @@ -8,19 +8,16 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; use hydroflow::{hydroflow_syntax, tokio}; -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} +mod protocol; +use protocol::*; #[derive(Serialize, Deserialize, Clone, Debug)] enum GossipOrIncrement { - Gossip(Vec<(u64, (usize, u32, u32))>), - Increment(u64, i32), + Gossip(Vec<(u64, (usize, u64, u64))>), + Increment(u64, i64), } -type NextStateType = (u64, bool, Rc, Vec)>>); +type NextStateType = (u64, bool, Rc, Vec)>>); #[hydroflow::main] async fn main() { @@ -67,7 +64,7 @@ async fn main() { let df = hydroflow_syntax! { next_state = union() - -> fold::<'static>((HashMap::, Vec)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| { + -> fold::<'static>((HashMap::, Vec)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| { if context.current_tick() != *last_tick { modified_tweets.clear(); } @@ -99,9 +96,9 @@ async fn main() { let mut cur_value = cur_value.as_ref().borrow_mut(); if delta > 0 { - cur_value.0[my_id] += delta as u32; + cur_value.0[my_id] += delta as u64; } else { - cur_value.1[my_id] += (-delta) as u32; + cur_value.1[my_id] += (-delta) as u64; } *modified_tweets.entry(counter_id).or_insert(false) |= true; @@ -120,8 +117,8 @@ async fn main() { -> next_state; source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) + -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> map(|t| GossipOrIncrement::Increment(t.key, t.change)) -> next_state; all_peers = source_iter(0..num_replicas) @@ -144,10 +141,10 @@ async fn main() { a.into_iter().map(|(k, _, rc_array)| { let rc_borrowed = rc_array.as_ref().borrow(); let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) + (k, pos.iter().sum::() as i64 - neg.iter().sum::() as i64) }).collect::>() }) - -> map(serialize_to_bytes::<(u64, i32)>) + -> map(serialize_to_bytes::<(u64, i64)>) -> dest_sink(query_responses); }; diff --git a/topolotree_datatypes/src/lib.rs b/topolotree/src/protocol.rs similarity index 65% rename from topolotree_datatypes/src/lib.rs rename to topolotree/src/protocol.rs index 5031ae0c62d2..3a7162660cd6 100644 --- a/topolotree_datatypes/src/lib.rs +++ b/topolotree/src/protocol.rs @@ -3,18 +3,19 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] -pub struct Payload { +pub struct Timestamped { pub timestamp: isize, pub data: T, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] +pub struct Payload { + pub key: u64, + pub contents: Timestamped +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct OperationPayload { + pub key: u64, pub change: i64, } - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct IncrementRequest { - tweet_id: u64, - likes: i32, -} diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs index 58804524a0ad..5b34a394bbd8 100644 --- a/topolotree/src/tests.rs +++ b/topolotree/src/tests.rs @@ -1,23 +1,22 @@ -use std::fmt::Debug; use std::io; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow::util::multiset::HashMultiSet; -use hydroflow::util::{collect_ready_async, unbounded_channel}; -use serde::Serialize; +use hydroflow::util::{collect_ready_async, unbounded_channel, deserialize_from_bytes, serialize_to_bytes}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; +use crate::protocol::Timestamped; use crate::{run_topolotree, OperationPayload, Payload}; -pub fn simulate_input( +pub fn simulate_input( input_send: &mut UnboundedSender>, - (id, payload): (u32, Payload), + (id, payload): (u32, Payload), ) -> Result<(), SendError>> { input_send.send(Ok(( id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + BytesMut::from(&serialize_to_bytes(&payload)[..]), ))) } @@ -25,9 +24,7 @@ pub fn simulate_operation( input_send: &mut UnboundedSender>, payload: OperationPayload, ) -> Result<(), SendError>> { - input_send.send(Ok(BytesMut::from( - serde_json::to_string(&payload).unwrap().as_str(), - ))) + input_send.send(Ok(BytesMut::from(&serialize_to_bytes(&payload)[..]))) } pub async fn read_all( @@ -39,7 +36,7 @@ pub async fn read_all( .map(|(id, bytes)| { ( *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), + deserialize_from_bytes::>(&bytes[..]).unwrap(), ) }) .collect::>() @@ -55,7 +52,7 @@ async fn simple_payload_test() { let (query_send, mut query_recv) = unbounded_channel::(); #[rustfmt::skip] - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })).unwrap(); let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send); @@ -63,8 +60,8 @@ async fn simple_payload_test() { #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 2 }), - (3, Payload { timestamp: 1, data: 2 }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }), ])); } @@ -79,8 +76,8 @@ async fn idempotence_test() { #[rustfmt::skip] { - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } })).unwrap(); }; let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send); @@ -89,8 +86,8 @@ async fn idempotence_test() { #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 2 }), - (3, Payload { timestamp: 1, data: 2 }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }), ])); } @@ -105,8 +102,8 @@ async fn backwards_in_time_test() { #[rustfmt::skip] { - simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } })).unwrap(); }; let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send); @@ -115,8 +112,8 @@ async fn backwards_in_time_test() { #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 7 }), - (3, Payload { timestamp: 1, data: 7 }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 7 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 7 } }), ])); } @@ -131,8 +128,8 @@ async fn multiple_input_sources_test() { #[rustfmt::skip] { - simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); - simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } })).unwrap(); + simulate_input(&mut input_send, (2, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 2 } })).unwrap(); }; let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send); @@ -141,9 +138,9 @@ async fn multiple_input_sources_test() { #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, Payload { timestamp: 2, data: 2 }), - (2, Payload { timestamp: 2, data: 7 }), - (3, Payload { timestamp: 2, data: 9 }), + (1, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 2 } }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 7 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 9 } }), ])); } @@ -160,32 +157,91 @@ async fn operations_across_ticks() { #[rustfmt::skip] { - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); - simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); - simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); + simulate_input(&mut input_send, (1, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } })).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 7 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, Payload { key: 123, contents: Timestamped { timestamp: 3, data: 12 } }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 3, data: 14 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 3, data: 14 } }), + ])); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 1 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 13 } }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 15 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 4, data: 15 } }), + ])); +} + +#[hydroflow::test] +async fn operations_multiple_keys() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 456, change: 7 }).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 1, data: 5 } }), + + (1, Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } }), + (2, Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } }), + (3, Payload { key: 456, contents: Timestamped { timestamp: 1, data: 7 } }), + ])); + + #[rustfmt::skip] + { + simulate_operation(&mut operations_tx, OperationPayload { key: 123, change: 1 }).unwrap(); }; flow.run_tick(); #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, Payload { timestamp: 3, data: 12 }), - (2, Payload { timestamp: 3, data: 14 }), - (3, Payload { timestamp: 3, data: 14 }), + (1, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }), + (2, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }), + (3, Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }) ])); #[rustfmt::skip] { - simulate_operation(&mut operations_tx, OperationPayload { change: 1 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { key: 456, change: 2 }).unwrap(); }; flow.run_tick(); #[rustfmt::skip] assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, Payload { timestamp: 4, data: 13 }), - (2, Payload { timestamp: 4, data: 15 }), - (3, Payload { timestamp: 4, data: 15 }), + (1, Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } }), + (2, Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } }), + (3, Payload { key: 456, contents: Timestamped { timestamp: 2, data: 9 } }) ])); } diff --git a/topolotree/topolotree_latency.hydro.py b/topolotree/topolotree_latency.hydro.py index 9996f26754bc..5ccc0a483493 100644 --- a/topolotree/topolotree_latency.hydro.py +++ b/topolotree/topolotree_latency.hydro.py @@ -85,19 +85,27 @@ def create_machine(): neighbors = get_neighbors_in_binary_tree(list(range(num_replicas))) cluster = [ deployment.HydroflowCrate( + src=str( + Path(__file__).parent.absolute() + ), + profile=profile, + bin="topolotree", + args=[str(neighbor) for neighbor in neighbors[i]], + on=create_machine(), + ) if is_tree else deployment.HydroflowCrate( src=str( Path(__file__).parent.absolute() ), profile=profile, bin="pn" if tree_arg == "pn" else "pn_delta", - args=[json.dumps(neighbors[i])] if is_tree else [json.dumps([i]), json.dumps([num_replicas])], + args=[json.dumps([i]), json.dumps([num_replicas])], on=create_machine(), ) for i in range(num_replicas) ] for i in range(num_replicas): - cluster[i].ports.to_peer.send_to( + cluster[i].ports.to_peer.tagged(i).send_to( hydro.demux( { j: cluster[j].ports.from_peer.merge() @@ -110,8 +118,8 @@ def create_machine(): if is_tree: leaves = get_leaves_in_binary_tree(list(range(num_replicas))) - source = cluster[leaves[0]].node - dest = cluster[leaves[-1]].node + source = cluster[leaves[0]] + dest = cluster[leaves[-1]] else: source = cluster[0] dest = cluster[-1] diff --git a/topolotree_datatypes/Cargo.toml b/topolotree_datatypes/Cargo.toml deleted file mode 100644 index 9382b5b3a025..000000000000 --- a/topolotree_datatypes/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "topolotree_datatypes" -publish = false -version = "0.0.0" -edition = "2021" - -[dependencies] -serde = { version = "1", features = ["rc"] }