Skip to content

Commit

Permalink
Further simplify protocol by unifying gossip and local updates
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 17, 2023
1 parent 810356e commit aef0255
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 48 deletions.
42 changes: 26 additions & 16 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn run_topolotree(
input_recv: impl Stream<Item = Result<(u32, BytesMut), io::Error>> + Unpin + 'static,
increment_requests: impl Stream<Item = Result<BytesMut, io::Error>> + Unpin + 'static,
output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>,
query_send: tokio::sync::mpsc::UnboundedSender<Bytes>,
) -> Hydroflow {
fn merge(x: &mut i64, y: i64) {
*x += y;
Expand Down Expand Up @@ -57,7 +58,8 @@ fn run_topolotree(
}
})
-> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}"))
-> [0]all_neighbor_data;
-> map(|(src, payload)| (Some(src), payload.data))
-> from_neighbors_or_local;

local_value = source_stream(increment_requests)
-> map(Result::unwrap)
Expand All @@ -66,7 +68,13 @@ fn run_topolotree(
-> inspect(|_| {
*self_timestamp2.borrow_mut() += 1;
})
-> fold::<'static>(0, |agg: &mut i64, op: OperationPayload| *agg += op.change);
-> map(|change_payload: OperationPayload| change_payload.change)
-> reduce::<'static>(|agg: &mut i64, change: i64| *agg += change);

local_value -> map(|data| (None, data)) -> from_neighbors_or_local;

from_neighbors_or_local = union();
from_neighbors_or_local -> [0]all_neighbor_data;

neighbors = source_iter(neighbors)
-> map(NodeID)
Expand All @@ -76,25 +84,19 @@ fn run_topolotree(

all_neighbor_data = cross_join_multiset()
-> filter(|((aggregate_from_this_guy, _), target_neighbor)| {
aggregate_from_this_guy != target_neighbor
aggregate_from_this_guy.iter().all(|source| source != target_neighbor)
})
-> map(|((_, payload), target_neighbor)| {
(target_neighbor, payload)
})
-> fold_keyed(|| 0, |acc: &mut i64, payload: Payload<i64>| {
merge(acc, payload.data);
-> fold_keyed(|| 0, |acc: &mut i64, data: i64| {
merge(acc, data);
})
-> inspect(|(target_neighbor, data)| println!("data from neighbors: {target_neighbor:?}: data: {data:?}"))
-> [0]add_local_value;

local_value
-> [1]add_local_value;

add_local_value = cross_join_multiset()
-> map(|((target_neighbor, data), local_value)| {
-> map(|(target_neighbor, data)| {
(target_neighbor, Payload {
timestamp: *self_timestamp3.borrow(),
data: data + local_value
data
})
})
-> for_each(|(target_neighbor, output): (NodeID, Payload<i64>)| {
Expand All @@ -113,14 +115,14 @@ async fn main() {
let mut ports = hydroflow::util::cli::init().await;

let input_recv = ports
.port("input")
.port("from_peer")
// connect to the port with a single recipient
.connect::<ConnectedTagged<ConnectedDirect>>()
.await
.into_source();

let mut output_send = ports
.port("output")
.port("to_peer")
.connect::<ConnectedDemux<ConnectedDirect>>()
.await
.into_sink();
Expand All @@ -132,7 +134,7 @@ async fn main() {
.await
.into_source();

let query_responses = ports
let mut query_responses = ports
.port("query_responses")
.connect::<ConnectedDirect>()
.await
Expand All @@ -146,11 +148,19 @@ async fn main() {
}
});

let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::task::spawn_local(async move {
while let Some(msg) = query_rx.recv().await {
query_responses.send(msg).await.unwrap();
}
});

hydroflow::util::cli::launch_flow(run_topolotree(
neighbors,
input_recv,
operations_send,
chan_tx,
query_tx
))
.await;
}
44 changes: 12 additions & 32 deletions topolotree/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ async fn simple_payload_test() {
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();
let (query_send, mut query_recv) = unbounded_channel::<Bytes>();

#[rustfmt::skip]
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send);

flow.run_tick();

Expand All @@ -74,14 +75,15 @@ async fn idempotence_test() {

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();
let (query_send, mut query_recv) = unbounded_channel::<Bytes>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send);

flow.run_tick();

Expand All @@ -99,14 +101,15 @@ async fn backwards_in_time_test() {
let (_operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();
let (query_send, mut query_recv) = unbounded_channel::<Bytes>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send);

flow.run_tick();

Expand All @@ -124,14 +127,15 @@ async fn multiple_input_sources_test() {

let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();
let (query_send, mut query_recv) = unbounded_channel::<Bytes>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap();
simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap();
};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send);

flow.run_tick();

Expand All @@ -143,42 +147,16 @@ async fn multiple_input_sources_test() {
]));
}

#[hydroflow::test]
async fn simple_operation_test() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (mut operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();

#[rustfmt::skip]
{
simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap();
simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap();

};

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);

flow.run_tick();

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]));
}

#[hydroflow::test]
async fn operations_across_ticks() {
let neighbors: Vec<u32> = vec![1, 2, 3];

let (mut operations_tx, operations_rx) = unbounded_channel::<Result<BytesMut, io::Error>>();
let (mut input_send, input_recv) = unbounded_channel::<Result<(u32, BytesMut), io::Error>>();
let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>();
let (query_send, mut query_recv) = unbounded_channel::<Bytes>();

let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send);
let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send, query_send);

#[rustfmt::skip]
{
Expand All @@ -191,6 +169,7 @@ async fn operations_across_ticks() {

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(1, Payload { timestamp: 3, data: 12 }),
(2, Payload { timestamp: 3, data: 14 }),
(3, Payload { timestamp: 3, data: 14 }),
]));
Expand All @@ -204,6 +183,7 @@ async fn operations_across_ticks() {

#[rustfmt::skip]
assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([
(1, Payload { timestamp: 4, data: 13 }),
(2, Payload { timestamp: 4, data: 15 }),
(3, Payload { timestamp: 4, data: 15 }),
]));
Expand Down

0 comments on commit aef0255

Please sign in to comment.