Skip to content

Commit

Permalink
refactor(paxos): complete split into leader election and sequencing p…
Browse files Browse the repository at this point in the history
…hases (#1486)
  • Loading branch information
shadaj authored Oct 30, 2024
1 parent 32e2970 commit 8b7b1c6
Show file tree
Hide file tree
Showing 10 changed files with 930 additions and 722 deletions.
21 changes: 15 additions & 6 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use std::marker::PhantomData;

use crate::builder::FlowState;
use crate::location::{Location, LocationId};
use crate::{NoTick, Tick};
use crate::Tick;

pub trait DeferTick {
fn defer_tick(self) -> Self;
}

pub trait CycleComplete<'a, T> {
fn complete(self, ident: syn::Ident);
Expand All @@ -25,25 +29,30 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
) -> Self;
}

/// Represents a fixpoint cycle in the graph that will be fulfilled
/// Represents a forward reference in the graph that will be fulfilled
/// by a stream that is not yet known.
///
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct HfCycle<'a, T, S: CycleComplete<'a, T>> {
pub struct HfForwardRef<'a, T, S: CycleComplete<'a, T>> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), T, S)>,
}

impl<'a, S: CycleComplete<'a, NoTick>> HfCycle<'a, NoTick, S> {
impl<'a, T, S: CycleComplete<'a, T>> HfForwardRef<'a, T, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
}
}

impl<'a, S: CycleComplete<'a, Tick>> HfCycle<'a, Tick, S> {
pub struct HfCycle<'a, S: CycleComplete<'a, Tick> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
}

impl<'a, S: CycleComplete<'a, Tick> + DeferTick> HfCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
S::complete(stream.defer_tick(), ident)
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod deploy;
pub use deploy::{ClusterSpec, Deploy, ProcessSpec};

pub mod cycle;
pub use cycle::HfCycle;
pub use cycle::HfForwardRef;

pub mod builder;
pub use builder::FlowBuilder;
Expand Down
49 changes: 40 additions & 9 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use serde::{Deserialize, Serialize};
use stageleft::{q, quote_type, Quoted};

use super::builder::{ClusterIds, ClusterSelfId, FlowState};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, DeferTick, HfCycle};
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, HfCycle, NoTick, Optional, Singleton, Stream, Tick, Unbounded};
use crate::{Bounded, HfForwardRef, NoTick, Optional, Singleton, Stream, Tick, Unbounded};

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum LocationId {
Expand Down Expand Up @@ -170,9 +170,9 @@ pub trait Location<'a> {
.latest()
}

fn tick_cycle<S: CycleCollection<'a, Tick, Location = Self>>(
fn forward_ref<S: CycleCollection<'a, NoTick, Location = Self>>(
&self,
) -> (HfCycle<'a, Tick, S>, S) {
) -> (HfForwardRef<'a, NoTick, S>, S) {
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand All @@ -191,17 +191,46 @@ pub trait Location<'a> {
let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfCycle {
HfForwardRef {
ident: ident.clone(),
_phantom: PhantomData,
},
S::create_source(ident, self.flow_state().clone(), self.id()),
)
}

fn tick_forward_ref<S: CycleCollection<'a, Tick, Location = Self>>(
&self,
) -> (HfForwardRef<'a, Tick, S>, S) {
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfForwardRef {
ident: ident.clone(),
_phantom: PhantomData,
},
S::create_source(ident, self.flow_state().clone(), self.id()),
)
}

fn cycle<S: CycleCollection<'a, NoTick, Location = Self>>(
fn tick_cycle<S: CycleCollection<'a, Tick, Location = Self> + DeferTick>(
&self,
) -> (HfCycle<'a, NoTick, S>, S) {
) -> (HfCycle<'a, S>, S) {
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand All @@ -228,10 +257,12 @@ pub trait Location<'a> {
)
}

fn tick_cycle_with_initial<S: CycleCollectionWithInitial<'a, Tick, Location = Self>>(
fn tick_cycle_with_initial<
S: CycleCollectionWithInitial<'a, Tick, Location = Self> + DeferTick,
>(
&self,
initial: S,
) -> (HfCycle<'a, Tick, S>, S) {
) -> (HfCycle<'a, S>, S) {
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand Down
36 changes: 31 additions & 5 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::rc::Rc;
use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowState;
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, CycleComplete};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{Location, LocationId};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
Expand Down Expand Up @@ -128,6 +128,12 @@ impl<'a, T, C, N: Location<'a>> From<Singleton<T, Bounded, C, N>>
}
}

impl<'a, T, N: Location<'a>> DeferTick for Singleton<T, Bounded, Tick, N> {
fn defer_tick(self) -> Self {
Singleton::defer_tick(self)
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, Tick> for Singleton<T, Bounded, Tick, N> {
fn complete(self, ident: syn::Ident) {
self.flow_state.borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
Expand All @@ -138,6 +144,21 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, Tick> for Singleton<T, Bounded, T
}
}

impl<'a, T, N: Location<'a>> CycleCollection<'a, Tick> for Singleton<T, Bounded, Tick, N> {
type Location = N;

fn create_source(ident: syn::Ident, flow_state: FlowState, l: LocationId) -> Self {
Singleton::new(
l,
flow_state,
HfPlusNode::CycleSource {
ident,
location_kind: l,
},
)
}
}

impl<'a, T, N: Location<'a>> CycleCollectionWithInitial<'a, Tick>
for Singleton<T, Bounded, Tick, N>
{
Expand Down Expand Up @@ -403,13 +424,18 @@ impl<'a, T, W, C, N: Location<'a>> Optional<T, W, C, N> {
}
}

impl<'a, T, N: Location<'a>> DeferTick for Optional<T, Bounded, Tick, N> {
fn defer_tick(self) -> Self {
Optional::defer_tick(self)
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, Tick> for Optional<T, Bounded, Tick, N> {
fn complete(self, ident: syn::Ident) {
let me = self.defer_tick();
me.flow_state.borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
self.flow_state.borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: me.location_kind,
input: Box::new(me.ir_node.into_inner()),
location_kind: self.location_kind,
input: Box::new(self.ir_node.into_inner()),
});
}
}
Expand Down
15 changes: 10 additions & 5 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use syn::parse_quote;

use super::staging_util::get_this_crate;
use crate::builder::{self, FlowState};
use crate::cycle::{CycleCollection, CycleComplete};
use crate::cycle::{CycleCollection, CycleComplete, DeferTick};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{
CanSend, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, LocationId,
Expand Down Expand Up @@ -54,13 +54,18 @@ pub struct Stream<T, W, C, N> {
_phantom: PhantomData<(T, N, W, C)>,
}

impl<'a, T, N: Location<'a>> DeferTick for Stream<T, Bounded, Tick, N> {
fn defer_tick(self) -> Self {
Stream::defer_tick(self)
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, Tick> for Stream<T, Bounded, Tick, N> {
fn complete(self, ident: syn::Ident) {
let me = self.defer_tick();
me.flow_state.borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
self.flow_state.borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: me.location_kind,
input: Box::new(me.ir_node.into_inner()),
location_kind: self.location_kind,
input: Box::new(self.ir_node.into_inner()),
});
}
}
Expand Down
Loading

0 comments on commit 8b7b1c6

Please sign in to comment.