Skip to content

Commit

Permalink
feat(hydroflow_plus): add utility to dedup tees when debugging IR (#1491
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shadaj authored Oct 8, 2024
1 parent edd8649 commit 98a21e3
Show file tree
Hide file tree
Showing 8 changed files with 1,102 additions and 7,723 deletions.
78 changes: 67 additions & 11 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::panic;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::ops::Deref;
use std::rc::Rc;

Expand Down Expand Up @@ -36,7 +37,7 @@ impl ToTokens for DebugExpr {
}
}

impl std::fmt::Debug for DebugExpr {
impl Debug for DebugExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_token_stream())
}
Expand All @@ -47,7 +48,7 @@ pub enum DebugInstantiate {
Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
}

impl std::fmt::Debug for DebugInstantiate {
impl Debug for DebugInstantiate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<network instantiate>")
}
Expand All @@ -56,7 +57,7 @@ impl std::fmt::Debug for DebugInstantiate {
#[derive(Clone)]
pub struct DebugPipelineFn(pub Rc<dyn Fn() -> Pipeline + 'static>);

impl std::fmt::Debug for DebugPipelineFn {
impl Debug for DebugPipelineFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<function>")
}
Expand Down Expand Up @@ -208,6 +209,59 @@ impl HfPlusLeaf {
}
}

type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HfPlusNode>, usize>)>>;
thread_local! {
static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
}

pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
PRINTED_TEES.with(|printed_tees| {
let mut printed_tees_mut = printed_tees.borrow_mut();
*printed_tees_mut = Some((0, HashMap::new()));
drop(printed_tees_mut);

let ret = f();

let mut printed_tees_mut = printed_tees.borrow_mut();
*printed_tees_mut = None;

ret
})
}

pub struct TeeNode(pub Rc<RefCell<HfPlusNode>>);

impl Debug for TeeNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
PRINTED_TEES.with(|printed_tees| {
let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
let printed_tees_mut = printed_tees_mut_borrow.as_mut();

if let Some(printed_tees_mut) = printed_tees_mut {
if let Some(existing) = printed_tees_mut
.1
.get(&(self.0.as_ref() as *const RefCell<HfPlusNode>))
{
write!(f, "<tee {}>", existing)
} else {
let next_id = printed_tees_mut.0;
printed_tees_mut.0 += 1;
printed_tees_mut
.1
.insert(self.0.as_ref() as *const RefCell<HfPlusNode>, next_id);
drop(printed_tees_mut_borrow);
write!(f, "<tee {}>: ", next_id)?;
Debug::fmt(&self.0.borrow(), f)
}
} else {
drop(printed_tees_mut_borrow);
write!(f, "<tee>: ")?;
Debug::fmt(&self.0.borrow(), f)
}
})
}
}

/// An intermediate node in a Hydroflow+ graph, which consumes data
/// from upstream nodes and emits data to downstream nodes.
#[derive(Debug)]
Expand All @@ -225,7 +279,7 @@ pub enum HfPlusNode {
},

Tee {
inner: Rc<RefCell<HfPlusNode>>,
inner: TeeNode,
},

Persist(Box<HfPlusNode>),
Expand Down Expand Up @@ -383,19 +437,19 @@ impl<'a> HfPlusNode {

HfPlusNode::Tee { inner } => {
if let Some(transformed) =
seen_tees.get(&(inner.as_ref() as *const RefCell<HfPlusNode>))
seen_tees.get(&(inner.0.as_ref() as *const RefCell<HfPlusNode>))
{
*inner = transformed.clone();
*inner = TeeNode(transformed.clone());
} else {
let transformed_cell = Rc::new(RefCell::new(HfPlusNode::Placeholder));
seen_tees.insert(
inner.as_ref() as *const RefCell<HfPlusNode>,
inner.0.as_ref() as *const RefCell<HfPlusNode>,
transformed_cell.clone(),
);
let mut orig = inner.replace(HfPlusNode::Placeholder);
let mut orig = inner.0.replace(HfPlusNode::Placeholder);
transform(&mut orig, seen_tees);
*transformed_cell.borrow_mut() = orig;
*inner = transformed_cell;
*inner = TeeNode(transformed_cell);
}
}

Expand Down Expand Up @@ -598,11 +652,13 @@ impl<'a> HfPlusNode {
}

HfPlusNode::Tee { inner } => {
if let Some(ret) = built_tees.get(&(inner.as_ref() as *const RefCell<HfPlusNode>)) {
if let Some(ret) = built_tees.get(&(inner.0.as_ref() as *const RefCell<HfPlusNode>))
{
ret.clone()
} else {
let (inner_ident, inner_location_id) =
inner
.0
.borrow()
.emit(graph_builders, built_tees, next_stmt_id);

Expand All @@ -618,7 +674,7 @@ impl<'a> HfPlusNode {
});

built_tees.insert(
inner.as_ref() as *const RefCell<HfPlusNode>,
inner.0.as_ref() as *const RefCell<HfPlusNode>,
(tee_ident.clone(), inner_location_id),
);

Expand Down
14 changes: 7 additions & 7 deletions hydroflow_plus/src/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ fn persist_pullup_node(
HfPlusNode::Delta(box HfPlusNode::Persist(box behind_persist)) => behind_persist,

HfPlusNode::Tee { inner } => {
if persist_pulled_tees.contains(&(inner.as_ref() as *const RefCell<HfPlusNode>)) {
if persist_pulled_tees.contains(&(inner.0.as_ref() as *const RefCell<HfPlusNode>)) {
HfPlusNode::Persist(Box::new(HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}))
} else if matches!(*inner.borrow(), HfPlusNode::Persist(_)) {
persist_pulled_tees.insert(inner.as_ref() as *const RefCell<HfPlusNode>);
} else if matches!(*inner.0.borrow(), HfPlusNode::Persist(_)) {
persist_pulled_tees.insert(inner.0.as_ref() as *const RefCell<HfPlusNode>);
if let HfPlusNode::Persist(box behind_persist) =
inner.replace(HfPlusNode::Placeholder)
inner.0.replace(HfPlusNode::Placeholder)
{
*inner.borrow_mut() = behind_persist;
*inner.0.borrow_mut() = behind_persist;
} else {
unreachable!()
}

HfPlusNode::Persist(Box::new(HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}))
} else {
HfPlusNode::Tee { inner }
Expand Down
10 changes: 5 additions & 5 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowState;
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, CycleComplete};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{Location, LocationId};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::Stream;
Expand Down Expand Up @@ -168,7 +168,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Singleton<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -177,7 +177,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Singleton<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down Expand Up @@ -465,7 +465,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Optional<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -474,7 +474,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Optional<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ expression: optimized.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
inner: <tee>: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
},
},
Expand All @@ -29,15 +27,13 @@ expression: optimized.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
inner: <tee>: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
inner: <tee>: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
),
},
),
},
},
),
Expand All @@ -38,22 +36,20 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
inner: <tee>: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
),
},
),
},
},
),
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use syn::parse_quote;
use super::staging_util::get_this_crate;
use crate::builder::{self, FlowState};
use crate::cycle::{CycleCollection, CycleComplete};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{
CanSend, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, LocationId,
};
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Stream<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -134,7 +134,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Stream<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ mod tests {
let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1);
let built = builder.with_default_optimize();

insta::assert_debug_snapshot!(built.ir());
hydroflow_plus::ir::dbg_dedup_tee(|| {
insta::assert_debug_snapshot!(built.ir());
});

let _ = built.compile::<DeployRuntime>(&RuntimeData::new("FAKE"));
}
Expand Down
Loading

0 comments on commit 98a21e3

Please sign in to comment.