diff --git a/hydroflow_plus/src/builder/compiled.rs b/hydroflow_plus/src/builder/compiled.rs index 59698115f911..1db322926722 100644 --- a/hydroflow_plus/src/builder/compiled.rs +++ b/hydroflow_plus/src/builder/compiled.rs @@ -31,10 +31,10 @@ impl<'a> CompiledFlow<'a, usize> { let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") .expect("hydroflow_plus should be present in `Cargo.toml`"); let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, proc_macro_crate::FoundCrate::Name(name) => { let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); - quote! { #ident } + quote! { #ident::hydroflow } } }; @@ -89,10 +89,10 @@ impl<'a> FreeVariable> for CompiledFlow<'a, ()> { let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") .expect("hydroflow_plus should be present in `Cargo.toml`"); let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, proc_macro_crate::FoundCrate::Name(name) => { let ident = syn::Ident::new(&name, proc_macro2::Span::call_site()); - quote! { #ident } + quote! { #ident::hydroflow } } }; diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index bb6690575305..b44936ac2a6a 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -12,14 +12,16 @@ use stageleft::Quoted; use super::built::build_inner; use super::compiled::CompiledFlow; -use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy, Node, RegisterPort}; +use crate::deploy::{ + ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy, Node, ProcessSpec, + RegisterPort, +}; use crate::ir::HfPlusLeaf; use crate::location::external_process::{ ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort, }; -use crate::location::{ExternalProcess, Location, LocationId}; +use crate::location::{Cluster, ExternalProcess, Location, LocationId, Process}; use crate::staging_util::Invariant; -use crate::{Cluster, ClusterSpec, Deploy, Process, ProcessSpec}; pub struct DeployFlow<'a, D: LocalDeploy<'a>> { pub(super) ir: Vec, diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index dd2330fed21d..a28357e1ac22 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -7,11 +7,11 @@ use compiled::CompiledFlow; use deploy::{DeployFlow, DeployResult}; use stageleft::*; -use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy}; +use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; use crate::staging_util::Invariant; -use crate::{ClusterSpec, Deploy, RuntimeContext}; +use crate::RuntimeContext; pub mod built; pub mod compiled; diff --git a/hydroflow_plus/src/deploy/deploy_graph.rs b/hydroflow_plus/src/deploy/deploy_graph.rs index ba5a91ee4a76..44be80c17f58 100644 --- a/hydroflow_plus/src/deploy/deploy_graph.rs +++ b/hydroflow_plus/src/deploy/deploy_graph.rs @@ -13,7 +13,9 @@ use hydro_deploy::hydroflow_crate::ports::{ use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions; use hydro_deploy::hydroflow_crate::HydroflowCrateService; use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate}; -use hydroflow::futures::StreamExt; +use hydroflow::bytes::Bytes; +use hydroflow::futures::{Sink, SinkExt, Stream, StreamExt}; +use hydroflow::lang::graph::HydroflowGraph; use hydroflow::util::deploy::{ConnectedSink, ConnectedSource}; use nameof::name_of; use serde::de::DeserializeOwned; @@ -27,8 +29,6 @@ use trybuild_internals_api::path; use super::deploy_runtime::*; use super::trybuild::{compile_graph_trybuild, create_trybuild}; use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort}; -use crate::futures::SinkExt; -use crate::lang::graph::HydroflowGraph; pub struct HydroDeploy {} @@ -542,49 +542,46 @@ impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal { fn as_bytes_sink( &self, key: usize, - ) -> impl Future>>> + 'a - { + ) -> impl Future>>> + 'a { let port = self.raw_port(key); async move { let sink = port.connect().await.into_sink(); - sink as Pin>> + sink as Pin>> } } fn as_bincode_sink( &self, key: usize, - ) -> impl Future>>> + 'a { + ) -> impl Future>>> + 'a { let port = self.raw_port(key); async move { let sink = port.connect().await.into_sink(); Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) })) - as Pin>> + as Pin>> } } fn as_bytes_source( &self, key: usize, - ) -> impl Future>>> + 'a - { + ) -> impl Future>>> + 'a { let port = self.raw_port(key); async move { let source = port.connect().await.into_source(); - Box::pin(source.map(|r| r.unwrap().freeze())) - as Pin>> + Box::pin(source.map(|r| r.unwrap().freeze())) as Pin>> } } fn as_bincode_source( &self, key: usize, - ) -> impl Future>>> + 'a { + ) -> impl Future>>> + 'a { let port = self.raw_port(key); async move { let source = port.connect().await.into_source(); Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap())) - as Pin>> + as Pin>> } } } diff --git a/hydroflow_plus/src/deploy/macro_runtime.rs b/hydroflow_plus/src/deploy/macro_runtime.rs index a565b20fb561..5ff3c2533177 100644 --- a/hydroflow_plus/src/deploy/macro_runtime.rs +++ b/hydroflow_plus/src/deploy/macro_runtime.rs @@ -1,13 +1,16 @@ use std::cell::RefCell; +use std::future::Future; use std::pin::Pin; use std::rc::Rc; +use hydroflow::bytes::Bytes; +use hydroflow::futures::{Sink, Stream}; use hydroflow::util::deploy::DeployPorts; +use hydroflow_lang::graph::HydroflowGraph; use stageleft::{Quoted, RuntimeData}; use super::HydroflowPlusMeta; use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; -use crate::lang::graph::HydroflowGraph; pub struct DeployRuntime {} @@ -197,9 +200,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { fn as_bytes_sink( &self, _key: usize, - ) -> impl std::future::Future< - Output = Pin>>, - > + 'a { + ) -> impl Future>>> + 'a { async { panic!() } } @@ -210,9 +211,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { fn as_bincode_sink( &self, _key: usize, - ) -> impl std::future::Future< - Output = Pin>>, - > + 'a { + ) -> impl Future>>> + 'a { async { panic!() } } @@ -223,9 +222,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { fn as_bytes_source( &self, _key: usize, - ) -> impl std::future::Future< - Output = Pin>>, - > + 'a { + ) -> impl Future>>> + 'a { async { panic!() } } @@ -236,8 +233,7 @@ impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { fn as_bincode_source( &self, _key: usize, - ) -> impl std::future::Future>>> + 'a - { + ) -> impl Future>>> + 'a { async { panic!() } } } diff --git a/hydroflow_plus/src/deploy/trybuild.rs b/hydroflow_plus/src/deploy/trybuild.rs index c9ed72535ff8..a517659579e3 100644 --- a/hydroflow_plus/src/deploy/trybuild.rs +++ b/hydroflow_plus/src/deploy/trybuild.rs @@ -1,14 +1,13 @@ use std::fs; use std::path::PathBuf; +use hydroflow_lang::graph::{partition_graph, HydroflowGraph}; use stageleft::internal::quote; use trybuild_internals_api::cargo::{self, Metadata}; use trybuild_internals_api::env::Update; use trybuild_internals_api::run::{PathDependency, Project}; use trybuild_internals_api::{dependencies, features, path, Runner}; -use crate::lang::graph::{partition_graph, HydroflowGraph}; - pub static IS_TEST: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); pub fn init_test() { @@ -19,26 +18,30 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec let partitioned_graph = partition_graph(graph).expect("Failed to partition (cycle detected)."); let mut diagnostics = Vec::new(); - let tokens = - partitioned_graph.as_code("e! { hydroflow_plus }, true, quote!(), &mut diagnostics); + let tokens = partitioned_graph.as_code( + "e! { hydroflow_plus::hydroflow }, + true, + quote!(), + &mut diagnostics, + ); let source_ast: syn::File = syn::parse_quote! { #![feature(box_patterns)] - #![allow(unused_crate_dependencies, missing_docs)] + #![allow(unused_imports, unused_crate_dependencies, missing_docs)] use hydroflow_plus::*; #[allow(unused)] - fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::util::deploy::DeployPorts) -> hydroflow_plus::Hydroflow<'a> { + fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::hydroflow::util::deploy::DeployPorts) -> hydroflow_plus::Hydroflow<'a> { #(#extra_stmts)* #tokens } #[tokio::main] async fn main() { - let ports = hydroflow_plus::util::deploy::init_no_ack_start().await; + let ports = hydroflow_plus::hydroflow::util::deploy::init_no_ack_start().await; let flow = __hfplus_runtime(&ports); println!("ack start"); - hydroflow_plus::util::deploy::launch_flow(flow).await; + hydroflow_plus::hydroflow::util::deploy::launch_flow(flow).await; } }; source_ast diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index b71cc879b8e1..c4aaeb977f5e 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -2,8 +2,8 @@ stageleft::stageleft_no_entry_crate!(); +pub use hydroflow; pub use hydroflow::scheduled::graph::Hydroflow; -pub use hydroflow::*; pub use stageleft::*; pub mod runtime_support { @@ -26,7 +26,6 @@ pub mod location; pub use location::{Cluster, ClusterId, Location, Process, Tick}; pub mod deploy; -pub use deploy::{ClusterSpec, Deploy, ProcessSpec}; pub mod cycle; diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index 2c1d82f9ce3f..566b5effbe03 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -1,8 +1,7 @@ +use hydroflow::tokio::sync::mpsc::UnboundedSender; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::deploy::SingleProcessGraph; -use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; -use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::*; -use stageleft::{q, Quoted, RuntimeData}; #[stageleft::entry] pub fn chat_app<'a>( @@ -44,14 +43,14 @@ pub fn chat_app<'a>( #[stageleft::runtime] #[cfg(test)] mod tests { - use hydroflow_plus::assert_graphvis_snapshots; - use hydroflow_plus::util::collect_ready; + use hydroflow::assert_graphvis_snapshots; + use hydroflow::util::collect_ready; #[test] fn test_chat_app_no_replay() { - let (users_send, users) = hydroflow_plus::util::unbounded_channel(); - let (messages_send, messages) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (users_send, users) = hydroflow::util::unbounded_channel(); + let (messages_send, messages) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut chat_server = super::chat_app!(users, messages, &out, false); assert_graphvis_snapshots!(chat_server); @@ -92,9 +91,9 @@ mod tests { #[test] fn test_chat_app_replay() { - let (users_send, users) = hydroflow_plus::util::unbounded_channel(); - let (messages_send, messages) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (users_send, users) = hydroflow::util::unbounded_channel(); + let (messages_send, messages) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut chat_server = super::chat_app!(users, messages, &out, true); assert_graphvis_snapshots!(chat_server); diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index 275c956d7e52..8e25dedf0db3 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -1,8 +1,7 @@ +use hydroflow::tokio::sync::mpsc::UnboundedSender; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::deploy::SingleProcessGraph; -use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; -use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::*; -use stageleft::{q, Quoted, RuntimeData}; pub fn count_elems_generic<'a, T: 'a>( flow: FlowBuilder<'a>, @@ -38,13 +37,13 @@ pub fn count_elems<'a>( #[stageleft::runtime] #[cfg(test)] mod tests { - use hydroflow_plus::assert_graphvis_snapshots; - use hydroflow_plus::util::collect_ready; + use hydroflow::assert_graphvis_snapshots; + use hydroflow::util::collect_ready; #[test] pub fn test_count() { - let (in_send, input) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (in_send, input) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut count = super::count_elems!(input, &out); assert_graphvis_snapshots!(count); diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index b03aefb230e2..d88a42fa3e62 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -1,6 +1,6 @@ +use hydroflow::tokio::sync::mpsc::UnboundedSender; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::deploy::SingleProcessGraph; -use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; -use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::*; #[stageleft::entry] @@ -35,14 +35,14 @@ pub fn graph_reachability<'a>( #[stageleft::runtime] #[cfg(test)] mod tests { - use hydroflow_plus::assert_graphvis_snapshots; - use hydroflow_plus::util::collect_ready; + use hydroflow::assert_graphvis_snapshots; + use hydroflow::util::collect_ready; #[test] pub fn test_reachability() { - let (roots_send, roots) = hydroflow_plus::util::unbounded_channel(); - let (edges_send, edges) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (roots_send, roots) = hydroflow::util::unbounded_channel(); + let (edges_send, edges) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut reachability = super::graph_reachability!(roots, edges, &out); assert_graphvis_snapshots!(reachability); diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index b771dd760353..a0924f70a93b 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -1,7 +1,6 @@ +use hydroflow::tokio::sync::mpsc::UnboundedSender; use hydroflow_plus::deploy::SingleProcessGraph; -use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; use hydroflow_plus::*; -use stageleft::{q, Quoted, RuntimeData}; #[stageleft::entry] pub fn test_difference<'a>( @@ -63,12 +62,12 @@ pub fn test_anti_join<'a>( #[stageleft::runtime] #[cfg(test)] mod tests { - use hydroflow_plus::assert_graphvis_snapshots; - use hydroflow_plus::util::collect_ready; + use hydroflow::assert_graphvis_snapshots; + use hydroflow::util::collect_ready; #[test] fn test_difference_tick_tick() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_difference!(&out, false, false); assert_graphvis_snapshots!(flow); @@ -84,7 +83,7 @@ mod tests { #[test] fn test_difference_tick_static() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_difference!(&out, false, true); assert_graphvis_snapshots!(flow); @@ -100,7 +99,7 @@ mod tests { #[test] fn test_difference_static_tick() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_difference!(&out, true, false); assert_graphvis_snapshots!(flow); @@ -119,7 +118,7 @@ mod tests { #[test] fn test_difference_static_static() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_difference!(&out, true, true); assert_graphvis_snapshots!(flow); @@ -135,7 +134,7 @@ mod tests { #[test] fn test_anti_join_tick_tick() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_anti_join!(&out, false, false); assert_graphvis_snapshots!(flow); @@ -151,7 +150,7 @@ mod tests { #[test] fn test_anti_join_tick_static() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_anti_join!(&out, false, true); assert_graphvis_snapshots!(flow); @@ -167,7 +166,7 @@ mod tests { #[test] fn test_anti_join_static_tick() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_anti_join!(&out, true, false); assert_graphvis_snapshots!(flow); @@ -186,7 +185,7 @@ mod tests { #[test] fn test_anti_join_static_static() { - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut flow = super::test_anti_join!(&out, true, true); assert_graphvis_snapshots!(flow); diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 3771e481a7e7..36e575f4109e 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -1,9 +1,8 @@ +use hydroflow::futures::stream::Stream; +use hydroflow::tokio::sync::mpsc::UnboundedSender; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::deploy::MultiGraph; -use hydroflow_plus::futures::stream::Stream; -use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; -use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::*; -use stageleft::{q, Quoted, RuntimeData}; struct N0 {} struct N1 {} @@ -48,13 +47,13 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( #[stageleft::runtime] #[cfg(test)] mod tests { - use hydroflow_plus::assert_graphvis_snapshots; - use hydroflow_plus::util::collect_ready; + use hydroflow::assert_graphvis_snapshots; + use hydroflow::util::collect_ready; #[test] fn test_teed_join() { - let (in_send, input) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (in_send, input) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut joined = super::teed_join!(input, &out, false, 0); assert_graphvis_snapshots!(joined); @@ -71,8 +70,8 @@ mod tests { #[test] fn test_teed_join_twice() { - let (in_send, input) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (in_send, input) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut joined = super::teed_join!(input, &out, true, 0); assert_graphvis_snapshots!(joined); @@ -89,8 +88,8 @@ mod tests { #[test] fn test_teed_join_multi_node() { - let (_, input) = hydroflow_plus::util::unbounded_channel(); - let (out, mut out_recv) = hydroflow_plus::util::unbounded_channel(); + let (_, input) = hydroflow::util::unbounded_channel(); + let (out, mut out_recv) = hydroflow::util::unbounded_channel(); let mut joined = super::teed_join!(input, &out, true, 1); assert_graphvis_snapshots!(joined); diff --git a/template/hydroflow_plus/src/first_ten_distributed.rs b/template/hydroflow_plus/src/first_ten_distributed.rs index 960526dc6a43..c325b6973191 100644 --- a/template/hydroflow_plus/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/src/first_ten_distributed.rs @@ -12,7 +12,7 @@ pub fn first_ten_distributed<'a>(p1: &Process<'a, P1>, p2: &Process<'a, P2>) { mod tests { use hydro_deploy::Deployment; use hydroflow_plus::deploy::DeployCrateWrapper; - use hydroflow_plus::futures::StreamExt; + use hydroflow_plus::hydroflow::futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; #[tokio::test]