Skip to content

Commit

Permalink
feat(hydroflow_plus, stageleft)!: allow cluster self ID to be referen…
Browse files Browse the repository at this point in the history
…ced as a global constant

This eliminates the need to store `cluster.self_id()` in a local variable first, instead you can directly reference `CLUSTER_SELF_ID`.
  • Loading branch information
shadaj committed Nov 19, 2024
1 parent 2cfbdcd commit 478cc9e
Show file tree
Hide file tree
Showing 85 changed files with 1,038 additions and 693 deletions.
25 changes: 16 additions & 9 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow_lang::graph::{partition_graph, HydroflowGraph};
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;
use stageleft::runtime_support::FreeVariableWithContext;
use stageleft::QuotedWithContext;

use crate::staging_util::Invariant;

Expand All @@ -27,7 +27,10 @@ impl<ID> CompiledFlow<'_, ID> {
}

impl<'a> CompiledFlow<'a, usize> {
pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> CompiledFlowWithId<'a> {
pub fn with_dynamic_id(
self,
id: impl QuotedWithContext<'a, usize, ()>,
) -> CompiledFlowWithId<'a> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -82,10 +85,12 @@ impl<'a> CompiledFlow<'a, usize> {
}
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlow<'a, ()> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlow<'a, ()> {}

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlow<'a, ()> {
fn to_tokens(mut self) -> (Option<TokenStream>, Option<TokenStream>) {
impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlow<'a, ()> {
type O = Hydroflow<'a>;

fn to_tokens(mut self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
Expand Down Expand Up @@ -116,10 +121,12 @@ pub struct CompiledFlowWithId<'a> {
_phantom: Invariant<'a>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for CompiledFlowWithId<'a> {}
impl<'a, Ctx> QuotedWithContext<'a, Hydroflow<'a>, Ctx> for CompiledFlowWithId<'a> {}

impl<'a, Ctx> FreeVariableWithContext<Ctx> for CompiledFlowWithId<'a> {
type O = Hydroflow<'a>;

impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlowWithId<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
fn to_tokens(self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(self.tokens))
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hydroflow::futures::{Sink, Stream};
use proc_macro2::Span;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

use super::built::build_inner;
use super::compiled::CompiledFlow;
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sha2::{Digest, Sha256};
use stageleft::{Quoted, RuntimeData};
use stageleft::{QuotedWithContext, RuntimeData};
use syn::visit_mut::VisitMut;
use tokio::sync::RwLock;
use trybuild_internals_api::path;
Expand Down Expand Up @@ -373,14 +373,14 @@ impl<'a> Deploy<'a> for HydroDeploy {
fn cluster_ids(
_env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
cluster_members(
RuntimeData::new("__hydroflow_plus_trybuild_cli"),
of_cluster,
)
}

fn cluster_self_id(_env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
cluster_self_id(RuntimeData::new("__hydroflow_plus_trybuild_cli"))
}
}
Expand Down
26 changes: 13 additions & 13 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};
use stageleft::{q, QuotedWithContext, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand All @@ -16,13 +16,13 @@ pub struct HydroflowPlusMeta {
pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
of_cluster: usize,
) -> impl Quoted<&Vec<u32>> + Copy {
) -> impl QuotedWithContext<&Vec<u32>, ()> + Copy {
q!(cli.meta.clusters.get(&of_cluster).unwrap())
}

pub fn cluster_self_id(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
) -> impl Quoted<u32> + Copy {
) -> impl QuotedWithContext<u32, ()> + Copy {
q!(cli
.meta
.cluster_id
Expand All @@ -41,15 +41,15 @@ pub fn deploy_o2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -66,15 +66,15 @@ pub fn deploy_o2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -91,15 +91,15 @@ pub fn deploy_m2o(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(p2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -116,15 +116,15 @@ pub fn deploy_m2m(
.connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
{
q!({
env.port(c2_port)
.connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
},
)
}
Expand All @@ -139,7 +139,7 @@ pub fn deploy_e2o(
.connect_local_blocking::<ConnectedDirect>()
.into_source()
})
.splice_untyped()
.splice_untyped_ctx(&())
}

pub fn deploy_o2e(
Expand All @@ -152,5 +152,5 @@ pub fn deploy_o2e(
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
})
.splice_untyped()
.splice_untyped_ctx(&())
}
6 changes: 3 additions & 3 deletions hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hydroflow::bytes::Bytes;
use hydroflow::futures::{Sink, Stream};
use hydroflow::util::deploy::DeployPorts;
use hydroflow_lang::graph::HydroflowGraph;
use stageleft::{Quoted, RuntimeData};
use stageleft::{QuotedWithContext, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
Expand Down Expand Up @@ -170,11 +170,11 @@ impl<'a> Deploy<'a> for DeployRuntime {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
super::deploy_runtime::cluster_members(*env, of_cluster)
}

fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
super::deploy_runtime::cluster_self_id(*env)
}
}
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hydroflow::futures::{Sink, Stream};
use hydroflow_lang::graph::HydroflowGraph;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;
use stageleft::QuotedWithContext;

#[cfg(feature = "deploy_runtime")]
pub mod macro_runtime;
Expand Down Expand Up @@ -171,8 +171,8 @@ pub trait Deploy<'a> {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a;
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
}

impl<
Expand Down
5 changes: 3 additions & 2 deletions hydroflow_plus/src/deploy/trybuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use trybuild_internals_api::env::Update;
use trybuild_internals_api::run::{PathDependency, Project};
use trybuild_internals_api::{dependencies, features, path, Runner};

pub static IS_TEST: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
pub(crate) static IS_TEST: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);

pub fn init_test() {
IS_TEST.store(true, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -27,7 +28,7 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec<syn::Stmt>

let source_ast: syn::File = syn::parse_quote! {
#![feature(box_patterns)]
#![allow(unused_imports, unused_crate_dependencies, missing_docs)]
#![allow(unused_imports, unused_crate_dependencies, missing_docs, non_snake_case)]
use hydroflow_plus::*;

#[allow(unused)]
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod optional;
pub use optional::Optional;

pub mod location;
pub use location::cluster::CLUSTER_SELF_ID;
pub use location::{Cluster, ClusterId, Location, Process, Tick};

pub mod deploy;
Expand Down
Loading

0 comments on commit 478cc9e

Please sign in to comment.