Skip to content

Commit

Permalink
refactor(paxos): generalize quorum logic (#1583)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 22, 2024
1 parent f96676d commit ec55910
Show file tree
Hide file tree
Showing 11 changed files with 1,379 additions and 1,070 deletions.
7 changes: 7 additions & 0 deletions hydroflow_plus/src/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
)
}

pub fn flatten<U>(self) -> Stream<U, L, B>
where
T: IntoIterator<Item = U>,
{
self.flat_map(q!(|v| v))
}

pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
Expand Down
78 changes: 73 additions & 5 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ pub struct Stream<T, L, B, Order = TotalOrder> {
_phantom: PhantomData<(T, L, B, Order)>,
}

impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder> {
fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
Stream {
location: stream.location,
ir_node: stream.ir_node,
_phantom: PhantomData,
}
}
}

impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
fn location_kind(&self) -> LocationId {
self.location.id()
Expand Down Expand Up @@ -417,6 +427,33 @@ where
}))
}

pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>(
self,
key: impl IntoQuotedMut<'a, F, L> + Copy,
) -> Optional<T, L, B> {
let f = key.splice_fn1_borrow_ctx(&self.location);

let wrapped: syn::Expr = parse_quote!({
let key_fn = #f;
move |curr, new| {
if key_fn(&new) > key_fn(&*curr) {
*curr = new;
}
}
});

let mut core = HfPlusNode::Reduce {
f: wrapped.into(),
input: Box::new(self.ir_node.into_inner()),
};

if L::is_top_level() {
core = HfPlusNode::Persist(Box::new(core));
}

Optional::new(self.location, core)
}

pub fn min(self) -> Optional<T, L, B>
where
T: Ord,
Expand Down Expand Up @@ -504,6 +541,35 @@ impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
}
}

impl<'a, T, L: Location<'a>> Stream<T, L, Bounded, TotalOrder> {
pub fn chain(
self,
other: Stream<T, L, Bounded, TotalOrder>,
) -> Stream<T, L, Bounded, TotalOrder> {
check_matching_location(&self.location, &other.location);

Stream::new(
self.location,
HfPlusNode::Chain(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
)
}
}

impl<'a, T, L: Location<'a> + NoTick> Stream<T, L, Unbounded, NoOrder> {
pub fn union(
self,
other: Stream<T, L, Unbounded, NoOrder>,
) -> Stream<T, L, Unbounded, NoOrder> {
let tick = self.location.tick();
self.tick_batch(&tick)
.union(other.tick_batch(&tick))
.all_ticks()
}
}

impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
where
Expand All @@ -515,7 +581,7 @@ impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
)
}

pub fn chain<B2, O2>(self, other: Stream<T, L, B2, O2>) -> Stream<T, L, B2, Order::Min>
pub fn union<B2, O2>(self, other: Stream<T, L, B2, O2>) -> Stream<T, L, B2, Order::Min>
where
Order: MinOrder<O2>,
{
Expand Down Expand Up @@ -598,10 +664,7 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
}
}

impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order>
where
Order: MinOrder<NoOrder, Min = NoOrder>,
{
impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order> {
pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
self,
init: impl IntoQuotedMut<'a, I, Tick<L>>,
Expand All @@ -620,6 +683,11 @@ where
)
}

pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order> {
self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
.map(q!(|(k, _)| k))
}

pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, Tick<L>>,
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_plus_test/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ pub mod map_reduce;
pub mod paxos;
pub mod paxos_bench;
pub mod paxos_kv;
pub mod quorum;
pub mod request_response;
pub mod simple_cluster;
pub mod two_pc;
Loading

0 comments on commit ec55910

Please sign in to comment.