Skip to content

Commit

Permalink
fix(hydroflow_plus)!: rename union to chain and restrict LHS to b…
Browse files Browse the repository at this point in the history
…e bounded (#1565)

Returning a `Stream` from `union` on unbounded streams was unsound,
since the order of outputs is not deterministic.
  • Loading branch information
shadaj authored Nov 15, 2024
1 parent 9f3c8c4 commit eb1ad3a
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 158 deletions.
42 changes: 42 additions & 0 deletions hydroflow_lang/src/graph/ops/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::graph::PortIndexValue;

use super::{
DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1
};

/// > 2 input streams of the same type, 1 output stream of the same type
///
/// Chains together a pair of streams, with all the elements of the first emitted before the second.
///
/// Since `chain` has multiple input streams, it needs to be assigned to
/// a variable to reference its multiple input ports across statements.
///
/// ```hydroflow
/// source_iter(vec!["hello", "world"]) -> [0]my_chain;
/// source_iter(vec!["stay", "gold"]) -> [1]my_chain;
/// my_chain = chain()
/// -> map(|x| x.to_uppercase())
/// -> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]);
/// ```
pub const CHAIN: OperatorConstraints = OperatorConstraints {
name: "chain",
categories: &[OperatorCategory::MultiIn],
persistence_args: RANGE_0,
type_args: RANGE_0,
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |idx| match idx {
PortIndexValue::Int(idx) if idx.value == 0 => {
Some(DelayType::Stratum)
}
_else => None,
},
write_fn: super::union::UNION.write_fn,
};
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ declare_ops![
anti_join_multiset::ANTI_JOIN_MULTISET,
assert::ASSERT,
assert_eq::ASSERT_EQ,
chain::CHAIN,
cross_join::CROSS_JOIN,
cross_join_multiset::CROSS_JOIN_MULTISET,
cross_singleton::CROSS_SINGLETON,
Expand Down
18 changes: 9 additions & 9 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub enum HfPlusNode {
Unpersist(Box<HfPlusNode>),
Delta(Box<HfPlusNode>),

Union(Box<HfPlusNode>, Box<HfPlusNode>),
Chain(Box<HfPlusNode>, Box<HfPlusNode>),
CrossProduct(Box<HfPlusNode>, Box<HfPlusNode>),
CrossSingleton(Box<HfPlusNode>, Box<HfPlusNode>),
Join(Box<HfPlusNode>, Box<HfPlusNode>),
Expand Down Expand Up @@ -457,7 +457,7 @@ impl<'a> HfPlusNode {
HfPlusNode::Unpersist(inner) => transform(inner.as_mut(), seen_tees),
HfPlusNode::Delta(inner) => transform(inner.as_mut(), seen_tees),

HfPlusNode::Union(left, right) => {
HfPlusNode::Chain(left, right) => {
transform(left.as_mut(), seen_tees);
transform(right.as_mut(), seen_tees);
}
Expand Down Expand Up @@ -678,37 +678,37 @@ impl<'a> HfPlusNode {
}
}

HfPlusNode::Union(left, right) => {
HfPlusNode::Chain(left, right) => {
let (left_ident, left_location_id) =
left.emit(graph_builders, built_tees, next_stmt_id);
let (right_ident, right_location_id) =
right.emit(graph_builders, built_tees, next_stmt_id);

assert_eq!(
left_location_id, right_location_id,
"union inputs must be in the same location"
"chain inputs must be in the same location"
);

let union_id = *next_stmt_id;
*next_stmt_id += 1;

let union_ident =
let chain_ident =
syn::Ident::new(&format!("stream_{}", union_id), Span::call_site());

let builder = graph_builders.entry(left_location_id).or_default();
builder.add_statement(parse_quote! {
#union_ident = union();
#chain_ident = chain();
});

builder.add_statement(parse_quote! {
#left_ident -> [0]#union_ident;
#left_ident -> [0]#chain_ident;
});

builder.add_statement(parse_quote! {
#right_ident -> [1]#union_ident;
#right_ident -> [1]#chain_ident;
});

(union_ident, left_location_id)
(chain_ident, left_location_id)
}

HfPlusNode::CrossSingleton(left, right) => {
Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus/src/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
if L::is_top_level() {
Optional::new(
self.location,
HfPlusNode::Persist(Box::new(HfPlusNode::Union(
HfPlusNode::Persist(Box::new(HfPlusNode::Chain(
Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))),
))),
)
} else {
Optional::new(
self.location,
HfPlusNode::Union(
HfPlusNode::Chain(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
Expand Down Expand Up @@ -288,15 +288,15 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
if L::is_top_level() {
Singleton::new(
self.location,
HfPlusNode::Persist(Box::new(HfPlusNode::Union(
HfPlusNode::Persist(Box::new(HfPlusNode::Chain(
Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))),
))),
)
} else {
Singleton::new(
self.location,
HfPlusNode::Union(
HfPlusNode::Chain(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus/src/rewrites/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ fn persist_pullup_node(
input: behind_persist,
})),

HfPlusNode::Union(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => {
HfPlusNode::Persist(Box::new(HfPlusNode::Union(left, right)))
HfPlusNode::Chain(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => {
HfPlusNode::Persist(Box::new(HfPlusNode::Chain(left, right)))
}

HfPlusNode::CrossProduct(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => {
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker>
let location_id = location.id();
Singleton::new(
location,
HfPlusNode::Union(
HfPlusNode::Chain(
Box::new(HfPlusNode::CycleSource {
ident,
location_kind: location_id,
Expand Down
24 changes: 12 additions & 12 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,6 @@ impl<'a, T, L: Location<'a>, B> Stream<T, L, B> {
)
}

pub fn union(self, other: Stream<T, L, B>) -> Stream<T, L, B> {
check_matching_location(&self.location, &other.location);

Stream::new(
self.location,
HfPlusNode::Union(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
)
}

pub fn enumerate(self) -> Stream<(usize, T), L, B> {
Stream::new(
self.location,
Expand Down Expand Up @@ -402,6 +390,18 @@ impl<'a, T, L: Location<'a>> Stream<T, L, Bounded> {
HfPlusNode::Sort(Box::new(self.ir_node.into_inner())),
)
}

pub fn chain<B2>(self, other: Stream<T, L, B2>) -> Stream<T, L, B2> {
check_matching_location(&self.location, &other.location);

Stream::new(
self.location,
HfPlusNode::Chain(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
)
}
}

impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> {
Expand Down
15 changes: 9 additions & 6 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
recommit_after_leader_election(proposers, p_relevant_p1bs, p_ballot_num.clone(), f);

let p_log_to_recommit = p_log_to_try_commit
.union(p_log_holes)
.chain(p_log_holes)
.continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader.

let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload(
Expand Down Expand Up @@ -226,9 +226,12 @@ fn p_max_ballot<'a>(
p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
p_to_proposers_i_am_leader: Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
) -> Singleton<Ballot, Cluster<'a, Proposer>, Unbounded> {
let ballot_batcher = proposers.tick();
p_received_p1b_ballots
.union(p_received_p2b_ballots)
.union(p_to_proposers_i_am_leader)
.tick_batch(&ballot_batcher)
.chain(p_received_p2b_ballots.tick_batch(&ballot_batcher))
.chain(p_to_proposers_i_am_leader.tick_batch(&ballot_batcher))
.all_ticks()
.max()
.unwrap_or(proposers.singleton(q!(Ballot {
num: 0,
Expand Down Expand Up @@ -624,7 +627,7 @@ fn p_p2a<'a, P: PaxosPayload>(
}));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads.clone())
.chain(p_indexed_payloads.clone())
.continue_if(p_is_leader.clone())
.all_ticks()
.broadcast_bincode_interleaved(acceptors);
Expand Down Expand Up @@ -694,7 +697,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
}
));
let a_log = a_p2as_to_place_in_log
.union(a_new_checkpoint.into_stream())
.chain(a_new_checkpoint.into_stream())
.persist()
.fold(
q!(|| (None, HashMap::new())),
Expand Down Expand Up @@ -753,7 +756,7 @@ fn p_p2b<'a, P: PaxosPayload>(
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposer_tick.cycle();
let p_p2b = a_to_proposers_p2b
.tick_batch(proposer_tick)
.union(p_persisted_p2bs);
.chain(p_persisted_p2bs);
let p_count_matching_p2bs = p_p2b
.clone()
.filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot {
Expand Down
10 changes: 5 additions & 5 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ fn bench_client<'a>(
replica_payload.key,
sender
)))
.union(c_pending_quorum_payloads);
.chain(c_pending_quorum_payloads);
let c_received_quorum_payloads = c_received_payloads
.clone()
.fold_keyed(
Expand Down Expand Up @@ -152,7 +152,7 @@ fn bench_client<'a>(
)));
c_to_proposers_complete_cycle.complete(
c_new_payloads_when_leader_elected
.union(c_new_payloads_when_committed)
.chain(c_new_payloads_when_committed)
.all_ticks(),
);

Expand All @@ -169,8 +169,8 @@ fn bench_client<'a>(
.map(q!(|key| (key as usize, SystemTime::now())));
let c_new_timers = c_timers
.clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
.union(c_new_timers_when_leader_elected)
.union(c_updated_timers.clone())
.chain(c_new_timers_when_leader_elected)
.chain(c_updated_timers.clone())
.reduce_keyed(q!(|curr_time, new_time| {
if new_time > *curr_time {
*curr_time = new_time;
Expand All @@ -190,7 +190,7 @@ fn bench_client<'a>(
.map(q!(|(_virtual_id, (prev_time, curr_time))| Some(
curr_time.duration_since(prev_time).unwrap().as_micros()
)))
.union(c_latency_reset.into_stream())
.chain(c_latency_reset.into_stream())
.all_ticks()
.flatten()
.fold(
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/cluster/paxos_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub fn replica<'a, K: KvKey, V: KvValue>(
// p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload)));
let r_sorted_payloads = p_to_replicas
.tick_batch(&replica_tick)
.union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet
.chain(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet
.sort();
// Create a cycle since we'll use this seq before we define it
let (r_highest_seq_complete_cycle, r_highest_seq) =
Expand Down
Loading

0 comments on commit eb1ad3a

Please sign in to comment.