Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Porting examples (2PC, Paxos) into Hydroflow #764

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
23 changes: 23 additions & 0 deletions hydroflow/examples/paxos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Paoxs
This is a Paxos implementation with stable leaders.

### To Run the code:
Look in the file `members.json` to find the addresses of the proposers and acceptors.
For each proposer, launch a process on the node with a matching IP address as follows.
Here we assume the proposer's IP address is `localhost` and ports `12300`, `12301`, and `12302` are free:
```
cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role proposer --addr localhost:12300
cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role proposer --addr localhost:12310
```

Now for each acceptor, launch a process on the node with the matching IP address as follows.
Here we assume the acceptor's IP address is `127.0.0.1` and ports `12321` and `12322` are free:
```
cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12320
cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12330
cargo run --example paxos -- --path hydroflow/examples/paxos/members.json --role acceptor --addr localhost:12340
```

Now, in the proposer process you can type a string payload at `stdin`.

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
115 changes: 115 additions & 0 deletions hydroflow/examples/paxos/acceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::collections::HashSet;
use std::net::SocketAddr;

use futures::join;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::bind_udp_bytes;

use crate::helpers::{get_phase_1_addr, get_phase_2_addr};
use crate::protocol::*;
use crate::GraphType;

pub(crate) async fn run_acceptor(addr: SocketAddr, graph: Option<GraphType>) {
let phase_1_addr = get_phase_1_addr(addr);
let phase_2_addr = get_phase_2_addr(addr);

let p1a_future = bind_udp_bytes(phase_1_addr);
let p2a_future = bind_udp_bytes(phase_2_addr);
let ((p1b_sink, p1a_src, _), (p2b_sink, p2a_src, _)) = join!(p1a_future, p2a_future);

let mut df: Hydroflow = hydroflow_syntax! {
// define inputs/outputs
p1a = source_stream_serde(p1a_src)
-> map(Result::unwrap)
-> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a))
-> tee();
p1b = cross_join::<'tick>()
-> map(|(((p1a, proposer), log), max_ballot): (((P1a, SocketAddr), HashSet<Entry>), Ballot)|
(P1b {
ballot: p1a.ballot,
max_ballot,
log,
}, proposer))
-> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a))
-> dest_sink_serde(p1b_sink);
p2a = source_stream_serde(p2a_src)
-> map(Result::unwrap)
-> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a))
-> tee();
p2b = cross_join::<'tick>()
-> map(|((p2a, proposer), max_ballot):((P2a, SocketAddr), Ballot)|
(P2b {
entry: p2a.entry,
max_ballot,
}, proposer))
-> inspect(|(m, a)| println!("Sending {:?} to {:?}", m, a))
-> dest_sink_serde(p2b_sink);

// find max_ballot
ballots = persist();
max_ballot = ballots -> reduce(|mut curr_max:Ballot, ballot:Ballot| {
if ballot > curr_max {
curr_max = ballot;
}
curr_max
}) -> tee();

p1a[0] -> map(|(m, _): (P1a, SocketAddr)| m.ballot) -> ballots;

// find max entry for each slot in the log
log = persist()
-> map(|e: Entry| (e.slot, e))
-> reduce_keyed::<'tick>(|curr_max:&mut Entry, e:Entry| {
if e.ballot > curr_max.ballot {
*curr_max = e;
}
})
-> fold::<'tick>(HashSet::new(), |mut log:HashSet<Entry>, (_slot, e): (u16, Entry)| {
log.insert(e);
log
});

// reply with p1b
p1a_and_log = cross_join::<'tick>() -> [0]p1b;
p1a[1] -> [0]p1a_and_log;
log -> [1]p1a_and_log;
max_ballot[0] -> [1]p1b;

// write p2a messages to log
p2a_and_max_ballot = cross_join::<'tick>();
p2a[0] -> [0]p2a_and_max_ballot;
max_ballot[1] -> [1]p2a_and_max_ballot;
p2a_and_max_ballot -> filter_map(|((m, _proposer), max_ballot)| {
if m.entry.ballot >= max_ballot {
Some(m.entry)
} else {
None
}
}) -> log;

// reply with p2b
p2a[1] -> [0]p2b;
max_ballot[2] -> [1]p2b;
};

if let Some(graph) = graph {
let serde_graph = df
.meta_graph()
.expect("No graph found, maybe failed to parse.");
match graph {
GraphType::Mermaid => {
println!("{}", serde_graph.to_mermaid());
}
GraphType::Dot => {
println!("{}", serde_graph.to_dot())
}
GraphType::Json => {
unimplemented!();
// println!("{}", serde_graph.to_json())
}
}
}

df.run_async().await.unwrap();
}
29 changes: 29 additions & 0 deletions hydroflow/examples/paxos/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::fs::File;
use std::io::BufReader;
use std::net::SocketAddr;
use std::path::Path;

use tokio_stream::wrappers::IntervalStream;

use crate::Config;

pub fn get_phase_1_addr(addr: SocketAddr) -> SocketAddr {
SocketAddr::new(addr.ip(), addr.port() + 1)
}
pub fn get_phase_2_addr(addr: SocketAddr) -> SocketAddr {
SocketAddr::new(addr.ip(), addr.port() + 2)
}

pub fn periodic(timeout: u16) -> IntervalStream {
let start = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout.into());
IntervalStream::new(tokio::time::interval_at(
start,
tokio::time::Duration::from_millis(timeout.into()),
))
}

pub fn get_config(path: impl AsRef<Path>) -> Config {
let file = File::open(path).unwrap();
let reader = BufReader::new(file);
serde_json::from_reader(reader).unwrap()
}
65 changes: 65 additions & 0 deletions hydroflow/examples/paxos/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::net::SocketAddr;
use std::path::Path;

use acceptor::run_acceptor;
use clap::{Parser, ValueEnum};
use hydroflow::tokio;
use hydroflow::util::ipv4_resolve;
use proposer::run_proposer;
use serde::Deserialize;

mod acceptor;
mod helpers;
mod proposer;
mod protocol;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Proposer,
Acceptor,
}

#[derive(Clone, ValueEnum, Debug)]
enum GraphType {
Mermaid,
Dot,
Json,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(long)]
path: String,
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: SocketAddr,
#[clap(value_enum, long)]
graph: Option<GraphType>,
}

#[derive(Deserialize, Debug)]
pub struct Config {
proposers: Vec<String>,
acceptors: Vec<String>,
f: u16,
i_am_leader_resend_timeout: u16,
i_am_leader_check_timeout_node_0: u16,
i_am_leader_check_timeout_other_nodes: u16,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
let path = Path::new(&opts.path);
let addr = opts.addr;

match opts.role {
Role::Proposer => {
run_proposer(addr, path, opts.graph.clone()).await;
}
Role::Acceptor => {
run_acceptor(addr, opts.graph.clone()).await;
}
}
}
15 changes: 15 additions & 0 deletions hydroflow/examples/paxos/members.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"proposers": [
"127.0.0.1:12300",
"127.0.0.1:12310"
],
"acceptors": [
"127.0.0.1:12320",
"127.0.0.1:12330",
"127.0.0.1:12340"
],
"f": 1,
"i_am_leader_resend_timeout": 2000,
"i_am_leader_check_timeout_node_0": 5000,
"i_am_leader_check_timeout_other_nodes": 60000
}
Loading