From 9c5e632bd011c9ca4af1d1a05748aa5741a60f8b Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 3 Dec 2024 14:02:52 -0800 Subject: [PATCH] feat(hydroflow_plus_std): extract initial Hydroflow+ utilities into a standard library --- .github/workflows/release.yml | 2 +- Cargo.lock | 14 ++++++++ Cargo.toml | 1 + hydroflow_plus_std/CHANGELOG.md | 0 hydroflow_plus_std/Cargo.toml | 28 +++++++++++++++ hydroflow_plus_std/build.rs | 3 ++ hydroflow_plus_std/src/lib.rs | 13 +++++++ .../src}/quorum.rs | 4 +-- .../src}/request_response.rs | 0 hydroflow_plus_test/Cargo.toml | 1 + hydroflow_plus_test/src/cluster/mod.rs | 2 -- hydroflow_plus_test/src/cluster/paxos.rs | 5 ++- .../src/cluster/paxos_bench.rs | 2 +- ...cluster__paxos_bench__tests__paxos_ir.snap | 34 +++++++++---------- hydroflow_plus_test/src/cluster/two_pc.rs | 3 +- stageleft/src/lib.rs | 1 + template/hydroflow_plus/Cargo.toml | 1 + 17 files changed, 86 insertions(+), 28 deletions(-) create mode 100644 hydroflow_plus_std/CHANGELOG.md create mode 100644 hydroflow_plus_std/Cargo.toml create mode 100644 hydroflow_plus_std/build.rs create mode 100644 hydroflow_plus_std/src/lib.rs rename {hydroflow_plus_test/src/cluster => hydroflow_plus_std/src}/quorum.rs (97%) rename {hydroflow_plus_test/src/cluster => hydroflow_plus_std/src}/request_response.rs (100%) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ff73a39ef0a0..9538a1ec3cb8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -87,7 +87,7 @@ jobs: --bump ${{ inputs.bump }} --bump-dependencies auto ${{ inputs.execute && '--execute' || '--no-publish' }} hydroflow hydroflow_lang hydroflow_macro hydroflow_plus - hydroflow_datalog hydroflow_datalog_core + hydroflow_plus_std hydroflow_datalog hydroflow_datalog_core hydro_deploy hydro_cli hydroflow_deploy_integration stageleft stageleft_macro stageleft_tool multiplatform_test diff --git a/Cargo.lock b/Cargo.lock index 396a800a5c20..d26a114e4e86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1607,6 +1607,19 @@ dependencies = [ "trybuild-internals-api", ] +[[package]] +name = "hydroflow_plus_std" +version = "0.10.0" +dependencies = [ + "async-ssh2-lite", + "ctor", + "hydro_deploy", + "hydroflow_plus", + "insta", + "stageleft", + "stageleft_tool", +] + [[package]] name = "hydroflow_plus_test" version = "0.0.0" @@ -1615,6 +1628,7 @@ dependencies = [ "futures", "hydro_deploy", "hydroflow_plus", + "hydroflow_plus_std", "insta", "rand", "serde", diff --git a/Cargo.toml b/Cargo.toml index 224bad277975..23491ccfe4b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "hydroflow_lang", "hydroflow_macro", "hydroflow_plus", + "hydroflow_plus_std", "hydroflow_plus_test", "hydroflow_plus_test_local", "hydroflow_plus_test_local_macro", diff --git a/hydroflow_plus_std/CHANGELOG.md b/hydroflow_plus_std/CHANGELOG.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/hydroflow_plus_std/Cargo.toml b/hydroflow_plus_std/Cargo.toml new file mode 100644 index 000000000000..03992c88f27f --- /dev/null +++ b/hydroflow_plus_std/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "hydroflow_plus_std" +publish = true +version = "0.10.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus_std/" +description = "Standard library of distributed systems building blocks for Hydroflow+" + +[lints] +workspace = true + +[lib] +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0", default-features = false } +stageleft = { path = "../stageleft", version = "^0.5.0" } + +[build-dependencies] +stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } + +[dev-dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +insta = "1.39" +hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0" } +async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } +ctor = "0.2.8" diff --git a/hydroflow_plus_std/build.rs b/hydroflow_plus_std/build.rs new file mode 100644 index 000000000000..99775c3c7daa --- /dev/null +++ b/hydroflow_plus_std/build.rs @@ -0,0 +1,3 @@ +fn main() { + stageleft_tool::gen_final!(); +} diff --git a/hydroflow_plus_std/src/lib.rs b/hydroflow_plus_std/src/lib.rs new file mode 100644 index 000000000000..fc5a2ab71fb1 --- /dev/null +++ b/hydroflow_plus_std/src/lib.rs @@ -0,0 +1,13 @@ +stageleft::stageleft_no_entry_crate!(); + +pub mod quorum; +pub mod request_response; + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + #[ctor::ctor] + fn init() { + hydroflow_plus::deploy::init_test(); + } +} diff --git a/hydroflow_plus_test/src/cluster/quorum.rs b/hydroflow_plus_std/src/quorum.rs similarity index 97% rename from hydroflow_plus_test/src/cluster/quorum.rs rename to hydroflow_plus_std/src/quorum.rs index 7a6547e17018..6788123bdda1 100644 --- a/hydroflow_plus_test/src/cluster/quorum.rs +++ b/hydroflow_plus_std/src/quorum.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use hydroflow_plus::*; use location::NoTick; -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +#[expect(clippy::type_complexity, reason = "stream types with ordering")] pub fn collect_quorum_with_response< 'a, L: Location<'a> + NoTick, @@ -99,7 +99,7 @@ pub fn collect_quorum_with_response< ) } -#[expect(clippy::type_complexity, reason = "quorum types are complex")] +#[expect(clippy::type_complexity, reason = "stream types with ordering")] pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>( responses: Stream<(K, Result<(), E>), Timestamped, Unbounded, Order>, min: usize, diff --git a/hydroflow_plus_test/src/cluster/request_response.rs b/hydroflow_plus_std/src/request_response.rs similarity index 100% rename from hydroflow_plus_test/src/cluster/request_response.rs rename to hydroflow_plus_std/src/request_response.rs diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index 6d57e17ebbe1..aabdaf528a2e 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -13,6 +13,7 @@ stageleft_devel = [] [dependencies] hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +hydroflow_plus_std = { path = "../hydroflow_plus_std", version = "^0.10.0" } tokio = { version = "1.29.0", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.5.0" } rand = "0.8.0" diff --git a/hydroflow_plus_test/src/cluster/mod.rs b/hydroflow_plus_test/src/cluster/mod.rs index 88bcbe2d9497..b4e5be9bb006 100644 --- a/hydroflow_plus_test/src/cluster/mod.rs +++ b/hydroflow_plus_test/src/cluster/mod.rs @@ -4,7 +4,5 @@ pub mod map_reduce; pub mod paxos; pub mod paxos_bench; pub mod paxos_kv; -pub mod quorum; -pub mod request_response; pub mod simple_cluster; pub mod two_pc; diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index af3a3ca04f3a..66b28264abac 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -4,12 +4,11 @@ use std::hash::Hash; use std::time::Duration; use hydroflow_plus::*; +use hydroflow_plus_std::quorum::{collect_quorum, collect_quorum_with_response}; +use hydroflow_plus_std::request_response::join_responses; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use super::quorum::{collect_quorum, collect_quorum_with_response}; -use super::request_response::join_responses; - pub struct Proposer {} pub struct Acceptor {} diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 4d87409fbde1..7517bd2cc943 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -3,11 +3,11 @@ use std::rc::Rc; use std::time::Duration; use hydroflow_plus::*; +use hydroflow_plus_std::quorum::collect_quorum; use tokio::time::Instant; use super::paxos::{Acceptor, Ballot, Proposer}; use super::paxos_kv::{paxos_kv, KvPayload, Replica}; -use super::quorum::collect_quorum; pub struct Client {} diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 30433787404a..c2c3c9e71196 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -220,11 +220,11 @@ expression: built.ir() input: DeferTick( Difference( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -426,7 +426,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -483,14 +483,14 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), input: Persist( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), input: AntiJoin( AntiJoin( Tee { inner: , }, FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -552,7 +552,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -745,11 +745,11 @@ expression: built.ir() Difference( Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -921,7 +921,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -982,7 +982,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use crate :: __staged :: cluster :: request_response :: * ; | (key , _) | key }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , _) | key }), input: Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), @@ -1129,7 +1129,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -1194,7 +1194,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((slot , _ballot) , (value , _)) | (slot , value) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use crate :: __staged :: cluster :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), input: Join( Tee { inner: , @@ -1526,11 +1526,11 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: , }, diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index 4a5f8220cb60..429de9ab6337 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -1,6 +1,5 @@ use hydroflow_plus::*; - -use super::quorum::collect_quorum; +use hydroflow_plus_std::quorum::collect_quorum; // if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. // diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs index ee1cdabaad1a..8ee222bce757 100644 --- a/stageleft/src/lib.rs +++ b/stageleft/src/lib.rs @@ -82,6 +82,7 @@ macro_rules! stageleft_no_entry_crate { unused, ambiguous_glob_reexports, clippy::suspicious_else_formatting, + clippy::type_complexity, reason = "generated code" )] pub mod __staged { diff --git a/template/hydroflow_plus/Cargo.toml b/template/hydroflow_plus/Cargo.toml index 1544bce2112e..202a7a2663d5 100644 --- a/template/hydroflow_plus/Cargo.toml +++ b/template/hydroflow_plus/Cargo.toml @@ -10,6 +10,7 @@ stageleft_devel = [] [dependencies] hydroflow_plus = { git = "{{ hydroflow_git | default: 'https://github.com/hydro-project/hydroflow.git' }}", branch = "{{ hydroflow_branch | default: 'main' }}" } +hydroflow_plus_std = { git = "{{ hydroflow_git | default: 'https://github.com/hydro-project/hydroflow.git' }}", branch = "{{ hydroflow_branch | default: 'main' }}" } stageleft = { git = "{{ hydroflow_git | default: 'https://github.com/hydro-project/hydroflow.git' }}", branch = "{{ hydroflow_branch | default: 'main' }}" } tokio = { version = "1.29.0", features = [ "full" ] }