Skip to content

Commit

Permalink
fix(hydroflow_plus): restrict lifetime parameters to be actually inva…
Browse files Browse the repository at this point in the history
…riant (#1559)

Our lifetimes were accidentally made covariant when the lifetime `'a`
was removed from the process/cluster tag type. This fixes that typing
hole, and also loosens some restrictions on the lifetime of deploy
environments.
  • Loading branch information
shadaj authored Nov 9, 2024
1 parent ee2d4c8 commit f6989ba
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ insta = "1.39"
hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0" }
async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] }
ctor = "0.2.8"
trybuild = "1"
9 changes: 5 additions & 4 deletions hydroflow_plus/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use super::deploy::{DeployFlow, DeployResult};
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::staging_util::Invariant;

pub struct BuiltFlow<'a> {
pub(super) ir: Vec<HfPlusLeaf>,
pub(super) processes: Vec<usize>,
pub(super) clusters: Vec<usize>,
pub(super) used: bool,

pub(super) _phantom: PhantomData<&'a mut &'a ()>,
pub(super) _phantom: Invariant<'a>,
}

impl Drop for BuiltFlow<'_> {
Expand Down Expand Up @@ -119,15 +120,15 @@ impl<'a> BuiltFlow<'a> {
self.into_deploy().with_cluster(cluster, spec)
}

pub fn compile<D: Deploy<'a> + 'a>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
self.into_deploy::<D>().compile(env)
}

pub fn compile_no_network<D: LocalDeploy<'a> + 'a>(self) -> CompiledFlow<'a, D::GraphId> {
pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
self.into_deploy::<D>().compile_no_network()
}

pub fn deploy<D: Deploy<'a, CompileEnv = ()> + 'a>(
pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
self,
env: &mut D::InstantiateEnv,
) -> DeployResult<'a, D> {
Expand Down
6 changes: 4 additions & 2 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;

use crate::staging_util::Invariant;

pub struct CompiledFlow<'a, ID> {
pub(super) hydroflow_ir: BTreeMap<usize, HydroflowGraph>,
pub(super) extra_stmts: BTreeMap<usize, Vec<syn::Stmt>>,
pub(super) _phantom: PhantomData<&'a mut &'a ID>,
pub(super) _phantom: Invariant<'a, ID>,
}

impl<ID> CompiledFlow<'_, ID> {
Expand Down Expand Up @@ -111,7 +113,7 @@ impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlow<'a, ()> {

pub struct CompiledFlowWithId<'a> {
tokens: TokenStream,
_phantom: PhantomData<&'a mut &'a ()>,
_phantom: Invariant<'a>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlowWithId<'a> {}
Expand Down
3 changes: 2 additions & 1 deletion hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::location::external_process::{
ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
};
use crate::location::{ExternalProcess, Location, LocationId};
use crate::staging_util::Invariant;
use crate::{Cluster, ClusterSpec, Deploy, Process, ProcessSpec};

pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
Expand All @@ -27,7 +28,7 @@ pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
pub(super) clusters: HashMap<usize, D::Cluster>,
pub(super) used: bool,

pub(super) _phantom: PhantomData<&'a mut &'a D>,
pub(super) _phantom: Invariant<'a, D>,
}

impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> {
Expand Down
9 changes: 5 additions & 4 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use stageleft::*;
use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::staging_util::Invariant;
use crate::{ClusterSpec, Deploy, RuntimeContext};

pub mod built;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub struct FlowBuilder<'a> {
/// capture more data that it is allowed to; 'a is generated at the
/// entrypoint of the staged code and we keep it invariant here
/// to enforce the appropriate constraints
_phantom: PhantomData<&'a mut &'a ()>,
_phantom: Invariant<'a>,
}

impl Drop for FlowBuilder<'_> {
Expand Down Expand Up @@ -186,15 +187,15 @@ impl<'a> FlowBuilder<'a> {
self.with_default_optimize().with_cluster(cluster, spec)
}

pub fn compile<D: Deploy<'a> + 'a>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
self.with_default_optimize::<D>().compile(env)
}

pub fn compile_no_network<D: LocalDeploy<'a> + 'a>(self) -> CompiledFlow<'a, D::GraphId> {
pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
self.with_default_optimize::<D>().compile_no_network()
}

pub fn deploy<D: Deploy<'a, CompileEnv = ()> + 'a>(
pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
self,
env: &mut D::InstantiateEnv,
) -> DeployResult<'a, D> {
Expand Down
7 changes: 3 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::marker::PhantomData;

use crate::location::Location;
use crate::staging_util::Invariant;

pub enum ForwardRefMarker {}
pub enum TickCycleMarker {}
Expand Down Expand Up @@ -31,7 +30,7 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {
Expand All @@ -43,7 +42,7 @@ impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {

pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> {
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub enum HfPlusLeaf {
}

impl HfPlusLeaf {
pub fn compile_network<'a, D: Deploy<'a> + 'a>(
pub fn compile_network<'a, D: Deploy<'a>>(
self,
compile_env: &D::CompileEnv,
seen_tees: &mut SeenTees,
Expand Down Expand Up @@ -355,7 +355,7 @@ pub enum HfPlusNode {
pub type SeenTees = HashMap<*const RefCell<HfPlusNode>, Rc<RefCell<HfPlusNode>>>;

impl<'a> HfPlusNode {
pub fn compile_network<D: Deploy<'a> + 'a>(
pub fn compile_network<D: Deploy<'a>>(
&mut self,
compile_env: &D::CompileEnv,
seen_tees: &mut SeenTees,
Expand Down Expand Up @@ -1192,7 +1192,7 @@ impl<'a> HfPlusNode {
}

#[expect(clippy::too_many_arguments, reason = "networking internals")]
fn instantiate_network<'a, D: Deploy<'a> + 'a>(
fn instantiate_network<'a, D: Deploy<'a>>(
from_location: &mut LocationId,
from_key: Option<usize>,
to_location: &mut LocationId,
Expand Down
12 changes: 6 additions & 6 deletions hydroflow_plus/src/location/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ use stageleft::{quote_type, Quoted};

use super::{Location, LocationId};
use crate::builder::FlowState;
use crate::staging_util::get_this_crate;
use crate::staging_util::{get_this_crate, Invariant};

pub struct Cluster<'a, C> {
pub(crate) id: usize,
pub(crate) flow_state: FlowState,
pub(crate) _phantom: PhantomData<&'a &'a mut C>,
pub(crate) _phantom: Invariant<'a, C>,
}

impl<'a, C> Cluster<'a, C> {
pub fn self_id(&self) -> impl Quoted<'a, ClusterId<C>> + Copy + 'a {
pub fn self_id(&self) -> impl Quoted<'a, ClusterId<C>> + Copy {
ClusterSelfId {
id: self.id,
_phantom: PhantomData,
}
}

pub fn members(&self) -> impl Quoted<'a, &'a Vec<ClusterId<C>>> + Copy + 'a {
pub fn members(&self) -> impl Quoted<'a, &'a Vec<ClusterId<C>>> + Copy {
ClusterIds {
id: self.id,
_phantom: PhantomData,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<C> ClusterId<C> {

pub struct ClusterIds<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
_phantom: Invariant<'a, C>,
}

impl<C> Clone for ClusterIds<'_, C> {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl<'a, C> Quoted<'a, &'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {}

pub struct ClusterSelfId<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
pub(crate) _phantom: Invariant<'a, C>,
}

impl<C> Clone for ClusterSelfId<'_, C> {
Expand Down
3 changes: 2 additions & 1 deletion hydroflow_plus/src/location/external_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::Serialize;
use super::{Location, LocationId, NoTick};
use crate::builder::FlowState;
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::staging_util::Invariant;
use crate::{Stream, Unbounded};

pub struct ExternalBytesPort {
Expand All @@ -31,7 +32,7 @@ pub struct ExternalProcess<'a, P> {

pub(crate) flow_state: FlowState,

pub(crate) _phantom: PhantomData<&'a &'a mut P>,
pub(crate) _phantom: Invariant<'a, P>,
}

impl<P> Clone for ExternalProcess<'_, P> {
Expand Down
3 changes: 2 additions & 1 deletion hydroflow_plus/src/location/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::marker::PhantomData;

use super::{Location, LocationId};
use crate::builder::FlowState;
use crate::staging_util::Invariant;

pub struct Process<'a, P = ()> {
pub(crate) id: usize,
pub(crate) flow_state: FlowState,
pub(crate) _phantom: PhantomData<&'a &'a mut P>,
pub(crate) _phantom: Invariant<'a, P>,
}

impl<P> Clone for Process<'_, P> {
Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus/src/runtime_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;

use crate::staging_util::Invariant;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
_phantom: Invariant<'a>,
}

impl RuntimeContext<'_> {
Expand Down
4 changes: 4 additions & 0 deletions hydroflow_plus/src/staging_util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::marker::PhantomData;

use proc_macro2::{Span, TokenStream};
use quote::quote;

pub type Invariant<'a, D = ()> = PhantomData<(fn(&'a ()) -> &'a (), D)>;

pub fn get_this_crate() -> TokenStream {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
Expand Down
10 changes: 5 additions & 5 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
self.send_bincode::<Process<'a, P2>, T>(other)
}

pub fn decouple_cluster<C2, Tag>(
pub fn decouple_cluster<C2: 'a, Tag>(
self,
other: &Cluster<'a, C2>,
) -> Stream<T, Cluster<'a, C2>, Unbounded>
Expand Down Expand Up @@ -772,7 +772,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
}

pub fn broadcast_bincode<C2>(
pub fn broadcast_bincode<C2: 'a>(
self,
other: &Cluster<'a, C2>,
) -> Stream<L::Out<T>, Cluster<'a, C2>, Unbounded>
Expand All @@ -789,7 +789,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
.send_bincode(other)
}

pub fn broadcast_bincode_interleaved<C2, Tag>(
pub fn broadcast_bincode_interleaved<C2: 'a, Tag>(
self,
other: &Cluster<'a, C2>,
) -> Stream<T, Cluster<'a, C2>, Unbounded>
Expand All @@ -800,7 +800,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
self.broadcast_bincode(other).map(q!(|(_, b)| b))
}

pub fn broadcast_bytes<C2>(
pub fn broadcast_bytes<C2: 'a>(
self,
other: &Cluster<'a, C2>,
) -> Stream<L::Out<Bytes>, Cluster<'a, C2>, Unbounded>
Expand All @@ -817,7 +817,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B> {
.send_bytes(other)
}

pub fn broadcast_bytes_interleaved<C2, Tag>(
pub fn broadcast_bytes_interleaved<C2: 'a, Tag>(
self,
other: &Cluster<'a, C2>,
) -> Stream<Bytes, Cluster<'a, C2>, Unbounded>
Expand Down
10 changes: 10 additions & 0 deletions hydroflow_plus/tests/compile-fail/send_bincode_lifetime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow_plus::*;

struct P1 {}
struct P2 {}

fn test<'a, 'b>(p1: &Process<'a, P1>, p2: &Process<'b, P2>) {
p1.source_iter(q!(0..10)).send_bincode(p2).for_each(q!(|n| println!("{}", n)));
}

fn main() {}
31 changes: 31 additions & 0 deletions hydroflow_plus/tests/compile-fail/send_bincode_lifetime.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
error: lifetime may not live long enough
--> tests/compile-fail/send_bincode_lifetime.rs:7:5
|
6 | fn test<'a, 'b>(p1: &Process<'a, P1>, p2: &Process<'b, P2>) {
| -- -- lifetime `'b` defined here
| |
| lifetime `'a` defined here
7 | p1.source_iter(q!(0..10)).send_bincode(p2).for_each(q!(|n| println!("{}", n)));
| ^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'a` must outlive `'b`
|
= help: consider adding the following bound: `'a: 'b`
= note: requirement occurs because of the type `hydroflow_plus::Process<'_, P1>`, which makes the generic argument `'_` invariant
= note: the struct `hydroflow_plus::Process<'a, P>` is invariant over the parameter `'a`
= help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance

error: lifetime may not live long enough
--> tests/compile-fail/send_bincode_lifetime.rs:7:5
|
6 | fn test<'a, 'b>(p1: &Process<'a, P1>, p2: &Process<'b, P2>) {
| -- -- lifetime `'b` defined here
| |
| lifetime `'a` defined here
7 | p1.source_iter(q!(0..10)).send_bincode(p2).for_each(q!(|n| println!("{}", n)));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'b` must outlive `'a`
|
= help: consider adding the following bound: `'b: 'a`
= note: requirement occurs because of the type `hydroflow_plus::Process<'_, P2>`, which makes the generic argument `'_` invariant
= note: the struct `hydroflow_plus::Process<'a, P>` is invariant over the parameter `'a`
= help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance

help: `'a` and `'b` must be the same: replace one with the other
5 changes: 5 additions & 0 deletions hydroflow_plus/tests/compile_fail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[test]
fn test_all() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/compile-fail/*.rs");
}
9 changes: 6 additions & 3 deletions hydroflow_plus_test/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ async fn main() {
};

let builder = hydroflow_plus::FlowBuilder::new();
let (external_process, external_port, p1, p2) =
hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder);
let external = builder.external_process();
let p1 = builder.process();
let p2 = builder.process();
let external_port =
hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&external, &p1, &p2);
let nodes = builder
.with_process(
&p1,
Expand All @@ -51,7 +54,7 @@ async fn main() {
&p2,
TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags),
)
.with_external(&external_process, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);

deployment.deploy().await.unwrap();
Expand Down
Loading

0 comments on commit f6989ba

Please sign in to comment.