Skip to content

Commit

Permalink
Make protocol keyed
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 17, 2023
1 parent aef0255 commit eb7c04f
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 136 deletions.
8 changes: 0 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ members = [
"pusherator",
"relalg",
"topolotree",
"topolotree_datatypes",
"variadics",
"website_playground",
]
Expand Down
1 change: 0 additions & 1 deletion topolotree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ path = "src/latency_measure.rs"
[dependencies]
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
hydroflow_datalog = { path = "../hydroflow_datalog" }
topolotree_datatypes = { path = "../topolotree_datatypes" }

tokio = { version = "1.16", features = [ "full" ] }
serde = { version = "1", features = ["rc"] }
Expand Down
14 changes: 5 additions & 9 deletions topolotree/src/latency_measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ use std::time::Instant;

use futures::{SinkExt, StreamExt};
use hydroflow::bytes::Bytes;
use hydroflow::serde::{Deserialize, Serialize};
use hydroflow::tokio;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -78,9 +74,9 @@ async fn main() {
let increment = rand::random::<bool>();
let start = Instant::now();
inc_sender
.send(serialize_to_bytes(IncrementRequest {
tweet_id: id,
likes: if increment { 1 } else { -1 },
.send(serialize_to_bytes(OperationPayload {
key: id,
change: if increment { 1 } else { -1 },
}))
.unwrap();

Expand Down
81 changes: 48 additions & 33 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
mod tests;

use std::cell::RefCell;
use std::fmt::Display;
use std::collections::HashMap;
use std::fmt::{Display, Debug};
use std::io;
use std::rc::Rc;

Expand All @@ -13,14 +14,17 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::cli::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
};
use topolotree_datatypes::{OperationPayload, Payload};

mod protocol;
use hydroflow::util::{serialize_to_bytes, deserialize_from_bytes};
use protocol::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct NodeID(pub u32);

impl Display for NodeID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
Display::fmt(&self.0, f)
}
}

Expand All @@ -38,7 +42,7 @@ fn run_topolotree(
// Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it
// but it would require at least one more join and one more cross join just specifically for the local timestamps
// Until we need it to be proper then we can take a shortcut and use rc refcell
let self_timestamp = Rc::new(RefCell::new(0));
let self_timestamp = Rc::new(RefCell::new(HashMap::<u64, isize>::new()));

let self_timestamp1 = Rc::clone(&self_timestamp);
let self_timestamp2 = Rc::clone(&self_timestamp);
Expand All @@ -47,33 +51,36 @@ fn run_topolotree(
hydroflow_syntax! {
from_neighbors = source_stream(input_recv)
-> map(Result::unwrap)
-> map(|(src, payload)| (NodeID(src), serde_json::from_slice(&payload[..]).unwrap()))
-> inspect(|(src, payload): &(NodeID, Payload<i64>)| println!("received from: {src}: payload: {payload:?}"));
-> map(|(src, x)| (NodeID(src), deserialize_from_bytes::<Payload<i64>>(&x).unwrap()))
-> inspect(|(src, payload): &(NodeID, Payload<i64>)| eprintln!("received from: {src}: payload: {payload:?}"));

from_neighbors
-> fold_keyed::<'static>(|| Payload { timestamp: -1, data: Default::default() }, |acc: &mut Payload<i64>, val: Payload<i64>| {
-> map(|(src, payload)| ((payload.key, src), (payload.key, payload.contents)))
-> fold_keyed::<'static>(|| Timestamped { timestamp: -1, data: Default::default() }, |acc: &mut Timestamped<i64>, (key, val): (u64, Timestamped<i64>)| {
if val.timestamp > acc.timestamp {
*acc = val;
*self_timestamp1.borrow_mut() += 1;
*self_timestamp1.borrow_mut().entry(key).or_insert(0) += 1;
}
})
-> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}"))
-> map(|(src, payload)| (Some(src), payload.data))
-> inspect(|(src, data)| eprintln!("data from stream+key: {src:?}: data: {data:?}"))
-> map(|((key, src), payload)| ((key, Some(src)), (payload.data, context.current_tick())))
-> from_neighbors_or_local;

local_value = source_stream(increment_requests)
-> map(Result::unwrap)
-> map(|change_payload: BytesMut| (serde_json::from_slice(&change_payload[..]).unwrap()))
-> inspect(|change_payload: &OperationPayload| println!("change: {change_payload:?}"))
-> inspect(|_| {
*self_timestamp2.borrow_mut() += 1;
-> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
-> inspect(|change_payload: &OperationPayload| eprintln!("change: {change_payload:?}"))
-> inspect(|change| {
*self_timestamp2.borrow_mut().entry(change.key).or_insert(0) += 1;
})
-> map(|change_payload: OperationPayload| change_payload.change)
-> reduce::<'static>(|agg: &mut i64, change: i64| *agg += change);
-> map(|change_payload: OperationPayload| (change_payload.key, (change_payload.change, context.current_tick())))
-> reduce_keyed::<'static>(|agg: &mut (i64, usize), change: (i64, usize)| {
agg.0 += change.0;
agg.1 = std::cmp::max(agg.1, change.1);
});

local_value -> map(|data| (None, data)) -> from_neighbors_or_local;
local_value -> map(|(key, data)| ((key, None), data)) -> from_neighbors_or_local;

from_neighbors_or_local = union();
from_neighbors_or_local = union() -> tee();
from_neighbors_or_local -> [0]all_neighbor_data;

neighbors = source_iter(neighbors)
Expand All @@ -82,25 +89,34 @@ fn run_topolotree(

neighbors -> [1]all_neighbor_data;

// query_result = from_neighbors_or_local
// -> map(|((key, _), data)| (key, data))
// -> fold_keyed(|| 0, |acc: &mut i64, data: i64| {
// merge(acc, data);
// });

all_neighbor_data = cross_join_multiset()
-> filter(|((aggregate_from_this_guy, _), target_neighbor)| {
-> filter(|(((_, aggregate_from_this_guy), _), target_neighbor)| {
aggregate_from_this_guy.iter().all(|source| source != target_neighbor)
})
-> map(|((_, payload), target_neighbor)| {
(target_neighbor, payload)
})
-> fold_keyed(|| 0, |acc: &mut i64, data: i64| {
merge(acc, data);
-> map(|(((key, _), payload), target_neighbor)| {
((key, target_neighbor), payload)
})
-> inspect(|(target_neighbor, data)| println!("data from neighbors: {target_neighbor:?}: data: {data:?}"))
-> map(|(target_neighbor, data)| {
(target_neighbor, Payload {
timestamp: *self_timestamp3.borrow(),
data
})
-> reduce_keyed(|acc: &mut (i64, usize), (data, change_tick): (i64, usize)| {
merge(&mut acc.0, data);
acc.1 = std::cmp::max(acc.1, change_tick);
})
-> filter(|(_, (_, change_tick))| *change_tick == context.current_tick())
-> inspect(|((key, target_neighbor), data)| eprintln!("data from key: {key:?}, neighbors: {target_neighbor:?}: data: {data:?}"))
-> map(|((key, target_neighbor), (data, _))| (target_neighbor, Payload {
key,
contents: Timestamped {
timestamp: self_timestamp3.borrow().get(&key).copied().unwrap_or(0),
data,
}
}))
-> for_each(|(target_neighbor, output): (NodeID, Payload<i64>)| {
let serialized = BytesMut::from(serde_json::to_string(&output).unwrap().as_str()).freeze();
let serialized = serialize_to_bytes(output);
output_send.send((target_neighbor.0, serialized)).unwrap();
});
}
Expand All @@ -110,7 +126,6 @@ fn run_topolotree(
async fn main() {
let args: Vec<String> = std::env::args().skip(1).collect();
let neighbors: Vec<u32> = args.into_iter().map(|x| x.parse().unwrap()).collect();
// let current_id = neighbors[0];

let mut ports = hydroflow::util::cli::init().await;

Expand Down
25 changes: 11 additions & 14 deletions topolotree/src/pn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};
use hydroflow::{hydroflow_syntax, tokio};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

type NextStateType = (u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>);
type NextStateType = (u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);

#[derive(Serialize, Deserialize, Clone, Debug)]
enum GossipOrIncrement {
Gossip(Vec<NextStateType>),
Increment(u64, i32),
Increment(u64, i64),
}

#[hydroflow::main]
Expand Down Expand Up @@ -67,7 +64,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down Expand Up @@ -102,9 +99,9 @@ async fn main() {
let mut cur_value = cur_value.as_ref().borrow_mut();

if delta > 0 {
cur_value.0[my_id] += delta as u32;
cur_value.0[my_id] += delta as u64;
} else {
cur_value.1[my_id] += (-delta) as u32;
cur_value.1[my_id] += (-delta) as u64;
}

modified_tweets.insert(counter_id);
Expand All @@ -123,8 +120,8 @@ async fn main() {
-> next_state;

source_stream(increment_requests)
-> map(|x| deserialize_from_bytes::<IncrementRequest>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes))
-> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.key, t.change))
-> next_state;

all_peers = source_iter(0..num_replicas)
Expand All @@ -143,10 +140,10 @@ async fn main() {
a.into_iter().map(|(k, rc_array)| {
let rc_borrowed = rc_array.as_ref().borrow();
let (pos, neg) = rc_borrowed.deref();
(k, pos.iter().sum::<u32>() as i32 - neg.iter().sum::<u32>() as i32)
(k, pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64)
}).collect::<Vec<_>>()
})
-> map(serialize_to_bytes::<(u64, i32)>)
-> map(serialize_to_bytes::<(u64, i64)>)
-> dest_sink(query_responses);
};

Expand Down
27 changes: 12 additions & 15 deletions topolotree/src/pn_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, Conne
use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes};
use hydroflow::{hydroflow_syntax, tokio};

#[derive(Serialize, Deserialize, Clone, Debug)]
struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
mod protocol;
use protocol::*;

#[derive(Serialize, Deserialize, Clone, Debug)]
enum GossipOrIncrement {
Gossip(Vec<(u64, (usize, u32, u32))>),
Increment(u64, i32),
Gossip(Vec<(u64, (usize, u64, u64))>),
Increment(u64, i64),
}

type NextStateType = (u64, bool, Rc<RefCell<(Vec<u32>, Vec<u32>)>>);
type NextStateType = (u64, bool, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);

#[hydroflow::main]
async fn main() {
Expand Down Expand Up @@ -67,7 +64,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down Expand Up @@ -99,9 +96,9 @@ async fn main() {
let mut cur_value = cur_value.as_ref().borrow_mut();

if delta > 0 {
cur_value.0[my_id] += delta as u32;
cur_value.0[my_id] += delta as u64;
} else {
cur_value.1[my_id] += (-delta) as u32;
cur_value.1[my_id] += (-delta) as u64;
}

*modified_tweets.entry(counter_id).or_insert(false) |= true;
Expand All @@ -120,8 +117,8 @@ async fn main() {
-> next_state;

source_stream(increment_requests)
-> map(|x| deserialize_from_bytes::<IncrementRequest>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes))
-> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
-> map(|t| GossipOrIncrement::Increment(t.key, t.change))
-> next_state;

all_peers = source_iter(0..num_replicas)
Expand All @@ -144,10 +141,10 @@ async fn main() {
a.into_iter().map(|(k, _, rc_array)| {
let rc_borrowed = rc_array.as_ref().borrow();
let (pos, neg) = rc_borrowed.deref();
(k, pos.iter().sum::<u32>() as i32 - neg.iter().sum::<u32>() as i32)
(k, pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64)
}).collect::<Vec<_>>()
})
-> map(serialize_to_bytes::<(u64, i32)>)
-> map(serialize_to_bytes::<(u64, i64)>)
-> dest_sink(query_responses);
};

Expand Down
15 changes: 8 additions & 7 deletions topolotree_datatypes/src/lib.rs → topolotree/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use std::fmt::Debug;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct Payload<T: Debug> {
pub struct Timestamped<T: Debug> {
pub timestamp: isize,
pub data: T,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct Payload<T: Debug> {
pub key: u64,
pub contents: Timestamped<T>
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct OperationPayload {
pub key: u64,
pub change: i64,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IncrementRequest {
tweet_id: u64,
likes: i32,
}
Loading

0 comments on commit eb7c04f

Please sign in to comment.