From 5b1a221dde7c754cd0155634d3988e1322b9c64d Mon Sep 17 00:00:00 2001 From: Jarrod Overson Date: Tue, 12 Sep 2023 12:29:54 -0400 Subject: [PATCH] refactor: removed mutexes in PacketStream, made Invocation state error-proof --- .../wick-http-client/src/component.rs | 11 +- crates/components/wick-sql/src/component.rs | 37 +-- .../flow-graph-interpreter/src/interpreter.rs | 28 +- .../src/interpreter/channel.rs | 14 +- .../src/interpreter/components/component.rs | 8 +- .../src/interpreter/components/core.rs | 8 +- .../interpreter/components/core/collect.rs | 5 +- .../src/interpreter/components/core/merge.rs | 3 +- .../src/interpreter/components/core/pluck.rs | 44 ++-- .../src/interpreter/components/core/switch.rs | 11 +- .../src/interpreter/components/internal.rs | 6 +- .../src/interpreter/components/null.rs | 4 +- .../interpreter/components/self_component.rs | 4 +- .../src/interpreter/event_loop.rs | 7 +- .../src/interpreter/event_loop/state.rs | 8 +- .../src/interpreter/executor.rs | 13 +- .../src/interpreter/executor/context.rs | 78 +++--- .../interpreter/executor/context/operation.rs | 35 ++- .../tests/test/observer.rs | 2 +- .../tests/test/test_component.rs | 8 +- .../wick/wick-component-wasm/src/component.rs | 2 +- .../wick/wick-component-wasm/src/wasm_host.rs | 13 +- crates/wick/wick-host/src/component_host.rs | 4 +- .../src/invocation_server.rs | 6 +- crates/wick/wick-packet/src/invocation.rs | 246 +++++++++++++----- crates/wick/wick-packet/src/lib.rs | 2 +- crates/wick/wick-packet/src/packet.rs | 10 +- crates/wick/wick-packet/src/packet_stream.rs | 51 ++-- crates/wick/wick-rpc/src/client.rs | 22 +- crates/wick/wick-rpc/src/lib.rs | 2 +- crates/wick/wick-rpc/src/types/conversions.rs | 27 +- .../src/components/component_service.rs | 35 +-- .../src/components/scope_component.rs | 9 +- crates/wick/wick-runtime/src/runtime.rs | 2 +- crates/wick/wick-runtime/src/runtime/scope.rs | 2 +- crates/wick/wick-stdlib/src/lib.rs | 2 +- crates/wick/wick-stdlib/src/macros.rs | 5 +- crates/wick/wick-test/tests/done_tolerance.rs | 4 +- integration-tests/integration/src/test.rs | 10 +- 39 files changed, 436 insertions(+), 352 deletions(-) diff --git a/crates/components/wick-http-client/src/component.rs b/crates/components/wick-http-client/src/component.rs index b4465929..1db1b100 100644 --- a/crates/components/wick-http-client/src/component.rs +++ b/crates/components/wick-http-client/src/component.rs @@ -91,7 +91,7 @@ impl Component for HttpClientComponent { let config = self.config.clone(); let baseurl = self.base.clone(); let codec = config.codec().copied(); - let opdef = get_op_by_name(&config, invocation.target.operation_id()); + let opdef = get_op_by_name(&config, invocation.target().operation_id()); let path_template = opdef .as_ref() .and_then(|op| self.path_templates.get(op.name()).cloned()); @@ -99,7 +99,7 @@ impl Component for HttpClientComponent { Box::pin(async move { let (tx, rx) = invocation.make_response(); - let span = invocation.span.clone(); + let span = invocation.span().clone(); let fut = handle( opdef, tx.clone(), @@ -134,7 +134,7 @@ fn get_op_by_name(config: &HttpClientComponentConfig, name: &str) -> Option, tx: FluxChannel, - mut invocation: Invocation, + invocation: Invocation, root_config: Option, op_config: Option, codec: Option, @@ -146,14 +146,15 @@ async fn handle( return Err(Error::InvalidBaseUrl(baseurl).into()); } let Some(opdef) = opdef else { - return Err(Error::OpNotFound(invocation.target.operation_id().to_owned()).into()); + return Err(Error::OpNotFound(invocation.target().operation_id().to_owned()).into()); }; // Defer to operation codec, then to client codec, then to default. let codec = opdef.codec().copied().unwrap_or(codec.unwrap_or_default()); let template = path_template.unwrap(); let input_list: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect(); - let mut inputs = wick_packet::StreamMap::from_stream(invocation.eject_stream(), input_list); + let (invocation, stream) = invocation.split(); + let mut inputs = wick_packet::StreamMap::from_stream(stream, input_list); let mut handles = Vec::new(); 'outer: loop { diff --git a/crates/components/wick-sql/src/component.rs b/crates/components/wick-sql/src/component.rs index bb54e048..eb5238a2 100644 --- a/crates/components/wick-sql/src/component.rs +++ b/crates/components/wick-sql/src/component.rs @@ -74,34 +74,6 @@ impl DatabaseProvider for Client { self.inner().get_connection().await } } -// pub(crate) struct Transaction<'a>(Arc); - -// impl<'a> Transaction<'a> { -// fn new(conn: Arc) -> Self { -// Self(conn) -// } -// fn end_transaction(&self) -> BoxFuture> { -// Box::pin(async move { Ok(()) }) -// } -// } - -// #[async_trait::async_trait] -// impl<'a> ClientConnection for Transaction<'a> { -// async fn query(&mut self, stmt: &str, bound_args: Vec) -> Result>, Error> { -// self.0.query(stmt, bound_args).await -// } -// async fn exec(&mut self, stmt: &str, bound_args: Vec) -> Result<(), Error> { -// self.0.exec(stmt, bound_args).await -// } - -// async fn handle_error(&mut self, e: Error, behavior: ErrorBehavior) -> Result<(), Error> { -// self.0.handle_error(e, behavior).await -// } - -// async fn finish(&mut self) -> Result<(), Error> { -// todo!() -// } -// } /// The Azure SQL Wick component. #[derive(Clone)] @@ -159,15 +131,15 @@ impl SqlComponent { impl Component for SqlComponent { fn handle( &self, - mut invocation: Invocation, + invocation: Invocation, _data: Option, // TODO: this needs to be used _callback: Arc, ) -> BoxFuture> { let client = self.provider.clone(); let opdef = self .config - .get_operation(invocation.target.operation_id()) - .ok_or_else(|| Error::MissingOperation(invocation.target.operation_id().to_owned())) + .get_operation(invocation.target().operation_id()) + .ok_or_else(|| Error::MissingOperation(invocation.target().operation_id().to_owned())) .cloned(); Box::pin(async move { @@ -175,7 +147,8 @@ impl Component for SqlComponent { let stmt = client.get_statement(opdef.name()).unwrap().to_owned(); let input_names: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect(); - let input_streams = wick_packet::split_stream(invocation.eject_stream(), input_names); + let (invocation, stream) = invocation.split(); + let input_streams = wick_packet::split_stream(stream, input_names); let (tx, rx) = invocation.make_response(); tokio::spawn(async move { let start = SystemTime::now(); diff --git a/crates/wick/flow-graph-interpreter/src/interpreter.rs b/crates/wick/flow-graph-interpreter/src/interpreter.rs index 27fecae5..aebdeb26 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter.rs @@ -239,38 +239,42 @@ impl Component for Interpreter { } hosted }; - let span = invocation.span.clone(); + let span = invocation.span().clone(); - span.in_scope(|| trace!(target=%invocation.target_url(),tx_id=%invocation.tx_id,id=%invocation.id, "invoking")); - let from_exposed = self.exposed_ops.get(invocation.target.operation_id()); + span + .in_scope(|| trace!(target=%invocation.target().url(),tx_id=%invocation.tx_id(),id=%invocation.id(), "invoking")); + let from_exposed = self.exposed_ops.get(invocation.target().operation_id()); Box::pin(async move { - let stream = match &invocation.target { + let stream = match invocation.target() { Entity::Operation(ns, _) => { if ns == SelfComponent::ID || ns == Entity::LOCAL || Some(ns) == self.namespace.as_ref() { if let Some(handler) = from_exposed { - let new_target = Entity::operation(handler.namespace(), invocation.target.operation_id()); - span.in_scope(|| trace!(origin=%invocation.origin,original_target=%invocation.target, %new_target, "invoke::exposed::operation")); - invocation.target = new_target; + let new_target = Entity::operation(handler.namespace(), invocation.target().operation_id()); + span.in_scope(|| trace!(origin=%invocation.origin(),original_target=%invocation.target(), %new_target, "invoke::exposed::operation")); + invocation = invocation.redirect(new_target); return handler.component.handle(invocation, config, cb).await; } - span - .in_scope(|| trace!(origin=%invocation.origin,target=%invocation.target, "invoke::composite::operation")); + span.in_scope( + || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::composite::operation"), + ); self .self_component .handle(invocation, config, self.get_callback()) .await? } else if let Some(handler) = self.components.get(ns) { - span.in_scope(|| trace!(origin=%invocation.origin,target=%invocation.target, "invoke::handler::operation")); + span.in_scope( + || trace!(origin=%invocation.origin(),target=%invocation.target(), "invoke::handler::operation"), + ); handler.component.handle(invocation, config, cb).await? } else { return Err(ComponentError::new(Error::TargetNotFound( - invocation.target.clone(), + invocation.target().clone(), known_targets(), ))); } } - _ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target))), + _ => return Err(ComponentError::new(Error::InvalidEntity(invocation.target().clone()))), }; Ok::<_, ComponentError>(stream) diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/channel.rs b/crates/wick/flow-graph-interpreter/src/interpreter/channel.rs index 38c14d4e..bb7677d5 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/channel.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/channel.rs @@ -1,7 +1,7 @@ use flow_graph::{NodeIndex, PortReference}; use tracing::Span; use uuid::Uuid; -use wick_packet::{Invocation, PacketPayload}; +use wick_packet::{Invocation, PacketPayload, PacketStream}; pub(crate) use self::error::Error; use super::executor::error::ExecutionError; @@ -45,7 +45,7 @@ impl Event { #[allow(clippy::exhaustive_enums)] pub enum EventKind { Ping(usize), - ExecutionStart(Box), + ExecutionStart(Box, PacketStream), ExecutionDone, PortData(PortReference), Invocation(NodeIndex, Box), @@ -57,7 +57,7 @@ impl EventKind { pub(crate) const fn name(&self) -> &str { match self { EventKind::Ping(_) => "ping", - EventKind::ExecutionStart(_) => "exec_start", + EventKind::ExecutionStart(_, _) => "exec_start", EventKind::ExecutionDone => "exec_done", EventKind::PortData(_) => "port_data", EventKind::Invocation(_, _) => "invocation", @@ -166,8 +166,12 @@ impl InterpreterDispatchChannel { self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone())); } - pub(crate) fn dispatch_start(&self, ctx: Box) { - self.dispatch(Event::new(ctx.id(), EventKind::ExecutionStart(ctx), self.span.clone())); + pub(crate) fn dispatch_start(&self, ctx: Box, stream: PacketStream) { + self.dispatch(Event::new( + ctx.id(), + EventKind::ExecutionStart(ctx, stream), + self.span.clone(), + )); } pub(crate) fn dispatch_call_complete(&self, ctx_id: Uuid, op_index: usize) { diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/component.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/component.rs index 9c2ce3e7..c8ab1521 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/component.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/component.rs @@ -41,12 +41,12 @@ impl Component for ComponentComponent { _config: Option, _callback: std::sync::Arc, ) -> BoxFuture> { - invocation.trace(|| debug!(target = %invocation.target, namespace = Self::ID)); + invocation.trace(|| debug!(target = %invocation.target(), namespace = Self::ID)); // This handler handles the components:: namespace and outputs the entity // to link to. - let target_name = invocation.target.operation_id().to_owned(); - let entity = Entity::component(invocation.target.operation_id()); + let target_name = invocation.target().operation_id().to_owned(); + let entity = Entity::component(invocation.target().operation_id()); let contains_components = self.signature.operations.iter().any(|op| op.name == target_name); let all_components: Vec<_> = self.signature.operations.iter().map(|op| op.name.clone()).collect(); @@ -63,7 +63,7 @@ impl Component for ComponentComponent { tx.send(Packet::encode( port_name, - ComponentReference::new(invocation.origin.clone(), Entity::component(entity.component_id())), + ComponentReference::new(invocation.origin().clone(), Entity::component(entity.component_id())), )) .map_err(ComponentError::new)?; diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core.rs index b7d5f4d9..d2f840af 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core.rs @@ -153,7 +153,7 @@ impl CoreComponent { macro_rules! core_op { ($type:ty, $inv:expr, $name:expr, $callback:expr, $data:ident) => {{ let config = <$type>::decode_config($data)?; - let ctx = Context::new(config, &$inv.inherent, $callback); + let ctx = Context::new(config, $inv.inherent(), $callback); $name.handle($inv, ctx).await }}; } @@ -165,17 +165,17 @@ impl Component for CoreComponent { data: Option, callback: std::sync::Arc, ) -> BoxFuture> { - invocation.trace(|| debug!(target = %invocation.target, namespace = Self::ID)); + invocation.trace(|| debug!(target = %invocation.target(), namespace = Self::ID)); let task = async move { - match invocation.target.operation_id() { + match invocation.target().operation_id() { sender::Op::ID => core_op! {sender::Op, invocation, self.sender, callback, data}, pluck::Op::ID => core_op! {pluck::Op, invocation, self.pluck, callback, data}, merge::Op::ID => core_op! {merge::Op, invocation, self.merge, callback, data}, switch::Op::ID => core_op! {switch::Op, invocation, self.switch, callback, data}, collect::Op::ID => core_op! {collect::Op, invocation, self.collect, callback, data}, _ => { - panic!("Core operation {} not handled.", invocation.target.operation_id()); + panic!("Core operation {} not handled.", invocation.target().operation_id()); } } }; diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs index 8b13d29d..2c6b2d92 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs @@ -64,7 +64,7 @@ impl Operation for Op { type Config = Config; fn handle( &self, - mut invocation: Invocation, + invocation: Invocation, context: Context, ) -> BoxFuture> { let (tx, rx) = invocation.make_response(); @@ -72,8 +72,9 @@ impl Operation for Op { tokio::spawn(async move { let mut ports: HashMap> = context.config.inputs.iter().map(|n| (n.clone(), vec![])).collect(); let mut array_levels: HashMap = HashMap::new(); + let mut stream = invocation.into_stream(); - while let Some(next) = invocation.packets.next().await { + while let Some(next) = stream.next().await { if let Err(e) = next { ports .entry(Packet::FATAL_ERROR.to_owned()) diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs index a38a209d..a2304ab8 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs @@ -72,7 +72,8 @@ impl Operation for Op { context: Context, ) -> BoxFuture> { let (tx, rx) = invocation.make_response(); - let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config)); + let stream = invocation.into_stream(); + let mut map = StreamMap::from_stream(stream, self.input_names(&context.config)); tokio::spawn(async move { while let Ok(next) = map.next_set().await { if next.is_none() { diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs index 510410b3..b9cbbf74 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs @@ -3,7 +3,7 @@ use flow_component::{ComponentError, Context, Operation, RenderConfiguration}; use futures::{FutureExt, StreamExt}; use serde_json::Value; use wick_interface_types::{operation, OperationSignature}; -use wick_packet::{Invocation, Packet, PacketStream, RuntimeConfig, StreamMap}; +use wick_packet::{Invocation, Packet, PacketStream, RuntimeConfig}; use crate::BoxFuture; pub(crate) struct Op { @@ -77,15 +77,19 @@ impl Operation for Op { invocation: Invocation, context: Context, ) -> BoxFuture> { - let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config)); - let mapped = map.take("input").map_err(ComponentError::new).map(|input| { - input - .map(move |next| { - let field = context.config.field.clone(); - next.and_then(move |packet| { - if packet.has_data() { - let obj = packet.decode_value()?; - let value = pluck(&obj, &field).map_or_else( + // let mut map = StreamMap::from_stream(invocation.packets, self.input_names(&context.config)); + let field = context.config.field.clone(); + let stream = invocation.into_stream(); + let mapped = stream.filter_map(move |next| { + let a = next.map_or_else( + |e| Some(Err(e)), + |packet| { + if packet.port() != "input" { + return None; + } + if packet.has_data() { + Some(packet.decode_value().map(|obj| { + pluck(&obj, &field).map_or_else( || { Packet::err( "output", @@ -93,18 +97,16 @@ impl Operation for Op { ) }, |value| Packet::encode("output", value), - ); - - Ok(value) - } else { - Ok(packet.set_port("output")) - } - }) - }) - .boxed() - .into() + ) + })) + } else { + Some(Ok(packet.set_port("output"))) + } + }, + ); + futures::future::ready(a) }); - async move { mapped }.boxed() + async move { Ok(PacketStream::new(mapped)) }.boxed() } fn get_signature(&self, _config: Option<&Self::Config>) -> &OperationSignature { diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs index 06b2b363..f90bb619 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs @@ -18,6 +18,7 @@ use wick_packet::{ Entity, InherentData, Invocation, + InvocationData, Packet, PacketSender, PacketStream, @@ -456,7 +457,7 @@ impl Operation for Op { #[allow(clippy::too_many_lines)] fn handle( &self, - mut invocation: Invocation, + invocation: Invocation, context: Context, ) -> BoxFuture> { let (root_tx, root_rx) = invocation.make_response(); @@ -467,8 +468,8 @@ impl Operation for Op { tokio::spawn(async move { // the substream level the condition was found at. let mut condition_level = 0; - let mut router = SwitchRouter::new(invocation.span.clone()); - let mut root_stream = invocation.eject_stream(); + let mut router = SwitchRouter::new(invocation.span().clone()); + let (invocation, mut root_stream) = invocation.split(); let input_streams: HashMap = context .config @@ -674,7 +675,7 @@ impl Operation for Op { fn new_route_handler( target: Entity, - invocation: &Invocation, + invocation: &InvocationData, inherent: InherentData, callback: Arc, op_config: Option, @@ -684,7 +685,7 @@ fn new_route_handler( let (outer_tx, outer_rx) = invocation.make_response(); let (inner_tx, inner_rx) = invocation.make_response(); - let compref = ComponentReference::new(invocation.target.clone(), target); + let compref = ComponentReference::new(invocation.target().clone(), target); span.in_scope(|| trace!(%compref,"switch:case: route handler created")); let handle = tokio::spawn(async move { diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/internal.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/internal.rs index 5300cb41..f05518a4 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/internal.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/internal.rs @@ -31,15 +31,15 @@ impl Component for InternalComponent { _config: Option, _callback: std::sync::Arc, ) -> BoxFuture> { - invocation.trace(|| debug!(target = %invocation.target, id=%invocation.id,namespace = Self::ID)); - let op = invocation.target.operation_id().to_owned(); + invocation.trace(|| debug!(target = %invocation.target(), id=%invocation.id(),namespace = Self::ID)); + let op = invocation.target().operation_id().to_owned(); let is_oneshot = op == SCHEMATIC_INPUT; let task = async move { if op == SCHEMATIC_OUTPUT { panic!("Output component should not be executed"); } else if is_oneshot { - Ok(invocation.packets) + Ok(invocation.into_stream()) } else { panic!("Internal component {} not handled.", op); } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/null.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/null.rs index c0de29d7..1e934dcc 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/null.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/null.rs @@ -39,12 +39,12 @@ impl NullComponent { impl Component for NullComponent { fn handle( &self, - mut invocation: Invocation, + invocation: Invocation, _data: Option, _callback: std::sync::Arc, ) -> BoxFuture> { spawn(async move { - let mut stream = invocation.eject_stream(); + let (invocation, mut stream) = invocation.split(); while let Some(p) = stream.next().await { match p { Err(e) => invocation.trace(|| error!("received error on dropped stream: {}", e)), diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/self_component.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/self_component.rs index 3aed0a25..b0ab2cea 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/self_component.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/self_component.rs @@ -74,9 +74,9 @@ impl Component for SelfComponent { config: Option, callback: Arc, ) -> BoxFuture> { - invocation.trace(|| debug!(target = %invocation.target, namespace = Self::ID)); + invocation.trace(|| debug!(target = %invocation.target(), namespace = Self::ID)); - let operation = invocation.target.operation_id().to_owned(); + let operation = invocation.target().operation_id().to_owned(); let fut = self .inner .schematics diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs index 6043b0c4..d7243027 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop.rs @@ -133,7 +133,12 @@ async fn event_loop( EventKind::CallComplete(data) => state.handle_call_complete(ctx_id, data).instrument(tx_span).await, EventKind::PortData(data) => state.handle_port_data(ctx_id, data, &tx_span).await, EventKind::ExecutionDone => state.handle_exec_done(ctx_id).instrument(tx_span).await, - EventKind::ExecutionStart(context) => state.handle_exec_start(*context, &options).instrument(tx_span).await, + EventKind::ExecutionStart(context, stream) => { + state + .handle_exec_start(*context, stream, &options) + .instrument(tx_span) + .await + } EventKind::Ping(ping) => { trace!(ping); Ok(()) diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs index c0c0a0fd..e9f7eedd 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use flow_graph::{PortDirection, PortReference}; use tracing::Span; use uuid::Uuid; -use wick_packet::PacketPayload; +use wick_packet::{PacketPayload, PacketStream}; use super::EventLoop; use crate::interpreter::channel::{CallComplete, InterpreterDispatchChannel}; @@ -89,9 +89,10 @@ impl State { pub(super) async fn handle_exec_start( &mut self, mut ctx: ExecutionContext, + stream: PacketStream, options: &InterpreterOptions, ) -> Result<(), ExecutionError> { - match ctx.start(options).await { + match ctx.start(options, stream).await { Ok(_) => { self.context_map.init_tx(ctx.id(), ctx); Ok(()) @@ -169,7 +170,8 @@ impl State { ); } }); - ctx.push_packets(port.node_index(), vec![packet]).await?; + let fut = ctx.push_packets(port.node_index(), vec![packet]); + fut.await?; } Ok(()) } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor.rs index 642adc31..a78d675e 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor.rs @@ -52,13 +52,15 @@ impl SchematicExecutor { callback: Arc, ) -> Result { invocation - .trace(|| debug!(operation = self.name(), origin=%invocation.origin,target=%invocation.target,"invoking")); + .trace(|| debug!(operation = self.name(), origin=%invocation.origin(),target=%invocation.target(),"invoking")); + + let (invocation, stream) = invocation.split(); let seed = Seed::unsafe_new(invocation.seed()); - let mut ctx = ExecutionContext::new( + let (ctx, output_stream) = ExecutionContext::new( self.schematic.clone(), - invocation, + &invocation, self.channel.clone(), &components, &self_component, @@ -67,8 +69,7 @@ impl SchematicExecutor { config, seed, ); - let stream = ctx.take_stream().unwrap(); - ExecutionContext::run(ctx); - Ok(stream) + ExecutionContext::run(ctx, stream); + Ok(output_stream) } } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor/context.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor/context.rs index b5a26d45..3e74a303 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor/context.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor/context.rs @@ -9,7 +9,7 @@ use parking_lot::Mutex; use seeded_random::{Random, Seed}; use uuid::Uuid; use wasmrs_rx::Observer; -use wick_packet::{Entity, Invocation, Packet, PacketError, PacketSender, PacketStream, RuntimeConfig}; +use wick_packet::{Entity, InvocationData, Packet, PacketError, PacketSender, PacketStream, RuntimeConfig}; use self::operation::{FutureInvocation, InstanceHandler}; use super::error::ExecutionError; @@ -38,9 +38,8 @@ pub(crate) enum TxState { #[must_use] pub struct ExecutionContext { schematic: Arc, - output: (Option, Option), + output: Option, channel: InterpreterDispatchChannel, - invocation: Invocation, instances: Vec>, id: Uuid, start_time: Instant, @@ -64,7 +63,7 @@ impl std::fmt::Debug for ExecutionContext { impl ExecutionContext { pub(crate) fn new( schematic: Arc, - invocation: Invocation, + invocation: &InvocationData, channel: InterpreterDispatchChannel, components: &Arc, self_component: &SelfComponent, @@ -72,7 +71,7 @@ impl ExecutionContext { root_config: Option, op_config: Option, seed: Seed, - ) -> Self { + ) -> (Self, PacketStream) { let rng = Random::from_seed(seed); let id = invocation.id; @@ -83,7 +82,7 @@ impl ExecutionContext { Arc::new(InstanceHandler::new( schematic.clone(), FutureInvocation::next( - &invocation, + invocation, Entity::operation(op_node.cref().component_id(), op_node.cref().name()), rng.gen(), ), @@ -101,14 +100,13 @@ impl ExecutionContext { let (tx, rx) = invocation.make_response(); - Self { + let this = Self { channel, options: None, - invocation, schematic, root_config, op_config, - output: (Some(tx), Some(rx)), + output: Some(tx), instances, start_time: Instant::now(), stats, @@ -117,12 +115,14 @@ impl ExecutionContext { span, finished: AtomicBool::new(false), callback, - } + }; + + (this, rx) } - pub fn run(self) { + pub fn run(self, stream: PacketStream) { let channel = self.channel.clone(); - channel.dispatch_start(Box::new(self)); + channel.dispatch_start(Box::new(self), stream); } pub fn in_scope T, T>(&self, f: F) -> T { @@ -163,7 +163,7 @@ impl ExecutionContext { outputs_done } - pub(crate) async fn start(&mut self, options: &InterpreterOptions) -> Result<()> { + pub(crate) async fn start(&mut self, options: &InterpreterOptions, incoming: PacketStream) -> Result<()> { self.stats.mark("start"); self.stats.start("execution"); self.span.in_scope(|| trace!("starting execution")); @@ -192,29 +192,12 @@ impl ExecutionContext { } } - let incoming = self.invocation.eject_stream(); - self.prime_input_ports(self.schematic.input().index(), incoming)?; self.stats.mark("start_done"); Ok(()) } - pub(crate) async fn start_instance(&self, instance: Arc) -> Result<()> { - instance - .start( - self.id(), - self.channel.clone(), - self.options.as_ref().unwrap(), - self.callback.clone(), - self.root_config.clone(), - self.op_config.clone(), - ) - .await?; - - Ok(()) - } - fn prime_input_ports(&self, index: NodeIndex, mut payloads: PacketStream) -> Result<()> { let input = self.instance(index).clone(); let channel = self.channel.clone(); @@ -250,7 +233,7 @@ impl ExecutionContext { self.span.in_scope(|| trace!("finishing execution output")); // drop our output sender; - drop(self.output.0.take()); + drop(self.output.take()); // mark our end of execution self.stats.end("execution"); @@ -265,7 +248,7 @@ impl ExecutionContext { } pub(crate) fn emit_output_message(&self, packets: Vec) -> Result<()> { - if let Some(ref output) = self.output.0 { + if let Some(ref output) = self.output { for packet in packets { output.send(packet).map_err(|_e| ExecutionError::ChannelSend)?; } @@ -290,10 +273,6 @@ impl ExecutionContext { Ok(()) } - pub(crate) fn take_stream(&mut self) -> Option { - self.output.1.take() - } - pub(crate) fn take_tx_output(&self) -> Result> { let output = self.output_handler(); output @@ -329,7 +308,16 @@ impl ExecutionContext { pub(crate) async fn push_packets(&self, index: NodeIndex, packets: Vec) -> Result<()> { let instance = self.instance(index).clone(); if !instance.has_started() { - self.start_instance(instance.clone()).await?; + InstanceHandler::start( + instance.clone(), + self.id(), + self.channel.clone(), + self.options.as_ref().unwrap(), + self.callback.clone(), + self.root_config.clone(), + self.op_config.clone(), + ) + .await?; } let _ = instance.accept_packets(packets); @@ -396,3 +384,19 @@ pub(crate) fn accept_output( instance.buffer_out(&port, payload); channel.dispatch_data(ctx_id, port); } + +#[cfg(test)] +mod test { + use anyhow::Result; + + use super::*; + + #[test] + const fn test_send_sync() -> Result<()> { + const fn assert_send() {} + const fn assert_sync() {} + assert_send::(); + assert_sync::(); + Ok(()) + } +} diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor/context/operation.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor/context/operation.rs index 0b8144cd..87762199 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor/context/operation.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor/context/operation.rs @@ -15,7 +15,7 @@ use wasmrs_rx::{FluxChannel, Observer}; use wick_packet::{ Entity, InherentData, - Invocation, + InvocationData, Packet, PacketError, PacketPayload, @@ -56,7 +56,7 @@ impl FutureInvocation { } } - pub(crate) fn next(value: &Invocation, target: Entity, seed: u64) -> Self { + pub(crate) fn next(value: &InvocationData, target: Entity, seed: u64) -> Self { let inherent = InherentData::new( seed, std::time::SystemTime::now() @@ -69,16 +69,9 @@ impl FutureInvocation { } } -impl From for Invocation { +impl From for InvocationData { fn from(value: FutureInvocation) -> Self { - Self::new_with_id( - value.tx_id, - value.origin, - value.target, - PacketStream::empty(), - value.inherent, - &value.span, - ) + Self::new_with_id(value.tx_id, value.origin, value.target, value.inherent, &value.span) } } @@ -279,7 +272,7 @@ impl InstanceHandler { let Some(invocation) = self.invocation.take() else { return Err(StateError::InvocationMissing(identifier).into()); }; - let mut invocation: Invocation = invocation.into(); + let invocation: InvocationData = invocation.into(); let span = info_span!(parent:&invocation.span,"interpreter:op:instance", otel.name=format!("starting:{}",invocation.target)); @@ -314,7 +307,7 @@ impl InstanceHandler { } else { PacketStream::new(Box::new(self.sender.take_rx().unwrap())) }; - invocation.attach_stream(stream); + let invocation = invocation.with_stream(stream); let cb = callback.clone(); let fut = if namespace == SelfComponent::ID { @@ -528,3 +521,19 @@ pub(crate) enum CompletionStatus { Timeout, Error, } + +#[cfg(test)] +mod test { + use anyhow::Result; + + use super::*; + + #[test] + const fn test_send_sync() -> Result<()> { + const fn assert_send() {} + const fn assert_sync() {} + assert_send::(); + assert_sync::(); + Ok(()) + } +} diff --git a/crates/wick/flow-graph-interpreter/tests/test/observer.rs b/crates/wick/flow-graph-interpreter/tests/test/observer.rs index f1811d5d..412c3fce 100644 --- a/crates/wick/flow-graph-interpreter/tests/test/observer.rs +++ b/crates/wick/flow-graph-interpreter/tests/test/observer.rs @@ -12,7 +12,7 @@ impl Observer for JsonWriter { let ctx_id = event.ctx_id(); let entry = match event.kind() { EventKind::Ping(_) => serde_json::Value::Null, - EventKind::ExecutionStart(tx) => { + EventKind::ExecutionStart(tx, _) => { serde_json::json!({ "type":event.name(), "index": index, diff --git a/crates/wick/flow-graph-interpreter/tests/test/test_component.rs b/crates/wick/flow-graph-interpreter/tests/test/test_component.rs index e5124b19..86d0770a 100644 --- a/crates/wick/flow-graph-interpreter/tests/test/test_component.rs +++ b/crates/wick/flow-graph-interpreter/tests/test/test_component.rs @@ -169,7 +169,7 @@ fn stream(_seed: u64) -> (Sender, PacketStream) { (sender, stream) } -fn defer(futs: Vec> + Sync + Send + 'static>) { +fn defer(futs: Vec> + Send + 'static>) { tokio::spawn(async move { let rng = Random::from_seed(Seed::unsafe_new(1)); let millis = rng.range(10, 100); @@ -188,9 +188,9 @@ impl Component for TestComponent { _config: Option, callback: Arc, ) -> BoxFuture> { - let operation = invocation.target.operation_id(); + let operation = invocation.target().operation_id(); println!("got op {} in test collection", operation); - Box::pin(async move { Ok(handler(invocation, callback)?) }) + Box::pin(async move { handler(invocation, callback) }) } fn signature(&self) -> &ComponentSignature { @@ -199,7 +199,7 @@ impl Component for TestComponent { } fn handler(invocation: Invocation, callback: Arc) -> anyhow::Result { - let mut payload_stream = invocation.packets; + let (invocation, mut payload_stream) = invocation.split(); let operation = invocation.target.operation_id().to_owned(); println!("handling {}", operation); let (mut send, stream) = stream(1); diff --git a/crates/wick/wick-component-wasm/src/component.rs b/crates/wick/wick-component-wasm/src/component.rs index 07d47eef..6634407a 100644 --- a/crates/wick/wick-component-wasm/src/component.rs +++ b/crates/wick/wick-component-wasm/src/component.rs @@ -123,7 +123,7 @@ impl Component for WasmComponent { data: Option, _callback: Arc, ) -> BoxFuture> { - invocation.trace(|| trace!(target = %invocation.target, config=?data, "wasm invoke")); + invocation.trace(|| trace!(target = %invocation.target(), config=?data, "wasm invoke")); let outputs = self.host.call(invocation, data); diff --git a/crates/wick/wick-component-wasm/src/wasm_host.rs b/crates/wick/wick-component-wasm/src/wasm_host.rs index a5998e1a..3c4d4ea5 100644 --- a/crates/wick/wick-component-wasm/src/wasm_host.rs +++ b/crates/wick/wick-component-wasm/src/wasm_host.rs @@ -23,6 +23,7 @@ use wick_packet::{ ComponentReference, ContextTransport, Entity, + InherentData, Invocation, PacketStream, RuntimeConfig, @@ -205,20 +206,20 @@ impl WasmHost { } #[allow(clippy::needless_pass_by_value)] - pub fn call(&self, mut invocation: Invocation, config: Option) -> Result { + pub fn call(&self, invocation: Invocation, config: Option) -> Result { let _span = self.span.enter(); + let (invocation, mut stream) = invocation.split(); let component_name = invocation.target.operation_id(); - let inherent = invocation.inherent; let now = Instant::now(); let ctx = self.ctx.clone(); let index = ctx .get_export("wick", component_name) .map_err(|_| crate::Error::OperationNotFound(component_name.to_owned(), ctx.get_exports()))?; + let inherent = InherentData::new(invocation.inherent.seed, invocation.inherent.timestamp); + stream.set_context(config.unwrap_or_default(), inherent); - invocation.packets.set_context(config.unwrap_or_default(), inherent); - - let s = packetstream_to_wasmrs(index, invocation.packets); - let out = ctx.request_channel(Box::pin(s)); + let wasmrs_stream = packetstream_to_wasmrs(index, stream); + let out = ctx.request_channel(Box::pin(wasmrs_stream)); trace!( component = component_name, duration_μs = ?now.elapsed().as_micros(), diff --git a/crates/wick/wick-host/src/component_host.rs b/crates/wick/wick-host/src/component_host.rs index 80ee814a..a77629ff 100644 --- a/crates/wick/wick-host/src/component_host.rs +++ b/crates/wick/wick-host/src/component_host.rs @@ -225,7 +225,7 @@ mod test { use wick_config::config::HttpConfigBuilder; use wick_config::WickConfiguration; use wick_invocation_server::connect_rpc_client; - use wick_packet::{packet_stream, packets, Entity, InherentData, Packet}; + use wick_packet::{packet_stream, packets, Entity, InherentData, InvocationData, Packet}; use super::*; use crate::{ComponentHostBuilder, Host}; @@ -309,7 +309,7 @@ mod test { println!("connected to server"); let passed_data = "logging output"; let packets = packets![("input", passed_data)]; - let invocation: wick_rpc::rpc::Invocation = Invocation::test("test", Entity::local("logger"), Vec::new(), None)? + let invocation: wick_rpc::rpc::Invocation = InvocationData::test("test", Entity::local("logger"), None)? .try_into() .unwrap(); diff --git a/crates/wick/wick-invocation-server/src/invocation_server.rs b/crates/wick/wick-invocation-server/src/invocation_server.rs index 6dc154a6..23bc6700 100644 --- a/crates/wick/wick-invocation-server/src/invocation_server.rs +++ b/crates/wick/wick-invocation-server/src/invocation_server.rs @@ -106,7 +106,7 @@ impl InvocationService for InvocationServer { let (tx, rx) = mpsc::channel(4); let mut stream = request.into_inner(); let first = stream.next().await; - let mut invocation: wick_packet::Invocation = if let Some(Ok(inv)) = first { + let invocation: wick_packet::InvocationData = if let Some(Ok(inv)) = first { if let Some(rpc::invocation_request::Data::Invocation(inv)) = inv.data { inv .try_into() @@ -119,9 +119,9 @@ impl InvocationService for InvocationServer { }; let stream = convert_invocation_stream(stream); let packet_stream = PacketStream::new(Box::new(stream)); - invocation.attach_stream(packet_stream); + let invocation = invocation.with_stream(packet_stream); - let op_id = invocation.target.operation_id().to_owned(); + let op_id = invocation.target().operation_id().to_owned(); let result = self .collection diff --git a/crates/wick/wick-packet/src/invocation.rs b/crates/wick/wick-packet/src/invocation.rs index f62f2383..6729059d 100644 --- a/crates/wick/wick-packet/src/invocation.rs +++ b/crates/wick/wick-packet/src/invocation.rs @@ -1,5 +1,4 @@ -use std::time::SystemTime; - +use parking_lot::Mutex; use tracing::{info_span, Span}; use uuid::Uuid; @@ -7,9 +6,9 @@ use crate::{Entity, InherentData, PacketSender, PacketStream}; /// A complete invocation request. #[derive(Debug)] -#[allow(clippy::exhaustive_structs)] #[must_use] -pub struct Invocation { +#[allow(clippy::exhaustive_structs)] +pub struct InvocationData { /// The entity that initiated the request. pub origin: Entity, /// The target of the invocation. @@ -22,8 +21,142 @@ pub struct Invocation { pub inherent: InherentData, /// The trace span associated with the invocation. pub span: Span, +} + +impl From for InvocationData { + fn from(value: Invocation) -> Self { + value.data + } +} + +impl InvocationData { + /// Creates an invocation with existing data. + #[doc(hidden)] + pub fn new_raw(origin: Entity, target: Entity, id: Uuid, tx_id: Uuid, inherent: InherentData, span: Span) -> Self { + Self { + origin, + target, + id, + tx_id, + inherent, + span, + } + } + + /// Creates an invocation with the passed transaction id. + pub fn new_with_id( + tx_id: Uuid, + origin: impl Into, + target: impl Into, + inherent: InherentData, + parent: &Span, + ) -> InvocationData { + let invocation_id = get_uuid(); + let target = target.into(); + let span = + info_span!(parent:parent,"invocation",otel.name=format!("invocation:{}", target),id=%invocation_id,tx_id=%tx_id); + + Self { + origin: origin.into(), + target, + id: invocation_id, + tx_id, + inherent, + span, + } + } + + pub fn with_stream(self, packets: impl Into) -> Invocation { + Invocation { + data: self, + packets: Mutex::new(packets.into()), + } + } + + /// Make response channels associated with this the invocation. + pub fn make_response(&self) -> (PacketSender, PacketStream) { + let (tx, mut rx) = PacketStream::new_channels(); + let span = info_span!(parent:&self.span,"invocation:response", otel.name=format!("invocation:response:{}", self.target), id=%self.id, target=%self.target); + + rx.set_span(span); + (tx, rx) + } + + /// Do work within this invocation's trace span. + pub fn trace T, T>(&self, f: F) -> T { + self.span.in_scope(f) + } + + /// Get the origin [Entity]. + pub const fn origin(&self) -> &Entity { + &self.origin + } + + /// Get the target [Entity]. + pub const fn target(&self) -> &Entity { + &self.target + } + + /// Returns the seed for the invocation. + #[must_use] + pub const fn seed(&self) -> u64 { + self.inherent.seed + } + + /// Returns the timestamp for the invocation. + #[must_use] + pub const fn timestamp(&self) -> u64 { + self.inherent.timestamp + } + + pub const fn inherent(&self) -> &InherentData { + &self.inherent + } + + /// Return the span associated with the [Invocation]. + #[must_use] + pub const fn span(&self) -> &Span { + &self.span + } + + /// Creates an invocation with a new transaction id. + #[cfg(feature = "test")] + pub fn test(name: &str, target: T, inherent: Option) -> Result + where + T: TryInto, + TE: std::error::Error + Send + Sync + 'static, + { + let inherent = inherent.unwrap_or_else(InherentData::unsafe_default); + let tx_id = get_uuid(); + let id = get_uuid(); + + Ok(Self { + origin: Entity::test(name), + target: target + .try_into() + .map_err(|e| crate::ParseError::Conversion(Box::new(e)))?, + inherent, + span: Span::current(), + id, + tx_id, + }) + } +} + +/// A complete invocation request. +#[derive(Debug)] +#[must_use] +pub struct Invocation { + /// Invocation metadata + data: InvocationData, /// The stream of incoming [crate::Packet]s associated with the invocation. - pub packets: PacketStream, + packets: Mutex, +} + +impl AsRef for Invocation { + fn as_ref(&self) -> &InvocationData { + &self.data + } } impl Invocation { @@ -64,13 +197,15 @@ impl Invocation { packets.set_span(span.clone()); Invocation { - origin: origin.into(), - target, - id: invocation_id, - tx_id, - inherent, - span, - packets, + data: InvocationData { + origin: origin.into(), + target, + id: invocation_id, + tx_id, + inherent, + span, + }, + packets: Mutex::new(packets), } } @@ -99,77 +234,70 @@ impl Invocation { )) } - /// Creates an invocation with a specific transaction id, to correlate a chain of - /// invocations. - pub fn next_tx(&self, origin: Entity, target: Entity) -> Invocation { - let invocation_id = get_uuid(); - let span = info_span!(parent:&self.span,"invocation",otel.name=format!("invocation:{}", target),id=%invocation_id); + #[allow(clippy::missing_const_for_fn)] + /// Redirect an invocation by changing the target. + pub fn redirect(self, target: Entity) -> Self { + Self { + data: InvocationData { target, ..self.data }, + packets: self.packets, + } + } - let mut packets: PacketStream = PacketStream::empty(); - packets.set_span(span.clone()); + #[allow(clippy::missing_const_for_fn)] + pub fn split(self) -> (InvocationData, PacketStream) { + (self.data, self.packets.into_inner()) + } - Invocation { - origin, - target, - id: invocation_id, - tx_id: self.tx_id, - inherent: InherentData { - seed: seeded_random::Seed::unsafe_new(self.inherent.seed).rng().gen(), - timestamp: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - }, - packets: PacketStream::empty(), - span, - } + #[allow(clippy::missing_const_for_fn)] + pub fn into_stream(self) -> PacketStream { + self.packets.into_inner() } - pub fn eject_stream(&mut self) -> PacketStream { - std::mem::replace(&mut self.packets, PacketStream::empty()) + pub const fn inherent(&self) -> &InherentData { + &self.data.inherent } - pub fn attach_stream(&mut self, packets: impl Into) { - let mut stream: PacketStream = packets.into(); - stream.set_span(self.span.clone()); - let _ = std::mem::replace(&mut self.packets, stream); + /// Get the origin [Entity]. + pub const fn origin(&self) -> &Entity { + &self.data.origin } - /// Get the seed associated with an invocation if it exists. - #[must_use] - pub const fn seed(&self) -> u64 { - self.inherent.seed + /// Get the target [Entity]. + pub const fn target(&self) -> &Entity { + &self.data.target } - /// Get the timestamp associated with an invocation if it exists. + /// Get the transaction id. #[must_use] - pub const fn timestamp(&self) -> u64 { - self.inherent.timestamp + pub const fn tx_id(&self) -> Uuid { + self.data.tx_id } - /// Utility function to get the target [Entity] URL. + /// Get the invocation id. #[must_use] - pub fn target_url(&self) -> String { - self.target.url() + pub const fn id(&self) -> Uuid { + self.data.id } - /// Utility function to get the origin [Entity] URL. + /// Get the [Span] associated with the invocation. #[must_use] - pub fn origin_url(&self) -> String { - self.origin.url() + pub const fn span(&self) -> &Span { + &self.data.span } /// Do work within this invocation's trace span. pub fn trace T, T>(&self, f: F) -> T { - self.span.in_scope(f) + self.data.trace(f) } - pub fn make_response(&self) -> (PacketSender, PacketStream) { - let (tx, mut rx) = PacketStream::new_channels(); - let span = info_span!(parent:&self.span,"invocation:response", otel.name=format!("invocation:response:{}", self.target), id=%self.id, target=%self.target); + #[doc(hidden)] + pub fn set_stream_context(&mut self, context: crate::RuntimeConfig, inherent: InherentData) { + let mut lock = self.packets.lock(); + lock.set_context(context, inherent); + } - rx.set_span(span); - (tx, rx) + pub fn make_response(&self) -> (PacketSender, PacketStream) { + self.data.make_response() } } diff --git a/crates/wick/wick-packet/src/lib.rs b/crates/wick/wick-packet/src/lib.rs index cec39367..ceac0eff 100644 --- a/crates/wick/wick-packet/src/lib.rs +++ b/crates/wick/wick-packet/src/lib.rs @@ -143,7 +143,7 @@ pub use entity::Entity; pub use error::{Error, ParseError}; pub use inherent::InherentData; #[cfg(feature = "invocation")] -pub use invocation::Invocation; +pub use invocation::{Invocation, InvocationData}; pub use metadata::{Flags, WickMetadata, CLOSE_BRACKET, DONE_FLAG, OPEN_BRACKET}; pub use output::{OutgoingPort, OutputIterator, Port, ValuePort}; pub use packet::{from_raw_wasmrs, from_wasmrs, packetstream_to_wasmrs, Packet, PacketError, PacketPayload}; diff --git a/crates/wick/wick-packet/src/packet.rs b/crates/wick/wick-packet/src/packet.rs index 1aadda78..74b25083 100644 --- a/crates/wick/wick-packet/src/packet.rs +++ b/crates/wick/wick-packet/src/packet.rs @@ -1,6 +1,8 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use tokio_stream::Stream; use wasmrs::{BoxFlux, Metadata, Payload, PayloadError, RawPayload}; +use wasmrs_runtime::ConditionallySend; use wick_interface_types::Type; use crate::metadata::DONE_FLAG; @@ -426,12 +428,16 @@ pub fn packetstream_to_wasmrs(index: u32, stream: PacketStream) -> BoxFlux) -> PacketStream { +pub fn from_raw_wasmrs> + ConditionallySend + Unpin + 'static>( + stream: T, +) -> PacketStream { let s = tokio_stream::StreamExt::map(stream, move |p| Ok(p.into())); PacketStream::new(Box::new(s)) } -pub fn from_wasmrs(stream: BoxFlux) -> PacketStream { +pub fn from_wasmrs> + ConditionallySend + Unpin + 'static>( + stream: T, +) -> PacketStream { let s = tokio_stream::StreamExt::map(stream, move |p| Ok(p.into())); PacketStream::new(Box::new(s)) } diff --git a/crates/wick/wick-packet/src/packet_stream.rs b/crates/wick/wick-packet/src/packet_stream.rs index edc631b0..dca3434a 100644 --- a/crates/wick/wick-packet/src/packet_stream.rs +++ b/crates/wick/wick-packet/src/packet_stream.rs @@ -12,7 +12,7 @@ pub type PacketSender = FluxChannel; type ContextConfig = (RuntimeConfig, InherentData); -pub(crate) type BoxStream<'a, T> = Pin + Send + 'a>>; +pub(crate) type BoxStream<'a, T> = Pin + Send + Sync + 'a>>; #[cfg(target_family = "wasm")] pin_project! { @@ -20,8 +20,8 @@ pin_project! { #[must_use] pub struct PacketStream { #[pin] - inner: std::sync::Arc> + Unpin>>, - config: std::sync::Arc>>, + inner: Box> + Unpin>, + config: Option, span: Span } } @@ -32,12 +32,18 @@ pin_project! { #[must_use] pub struct PacketStream { #[pin] - inner: std::sync::Arc> + Send + Unpin>>, + inner: Box> + Send + Unpin>, config: Option, span: Span } } +impl Default for PacketStream { + fn default() -> Self { + PacketStream::empty() + } +} + impl From>> for PacketStream { fn from(stream: BoxStream<'static, Result>) -> Self { Self::new(stream) @@ -54,18 +60,18 @@ impl PacketStream { #[cfg(target_family = "wasm")] pub fn new(rx: impl Stream> + Unpin + 'static) -> Self { Self { - inner: std::sync::Arc::new(parking_lot::Mutex::new(tokio_stream::StreamExt::fuse(rx))), + inner: Box::new(tokio_stream::StreamExt::fuse(rx)), config: Default::default(), span: Span::current(), } } #[cfg(not(target_family = "wasm"))] - pub fn new(rx: impl Stream> + Unpin + Send + 'static) -> Self { + pub fn new> + Unpin + Send + 'static>(rx: T) -> Self { use tokio_stream::StreamExt; Self { - inner: std::sync::Arc::new(parking_lot::Mutex::new(rx.fuse())), + inner: Box::new(rx.fuse()), config: Default::default(), span: Span::current(), } @@ -104,14 +110,14 @@ impl Stream for PacketStream { fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { let mut this = self; - #[allow(unsafe_code)] // this is the implementation of futures::pin_mut!() - let mut this = unsafe { Pin::new_unchecked(&mut this) }; + let mut this = Pin::new(&mut this); let config = this.config.take(); - let poll = { - let mut stream = this.inner.lock(); - Pin::new(&mut *stream).poll_next(cx) - }; + let poll = { Pin::new(&mut *this.inner).poll_next(cx) }; + // Backwards compatibility note: + // This is a hack added when context & operation configuration was introduced. + // Rather than send it as a beginning packet, it's added as a sidecar to an existing packet and new + // components expect it to exist in the first packet they receive. if let Some(config) = config { match poll { Poll::Ready(Some(Ok(mut packet))) => { @@ -170,22 +176,3 @@ pub fn into_packet, T: serde::Serialize>( let name = name.into(); Box::new(move |x| Ok(x.map_or_else(|e| Packet::err(&name, e.to_string()), |x| Packet::encode(&name, &x)))) } - -#[cfg(test)] -mod test { - use anyhow::Result; - - use super::*; - - const fn is_sync_send() - where - T: Send + Sync, - { - } - - #[test] - const fn test_sync_send() -> Result<()> { - is_sync_send::(); - Ok(()) - } -} diff --git a/crates/wick/wick-rpc/src/client.rs b/crates/wick/wick-rpc/src/client.rs index f6e10c0e..e8af5d21 100644 --- a/crates/wick/wick-rpc/src/client.rs +++ b/crates/wick/wick-rpc/src/client.rs @@ -142,9 +142,9 @@ impl RpcClient { } /// Send an invoke RPC command with an [Invocation] object. - pub async fn invoke(&mut self, mut invocation: Invocation) -> Result { + pub async fn invoke(&mut self, invocation: Invocation) -> Result { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let mut stream = invocation.eject_stream(); + let (invocation, mut stream) = invocation.split(); tx.send(InvocationRequest { data: Some(generated::wick::invocation_request::Data::Invocation(invocation.into())), }) @@ -164,22 +164,4 @@ impl RpcClient { .invoke_raw(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)) .await } - - // /// Make an invocation with data passed as a JSON string. - // pub async fn invoke_from_json( - // &mut self, - // origin: Entity, - // component: Entity, - // data: &str, - // transpose: bool, - // inherent_data: Option, - // ) -> Result { - // let mut payload = TransportMap::from_json_output(data).map_err(|e| RpcClientError::Sdk(e.into()))?; - // if transpose { - // payload.transpose_output_name(); - // } - // let invocation = Invocation::new(origin, component, payload, inherent_data); - - // self.invoke(invocation).await - // } } diff --git a/crates/wick/wick-rpc/src/lib.rs b/crates/wick/wick-rpc/src/lib.rs index f55e9f1e..e695e6c1 100644 --- a/crates/wick/wick-rpc/src/lib.rs +++ b/crates/wick/wick-rpc/src/lib.rs @@ -154,7 +154,7 @@ pub fn convert_tonic_streaming(mut streaming: tonic::Streaming) -> macro_rules! dispatch { ($inv:expr, {$($name:expr => $handler:path),*,}) => { { - match $inv.target.operation_id() { + match $inv.target().operation_id() { $($name => $handler($inv).await?,)* _ => { unreachable!() diff --git a/crates/wick/wick-rpc/src/types/conversions.rs b/crates/wick/wick-rpc/src/types/conversions.rs index 8e3d7c95..257c7f6c 100644 --- a/crates/wick/wick-rpc/src/types/conversions.rs +++ b/crates/wick/wick-rpc/src/types/conversions.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use std::time::Duration; use wick_interface_types as wick; -use wick_packet::{Entity, InherentData, Metadata, Packet, PacketStream, WickMetadata}; +use wick_packet::{Entity, InherentData, Metadata, Packet, WickMetadata}; use crate::error::RpcError; use crate::{rpc, DurationStatistics}; @@ -156,8 +156,8 @@ impl From for Metadata { } } -impl From for rpc::Invocation { - fn from(inv: wick_packet::Invocation) -> Self { +impl From for rpc::Invocation { + fn from(inv: wick_packet::InvocationData) -> Self { Self { origin: inv.origin.url(), target: inv.target.url(), @@ -171,20 +171,19 @@ impl From for rpc::Invocation { } } -impl TryFrom for wick_packet::Invocation { +impl TryFrom for wick_packet::InvocationData { type Error = RpcError; fn try_from(inv: rpc::Invocation) -> Result { let inherent = inv.inherent.ok_or(RpcError::NoInherentData)?; - Ok(Self { - origin: Entity::from_str(&inv.origin).map_err(|_e| RpcError::TypeConversion)?, - target: Entity::from_str(&inv.target).map_err(|_e| RpcError::TypeConversion)?, - - id: uuid::Uuid::from_str(&inv.id).map_err(|e| RpcError::UuidParseError(inv.id, e))?, - tx_id: uuid::Uuid::from_str(&inv.tx_id).map_err(|e| RpcError::UuidParseError(inv.tx_id, e))?, - inherent: InherentData::new(inherent.seed, inherent.timestamp), - span: tracing::Span::current(), - packets: PacketStream::empty(), - }) + + Ok(Self::new_raw( + Entity::from_str(&inv.origin).map_err(|_e| RpcError::TypeConversion)?, + Entity::from_str(&inv.target).map_err(|_e| RpcError::TypeConversion)?, + uuid::Uuid::from_str(&inv.id).map_err(|e| RpcError::UuidParseError(inv.id, e))?, + uuid::Uuid::from_str(&inv.tx_id).map_err(|e| RpcError::UuidParseError(inv.tx_id, e))?, + InherentData::new(inherent.seed, inherent.timestamp), + tracing::Span::current(), + )) } } diff --git a/crates/wick/wick-runtime/src/components/component_service.rs b/crates/wick/wick-runtime/src/components/component_service.rs index 057c9097..522f7bcb 100644 --- a/crates/wick/wick-runtime/src/components/component_service.rs +++ b/crates/wick/wick-runtime/src/components/component_service.rs @@ -46,9 +46,9 @@ impl InvocationHandler for NativeComponentService { invocation: Invocation, config: Option, ) -> Result>> { - let tx_id = invocation.tx_id; + let tx_id = invocation.tx_id(); - let span = info_span!(parent:&invocation.span,"runtime:handle"); + let span = info_span!(parent:invocation.span(),"runtime:handle"); let fut = self.handle(invocation, config, panic_callback()); let task = async move { @@ -60,34 +60,3 @@ impl InvocationHandler for NativeComponentService { Ok(Box::pin(task)) } } - -#[cfg(test)] -mod test { - - // use std::sync::Arc; - - // use anyhow::Result; - // use seeded_random::Seed; - - // use super::*; - // use crate::test::prelude::assert_eq; - - // #[test_logger::test(tokio::test)] - // async fn test_collection_component() -> Result<()> { - // let seed: u64 = 100000; - // let collection = NativeCollectionService::new(Arc::new(wick_stdlib::Collection::new(Seed::unsafe_new(seed)))); - - // let user_data = "This is my payload"; - - // let payload = vec![("input", user_data)].into(); - // let invocation = Invocation::new(Entity::test("test"), Entity::local("core::log"), payload, None); - // let response = collection.invoke(invocation)?.await?; - - // let mut rx = response.ok()?; - // let packets: Vec<_> = rx.collect().await; - // let p = packets.pop().unwrap().unwrap(); - // assert_eq!(p, Packet::encode("output", user_data)); - - // Ok(()) - // } -} diff --git a/crates/wick/wick-runtime/src/components/scope_component.rs b/crates/wick/wick-runtime/src/components/scope_component.rs index 24401cf1..ef1bd7e2 100644 --- a/crates/wick/wick-runtime/src/components/scope_component.rs +++ b/crates/wick/wick-runtime/src/components/scope_component.rs @@ -30,12 +30,12 @@ impl Component for ScopeComponent { config: Option, _callback: Arc, ) -> flow_component::BoxFuture> { - let target_url = invocation.target_url(); + let target_url = invocation.target().url(); invocation.trace(|| { debug!( scope_id = %self.scope_id, - target = %invocation.target, + target = %invocation.target(), "scope:invoke", ); }); @@ -44,14 +44,15 @@ impl Component for ScopeComponent { let scope = Scope::for_id(&self.scope_id) .ok_or_else(|| flow_component::ComponentError::msg(format!("scope '{}' not found", target_url)))?; - let target_component = invocation.target.component_id().to_owned(); + let target_component = invocation.target().component_id().to_owned(); if target_component != scope.namespace() { debug!( orig_target = target_component, runtime = scope.namespace(), "translating invocation target to scope namespace" ); - invocation.target = Entity::operation(scope.namespace(), invocation.target.operation_id()); + let new_target = Entity::operation(scope.namespace(), invocation.target().operation_id()); + invocation = invocation.redirect(new_target); } invocation.trace(|| trace!(target = %target_url, "invoking")); diff --git a/crates/wick/wick-runtime/src/runtime.rs b/crates/wick/wick-runtime/src/runtime.rs index 659782f6..5b7bf0a9 100644 --- a/crates/wick/wick-runtime/src/runtime.rs +++ b/crates/wick/wick-runtime/src/runtime.rs @@ -114,7 +114,7 @@ impl Runtime { } else { Err(RuntimeError::ScopeNotFound( path.map(|p| p.iter().copied().map(Into::into).collect()), - Some(invocation.target), + Some(invocation.target().clone()), )) } } diff --git a/crates/wick/wick-runtime/src/runtime/scope.rs b/crates/wick/wick-runtime/src/runtime/scope.rs index cdc191c7..e0880327 100644 --- a/crates/wick/wick-runtime/src/runtime/scope.rs +++ b/crates/wick/wick-runtime/src/runtime/scope.rs @@ -121,7 +121,7 @@ impl InvocationHandler for Scope { invocation: Invocation, config: Option, ) -> std::result::Result>, ComponentError> { - let tx_id = invocation.tx_id; + let tx_id = invocation.tx_id(); let fut = self.inner.interpreter.invoke(invocation, config); let task = async move { diff --git a/crates/wick/wick-stdlib/src/lib.rs b/crates/wick/wick-stdlib/src/lib.rs index d5a313a5..9903d663 100644 --- a/crates/wick/wick-stdlib/src/lib.rs +++ b/crates/wick/wick-stdlib/src/lib.rs @@ -200,7 +200,7 @@ impl Component for Collection { _data: Option, _callback: std::sync::Arc, ) -> flow_component::BoxFuture> { - let target = invocation.target_url(); + let target = invocation.target().url(); trace!("stdlib invoke: {}", target); Box::pin(async move { diff --git a/crates/wick/wick-stdlib/src/macros.rs b/crates/wick/wick-stdlib/src/macros.rs index f15d3346..72332a38 100644 --- a/crates/wick/wick-stdlib/src/macros.rs +++ b/crates/wick/wick-stdlib/src/macros.rs @@ -4,9 +4,10 @@ macro_rules! request_response { inputs: {$($ikey:ident => $ity:ty),* $(,)?}, output: $okey:expr, }) => { - pub(crate) async fn $name(mut invocation: wick_packet::Invocation) -> Result { + pub(crate) async fn $name( invocation: wick_packet::Invocation) -> Result { + let (_inv, mut stream) = invocation.split(); #[allow(unused_parens)] - let ($(mut $ikey),*) = fan_out!(invocation.packets, $(stringify!($ikey)),*); + let ($(mut $ikey),*) = fan_out!(stream, $(stringify!($ikey)),*); let (tx, rx) = PacketStream::new_channels(); tokio::spawn(async move { let error = loop { diff --git a/crates/wick/wick-test/tests/done_tolerance.rs b/crates/wick/wick-test/tests/done_tolerance.rs index 4882b4aa..7ca89801 100644 --- a/crates/wick/wick-test/tests/done_tolerance.rs +++ b/crates/wick/wick-test/tests/done_tolerance.rs @@ -39,12 +39,12 @@ impl TestComponent { impl Component for TestComponent { fn handle( &self, - mut invocation: Invocation, + invocation: Invocation, _data: Option, _callback: Arc, ) -> flow_component::BoxFuture> { Box::pin(async move { - let stream = invocation.eject_stream(); + let stream = invocation.into_stream(); let packets = stream.collect::>().await; let mut packets = packets .into_iter() diff --git a/integration-tests/integration/src/test.rs b/integration-tests/integration/src/test.rs index 775c1535..2972abe6 100644 --- a/integration-tests/integration/src/test.rs +++ b/integration-tests/integration/src/test.rs @@ -46,7 +46,7 @@ impl Component for NativeComponent { _data: Option, _callback: Arc, ) -> BoxFuture> { - let target = invocation.target_url(); + let target = invocation.target().url(); trace!("test collection invoke: {}", target); Box::pin(async move { let stream = dispatch!(invocation, { @@ -64,13 +64,14 @@ impl Component for NativeComponent { } async fn error(_input: Invocation) -> Result { - Err(anyhow::anyhow!("Always errors").into()) + Err(anyhow::anyhow!("Always errors")) } -async fn test_component(mut input: Invocation) -> Result { +async fn test_component(input: Invocation) -> Result { let (tx, stream) = input.make_response(); tokio::spawn(async move { - let mut input = fan_out!(input.packets, "input"); + let mut stream = input.into_stream(); + let mut input = fan_out!(stream, "input"); while let Some(Ok(input)) = input.next().await { if input.is_done() { break; @@ -84,6 +85,7 @@ async fn test_component(mut input: Invocation) -> Result