diff --git a/hydroflow/examples/paxos/README.md b/hydroflow/examples/paxos/README.md new file mode 100644 index 000000000000..e6dce42c37e3 --- /dev/null +++ b/hydroflow/examples/paxos/README.md @@ -0,0 +1,23 @@ +## Paoxs +This is a Paxos implementation with stable leaders. + +### To Run the code: +Look in the file `members.json` to find the addresses of the proposers and acceptors. +For each proposer, launch a process on the node with a matching IP address as follows. +Here we assume the proposer's IP address is `localhost` and ports `12300`, `12301`, and `12302` are free: +``` +cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role proposer --addr localhost:12300 +cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role proposer --addr localhost:12310 +``` + +Now for each acceptor, launch a process on the node with the matching IP address as follows. +Here we assume the acceptor's IP address is `127.0.0.1` and ports `12321` and `12322` are free: +``` +cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12320 +cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12330 +cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12340 +``` + +Now, in the proposer process you can type a string payload at `stdin`. + +Adding the `--graph ` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html). \ No newline at end of file diff --git a/hydroflow/examples/paxos/acceptor.rs b/hydroflow/examples/paxos/acceptor.rs new file mode 100644 index 000000000000..2b02f1518911 --- /dev/null +++ b/hydroflow/examples/paxos/acceptor.rs @@ -0,0 +1,115 @@ +use std::collections::HashSet; +use std::net::SocketAddr; + +use futures::join; +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::bind_udp_bytes; + +use crate::helpers::{get_phase_1_addr, get_phase_2_addr}; +use crate::protocol::*; +use crate::GraphType; + +pub(crate) async fn run_acceptor(addr: SocketAddr, graph: Option) { + let phase_1_addr = get_phase_1_addr(addr); + let phase_2_addr = get_phase_2_addr(addr); + + let p1a_future = bind_udp_bytes(phase_1_addr); + let p2a_future = bind_udp_bytes(phase_2_addr); + let ((p1b_sink, p1a_src, _), (p2b_sink, p2a_src, _)) = join!(p1a_future, p2a_future); + + let mut df: Hydroflow = hydroflow_syntax! { + // define inputs/outputs + p1a = source_stream_serde(p1a_src) + -> map(Result::unwrap) + -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) + -> tee(); + p1b = cross_join::<'tick>() + -> map(|(((p1a, proposer), log), max_ballot): (((P1a, SocketAddr), HashSet), Ballot)| + (P1b { + ballot: p1a.ballot, + max_ballot, + log, + }, proposer)) + -> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a)) + -> dest_sink_serde(p1b_sink); + p2a = source_stream_serde(p2a_src) + -> map(Result::unwrap) + -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) + -> tee(); + p2b = cross_join::<'tick>() + -> map(|((p2a, proposer), max_ballot):((P2a, SocketAddr), Ballot)| + (P2b { + entry: p2a.entry, + max_ballot, + }, proposer)) + -> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a)) + -> dest_sink_serde(p2b_sink); + + // find max_ballot + ballots = persist(); + max_ballot = ballots -> reduce(|mut curr_max:Ballot, ballot:Ballot| { + if ballot > curr_max { + curr_max = ballot; + } + curr_max + }) -> tee(); + + p1a[0] -> map(|(m, _): (P1a, SocketAddr)| m.ballot) -> ballots; + + // find max entry for each slot in the log + log = persist() + -> map(|e: Entry| (e.slot, e)) + -> reduce_keyed::<'tick>(|curr_max:&mut Entry, e:Entry| { + if e.ballot > curr_max.ballot { + *curr_max = e; + } + }) + -> fold::<'tick>(HashSet::new(), |mut log:HashSet, (_slot, e): (u16, Entry)| { + log.insert(e); + log + }); + + // reply with p1b + p1a_and_log = cross_join::<'tick>() -> [0]p1b; + p1a[1] -> [0]p1a_and_log; + log -> [1]p1a_and_log; + max_ballot[0] -> [1]p1b; + + // write p2a messages to log + p2a_and_max_ballot = cross_join::<'tick>(); + p2a[0] -> [0]p2a_and_max_ballot; + max_ballot[1] -> [1]p2a_and_max_ballot; + p2a_and_max_ballot -> filter_map(|((m, _proposer), max_ballot)| { + if m.entry.ballot >= max_ballot { + Some(m.entry) + } else { + None + } + }) -> log; + + // reply with p2b + p2a[1] -> [0]p2b; + max_ballot[2] -> [1]p2b; + }; + + if let Some(graph) = graph { + let serde_graph = df + .meta_graph() + .expect("No graph found, maybe failed to parse."); + match graph { + GraphType::Mermaid => { + println!("{}", serde_graph.to_mermaid()); + } + GraphType::Dot => { + println!("{}", serde_graph.to_dot()) + } + GraphType::Json => { + unimplemented!(); + // println!("{}", serde_graph.to_json()) + } + } + } + + df.run_async().await.unwrap(); +} diff --git a/hydroflow/examples/paxos/helpers.rs b/hydroflow/examples/paxos/helpers.rs new file mode 100644 index 000000000000..831cf3a703e2 --- /dev/null +++ b/hydroflow/examples/paxos/helpers.rs @@ -0,0 +1,29 @@ +use std::fs::File; +use std::io::BufReader; +use std::net::SocketAddr; +use std::path::Path; + +use tokio_stream::wrappers::IntervalStream; + +use crate::Config; + +pub fn get_phase_1_addr(addr: SocketAddr) -> SocketAddr { + SocketAddr::new(addr.ip(), addr.port() + 1) +} +pub fn get_phase_2_addr(addr: SocketAddr) -> SocketAddr { + SocketAddr::new(addr.ip(), addr.port() + 2) +} + +pub fn periodic(timeout: u16) -> IntervalStream { + let start = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout.into()); + IntervalStream::new(tokio::time::interval_at( + start, + tokio::time::Duration::from_millis(timeout.into()), + )) +} + +pub fn get_config(path: impl AsRef) -> Config { + let file = File::open(path).unwrap(); + let reader = BufReader::new(file); + serde_json::from_reader(reader).unwrap() +} diff --git a/hydroflow/examples/paxos/main.rs b/hydroflow/examples/paxos/main.rs new file mode 100644 index 000000000000..e2bb2599fef3 --- /dev/null +++ b/hydroflow/examples/paxos/main.rs @@ -0,0 +1,65 @@ +use std::net::SocketAddr; +use std::path::Path; + +use acceptor::run_acceptor; +use clap::{Parser, ValueEnum}; +use hydroflow::tokio; +use hydroflow::util::ipv4_resolve; +use proposer::run_proposer; +use serde::Deserialize; + +mod acceptor; +mod helpers; +mod proposer; +mod protocol; + +#[derive(Clone, ValueEnum, Debug)] +enum Role { + Proposer, + Acceptor, +} + +#[derive(Clone, ValueEnum, Debug)] +enum GraphType { + Mermaid, + Dot, + Json, +} + +#[derive(Parser, Debug)] +struct Opts { + #[clap(long)] + path: String, + #[clap(value_enum, long)] + role: Role, + #[clap(long, value_parser = ipv4_resolve)] + addr: SocketAddr, + #[clap(value_enum, long)] + graph: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Config { + proposers: Vec, + acceptors: Vec, + f: u16, + i_am_leader_resend_timeout: u16, + i_am_leader_check_timeout_node_0: u16, + i_am_leader_check_timeout_other_nodes: u16, +} + +#[hydroflow::main] +async fn main() { + let opts = Opts::parse(); + let path = Path::new(&opts.path); + let addr = opts.addr; + + match opts.role { + Role::Proposer => { + run_proposer(addr, path, opts.graph.clone()).await; + } + Role::Acceptor => { + run_acceptor(addr, opts.graph.clone()).await; + } + } +} diff --git a/hydroflow/examples/paxos/members.json b/hydroflow/examples/paxos/members.json new file mode 100644 index 000000000000..4142e9ef0491 --- /dev/null +++ b/hydroflow/examples/paxos/members.json @@ -0,0 +1,15 @@ +{ + "proposers": [ + "127.0.0.1:12300", + "127.0.0.1:12310" + ], + "acceptors": [ + "127.0.0.1:12320", + "127.0.0.1:12330", + "127.0.0.1:12340" + ], + "f": 1, + "i_am_leader_resend_timeout": 2000, + "i_am_leader_check_timeout_node_0": 5000, + "i_am_leader_check_timeout_other_nodes": 60000 +} \ No newline at end of file diff --git a/hydroflow/examples/paxos/proposer.rs b/hydroflow/examples/paxos/proposer.rs new file mode 100644 index 000000000000..50969b1d9aaf --- /dev/null +++ b/hydroflow/examples/paxos/proposer.rs @@ -0,0 +1,348 @@ +use std::cmp; +use std::net::SocketAddr; +use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; + +use futures::join; +use hydroflow::compiled::pull::HalfMultisetJoinState; +use hydroflow::hydroflow_syntax; +use hydroflow::lattices::{DomPair, Max}; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::bind_udp_bytes; +use hydroflow::util::Persistence::*; + +use crate::helpers::{get_config, get_phase_1_addr, get_phase_2_addr, periodic}; +use crate::protocol::*; +use crate::GraphType; + +pub(crate) async fn run_proposer( + addr: SocketAddr, + path: impl AsRef, + graph: Option, +) { + // init default values + let start_ballot_num: u16 = 0; + let start_slot: u16 = 0; + let noop = ""; + + // create channels + let stable_leader_addr = addr; + let phase_1_addr = get_phase_1_addr(addr); + let phase_2_addr = get_phase_2_addr(addr); + let leader_future = bind_udp_bytes(stable_leader_addr); + let p1a_future = bind_udp_bytes(phase_1_addr); + let p2a_future = bind_udp_bytes(phase_2_addr); + let ( + (stable_leader_sink, stable_leader_src, _), + (p1a_sink, p1b_src, _), + (p2a_sink, p2b_src, _), + ) = join!(leader_future, p1a_future, p2a_future); + + let id = addr.port(); // TODO assumes that each proposer has a different port. True locally, but may not be true in distributed setting + let config = get_config(path); + let f = config.f; + + // create timeout triggers + let is_node_0 = addr == config.proposers[0].parse::().unwrap(); + println!("is_node_0: {:?}", is_node_0); + let i_am_leader_resend_trigger = periodic(config.i_am_leader_resend_timeout); + let i_am_leader_check_timeout = if is_node_0 { + config.i_am_leader_check_timeout_node_0 + } else { + config.i_am_leader_check_timeout_other_nodes + }; + let i_am_leader_check_trigger = periodic(i_am_leader_check_timeout); + + let mut df: Hydroflow = hydroflow_syntax! { + // define inputs/outputs + // stable leader election messages + proposers = source_iter(config.proposers) + // filter out self + -> filter_map(|s| { + let proposer = s.parse::().unwrap(); + if proposer != addr { + Some(proposer) + } else { + None + } + }) + -> inspect(|p| println!("Proposer: {:?}", p)) + -> persist(); + leader_recv = source_stream_serde(stable_leader_src) + -> map(Result::unwrap) + -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) + -> map(|(m, _)| m) + -> tee(); + leader_send = cross_join::<'tick>() + -> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a)) + -> dest_sink_serde(stable_leader_sink); + proposers -> [1]leader_send; + // messages to/from acceptors + acceptors = source_iter(config.acceptors) + -> map(|s| s.parse::().unwrap()) + -> inspect(|a| println!("Acceptor: {:?}", a)) + -> persist() + -> tee(); + p1a = cross_join::<'tick>() + -> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a)) + -> dest_sink_serde(p1a_sink); + acceptors[0] -> map(get_phase_1_addr) -> [1]p1a; + p1b = source_stream_serde(p1b_src) + -> map(Result::unwrap) + -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) + -> map(|(m, _)| m) + -> persist() + -> tee(); + p2a_out = cross_join::<'tick>() + -> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a)) + -> dest_sink_serde(p2a_sink); + p2a = union() -> [0]p2a_out; + acceptors[1] -> map(get_phase_2_addr) -> [1]p2a_out; + p2b_in = source_stream_serde(p2b_src) + -> map(Result::unwrap) + -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) + -> map(|(m, _)| Persist(m)) + -> [0]p2b; + p2b = union() + -> persist_mut() + -> tee(); + + client_in = source_stdin() + -> filter_map(|m: Result| m.ok()) + -> inspect(|m| println!("Client input: {:?}", m)); + + // compute ballot (default ballot num = 0) + ballot_num = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.into_reveal()); + source_iter(vec![start_ballot_num]) -> [0]ballot_num; + ballot = ballot_num -> map(|num| Ballot {id, num}) -> tee(); + + + /////////////////////////////////////////////////////////////////////// stable leader election + // check if we have the largest ballot + leader_recv[0] -> map(|m: IAmLeader| m.ballot) -> received_ballots[0]; + p1b[0] -> map(|m: P1b| m.max_ballot) -> received_ballots[1]; + p2b[0] -> map(|m: P2b| m.max_ballot) -> received_ballots[2]; + received_ballots = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.into_reveal()) + -> tee(); + received_ballots[0] -> [0]has_largest_ballot; + ballot[0] -> [1]has_largest_ballot; + has_largest_ballot = cross_join::<'tick>() -> filter_map(|(max_received_ballot, ballot): (Ballot, Ballot)| { + if ballot >= max_received_ballot { + Some(()) + } + else { + None + } + }) -> tee(); + + // send heartbeat if we're the leader + source_stream(i_am_leader_resend_trigger) -> [0]leader_and_resend_timeout; + is_leader[0] -> [1]leader_and_resend_timeout; + leader_and_resend_timeout = cross_join::<'tick>() -> map(|_| ()); + leader_and_resend_timeout -> [0]leader_and_resend_timeout_ballot; + ballot[1] -> [1]leader_and_resend_timeout_ballot; + leader_and_resend_timeout_ballot = cross_join::<'tick>() -> map(|(_, b): ((), Ballot)| IAmLeader {ballot: b}) -> [0]leader_send; + + // track latest heartbeat + source_iter(vec![0]) -> [0]latest_heartbeat; + latest_heartbeat = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.into_reveal()); + leader_recv[1] -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) -> [1]latest_heartbeat; + + // if there was no previous heartbeat when the i_am_leader_check triggers again, send p1a + i_am_leader_check = source_stream(i_am_leader_check_trigger) + -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) + -> inspect(|_| println!("I am leader check triggered")) + -> [0]p1a_trigger_and_no_heartbeat; + latest_heartbeat -> [1]p1a_trigger_and_no_heartbeat; + p1a_trigger_and_no_heartbeat = cross_join::<'tick>() + -> filter_map(|(check_time, heartbeat): (u64, u64)| { + if heartbeat + u64::from(i_am_leader_check_timeout) < check_time { + Some(()) + } + else { + None + } + }) + -> inspect(|_| println!("p1a trigger and no heartbeat")); + p1a_trigger_and_no_heartbeat -> [pos]leader_expired; + is_leader[1] -> [neg]leader_expired; + leader_expired = difference::<'tick, 'tick>() -> inspect(|_| println!("leader expired")); + leader_expired -> [0]leader_expired_ballot; + ballot[2] -> [1]leader_expired_ballot; + leader_expired_ballot = cross_join::<'tick>() -> inspect(|_| println!("leader expired ballot")) -> map(|(_, b):((), Ballot)| P1a {ballot: b}) -> [0]p1a; + + // increment ballot num if we don't have the largest ballot + received_ballots[1] -> map(|b: Ballot| ((), b)) -> [pos]next_ballot; + has_largest_ballot[0] -> [neg]next_ballot; + next_ballot = anti_join() -> map(|(_, b): ((), Ballot)| b.num + 1) -> next_tick() -> [1]ballot_num; + + + /////////////////////////////////////////////////////////////////////// reconcile p1b log with local log + // check if we've received a quorum of p1bs + p1b[1] -> map(|m: P1b| (m.ballot, m)) -> [0]relevant_p1bs; + ballot[3] -> map(|b: Ballot| (b, ())) -> [1]relevant_p1bs; + relevant_p1bs = join::<'tick, HalfMultisetJoinState>() -> map(|(_b, (m, _)): (Ballot, (P1b, ()))| m) -> tee(); + num_p1bs = relevant_p1bs[0] -> fold::<'tick>(0, |sum: u16, _m: P1b| sum + 1); + p1b_quorum_reached = num_p1bs -> filter_map(|num_p1bs: u16| if num_p1bs > config.f {Some(())} else {None}); + + p1b_quorum_reached -> [0]is_leader; + has_largest_ballot[1] -> [1]is_leader; + is_leader = cross_join::<'tick>() -> map(|_| ()) -> tee(); + + // re-propose what was not committed + p1b_log = relevant_p1bs[1] -> flat_map(|m: P1b| m.log) -> tee(); + p1b_uncommitted = p1b_log[0] -> map(|e: Entry| (e, 1)) + -> reduce_keyed::<'tick>(|sum: &mut u16, _| *sum += 1) + -> filter_map(|(e, num): (Entry, u16)| { + if num <= f + 1 { + Some((e.slot, e)) + } + else { + None + } + }); + p1b_largest_uncommitted = p1b_uncommitted -> reduce_keyed::<'tick>(|largest_e: &mut Entry, e: Entry| { + if e.ballot > largest_e.ballot { + *largest_e = e; + } + }); + is_leader[2] -> map(|()| true) -> [pos]leader_and_reproposing; + next_slot[0] -> fold::<'tick>(false, |_, _| true) -> [neg]leader_and_reproposing; + leader_and_reproposing = difference::<'tick, 'tick>() -> map(|_b| ()) -> tee(); // type: () + leader_and_reproposing[0] -> [0]leader_and_reproposing_and_ballot; + ballot[4] -> [1]leader_and_reproposing_and_ballot; + leader_and_reproposing_and_ballot = cross_join::<'tick>() // type: ((), ballot) + -> map(|((), b): ((), Ballot)| b) + -> tee(); + leader_and_reproposing_and_ballot[0] -> [0]leader_and_reproposing_and_ballot_and_largest_uncommitted; + p1b_largest_uncommitted -> map(|(_slot, e): (u16, Entry)| e) -> [1]leader_and_reproposing_and_ballot_and_largest_uncommitted; + leader_and_reproposing_and_ballot_and_largest_uncommitted = cross_join::<'tick>() // type: (ballot, entry) + -> map(|(b, e): (Ballot, Entry)| P2a { + entry: Entry { + payload: e.payload, + slot: e.slot, + ballot: b, + }, + }) -> [0]p2a; + + // hole-filling + p1b_log[2] -> map(|m: Entry| m.slot) -> [0]proposed_slots; + source_iter(vec![start_slot]) -> persist() -> [1]proposed_slots; + proposed_slots = union() -> tee(); + max_proposed_slot = proposed_slots[0] -> reduce::<'tick>(cmp::max) -> tee(); + prev_slots = max_proposed_slot[0] -> flat_map(|max: u16| 0..max); + prev_slots -> [pos]holes; + proposed_slots[1] -> [neg]holes; + leader_and_reproposing_and_ballot[1] -> [0]leader_and_reproposing_and_ballot_and_holes; + holes = difference() -> [1]leader_and_reproposing_and_ballot_and_holes; + leader_and_reproposing_and_ballot_and_holes = cross_join::<'tick>() + -> map(|(b, hole): (Ballot, u16)| P2a { + entry: Entry { + payload: noop.to_string(), + slot: hole, + ballot: b, + }, + }) -> [1]p2a; + + leader_and_reproposing[1] -> [0]leader_and_reproposing_and_max_slot; + max_proposed_slot[1] -> [1]leader_and_reproposing_and_max_slot; + leader_and_reproposing_and_max_slot = cross_join::<'tick>() + -> map(|((), s): ((), u16)| s + 1) + -> next_tick() + -> [0]new_slots; + ballot[5] -> [0]ballot_and_next_slot; + new_slots = union() -> [1]ballot_and_next_slot; + // store the next slot for each ballot. Can discard slots for older ballots, so use DomPair lattice + ballot_and_next_slot = cross_join::<'tick>() + -> map(|(ballot, slot): (Ballot, u16)| DomPair::new(Max::new(ballot), Max::new(slot))) + -> lattice_merge::<'static, DomPair, Max>>() + -> map(|lattice: DomPair, Max>| { + let (max_key, max_val) = lattice.into_reveal(); + (max_key.into_reveal(), max_val.into_reveal()) + }) + -> [0]next_slot; + ballot[6] -> map(|b: Ballot| (b, ())) -> [1]next_slot; + // find the next slot for the current ballot + next_slot = join::<'tick>() + -> map(|(_ballot, (slot, ())): (Ballot, (u16, ()))| slot) + -> tee(); + + + /////////////////////////////////////////////////////////////////////// send p2as + // assign a slot + indexed_payloads = client_in -> enumerate::<'tick>(); + is_leader[3] -> [0]leader_and_slot; + next_slot[1] -> [1]leader_and_slot; + leader_and_slot = cross_join::<'tick>() -> map(|((), s): ((), u16)| s); + leader_and_slot -> [0]leader_and_slot_and_ballot; + ballot[7] -> [1]leader_and_slot_and_ballot; + leader_and_slot_and_ballot = cross_join::<'tick>(); // type: (slot, ballot) + leader_and_slot_and_ballot -> [0]leader_and_slot_and_ballot_and_payload; + indexed_payloads -> [1]leader_and_slot_and_ballot_and_payload; + leader_and_slot_and_ballot_and_payload = cross_join::<'tick>() // type: ((slot, ballot), (index, payload)) + -> map(|((slot, ballot), (index, payload)): ((u16, Ballot), (u16, String))| P2a { + entry: Entry { + payload, + slot: slot + index, + ballot, + }, + }) -> tee(); + leader_and_slot_and_ballot_and_payload[0] -> [2]p2a; + + // increment the slot if a payload was chosen + num_payloads = leader_and_slot_and_ballot_and_payload[1] + -> fold::<'tick>(0, |sum: u16, _| sum + 1) + -> filter(|s: &u16| *s > 0); // avoid emitting 0, even if it's correct, since it will trigger the calculation of a new slot each tick + next_slot[2] -> [0]slot_plus_num_indexed; + num_payloads -> [1]slot_plus_num_indexed; + slot_plus_num_indexed = cross_join::<'tick>() + -> map(|(slot, num): (u16, u16)| slot + num) + -> next_tick() + -> [1]new_slots; + + /////////////////////////////////////////////////////////////////////// process p2bs + all_commit = p2b[1] -> map(|m: P2b| (m, 1)) + -> reduce_keyed::<'tick>(|sum: &mut u16, _| *sum += 1) + -> filter_map(|(m, num): (P2b, u16)| { + if num == 2*f+1 { // only count when all acceptors have accepted, so we avoid duplicate outputs (when f+1, then f+2, etc accept) + Some(m) + } + else { + None + } + }) + -> inspect(|m: &P2b| println!("Committed slot {:?}: {:?}", m.entry.slot, m.entry.payload)) + -> map(Delete) // delete p2bs for committed slots + -> next_tick() + -> [1]p2b; + }; + + if let Some(graph) = graph { + let serde_graph = df + .meta_graph() + .expect("No graph found, maybe failed to parse."); + match graph { + GraphType::Mermaid => { + println!("{}", serde_graph.to_mermaid()); + } + GraphType::Dot => { + println!("{}", serde_graph.to_dot()) + } + GraphType::Json => { + unimplemented!(); + // println!("{}", serde_graph.to_json()) + } + } + } + + df.run_async().await.unwrap(); +} diff --git a/hydroflow/examples/paxos/protocol.rs b/hydroflow/examples/paxos/protocol.rs new file mode 100644 index 000000000000..f53d1bc51ee7 --- /dev/null +++ b/hydroflow/examples/paxos/protocol.rs @@ -0,0 +1,60 @@ +use std::collections::HashSet; + +use serde::{Deserialize, Serialize}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)] +pub struct Ballot { + pub id: u16, + pub num: u16, +} + +impl PartialOrd for Ballot { + fn partial_cmp(&self, other: &Self) -> Option { + if self.num == other.num { + self.id.partial_cmp(&other.id) + } else { + self.num.partial_cmp(&other.num) + } + } +} + +impl Ord for Ballot { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)] +pub struct Entry { + pub payload: String, + pub slot: u16, + pub ballot: Ballot, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct P1a { + pub ballot: Ballot, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct P1b { + pub ballot: Ballot, + pub max_ballot: Ballot, + pub log: HashSet, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct P2a { + pub entry: Entry, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)] +pub struct P2b { + pub entry: Entry, + pub max_ballot: Ballot, +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)] +pub struct IAmLeader { + pub ballot: Ballot, +} diff --git a/hydroflow/examples/two_pc/README.md b/hydroflow/examples/two_pc/README.md index af4b4d469b12..99cf2f8bc85a 100644 --- a/hydroflow/examples/two_pc/README.md +++ b/hydroflow/examples/two_pc/README.md @@ -2,8 +2,6 @@ This is a remedial 2PC implementation. Design limitations: -- No database logging (just log statements via println) -- No distinction between forced and non-forced logs, no presumed commit/abort optimizations - No recovery manager implementation (yet) - Subordinates make random decisions whether to commit or abort @@ -24,6 +22,6 @@ cargo run --example two_pc -- --path hydroflow/examples/two_pc/members.json --ro Now, in the coordinator process you can type an integer at `stdin`. Each integer you type is considered a transaction ID, and a two-phase commit process is run for that transaction. Votes to commit or abort are randomized. -You should see logging information on screen at both the coordinator and the subordinates. +You should see log files generated, where each log is named `{port}.txt`. Adding the `--graph ` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html). \ No newline at end of file diff --git a/hydroflow/examples/two_pc/coordinator.rs b/hydroflow/examples/two_pc/coordinator.rs index 777103074cbe..5dda0dfc8b52 100644 --- a/hydroflow/examples/two_pc/coordinator.rs +++ b/hydroflow/examples/two_pc/coordinator.rs @@ -13,6 +13,7 @@ pub(crate) async fn run_coordinator( outbound: UdpSink, inbound: UdpStream, path: impl AsRef, + log: &String, graph: Option, ) { let mut df: Hydroflow = hydroflow_syntax! { @@ -23,10 +24,10 @@ pub(crate) async fn run_coordinator( -> tee(); // set up channels - outbound_chan = tee(); - outbound_chan[0] -> dest_sink_serde(outbound); - inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(m, _a)| m) -> tee(); - msgs = inbound_chan[0] -> demux(|m:SubordResponse, var_args!(commits, aborts, acks, endeds, errs)| match m.mtype { + msgs = source_stream_serde(inbound) + -> map(Result::unwrap) + -> map(|(m, _a)| m) + -> demux(|m:SubordResponse, var_args!(commits, aborts, acks, endeds, errs)| match m.mtype { MsgType::Commit => commits.give(m), MsgType::Abort => aborts.give(m), MsgType::AckP2 {..} => acks.give(m), @@ -35,13 +36,10 @@ pub(crate) async fn run_coordinator( }); msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m)); msgs[endeds] -> null(); - - // we log all messages (in this prototype we just print) - inbound_chan[1] -> for_each(|m| println!("Received {:?}", m)); - outbound_chan[1] -> for_each(|(m, a)| println!("Sending {:?} to {:?}", m, a)); + log_to_disk = union() -> dest_file(log, true); // setup broadcast channel to all subords - broadcast_join = cross_join() -> outbound_chan; + broadcast_join = cross_join() -> dest_sink_serde(outbound); broadcast = union() -> [0]broadcast_join; subords[1] -> [1]broadcast_join; subords[2] -> for_each(|s| println!("Subordinate: {:?}", s)); @@ -66,17 +64,33 @@ pub(crate) async fn run_coordinator( -> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val); // count subordinates - subord_total = subords[0] -> fold::<'tick>(0, |a,_b| a+1); // -> for_each(|n| println!("There are {} subordinates.", n)); + subord_total = subords[0] -> fold::<'tick>(0, |a,_b| a+1) -> tee(); // -> for_each(|n| println!("There are {} subordinates.", n)); - // If commit_votes for this xid is the same as all_votes, send a P2 Commit message - committed = join() -> map(|(_c, (xid, ()))| xid); + // If commit_votes for this xid is the same as subord_total, send a P2 Commit message + committed = join() -> map(|(_c, (xid, ()))| xid) -> tee(); commit_votes -> map(|(xid, c)| (c, xid)) -> [0]committed; - subord_total -> map(|c| (c, ())) -> [1]committed; - committed -> map(|xid| CoordMsg{xid, mtype: MsgType::Commit}) -> [2]broadcast; + subord_total[0] -> map(|c| (c, ())) -> [1]committed; + // Presumed abort: log commits (send commit only after flushing to disk) + committed[0] -> map(|xid| format!("Commit {:?}", xid)) + -> [0]log_to_disk; + committed[1] -> next_tick() + -> map(|xid| CoordMsg{xid, mtype: MsgType::Commit}) + -> [2]broadcast; + + // count ack votes + ack_votes = msgs[acks] + -> map(|m: SubordResponse| (m.xid, 1)) + -> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val); - // Handle p2 acknowledgments by sending an End message - msgs[acks] -> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,}) - -> [3]broadcast; + // If ack_votes for this xid is the same as subord_total, send a End message + acked = join() -> map(|(_c, (xid, ()))| xid) -> tee(); + ack_votes -> map(|(xid, c)| (c, xid)) -> [0]acked; + subord_total[1] -> map(|c| (c, ())) -> [1]acked; + // Presumed abort: log ends (don't need to flush) + acked[0] -> map(|xid| format!("Ended {:?}", xid)) + -> [1]log_to_disk; + acked[1] -> map(|xid| CoordMsg{xid, mtype: MsgType::End}) + -> [3]broadcast; // Handler for ended acknowledgments not necessary; we just print them }; diff --git a/hydroflow/examples/two_pc/helpers.rs b/hydroflow/examples/two_pc/helpers.rs index 20e8e8d5c249..bea659972c5c 100644 --- a/hydroflow/examples/two_pc/helpers.rs +++ b/hydroflow/examples/two_pc/helpers.rs @@ -10,3 +10,8 @@ pub fn decide(odds: u8) -> bool { let mut rng = rand::thread_rng(); rng.gen_range(0..100) <= odds } + +use std::net::SocketAddr; +pub fn get_output_file(addr: SocketAddr) -> String { + format!("{:?}.txt", addr.port()) +} diff --git a/hydroflow/examples/two_pc/main.rs b/hydroflow/examples/two_pc/main.rs index 7d314a0043c7..0a1deb6037ba 100644 --- a/hydroflow/examples/two_pc/main.rs +++ b/hydroflow/examples/two_pc/main.rs @@ -3,6 +3,7 @@ use std::path::Path; use clap::{Parser, ValueEnum}; use coordinator::run_coordinator; +use helpers::get_output_file; use hydroflow::tokio; use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; use serde::Deserialize; @@ -51,15 +52,16 @@ async fn main() { let opts = Opts::parse(); let path = Path::new(&opts.path); let addr = opts.addr; + let filename = get_output_file(addr); match opts.role { Role::Coordinator => { let (outbound, inbound, _) = bind_udp_bytes(addr).await; - run_coordinator(outbound, inbound, path, opts.graph.clone()).await; + run_coordinator(outbound, inbound, path, &filename, opts.graph.clone()).await; } Role::Subordinate => { let (outbound, inbound, _) = bind_udp_bytes(addr).await; - run_subordinate(outbound, inbound, path, opts.graph.clone()).await; + run_subordinate(outbound, inbound, path, &filename, opts.graph.clone()).await; } } } diff --git a/hydroflow/examples/two_pc/protocol.rs b/hydroflow/examples/two_pc/protocol.rs index 894893a5792c..cbb398476507 100644 --- a/hydroflow/examples/two_pc/protocol.rs +++ b/hydroflow/examples/two_pc/protocol.rs @@ -1,5 +1,16 @@ use serde::{Deserialize, Serialize}; +/// Message pattern: +/// User input on stdin +/// Phase 1 +/// Coordinator sends Prepare +/// Subordinate responds with Commit or Abort +/// Coordinator collects, wait for all responses, logs Commit if all commit +/// Phase 2 +/// Coordinator sends Commit after flushing log, or just sends Abort +/// Subordinate responds with AckP2 +/// Coordinator collects, wait for all responses, sends End +/// Subordinate responds with Ended #[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)] pub enum MsgType { Prepare, diff --git a/hydroflow/examples/two_pc/subordinate.rs b/hydroflow/examples/two_pc/subordinate.rs index ae774c115fb5..3bf8adcf0b2b 100644 --- a/hydroflow/examples/two_pc/subordinate.rs +++ b/hydroflow/examples/two_pc/subordinate.rs @@ -13,6 +13,7 @@ pub(crate) async fn run_subordinate( outbound: UdpSink, inbound: UdpStream, path: impl AsRef, + log: &String, graph: Option, ) { let mut df: Hydroflow = hydroflow_syntax! { @@ -21,12 +22,11 @@ pub(crate) async fn run_subordinate( -> map(|json: Addresses| json.coordinator) -> map(|s| s.parse::().unwrap()) -> inspect(|coordinator| println!("Coordinator: {}", coordinator)); - server_addr_join = cross_join(); + server_addr_join = cross_join() -> dest_sink_serde(outbound); server_addr -> [1]server_addr_join; // set up channels - outbound_chan = union() -> [0]server_addr_join -> tee(); - outbound_chan[0] -> dest_sink_serde(outbound); + outbound_chan = union() -> [0]server_addr_join; inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(m, _a)| m) -> tee(); msgs = inbound_chan[0] -> demux(|m:CoordMsg, var_args!(prepares, p2, ends, errs)| match m.mtype { MsgType::Prepare => prepares.give(m), @@ -36,26 +36,29 @@ pub(crate) async fn run_subordinate( _ => errs.give(m), }); msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m)); + log_to_disk = union() -> dest_file(log, true); // we log all messages (in this prototype we just print) inbound_chan[1] -> for_each(|m| println!("Received {:?}", m)); - outbound_chan[1] -> for_each(|m| println!("Sending {:?}", m)); - // handle p1 message: choose vote and respond // in this prototype we choose randomly whether to abort via decide() report_chan = msgs[prepares] -> map(|m: CoordMsg| SubordResponse { xid: m.xid, mtype: if decide(67) { MsgType::Commit } else { MsgType::Abort } - }); - report_chan -> [0]outbound_chan; + }) -> tee(); + // Presumed abort: log prepares/aborts (reply only after flushing to disk) + report_chan[0] -> map(|m:SubordResponse| format!("Phase 1 {:?}, {:?}", m.xid, m.mtype)) -> log_to_disk[0]; + report_chan[1] -> next_tick() -> [0]outbound_chan; // handle p2 message: acknowledge (and print) - p2_response = map(|(m, t)| SubordResponse { + ack_p2_chan = msgs[p2] -> tee(); + // Presumed abort: log commits/aborts (reply only after flushing to disk) + ack_p2_chan[0] -> map(|m:CoordMsg| format!("Phase 2 {:?}, {:?}", m.xid, m.mtype)) -> log_to_disk[1]; + ack_p2_chan[1] -> map(|m:CoordMsg| SubordResponse { xid: m.xid, - mtype: t, - }) -> [1]outbound_chan; - msgs[p2] -> map(|m:CoordMsg| (m, MsgType::AckP2)) -> p2_response; + mtype: MsgType::AckP2, + }) -> next_tick() -> [1]outbound_chan; // handle end message: acknowledge (and print) msgs[ends] -> map(|m:CoordMsg| SubordResponse { diff --git a/hydroflow/examples/voting/README.md b/hydroflow/examples/voting/README.md new file mode 100644 index 000000000000..be2fd42b3d41 --- /dev/null +++ b/hydroflow/examples/voting/README.md @@ -0,0 +1,20 @@ +## Voting +This implements a primary-replica scheme without fault tolerance. + +### To Run the code: +Look in the file `members.json` to find the addresses of the coordinator and subordinates. +For the coordinator, launch a process on the node with a matching IP address as follows. +Here we assume the coordinator's IP address is `localhost` and port `12346` is free: +``` +cargo run --example voting -- --path hydroflow/examples/voting/members.json --role coordinator --addr localhost:12346 +``` + +Now for each subordinate, launch a process on the node with the matching IP address as follows. +Here we assume the subordinate's IP address is `127.0.0.1` and port `12349` is free: +``` +cargo run --example voting -- --path hydroflow/examples/voting/members.json --role subordinate --addr localhost:12349 +``` + +Now, in the coordinator process you can type a string payload at `stdin`. + +Adding the `--graph ` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html). \ No newline at end of file diff --git a/hydroflow/examples/voting/coordinator.rs b/hydroflow/examples/voting/coordinator.rs new file mode 100644 index 000000000000..77d8d4b15388 --- /dev/null +++ b/hydroflow/examples/voting/coordinator.rs @@ -0,0 +1,74 @@ +use std::net::SocketAddr; +use std::path::Path; + +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::{UdpSink, UdpStream}; + +use crate::protocol::{CoordMsg, SubordResponse}; +use crate::{Addresses, GraphType}; + +pub(crate) async fn run_coordinator( + outbound: UdpSink, + inbound: UdpStream, + path: impl AsRef, + graph: Option, +) { + let mut df: Hydroflow = hydroflow_syntax! { + // fetch subordinates from file, convert ip:port to a SocketAddr, and tee + subords = source_json(path) + -> flat_map(|json: Addresses| json.subordinates) + -> map(|s| s.parse::().unwrap()) + -> tee(); + + // set up channels + outbound_chan = dest_sink_serde(outbound); + inbound_chan = source_stream_serde(inbound) + -> map(Result::unwrap) + -> map(|(m, _a)| m); + + // setup broadcast channel to all subords + broadcast = cross_join() -> outbound_chan; + subords[1] -> [1]broadcast; + subords[2] -> for_each(|s| println!("Subordinate: {:?}", s)); + + // Phase 1 initiate: + // Given a transaction commit request from stdio, broadcast a Prepare to subordinates + source_stdin() + -> map(|m: Result| CoordMsg { payload: m.unwrap() }) + -> [0]broadcast; + + // count votes + votes = inbound_chan + -> map(|m: SubordResponse| (m.payload, 1)) + -> fold_keyed::<'static, String, u32>(|| 0, |acc: &mut _, val| *acc += val); + + // count subordinates + subord_total = subords[0] -> fold::<'tick>(0, |a,_b| a+1); // -> for_each(|n| println!("There are {} subordinates.", n)); + + // If commit_votes for this xid is the same as all_votes, output committed + committed = join() -> map(|(_c, (payload, ()))| payload) -> for_each(|payload| println!("Committed: {:?}", payload)); + votes -> map(|(payload, c)| (c, payload)) -> [0]committed; + subord_total -> map(|c| (c, ())) -> [1]committed; + }; + + if let Some(graph) = graph { + let serde_graph = df + .meta_graph() + .expect("No graph found, maybe failed to parse."); + match graph { + GraphType::Mermaid => { + println!("{}", serde_graph.to_mermaid()); + } + GraphType::Dot => { + println!("{}", serde_graph.to_dot()) + } + GraphType::Json => { + unimplemented!(); + // println!("{}", serde_graph.to_json()) + } + } + } + + df.run_async().await.unwrap(); +} diff --git a/hydroflow/examples/voting/main.rs b/hydroflow/examples/voting/main.rs new file mode 100644 index 000000000000..b5cbc2c6cfdd --- /dev/null +++ b/hydroflow/examples/voting/main.rs @@ -0,0 +1,64 @@ +use std::net::SocketAddr; +use std::path::Path; + +use clap::{Parser, ValueEnum}; +use coordinator::run_coordinator; +use hydroflow::tokio; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use serde::Deserialize; +use subordinate::run_subordinate; + +mod coordinator; +mod protocol; +mod subordinate; + +/// This is a remedial 2PC implementation. + +#[derive(Clone, ValueEnum, Debug)] +enum Role { + Coordinator, + Subordinate, +} + +#[derive(Clone, ValueEnum, Debug)] +enum GraphType { + Mermaid, + Dot, + Json, +} + +#[derive(Parser, Debug)] +struct Opts { + #[clap(long)] + path: String, + #[clap(value_enum, long)] + role: Role, + #[clap(long, value_parser = ipv4_resolve)] + addr: SocketAddr, + #[clap(value_enum, long)] + graph: Option, +} + +#[derive(Deserialize, Debug)] +struct Addresses { + coordinator: String, + subordinates: Vec, +} + +#[hydroflow::main] +async fn main() { + let opts = Opts::parse(); + let path = Path::new(&opts.path); + let addr = opts.addr; + + match opts.role { + Role::Coordinator => { + let (outbound, inbound, _) = bind_udp_bytes(addr).await; + run_coordinator(outbound, inbound, path, opts.graph.clone()).await; + } + Role::Subordinate => { + let (outbound, inbound, _) = bind_udp_bytes(addr).await; + run_subordinate(outbound, inbound, path, opts.graph.clone()).await; + } + } +} diff --git a/hydroflow/examples/voting/members.json b/hydroflow/examples/voting/members.json new file mode 100644 index 000000000000..17e7383d17b6 --- /dev/null +++ b/hydroflow/examples/voting/members.json @@ -0,0 +1,8 @@ +{ + "coordinator": "127.0.0.1:12346", + "subordinates": [ + "127.0.0.1:12347", + "127.0.0.1:12348", + "127.0.0.1:12349" + ] +} \ No newline at end of file diff --git a/hydroflow/examples/voting/protocol.rs b/hydroflow/examples/voting/protocol.rs new file mode 100644 index 000000000000..c78fc3db8853 --- /dev/null +++ b/hydroflow/examples/voting/protocol.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct CoordMsg { + pub payload: String, +} +/// Member Response +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct SubordResponse { + pub payload: String, +} diff --git a/hydroflow/examples/voting/subordinate.rs b/hydroflow/examples/voting/subordinate.rs new file mode 100644 index 000000000000..a96e1eed195e --- /dev/null +++ b/hydroflow/examples/voting/subordinate.rs @@ -0,0 +1,55 @@ +use std::net::SocketAddr; +use std::path::Path; + +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::{UdpSink, UdpStream}; + +use crate::protocol::{CoordMsg, SubordResponse}; +use crate::{Addresses, GraphType}; + +pub(crate) async fn run_subordinate( + outbound: UdpSink, + inbound: UdpStream, + path: impl AsRef, + graph: Option, +) { + let mut df: Hydroflow = hydroflow_syntax! { + // Outbound address + server_addr = source_json(path) + -> map(|json: Addresses| json.coordinator) + -> map(|s| s.parse::().unwrap()) + -> inspect(|coordinator| println!("Coordinator: {}", coordinator)); + + // set up channels + outbound_chan = cross_join() -> dest_sink_serde(outbound); + server_addr -> [1]outbound_chan; + inbound_chan = source_stream_serde(inbound) + -> map(Result::unwrap) + -> map(|(m, _a)| m); + + // respond to vote + report_chan = inbound_chan -> map(|m: CoordMsg| SubordResponse { payload: m.payload}); + report_chan -> [0]outbound_chan; + }; + + if let Some(graph) = graph { + let serde_graph = df + .meta_graph() + .expect("No graph found, maybe failed to parse."); + match graph { + GraphType::Mermaid => { + println!("{}", serde_graph.to_mermaid()); + } + GraphType::Dot => { + println!("{}", serde_graph.to_dot()) + } + GraphType::Json => { + unimplemented!(); + // println!("{}", serde_graph.to_json()) + } + } + } + + df.run_async().await.unwrap(); +}