Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: don't re-export all of hydroflow (#1562)
Browse files Browse the repository at this point in the history
Reduces namespace pollution when wildcard-importing `hydroflow_plus`.
  • Loading branch information
shadaj authored Nov 12, 2024
1 parent 2c17d65 commit 9f3c8c4
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 95 deletions.
8 changes: 4 additions & 4 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ impl<'a> CompiledFlow<'a, usize> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
quote! { #ident::hydroflow }
}
};

Expand Down Expand Up @@ -89,10 +89,10 @@ impl<'a> FreeVariable<Hydroflow<'a>> for CompiledFlow<'a, ()> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
quote! { #ident::hydroflow }
}
};

Expand Down
8 changes: 5 additions & 3 deletions hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ use stageleft::Quoted;

use super::built::build_inner;
use super::compiled::CompiledFlow;
use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy, Node, RegisterPort};
use crate::deploy::{
ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy, Node, ProcessSpec,
RegisterPort,
};
use crate::ir::HfPlusLeaf;
use crate::location::external_process::{
ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
};
use crate::location::{ExternalProcess, Location, LocationId};
use crate::location::{Cluster, ExternalProcess, Location, LocationId, Process};
use crate::staging_util::Invariant;
use crate::{Cluster, ClusterSpec, Deploy, Process, ProcessSpec};

pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
pub(super) ir: Vec<HfPlusLeaf>,
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use compiled::CompiledFlow;
use deploy::{DeployFlow, DeployResult};
use stageleft::*;

use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::staging_util::Invariant;
use crate::{ClusterSpec, Deploy, RuntimeContext};
use crate::RuntimeContext;

pub mod built;
pub mod compiled;
Expand Down
25 changes: 11 additions & 14 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use hydro_deploy::hydroflow_crate::ports::{
use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow::futures::StreamExt;
use hydroflow::bytes::Bytes;
use hydroflow::futures::{Sink, SinkExt, Stream, StreamExt};
use hydroflow::lang::graph::HydroflowGraph;
use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
Expand All @@ -27,8 +29,6 @@ use trybuild_internals_api::path;
use super::deploy_runtime::*;
use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;

pub struct HydroDeploy {}

Expand Down Expand Up @@ -542,49 +542,46 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
fn as_bytes_sink(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>> + 'a
{
) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a {
let port = self.raw_port(key);
async move {
let sink = port.connect().await.into_sink();
sink as Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = Error>>>
sink as Pin<Box<dyn Sink<Bytes, Error = Error>>>
}
}

fn as_bincode_sink<T: Serialize + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Sink<T, Error = Error>>>> + 'a {
) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
let port = self.raw_port(key);
async move {
let sink = port.connect().await.into_sink();
Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
as Pin<Box<dyn crate::futures::Sink<T, Error = Error>>>
as Pin<Box<dyn Sink<T, Error = Error>>>
}
}

fn as_bytes_source(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Stream<Item = crate::bytes::Bytes>>>> + 'a
{
) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
let port = self.raw_port(key);
async move {
let source = port.connect().await.into_source();
Box::pin(source.map(|r| r.unwrap().freeze()))
as Pin<Box<dyn crate::futures::Stream<Item = crate::bytes::Bytes>>>
Box::pin(source.map(|r| r.unwrap().freeze())) as Pin<Box<dyn Stream<Item = Bytes>>>
}
}

fn as_bincode_source<T: DeserializeOwned + 'static>(
&self,
key: usize,
) -> impl Future<Output = Pin<Box<dyn crate::futures::Stream<Item = T>>>> + 'a {
) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
let port = self.raw_port(key);
async move {
let source = port.connect().await.into_source();
Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
as Pin<Box<dyn crate::futures::Stream<Item = T>>>
as Pin<Box<dyn Stream<Item = T>>>
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;

use hydroflow::bytes::Bytes;
use hydroflow::futures::{Sink, Stream};
use hydroflow::util::deploy::DeployPorts;
use hydroflow_lang::graph::HydroflowGraph;
use stageleft::{Quoted, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::lang::graph::HydroflowGraph;

pub struct DeployRuntime {}

Expand Down Expand Up @@ -197,9 +200,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
fn as_bytes_sink(
&self,
_key: usize,
) -> impl std::future::Future<
Output = Pin<Box<dyn crate::futures::Sink<crate::bytes::Bytes, Error = std::io::Error>>>,
> + 'a {
) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>> + 'a {
async { panic!() }
}

Expand All @@ -210,9 +211,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
fn as_bincode_sink<T: serde::Serialize + 'static>(
&self,
_key: usize,
) -> impl std::future::Future<
Output = Pin<Box<dyn crate::futures::Sink<T, Error = std::io::Error>>>,
> + 'a {
) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a {
async { panic!() }
}

Expand All @@ -223,9 +222,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
fn as_bytes_source(
&self,
_key: usize,
) -> impl std::future::Future<
Output = Pin<Box<dyn hydroflow::futures::Stream<Item = hydroflow::bytes::Bytes>>>,
> + 'a {
) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
async { panic!() }
}

Expand All @@ -236,8 +233,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
fn as_bincode_source<T: serde::de::DeserializeOwned + 'static>(
&self,
_key: usize,
) -> impl std::future::Future<Output = Pin<Box<dyn hydroflow::futures::Stream<Item = T>>>> + 'a
{
) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
async { panic!() }
}
}
Expand Down
19 changes: 11 additions & 8 deletions hydroflow_plus/src/deploy/trybuild.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::fs;
use std::path::PathBuf;

use hydroflow_lang::graph::{partition_graph, HydroflowGraph};
use stageleft::internal::quote;
use trybuild_internals_api::cargo::{self, Metadata};
use trybuild_internals_api::env::Update;
use trybuild_internals_api::run::{PathDependency, Project};
use trybuild_internals_api::{dependencies, features, path, Runner};

use crate::lang::graph::{partition_graph, HydroflowGraph};

pub static IS_TEST: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);

pub fn init_test() {
Expand All @@ -19,26 +18,30 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec<syn::Stmt>
let partitioned_graph = partition_graph(graph).expect("Failed to partition (cycle detected).");

let mut diagnostics = Vec::new();
let tokens =
partitioned_graph.as_code(&quote! { hydroflow_plus }, true, quote!(), &mut diagnostics);
let tokens = partitioned_graph.as_code(
&quote! { hydroflow_plus::hydroflow },
true,
quote!(),
&mut diagnostics,
);

let source_ast: syn::File = syn::parse_quote! {
#![feature(box_patterns)]
#![allow(unused_crate_dependencies, missing_docs)]
#![allow(unused_imports, unused_crate_dependencies, missing_docs)]
use hydroflow_plus::*;

#[allow(unused)]
fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::util::deploy::DeployPorts<hydroflow_plus::deploy::HydroflowPlusMeta>) -> hydroflow_plus::Hydroflow<'a> {
fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::hydroflow::util::deploy::DeployPorts<hydroflow_plus::deploy::HydroflowPlusMeta>) -> hydroflow_plus::Hydroflow<'a> {
#(#extra_stmts)*
#tokens
}

#[tokio::main]
async fn main() {
let ports = hydroflow_plus::util::deploy::init_no_ack_start().await;
let ports = hydroflow_plus::hydroflow::util::deploy::init_no_ack_start().await;
let flow = __hfplus_runtime(&ports);
println!("ack start");
hydroflow_plus::util::deploy::launch_flow(flow).await;
hydroflow_plus::hydroflow::util::deploy::launch_flow(flow).await;
}
};
source_ast
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

stageleft::stageleft_no_entry_crate!();

pub use hydroflow;
pub use hydroflow::scheduled::graph::Hydroflow;
pub use hydroflow::*;
pub use stageleft::*;

pub mod runtime_support {
Expand All @@ -26,7 +26,6 @@ pub mod location;
pub use location::{Cluster, ClusterId, Location, Process, Tick};

pub mod deploy;
pub use deploy::{ClusterSpec, Deploy, ProcessSpec};

pub mod cycle;

Expand Down
21 changes: 10 additions & 11 deletions hydroflow_plus_test_local/src/local/chat_app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use hydroflow::tokio::sync::mpsc::UnboundedSender;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::deploy::SingleProcessGraph;
use hydroflow_plus::tokio::sync::mpsc::UnboundedSender;
use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::*;
use stageleft::{q, Quoted, RuntimeData};

#[stageleft::entry]
pub fn chat_app<'a>(
Expand Down Expand Up @@ -44,14 +43,14 @@ pub fn chat_app<'a>(
#[stageleft::runtime]
#[cfg(test)]
mod tests {
use hydroflow_plus::assert_graphvis_snapshots;
use hydroflow_plus::util::collect_ready;
use hydroflow::assert_graphvis_snapshots;
use hydroflow::util::collect_ready;

#[test]
fn test_chat_app_no_replay() {
let (users_send, users) = hydroflow_plus::util::unbounded_channel();
let (messages_send, messages) = hydroflow_plus::util::unbounded_channel();
let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel();
let (users_send, users) = hydroflow::util::unbounded_channel();
let (messages_send, messages) = hydroflow::util::unbounded_channel();
let (out, mut out_recv) = hydroflow::util::unbounded_channel();

let mut chat_server = super::chat_app!(users, messages, &out, false);
assert_graphvis_snapshots!(chat_server);
Expand Down Expand Up @@ -92,9 +91,9 @@ mod tests {

#[test]
fn test_chat_app_replay() {
let (users_send, users) = hydroflow_plus::util::unbounded_channel();
let (messages_send, messages) = hydroflow_plus::util::unbounded_channel();
let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel();
let (users_send, users) = hydroflow::util::unbounded_channel();
let (messages_send, messages) = hydroflow::util::unbounded_channel();
let (out, mut out_recv) = hydroflow::util::unbounded_channel();

let mut chat_server = super::chat_app!(users, messages, &out, true);
assert_graphvis_snapshots!(chat_server);
Expand Down
13 changes: 6 additions & 7 deletions hydroflow_plus_test_local/src/local/count_elems.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use hydroflow::tokio::sync::mpsc::UnboundedSender;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::deploy::SingleProcessGraph;
use hydroflow_plus::tokio::sync::mpsc::UnboundedSender;
use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::*;
use stageleft::{q, Quoted, RuntimeData};

pub fn count_elems_generic<'a, T: 'a>(
flow: FlowBuilder<'a>,
Expand Down Expand Up @@ -38,13 +37,13 @@ pub fn count_elems<'a>(
#[stageleft::runtime]
#[cfg(test)]
mod tests {
use hydroflow_plus::assert_graphvis_snapshots;
use hydroflow_plus::util::collect_ready;
use hydroflow::assert_graphvis_snapshots;
use hydroflow::util::collect_ready;

#[test]
pub fn test_count() {
let (in_send, input) = hydroflow_plus::util::unbounded_channel();
let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel();
let (in_send, input) = hydroflow::util::unbounded_channel();
let (out, mut out_recv) = hydroflow::util::unbounded_channel();

let mut count = super::count_elems!(input, &out);
assert_graphvis_snapshots!(count);
Expand Down
14 changes: 7 additions & 7 deletions hydroflow_plus_test_local/src/local/graph_reachability.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use hydroflow::tokio::sync::mpsc::UnboundedSender;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::deploy::SingleProcessGraph;
use hydroflow_plus::tokio::sync::mpsc::UnboundedSender;
use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow_plus::*;

#[stageleft::entry]
Expand Down Expand Up @@ -35,14 +35,14 @@ pub fn graph_reachability<'a>(
#[stageleft::runtime]
#[cfg(test)]
mod tests {
use hydroflow_plus::assert_graphvis_snapshots;
use hydroflow_plus::util::collect_ready;
use hydroflow::assert_graphvis_snapshots;
use hydroflow::util::collect_ready;

#[test]
pub fn test_reachability() {
let (roots_send, roots) = hydroflow_plus::util::unbounded_channel();
let (edges_send, edges) = hydroflow_plus::util::unbounded_channel();
let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel();
let (roots_send, roots) = hydroflow::util::unbounded_channel();
let (edges_send, edges) = hydroflow::util::unbounded_channel();
let (out, mut out_recv) = hydroflow::util::unbounded_channel();

let mut reachability = super::graph_reachability!(roots, edges, &out);
assert_graphvis_snapshots!(reachability);
Expand Down
Loading

0 comments on commit 9f3c8c4

Please sign in to comment.