diff --git a/Cargo.lock b/Cargo.lock index f608b77e7..37a448243 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6523,6 +6523,7 @@ dependencies = [ "option-utils", "parking_lot", "seeded-random", + "structured-output", "test-logger", "thiserror", "tokio", @@ -6724,6 +6725,7 @@ dependencies = [ "seeded-random", "serde", "serde_json", + "test-logger", "thiserror", "tokio", "tokio-stream", @@ -6737,6 +6739,7 @@ dependencies = [ "wasmrs-runtime", "wasmrs-rx", "wick-interface-types", + "wick-logger", ] [[package]] diff --git a/crates/components/wick-http-client/src/component.rs b/crates/components/wick-http-client/src/component.rs index b419f66ec..b4465929d 100644 --- a/crates/components/wick-http-client/src/component.rs +++ b/crates/components/wick-http-client/src/component.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use flow_component::{BoxFuture, Component, ComponentError, IntoComponentResult, RuntimeCallback}; +use anyhow::anyhow; +use flow_component::{BoxFuture, Component, ComponentError, RuntimeCallback}; use futures::{Stream, StreamExt, TryStreamExt}; use reqwest::header::CONTENT_TYPE; use reqwest::{ClientBuilder, Method, Request, RequestBuilder}; @@ -44,8 +45,7 @@ impl HttpClientComponent { validate(&config, resolver)?; let addr: UrlResource = resolver(config.resource()) .and_then(|r| r.try_resource()) - .and_then(|r| r.try_url()) - .into_component_error()?; + .and_then(|r| r.try_url())?; let mut sig = ComponentSignature::new_named("wick/component/http"); sig.metadata.version = metadata.map(|v| v.version().to_owned()); @@ -56,7 +56,7 @@ impl HttpClientComponent { .url() .value() .cloned() - .ok_or_else(|| ComponentError::message("Internal Error - Invalid resource"))?; + .ok_or_else(|| anyhow!("Internal Error - Invalid resource"))?; let mut path_templates = HashMap::new(); for ops in config.operations() { diff --git a/crates/components/wick-http-client/src/error.rs b/crates/components/wick-http-client/src/error.rs index a37978cef..2efe34fc9 100644 --- a/crates/components/wick-http-client/src/error.rs +++ b/crates/components/wick-http-client/src/error.rs @@ -1,4 +1,3 @@ -use flow_component::ComponentError; use url::Url; #[derive(thiserror::Error, Debug, PartialEq)] @@ -19,9 +18,3 @@ pub enum Error { #[error("Invalid baseurl: {0}")] InvalidBaseUrl(Url), } - -impl From for ComponentError { - fn from(value: Error) -> Self { - ComponentError::new(value) - } -} diff --git a/crates/components/wick-sql/src/error.rs b/crates/components/wick-sql/src/error.rs index 330c19c84..bf583b9b9 100644 --- a/crates/components/wick-sql/src/error.rs +++ b/crates/components/wick-sql/src/error.rs @@ -1,4 +1,3 @@ -use flow_component::ComponentError; use wick_config::error::ManifestError; use wick_packet::TypeWrapper; @@ -93,12 +92,6 @@ pub enum Error { NoRow, } -impl From for ComponentError { - fn from(value: Error) -> Self { - ComponentError::new(value) - } -} - #[derive(thiserror::Error, Debug, Copy, Clone)] pub enum ConversionError { #[error("i8")] diff --git a/crates/wick/flow-component/src/config.rs b/crates/wick/flow-component/src/config.rs deleted file mode 100644 index 76ce0b4d6..000000000 --- a/crates/wick/flow-component/src/config.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::collections::HashMap; - -use liquid_json::LiquidJsonValue; -use wick_packet::GenericConfig; - -/// A generic configuration that may include LiquidJson values -#[derive(Debug, Clone, PartialEq, Default)] -pub struct LiquidConfig(pub HashMap); - -impl LiquidConfig { - /// Render a [LiquidConfig] into a [GenericConfig] with the passed context. - pub fn render(&self, context: &serde_json::Value) -> Result { - let mut map = HashMap::new(); - for (k, v) in self.0.iter() { - map.insert(k.clone(), v.render(context)?); - } - Ok(GenericConfig::from(map)) - } -} - -impl From> for LiquidConfig { - fn from(value: HashMap) -> Self { - Self(value) - } -} - -impl From> for LiquidConfig { - fn from(value: HashMap) -> Self { - Self(value.into_iter().map(|(k, v)| (k, v.into())).collect()) - } -} - -impl From for HashMap { - fn from(value: LiquidConfig) -> Self { - value.0 - } -} diff --git a/crates/wick/flow-component/src/lib.rs b/crates/wick/flow-component/src/lib.rs index ad23ee21f..50b374a65 100644 --- a/crates/wick/flow-component/src/lib.rs +++ b/crates/wick/flow-component/src/lib.rs @@ -114,9 +114,6 @@ // Add exceptions here #![allow()] -// mod config; -// pub use config::*; - mod context; pub use context::*; #[cfg(feature = "invocation")] @@ -128,65 +125,9 @@ pub use traits::*; /// A boxed future that can be sent across threads. pub type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; -pub use serde_json::Value; - -#[derive(Debug)] -#[must_use] /// A generic error type for components. -pub struct ComponentError { - source: Box, -} - -impl std::error::Error for ComponentError {} -impl std::fmt::Display for ComponentError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.source.to_string().as_str()) - } -} -impl ComponentError { - /// Create a new error from a boxed error. - pub fn new(source: impl std::error::Error + Send + Sync + 'static) -> Self { - Self { - source: Box::new(source), - } - } - - /// Create a new error from a string. - pub fn message(msg: &str) -> Self { - Self { - source: Box::new(GenericError(msg.to_owned())), - } - } -} -impl From> for ComponentError { - fn from(source: Box) -> Self { - Self { source } - } -} - -impl From for ComponentError { - fn from(source: anyhow::Error) -> Self { - Self::message(&source.to_string()) - } -} - -/// Trait that allows for conversion of a result into a component error. -pub trait IntoComponentResult -where - E: std::error::Error + Send + Sync + 'static, -{ - /// Convert a Result into a Result. - fn into_component_error(self) -> Result; -} - -impl IntoComponentResult for Result -where - E: std::error::Error + Send + Sync + 'static, -{ - fn into_component_error(self) -> Result { - self.map_err(ComponentError::new) - } -} +pub use anyhow::Error as ComponentError; +pub use serde_json::Value; #[derive(Debug)] struct GenericError(String); diff --git a/crates/wick/flow-component/src/traits.rs b/crates/wick/flow-component/src/traits.rs index 64dfc684a..a24717657 100644 --- a/crates/wick/flow-component/src/traits.rs +++ b/crates/wick/flow-component/src/traits.rs @@ -19,13 +19,13 @@ pub trait Component { invocation: Invocation, data: Option, callback: Arc, - ) -> BoxFuture>; + ) -> BoxFuture>; /// The `signature` method returns the [ComponentSignature] for the component. fn signature(&self) -> &ComponentSignature; /// The `shutdown` method is called when the component is shutdown. - fn shutdown(&self) -> BoxFuture> { + fn shutdown(&self) -> BoxFuture> { // Override if you need a more explicit shutdown. Box::pin(async move { Ok(()) }) } diff --git a/crates/wick/flow-expression-parser/src/error.rs b/crates/wick/flow-expression-parser/src/error.rs index 496e9f11b..d8a253d70 100644 --- a/crates/wick/flow-expression-parser/src/error.rs +++ b/crates/wick/flow-expression-parser/src/error.rs @@ -1,7 +1,5 @@ use thiserror::Error; -// type BoxedSyncSendError = Box; - /// Error type for the flow expression parser. #[derive(Error, Debug, Clone, PartialEq)] #[non_exhaustive] diff --git a/crates/wick/flow-graph-interpreter/Cargo.toml b/crates/wick/flow-graph-interpreter/Cargo.toml index ae50697da..7347c97b7 100644 --- a/crates/wick/flow-graph-interpreter/Cargo.toml +++ b/crates/wick/flow-graph-interpreter/Cargo.toml @@ -31,6 +31,7 @@ uuid = { workspace = true, features = ["v4"] } parking_lot = { workspace = true } serde_json = { workspace = true } serde = { workspace = true, features = ["derive"] } +anyhow = { version = "1.0" } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs index 55926e787..8b13d29d4 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/collect.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use anyhow::{anyhow, bail}; use flow_component::{ComponentError, Context, Operation, RenderConfiguration}; use futures::FutureExt; use serde_json::{json, Value}; @@ -150,10 +151,10 @@ fn get_inner_array(value: &mut Value, depth: i16) -> Result<&mut Value, Componen Value::Array(ref mut array) => { let inner = array .last_mut() - .ok_or_else(|| ComponentError::message("Invalid structured in bracketed streams"))?; + .ok_or_else(|| anyhow!("Invalid structure in bracketed streams"))?; get_inner_array(inner, depth - 1) } - _ => Err(ComponentError::message("Value is not an array")), + _ => bail!("Value {} is not an array", value), } } @@ -162,12 +163,11 @@ impl RenderConfiguration for Op { type ConfigSource = RuntimeConfig; fn decode_config(data: Option) -> Result { - let config = data.ok_or_else(|| { - ComponentError::message("Collect component requires configuration, please specify configuration.") - })?; + let config = + data.ok_or_else(|| anyhow!("Collect component requires configuration, please specify configuration."))?; Ok(Self::Config { - inputs: config.coerce_key("inputs").map_err(ComponentError::new)?, + inputs: config.coerce_key("inputs")?, }) } } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs index 143599e1e..a38a209d8 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/merge.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use anyhow::anyhow; use flow_component::{ComponentError, Context, Operation, RenderConfiguration}; use futures::FutureExt; use wasmrs_rx::Observer; @@ -109,12 +110,11 @@ impl RenderConfiguration for Op { type ConfigSource = RuntimeConfig; fn decode_config(data: Option) -> Result { - let config = data.ok_or_else(|| { - ComponentError::message("Merge component requires configuration, please specify configuration.") - })?; + let config = + data.ok_or_else(|| anyhow!("Merge component requires configuration, please specify configuration."))?; Ok(Self::Config { - inputs: config.coerce_key("inputs").map_err(ComponentError::new)?, + inputs: config.coerce_key("inputs")?, }) } } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs index 2e9b08d65..510410b35 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/pluck.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use flow_component::{ComponentError, Context, Operation, RenderConfiguration}; use futures::{FutureExt, StreamExt}; use serde_json::Value; @@ -120,26 +121,23 @@ impl RenderConfiguration for Op { type ConfigSource = RuntimeConfig; fn decode_config(data: Option) -> Result { - let config = data.ok_or_else(|| { - ComponentError::message("Pluck component requires configuration, please specify configuration.") - })?; + let config = + data.ok_or_else(|| anyhow!("Pluck component requires configuration, please specify configuration."))?; for (k, v) in config { if k == "field" { - let field: String = serde_json::from_value(v).map_err(ComponentError::new)?; + let field: String = serde_json::from_value(v)?; warn!("pluck should be configured with 'path' as an array of strings, 'field' is deprecated and will be removed in a future release."); return Ok(Self::Config { field: field.split('.').map(|s| s.to_owned()).collect(), }); } if k == "path" { - let field: Vec = serde_json::from_value(v).map_err(ComponentError::new)?; + let field: Vec = serde_json::from_value(v)?; return Ok(Self::Config { field }); } } - Err(ComponentError::message( - "invalid configuration for pluck, 'path' field is required", - )) + Err(anyhow!("invalid configuration for pluck, 'path' field is required",)) } } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/sender.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/sender.rs index 73cb388f5..aa4dc2bd5 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/sender.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/sender.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use flow_component::{ComponentError, Context, Operation, RenderConfiguration}; use serde_json::Value; use wick_interface_types::{operation, OperationSignature}; @@ -63,12 +64,11 @@ impl RenderConfiguration for Op { type ConfigSource = RuntimeConfig; fn decode_config(data: Option) -> Result { - let config = data.ok_or_else(|| { - ComponentError::message("Sender component requires configuration, please specify configuration.") - })?; + let config = + data.ok_or_else(|| anyhow!("Sender component requires configuration, please specify configuration."))?; Ok(Self::Config { - output: config.coerce_key("output").map_err(ComponentError::new)?, + output: config.coerce_key("output")?, }) } } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs index f5da3fcee..06b2b3633 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/components/core/switch.rs @@ -3,6 +3,7 @@ use std::hash::Hash; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering}; use std::sync::Arc; +use anyhow::anyhow; use flow_component::{ComponentError, Context, Operation, RenderConfiguration, RuntimeCallback}; use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; @@ -719,9 +720,8 @@ impl RenderConfiguration for Op { type ConfigSource = RuntimeConfig; fn decode_config(data: Option) -> Result { - let config = data.ok_or_else(|| { - ComponentError::message("Switch component requires configuration, please specify configuration.") - })?; + let config = + data.ok_or_else(|| anyhow!("Switch component requires configuration, please specify configuration."))?; Ok(Self::Config { inputs: if config.has("context") { config.coerce_key("context") @@ -729,20 +729,17 @@ impl RenderConfiguration for Op { config.coerce_key("inputs") } else { Ok(Vec::new()) - } - .map_err(ComponentError::new)?, + }?, outputs: if config.has("outputs") { config.coerce_key("outputs") } else { Ok(Vec::new()) - } - .map_err(ComponentError::new)?, + }?, cases: if config.has("cases") { config.coerce_key("cases") } else { Ok(Vec::new()) - } - .map_err(ComponentError::new)?, + }?, default: config.coerce_key("default").map_err(ComponentError::new)?, }) } diff --git a/crates/wick/wick-component-codegen/src/generate.rs b/crates/wick/wick-component-codegen/src/generate.rs index c18181054..bb317e8f5 100644 --- a/crates/wick/wick-component-codegen/src/generate.rs +++ b/crates/wick/wick-component-codegen/src/generate.rs @@ -198,11 +198,9 @@ mod test { * See /tests/codegen-tests/ for integration tests */ use anyhow::Result; - use wick_interface_types::Type; use super::*; use crate::generate::config::ConfigBuilder; - use crate::Config; #[tokio::test] async fn test_build() -> Result<()> { @@ -218,15 +216,4 @@ mod test { Ok(()) } - - #[test] - fn test_expand_type() -> Result<()> { - let mut config = Config::default(); - let ty = Type::Object; - let src = expand_type(&mut config, Direction::In, false, &ty); - - assert_eq!(&src.to_string(), "wick_component :: Value"); - - Ok(()) - } } diff --git a/crates/wick/wick-component-codegen/src/generate/expand_type.rs b/crates/wick/wick-component-codegen/src/generate/expand_type.rs index 827ac0117..8f9284200 100644 --- a/crates/wick/wick-component-codegen/src/generate/expand_type.rs +++ b/crates/wick/wick-component-codegen/src/generate/expand_type.rs @@ -12,9 +12,10 @@ pub(super) fn expand_type( config: &mut config::Config, dir: Direction, imported: bool, + raw: bool, ty: &wick_interface_types::Type, ) -> TokenStream { - if config.raw && dir != Direction::Out { + if raw && dir != Direction::Out { return quote! { wick_component::wick_packet::Packet }; } match ty { @@ -31,7 +32,7 @@ pub(super) fn expand_type( wick_interface_types::Type::F64 => quote! { f64 }, wick_interface_types::Type::String => quote! { String }, wick_interface_types::Type::List { ty } => { - let ty = expand_type(config, dir, imported, ty); + let ty = expand_type(config, dir, imported, raw, ty); quote! { Vec<#ty> } } wick_interface_types::Type::Bytes => { @@ -50,12 +51,12 @@ pub(super) fn expand_type( quote! {#location #(#mod_parts ::)*#ty} } wick_interface_types::Type::Optional { ty } => { - let ty = expand_type(config, dir, imported, ty); + let ty = expand_type(config, dir, imported, raw, ty); quote! { Option<#ty> } } wick_interface_types::Type::Map { key, value } => { - let key = expand_type(config, dir, imported, key); - let value = expand_type(config, dir, imported, value); + let key = expand_type(config, dir, imported, raw, key); + let value = expand_type(config, dir, imported, raw, value); quote! { std::collections::HashMap<#key,#value> } } #[allow(deprecated)] @@ -79,15 +80,15 @@ pub(crate) fn expand_input_fields( config: &mut config::Config, fields: &[Field], direction: Direction, - raw: bool, + imported: bool, ) -> Vec { fields .iter() .map(|input| { let name = id(&snake(input.name())); - let ty = expand_type(config, direction, raw, &input.ty); + let ty = expand_type(config, direction, imported, false, &input.ty); quote! { - #name: impl wick_component::Stream> + 'static + #name: impl wick_component::Stream> + 'static } }) .collect_vec() @@ -109,12 +110,12 @@ pub(crate) fn expand_field_types( config: &mut config::Config, fields: &[Field], direction: Direction, - raw: bool, + imported: bool, ) -> Vec { fields .iter() .map(|input| { - let ty = expand_type(config, direction, raw, &input.ty); + let ty = expand_type(config, direction, imported, false, &input.ty); quote! { WickStream<#ty> } @@ -132,7 +133,7 @@ pub(crate) fn fields_to_tuples( .iter() .map(|input| { let name = input.name(); - let ty = expand_type(config, direction, raw, &input.ty); + let ty = expand_type(config, direction, raw, false, &input.ty); quote! { (#name, #ty) } diff --git a/crates/wick/wick-component-codegen/src/generate/f.rs b/crates/wick/wick-component-codegen/src/generate/f.rs index d44c5eadc..75ae48171 100644 --- a/crates/wick/wick-component-codegen/src/generate/f.rs +++ b/crates/wick/wick-component-codegen/src/generate/f.rs @@ -28,7 +28,7 @@ pub(crate) fn field_pair( let name = &field.name; let id = id(&snake(name)); - let ty = expand_type(config, dir, imported, &field.ty); + let ty = expand_type(config, dir, imported, config.raw, &field.ty); let desc = field .description .as_ref() diff --git a/crates/wick/wick-component-codegen/src/generate/templates/imported_components.rs b/crates/wick/wick-component-codegen/src/generate/templates/imported_components.rs index 50d0503a2..6ff6bc251 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/imported_components.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/imported_components.rs @@ -8,7 +8,7 @@ use crate::generate::dependency::Dependency; use crate::generate::expand_type::{expand_field_types, expand_input_fields, fields_to_tuples}; use crate::generate::ids::*; use crate::generate::templates::op_config; -use crate::generate::{f, Direction}; +use crate::generate::Direction; use crate::*; struct ComponentCodegen { @@ -118,19 +118,18 @@ fn operation_impls(config: &mut Config, ops: &[OperationSignature]) -> Vec = fields_to_tuples(config, op.outputs(), dir, raw); - let types = f::maybe_parens(response_stream_types); let impls = quote! { #[allow(unused)] - pub fn #name(&self, #op_config_pair #(#inputs),*) -> std::result::Result<#types,wick_packet::Error> { + pub fn #name(&self, #op_config_pair #(#inputs),*) -> std::result::Result<(#(#types),*),wick_packet::Error> { #(#encode_inputs)* let stream = wick_component::empty(); let stream = #merge_inputs; let stream = wick_packet::PacketStream::new(Box::pin(stream)); let mut stream = self.#name_raw(#op_config_id stream)?; - Ok(wick_component::payload_fan_out!(stream, raw: false, wick_component::BoxError, [#(#fan_out),*])) + Ok(wick_component::payload_fan_out!(stream, raw: false, wick_component::AnyError, [#(#fan_out),*])) } #[allow(unused)] diff --git a/crates/wick/wick-component-codegen/src/generate/templates/op_outputs.rs b/crates/wick/wick-component-codegen/src/generate/templates/op_outputs.rs index 2d647dd21..58c1335ac 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/op_outputs.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/op_outputs.rs @@ -16,8 +16,8 @@ pub(crate) fn op_outputs(config: &mut config::Config, op: &OperationSignature) - .iter() .map(|i| { let port_field_name = id(&snake(&i.name)); - let port_type = expand_type(config, Direction::Out, false, &i.ty); - quote! {pub(crate) #port_field_name: wick_packet::Output<#port_type>} + let port_type = expand_type(config, Direction::Out, false, config.raw, &i.ty); + quote! {pub(crate) #port_field_name: wick_packet::OutgoingPort<#port_type>} }) .collect_vec(); @@ -38,9 +38,9 @@ pub(crate) fn op_outputs(config: &mut config::Config, op: &OperationSignature) - let port_name = &out.name; let port_field_name = id(&snake(&out.name)); if i < op.outputs.len() - 1 { - quote! {#port_field_name: wick_packet::Output::new(#port_name, channel.clone())} + quote! {#port_field_name: wick_packet::OutgoingPort::new(#port_name, channel.clone())} } else { - quote! {#port_field_name: wick_packet::Output::new(#port_name, channel)} + quote! {#port_field_name: wick_packet::OutgoingPort::new(#port_name, channel)} } }) .collect_vec(); @@ -67,7 +67,7 @@ pub(crate) fn op_outputs(config: &mut config::Config, op: &OperationSignature) - quote! { pub struct #outputs_name { #[allow(unused)] - #(#output_port_fields,)* + #(#output_port_fields),* } impl wick_component::Broadcast for #outputs_name { @@ -81,7 +81,7 @@ pub(crate) fn op_outputs(config: &mut config::Config, op: &OperationSignature) - impl #outputs_name { pub fn new(channel: wasmrs_rx::FluxChannel) -> Self { Self { - #(#output_ports_new,)* + #(#output_ports_new),* } } } diff --git a/crates/wick/wick-component-codegen/src/generate/templates/provided_struct.rs b/crates/wick/wick-component-codegen/src/generate/templates/provided_struct.rs index acd50150f..f8aa76d98 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/provided_struct.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/provided_struct.rs @@ -48,7 +48,7 @@ pub(crate) fn imported_component_container(name: &str, required: &[Binding let config = get_config(); let inherent = self.inherent.clone(); #struct_id { - #(#required_names,)* + #(#required_names),* } } } diff --git a/crates/wick/wick-component-codegen/src/generate/templates/trait_signature.rs b/crates/wick/wick-component-codegen/src/generate/templates/trait_signature.rs index 79bef1419..db10f71ef 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/trait_signature.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/trait_signature.rs @@ -17,7 +17,7 @@ pub(crate) fn trait_signature(config: &mut config::Config, op: &OperationSignatu .iter() .map(|i| { let port_name = id(&snake(&i.name)); - let port_type = expand_type(config, Direction::In, false, &i.ty); + let port_type = expand_type(config, Direction::In, false, config.raw, &i.ty); quote! {#port_name: WickStream<#port_type>} }) .collect_vec(); @@ -32,7 +32,7 @@ pub(crate) fn trait_signature(config: &mut config::Config, op: &OperationSignatu #[async_trait::async_trait(?Send)] #[cfg(target_family = "wasm")] pub trait #trait_name { - type Error: std::fmt::Display; + type Error; type Outputs; type Config: std::fmt::Debug; @@ -43,7 +43,7 @@ pub(crate) fn trait_signature(config: &mut config::Config, op: &OperationSignatu #[async_trait::async_trait] #[cfg(not(target_family = "wasm"))] pub trait #trait_name { - type Error: std::fmt::Display + Send ; + type Error: Send ; type Outputs: Send; type Config: std::fmt::Debug + Send ; diff --git a/crates/wick/wick-component-codegen/src/generate/templates/type_def.rs b/crates/wick/wick-component-codegen/src/generate/templates/type_def.rs index 2d1f6f3c1..86bdd77e4 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/type_def.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/type_def.rs @@ -140,7 +140,7 @@ pub(crate) fn gen_enum(ty: &EnumDefinition, _options: TypeOptions) -> (Vec<&str> #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] #[allow(clippy::exhaustive_enums)] pub enum #name { - #(#variants,)* + #(#variants),* } #try_from_strnum_impl @@ -152,7 +152,7 @@ pub(crate) fn gen_enum(ty: &EnumDefinition, _options: TypeOptions) -> (Vec<&str> pub fn value(&self) -> Option<&'static str> { #[allow(clippy::match_single_binding)] match self { - #(#value_match_arms,)* + #(#value_match_arms),* } } } @@ -184,7 +184,7 @@ pub(crate) fn gen_enum(ty: &EnumDefinition, _options: TypeOptions) -> (Vec<&str> fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { #[allow(clippy::match_single_binding)] match self { - #(#display_match_arms,)* + #(#display_match_arms),* } } } @@ -225,7 +225,7 @@ pub(crate) fn gen_struct<'a>( impl Default for #name { fn default() -> Self { Self { - #(#fields,)* + #(#fields),* } } } @@ -245,7 +245,7 @@ pub(crate) fn gen_struct<'a>( #description #[allow(clippy::exhaustive_structs)] pub struct #name { - #(#fields,)* + #(#fields),* } #default_impl }; @@ -285,7 +285,7 @@ pub(crate) fn gen_union<'a>( .map(|ty| { let name = id(&generic_type_name(ty)); let description = format!("A {} value.", ty); - let ty = expand_type(config, Direction::In, imported, ty); + let ty = expand_type(config, Direction::In, imported, config.raw, ty); quote! { #[doc = #description] #name(#ty) @@ -303,7 +303,7 @@ pub(crate) fn gen_union<'a>( #description #[serde(untagged)] pub enum #name { - #(#variants,)* + #(#variants),* } }; (module_parts, item) diff --git a/crates/wick/wick-component-codegen/src/generate/templates/wrapper_fn.rs b/crates/wick/wick-component-codegen/src/generate/templates/wrapper_fn.rs index 3bd8ca617..9752b1293 100644 --- a/crates/wick/wick-component-codegen/src/generate/templates/wrapper_fn.rs +++ b/crates/wick/wick-component-codegen/src/generate/templates/wrapper_fn.rs @@ -17,17 +17,13 @@ pub(crate) fn gen_wrapper_fn(config: &mut config::Config, component: &Ident, op: .iter() .map(|i| { let port_name = &i.name; - let port_type = expand_type(config, Direction::In, false, &i.ty); + let port_type = expand_type(config, Direction::In, false, config.raw, &i.ty); quote! {(#port_name, #port_type)} }) .collect_vec(); let inputs = op.inputs().iter().map(|i| id(&snake(&i.name))).collect_vec(); let outputs_name = id(&op_outputs_name(op)); - let sanitized_input_names = if inputs.is_empty() { - quote! {config} - } else { - quote! {(config, #(#inputs,)*)} - }; + let op_args = quote! {(config, (#(#inputs),*))}; let raw = if config.raw { quote! {raw:true} @@ -40,12 +36,13 @@ pub(crate) fn gen_wrapper_fn(config: &mut config::Config, component: &Ident, op: let config_id = id(&generic_config_id()); quote! { - fn #wrapper_id(mut input: wasmrs_rx::BoxFlux) -> std::result::Result,Box> { + fn #wrapper_id(mut input: wasmrs_rx::BoxFlux) -> std::result::Result,wick_component::BoxError> { let (channel, rx) = wasmrs_rx::FluxChannel::::new_parts(); let outputs = #impl_name::#outputs_name::new(channel.clone()); runtime::spawn(#wrapper_name,async move { - let #sanitized_input_names = wick_component::payload_fan_out!(input, #raw, Box, #impl_name::#config_id, [#(#input_pairs,)*]); + #[allow(unused_parens)] + let #op_args = wick_component::payload_fan_out!(input, #raw, wick_component::AnyError, #impl_name::#config_id, [#(#input_pairs),*]); let config = match config.await { Ok(Ok(config)) => { config diff --git a/crates/wick/wick-component/Cargo.toml b/crates/wick/wick-component/Cargo.toml index bde361064..9cd80cdc7 100644 --- a/crates/wick/wick-component/Cargo.toml +++ b/crates/wick/wick-component/Cargo.toml @@ -37,11 +37,13 @@ chrono = { workspace = true, features = ["std"], optional = true } bytes = { workspace = true, optional = true } async-recursion = { workspace = true } tracing = { workspace = true } +anyhow = { workspace = true } [target.'cfg(target_family = "wasm")'.dependencies] wasmrs-guest = { workspace = true } [dev-dependencies] +wick-packet = { workspace = true, features = ["std", "rng"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } serde = { workspace = true } anyhow = { workspace = true } diff --git a/crates/wick/wick-component/src/adapters/binary/interleaved_pairs.rs b/crates/wick/wick-component/src/adapters/binary/interleaved_pairs.rs index 73a358819..ba3456901 100644 --- a/crates/wick/wick-component/src/adapters/binary/interleaved_pairs.rs +++ b/crates/wick/wick-component/src/adapters/binary/interleaved_pairs.rs @@ -20,7 +20,7 @@ macro_rules! binary_interleaved_pairs { #[cfg_attr(target_family = "wasm",async_trait::async_trait(?Send))] #[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)] impl $name::Operation for Component { - type Error = Box; + type Error = wick_component::AnyError; type Outputs = $name::Outputs; type Config = $name::Config; diff --git a/crates/wick/wick-component/src/adapters/binary/paired_right_stream.rs b/crates/wick/wick-component/src/adapters/binary/paired_right_stream.rs index 5eee5dd29..697c55b79 100644 --- a/crates/wick/wick-component/src/adapters/binary/paired_right_stream.rs +++ b/crates/wick/wick-component/src/adapters/binary/paired_right_stream.rs @@ -14,7 +14,7 @@ macro_rules! binary_paired_right_stream { ($name:ident) => { #[async_trait::async_trait(?Send)] impl $name::Operation for Component { - type Error = Box; + type Error = wick_component::AnyError; type Outputs = $name::Outputs; type Config = $name::Config; @@ -106,9 +106,9 @@ where continue; } tracing::debug!("received open bracket while already started"); - let _ = tx.send_result(Err(Box::new(wick_packet::Error::Component( - "Received open bracket while already started".to_owned(), - )))); + let _ = tx.send_result(Err( + wick_packet::Error::Component("Received open bracket while already started".to_owned()).into(), + )); continue; } diff --git a/crates/wick/wick-component/src/adapters/unary/simple.rs b/crates/wick/wick-component/src/adapters/unary/simple.rs index 3de4044c9..918687059 100644 --- a/crates/wick/wick-component/src/adapters/unary/simple.rs +++ b/crates/wick/wick-component/src/adapters/unary/simple.rs @@ -11,7 +11,7 @@ macro_rules! unary_simple { ($name:ident) => { #[async_trait::async_trait(?Send)] impl $name::Operation for Component { - type Error = Box; + type Error = wick_component::AnyError; type Outputs = $name::Outputs; type Config = $name::Config; diff --git a/crates/wick/wick-component/src/lib.rs b/crates/wick/wick-component/src/lib.rs index 23d462d04..fdc5b95c7 100644 --- a/crates/wick/wick-component/src/lib.rs +++ b/crates/wick/wick-component/src/lib.rs @@ -178,10 +178,16 @@ pub type BoxError = Box; // // -/// A stream of `Result`. -pub type WickStream = wasmrs_rx::BoxFlux; +/// A re-export of the `anyhow::Error` type. +pub type AnyError = anyhow::Error; +pub use anyhow; -/// Create a stream of `Result` that yields one value and ends. +// +// +/// A stream of `Result`. +pub type WickStream = wasmrs_rx::BoxFlux; + +/// Create a stream of `Result` that yields one value and ends. /// /// # Example /// @@ -193,11 +199,11 @@ pub type WickStream = wasmrs_rx::BoxFlux; /// assert_eq!(stream.next().await, None); /// ``` /// -pub fn once(value: T) -> impl Stream> { +pub fn once(value: T) -> impl Stream> { tokio_stream::once(Ok(value)) } -/// Create a stream of `Result` from an iterator of type T. +/// Create a stream of `Result` from an iterator of type T. /// /// This is a convenience function for creating a Result/TryStream from an iterator of `Ok` values. /// @@ -213,7 +219,7 @@ pub fn once(value: T) -> impl Stream> { /// assert_eq!(stream.next().await, None); /// ``` /// -pub fn iter(i: I) -> impl Stream> +pub fn iter(i: I) -> impl Stream> where I: IntoIterator, { diff --git a/crates/wick/wick-component/src/macros.rs b/crates/wick/wick-component/src/macros.rs index b4355fd04..6092b253b 100644 --- a/crates/wick/wick-component/src/macros.rs +++ b/crates/wick/wick-component/src/macros.rs @@ -25,7 +25,6 @@ macro_rules! handle_port { if $packet.is_done() { $tx.complete(); } else { - // let packet: Result<$ty, _> = $packet.deserialize().map_err(|e| e.into()); let _ = $tx.send($packet); } }}; @@ -40,135 +39,175 @@ macro_rules! handle_port { }}; } +#[doc(hidden)] +#[macro_export] +macro_rules! stream_senders { + ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { + $crate::paste::paste! { + struct $name { + $( + [<$port:snake>]: $crate::wasmrs_rx::FluxChannel<$($ty)*,$error> + ),* + } + impl $name { + #[allow(clippy::missing_const_for_fn,unreachable_pub,unused,unused_parens)] + pub fn receivers(&self) -> Option<($( + $crate::WickStream<$($ty)*> + ),*)> { + Some(( + $(Box::pin(self.[<$port:snake>].take_rx().ok()?)),* + )) + } + } + impl Default for $name { + fn default() -> Self { + $crate::paste::paste! { + Self { + $( + [<$port:snake>]: $crate::wasmrs_rx::FluxChannel::new() + ),* + } + } + } + } + } + }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! stream_receivers { + ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { + $crate::paste::paste! { + struct $name { + $( + [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error> + ),* + } + impl $name { + fn new( + $( + [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error> + ),* + ) -> Self { + $crate::paste::paste! { + Self { + $( + [<$port:snake>] + ),* + } + } + } + } + } + }; +} + #[doc(hidden)] #[macro_export] macro_rules! payload_fan_out { - ($stream:expr, raw:$raw:tt, $error:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { + (@handle_packet $payload: ident, $sender:ident, $config:ty) => { { - $crate::paste::paste! { - $( - #[allow(unused_parens)] - let ([<$port:snake _tx>],[<$port:snake _rx>]) = $crate::wasmrs_rx::FluxChannel::<_,$error>::new_parts(); - )* + let mut packet: $crate::wick_packet::Packet = $payload.into(); + + if let Some(config_tx) = $sender.take() { + if let Some(context) = packet.context() { + let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|e|format!("Cound not deserialize context: {}", e)); + let _ = config_tx.send(config.map($crate::flow_component::Context::from)); + } else { + packet = $crate::wick_packet::Packet::component_error("No context attached to first invocation packet"); + } } - $crate::runtime::spawn("payload_fan_out", async move { - use $crate::StreamExt; - loop { - if let Some(Ok(payload)) = $stream.next().await { - let packet: $crate::wick_packet::Packet = payload.into(); - match packet.port() { + packet + } + }; + (@handle_packet $payload: ident) => { + { + let packet: $crate::wick_packet::Packet = $payload.into(); + packet + } + }; + + (@route_packet $packet:ident, $channels:ident, raw:$raw:tt,[ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { + match $packet.port() { + $( + $port => { + let tx = &$crate::paste::paste! { $channels.[<$port:snake>] }; + $crate::handle_port!(raw: $raw, $packet, tx, $port, $($ty)*) + } + ),* + $crate::wick_packet::Packet::FATAL_ERROR => + { + #[allow(unused)] + { + use $crate::wasmrs_rx::Observer; + let error = $packet.unwrap_err(); + $crate::paste::paste! { $( - $port=> { - let tx = &$crate::paste::paste! {[<$port:snake _tx>]}; - $crate::handle_port!(raw: $raw, packet, tx, $port, $($ty)*) - }, + $channels.[<$port:snake>].send_result(Err($crate::anyhow::anyhow!(error.clone()))).unwrap(); )* - $crate::wick_packet::Packet::FATAL_ERROR => { - let error = packet.unwrap_err(); - $crate::paste::paste! { - $( - [<$port:snake _tx>].send_result(Err(Box::new($crate::flow_component::ComponentError::message(error.msg())).into())).unwrap(); - )* - } - } - _ => { - // TODO: add tracing to warn when we're sent packets we aren't expecting - } } - } else { - break; } } - - }); - $crate::paste::paste! {($(Box::pin([<$port:snake _rx>])),*)} + _ => { + // TODO: add tracing to warn when we're sent packets we aren't expecting + } } - }; - ($stream:expr, raw:$raw:tt, $error:ty, $config:ty, [ ]) => { + ($stream:expr, raw:$raw:tt, $error:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { { - let (config_tx,config_rx) = $crate::runtime::oneshot(); + $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]); + #[allow(unused)] - $crate::runtime::spawn("payload_fan_out",async move { + let channels = Channels::default(); + + let output_streams = channels.receivers().unwrap(); + + $crate::runtime::spawn("payload_fan_out", async move { #[allow(unused)] use $crate::StreamExt; - let mut config_tx = Some(config_tx); loop { if let Some(Ok(payload)) = $stream.next().await { - let mut packet: $crate::wick_packet::Packet = payload.into(); - if let Some(config_tx) = config_tx.take() { - if let Some(context) = packet.context() { - let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|_e|$crate::flow_component::ComponentError::message("Cound not deserialize Context")); - let _ = config_tx.send(config.map($crate::flow_component::Context::from)); - } else { - packet = $crate::wick_packet::Packet::component_error("No context attached to first invocation packet"); - } - } + let packet = $crate::payload_fan_out!(@handle_packet payload); + + $crate::payload_fan_out!(@route_packet packet, channels, raw:$raw, [ $(($port, $($ty)+)),* ]); + } else { break; } } - }); - let config_mono = Box::pin(config_rx); - config_mono - } - }; + }); + output_streams + } + }; ($stream:expr, raw:$raw:tt, $error:ty, $config:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => { { - $crate::paste::paste! { - $( - #[allow(unused_parens)] - let ([<$port:snake _tx>],[<$port:snake _rx>]) = $crate::wasmrs_rx::FluxChannel::<_,$error>::new_parts(); - )* - } - let (config_tx,config_rx) = $crate::runtime::oneshot(); + $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]); + #[allow(unused)] + let channels = Channels::default(); - $crate::runtime::spawn("payload_fan_out",async move { + let (config_tx,config_rx) = $crate::runtime::oneshot(); + let mut config_tx = Some(config_tx); + let config_mono = Box::pin(config_rx); + let output_streams = (config_mono, channels.receivers().unwrap()); + + $crate::runtime::spawn("payload_fan_out", async move { #[allow(unused)] use $crate::StreamExt; - let mut config_tx = Some(config_tx); loop { if let Some(Ok(payload)) = $stream.next().await { - let mut packet: $crate::wick_packet::Packet = payload.into(); - if let Some(config_tx) = config_tx.take() { - if let Some(context) = packet.context() { - let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|e|$crate::flow_component::ComponentError::message(&format!("Cound not deserialize context: {}", e))); - let _ = config_tx.send(config.map($crate::flow_component::Context::from)); - } else { - packet = $crate::wick_packet::Packet::component_error("No context attached to first invocation packet"); - } - } - - match packet.port() { - $( - $port=> { - let tx = &$crate::paste::paste! {[<$port:snake _tx>]}; - $crate::handle_port!(raw: $raw, packet, tx, $port, $($ty)*) - }, - )* - $crate::wick_packet::Packet::FATAL_ERROR => { - use $crate::wasmrs_rx::Observer; - let error = packet.unwrap_err(); - $crate::paste::paste! { - $( - [<$port:snake _tx>].send_result(Err(Box::new($crate::flow_component::ComponentError::message(error.msg())).into())).unwrap(); - )* - } - } - _ => { - // TODO: add tracing to warn when we're sent packets we aren't expecting - } - } + let packet = $crate::payload_fan_out!(@handle_packet payload, config_tx, $config); + $crate::payload_fan_out!(@route_packet packet, channels, raw:$raw, [ $(($port, $($ty)+)),* ]); } else { break; } } }); - let config_mono = Box::pin(config_rx); - $crate::paste::paste! {(config_mono, $(Box::pin([<$port:snake _rx>])),*)} - } + + output_streams + } }; } @@ -280,10 +319,10 @@ mod test { struct Config {} #[tokio::test] - async fn test_basic() -> Result<()> { + async fn test_fan_out() -> Result<()> { let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6)); stream.set_context(Default::default(), InherentData::unsafe_default()); - let (_config, mut foo_rx, mut bar_rx) = + let (_config, (mut foo_rx, mut bar_rx)) = payload_fan_out!(stream, raw: false, anyhow::Error, Config, [("foo", i32), ("bar", i32)]); assert_eq!(foo_rx.next().await.unwrap().unwrap(), 1); assert_eq!(bar_rx.next().await.unwrap().unwrap(), 2); @@ -294,4 +333,28 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_fan_out_no_config() -> Result<()> { + let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6)); + stream.set_context(Default::default(), InherentData::unsafe_default()); + let (mut foo_rx, mut bar_rx) = payload_fan_out!(stream, raw: false, anyhow::Error, [("foo", i32), ("bar", i32)]); + assert_eq!(foo_rx.next().await.unwrap().unwrap(), 1); + assert_eq!(bar_rx.next().await.unwrap().unwrap(), 2); + assert_eq!(foo_rx.next().await.unwrap().unwrap(), 3); + assert_eq!(bar_rx.next().await.unwrap().unwrap(), 4); + assert_eq!(foo_rx.next().await.unwrap().unwrap(), 5); + assert_eq!(bar_rx.next().await.unwrap().unwrap(), 6); + + Ok(()) + } + + #[tokio::test] + async fn test_fan_out_no_fields() -> Result<()> { + let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6)); + stream.set_context(Default::default(), InherentData::unsafe_default()); + let _config = payload_fan_out!(stream, raw: false, anyhow::Error, Config, []); + + Ok(()) + } } diff --git a/crates/wick/wick-config/src/error.rs b/crates/wick/wick-config/src/error.rs index 5ebe0825c..f5504ac59 100644 --- a/crates/wick/wick-config/src/error.rs +++ b/crates/wick/wick-config/src/error.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use thiserror::Error; -// type BoxedSyncSendError = Box; - /// Wick Manifest's Errors. #[derive(Error, Debug)] #[non_exhaustive] diff --git a/crates/wick/wick-packet/Cargo.toml b/crates/wick/wick-packet/Cargo.toml index 47f09dfd4..1c952fa3f 100644 --- a/crates/wick/wick-packet/Cargo.toml +++ b/crates/wick/wick-packet/Cargo.toml @@ -44,6 +44,7 @@ pin-project-lite = { workspace = true } base64-serde = { workspace = true } base64 = { workspace = true, default-features = false, features = ["alloc"] } tokio-stream = { workspace = true, default-features = false } +anyhow = { workspace = true } # # feature = rt-tokio tokio = { workspace = true, optional = true, features = ["rt"] } @@ -61,5 +62,8 @@ chrono = { workspace = true, optional = true, features = [ wasmrs-guest = { workspace = true } [dev-dependencies] -anyhow = { workspace = true } rstest = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt"] } +test-logger = { workspace = true } +tracing = { workspace = true } +wick-logger = { workspace = true } diff --git a/crates/wick/wick-packet/src/lib.rs b/crates/wick/wick-packet/src/lib.rs index 1114f41ce..cec39367a 100644 --- a/crates/wick/wick-packet/src/lib.rs +++ b/crates/wick/wick-packet/src/lib.rs @@ -145,7 +145,7 @@ pub use inherent::InherentData; #[cfg(feature = "invocation")] pub use invocation::Invocation; pub use metadata::{Flags, WickMetadata, CLOSE_BRACKET, DONE_FLAG, OPEN_BRACKET}; -pub use output::{Output, OutputIterator, Port, ValuePort}; +pub use output::{OutgoingPort, OutputIterator, Port, ValuePort}; pub use packet::{from_raw_wasmrs, from_wasmrs, packetstream_to_wasmrs, Packet, PacketError, PacketPayload}; pub use packet_stream::{into_packet, PacketSender, PacketStream}; pub use stream_map::StreamMap; @@ -160,5 +160,4 @@ pub use b64_bytes::Base64Bytes; #[cfg(feature = "rt-tokio")] pub use runtime::split_stream; -pub(crate) type BoxError = Box; pub(crate) type Result = std::result::Result; diff --git a/crates/wick/wick-packet/src/output.rs b/crates/wick/wick-packet/src/output.rs index a20463489..7d23b5800 100644 --- a/crates/wick/wick-packet/src/output.rs +++ b/crates/wick/wick-packet/src/output.rs @@ -5,16 +5,13 @@ use wasmrs_rx::{FluxChannel, Observer}; use crate::{Packet, PacketPayload}; -pub struct Output -where - T: serde::Serialize, -{ +pub struct OutgoingPort { channel: FluxChannel, name: String, _phantom: std::marker::PhantomData, } -impl std::fmt::Debug for Output<()> { +impl std::fmt::Debug for OutgoingPort { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Output").field("name", &self.name).finish() } @@ -46,23 +43,58 @@ pub trait Port: ConditionallySend { } } -pub trait ValuePort: Port +pub trait ValuePort: Port { + fn send(&mut self, value: T); + + fn send_result(&mut self, value: Result); +} + +impl ValuePort for OutgoingPort where - T: serde::Serialize, + T: serde::Serialize + ConditionallySend, { - fn send(&mut self, value: &T) { + fn send(&mut self, value: T) { self.send_packet(Packet::encode(self.name(), value)); } fn send_result(&mut self, value: Result) { match value { - Ok(value) => self.send(&value), + Ok(value) => self.send(value), + Err(err) => self.error(err.to_string().as_str()), + } + } +} + +impl ValuePort<&T> for OutgoingPort +where + T: serde::Serialize + ConditionallySend, +{ + fn send(&mut self, value: &T) { + self.send_packet(Packet::encode(self.name(), value)); + } + + fn send_result(&mut self, value: Result<&T, impl std::fmt::Display>) { + match value { + Ok(value) => self.send(value), + Err(err) => self.error(err.to_string().as_str()), + } + } +} + +impl ValuePort<&str> for OutgoingPort { + fn send(&mut self, value: &str) { + self.send_packet(Packet::encode(self.name(), value)); + } + + fn send_result(&mut self, value: Result<&str, impl std::fmt::Display>) { + match value { + Ok(value) => self.send(value), Err(err) => self.error(err.to_string().as_str()), } } } -impl Port for Output +impl Port for OutgoingPort where T: serde::Serialize + ConditionallySend, { @@ -82,9 +114,7 @@ where } } -impl ValuePort for Output where T: serde::Serialize + ConditionallySend {} - -impl Output +impl OutgoingPort where T: serde::Serialize, { @@ -126,3 +156,111 @@ impl<'a> IntoIterator for OutputIterator<'a> { self.outputs.into_iter() } } + +#[cfg(test)] +mod test { + use anyhow::Result; + use tokio::task::JoinHandle; + use tokio_stream::StreamExt; + + use super::*; + use crate::{packet_stream, PacketStream}; + + #[test_logger::test(tokio::test)] + async fn test_outputs() -> Result<()> { + struct Outputs { + a: OutgoingPort, + b: OutgoingPort, + c: OutgoingPort, + } + + let (stream, rx) = FluxChannel::new_parts(); + + let mut outputs = Outputs { + a: OutgoingPort::new("a", stream.clone()), + b: OutgoingPort::new("b", stream.clone()), + c: OutgoingPort::new("c", stream), + }; + + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] + struct SomeStruct { + a: String, + } + let some_struct = SomeStruct { a: "hey".to_owned() }; + + outputs.a.send(&42); + outputs.a.send(42); + outputs.b.send("hey"); + outputs.b.send("hey".to_owned()); + let kinda_string = std::borrow::Cow::Borrowed("hey"); + outputs.b.send(kinda_string.as_ref()); + outputs.c.send(&some_struct.clone()); + outputs.c.send(&some_struct); + drop(outputs); + + let mut packets = rx.collect::>().await; + + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, 42); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, 42); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, "hey"); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, "hey"); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, "hey"); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, some_struct); + let p: Packet = packets.remove(0).into(); + assert_eq!(p.decode::()?, some_struct); + + Ok(()) + } + + #[test_logger::test(tokio::test)] + async fn test_inputs() -> Result<()> { + struct Inputs { + #[allow(unused)] + task: JoinHandle<()>, + a: PacketStream, + b: PacketStream, + } + impl Inputs { + fn new(mut stream: PacketStream) -> Self { + let (a_tx, a_rx) = PacketStream::new_channels(); + let (b_tx, b_rx) = PacketStream::new_channels(); + let task = tokio::spawn(async move { + while let Some(next) = stream.next().await { + let _ = match next { + Ok(packet) => match packet.port() { + "a" => a_tx.send(packet), + "b" => b_tx.send(packet), + crate::Packet::FATAL_ERROR => { + let _ = a_tx.send(packet.clone()); + b_tx.send(packet.clone()) + } + _ => continue, + }, + Err(e) => { + let _ = a_tx.error(e.clone()); + b_tx.error(e) + } + }; + } + }); + + Self { task, a: a_rx, b: b_rx } + } + } + + let stream = packet_stream!(("a", 32), ("b", "Hey")); + + let mut inputs = Inputs::new(stream); + + assert_eq!(inputs.a.next().await.unwrap()?.decode::()?, 32); + assert_eq!(inputs.b.next().await.unwrap()?.decode::()?, "Hey"); + + Ok(()) + } +} diff --git a/crates/wick/wick-packet/src/packet.rs b/crates/wick/wick-packet/src/packet.rs index 75138d214..1aadda788 100644 --- a/crates/wick/wick-packet/src/packet.rs +++ b/crates/wick/wick-packet/src/packet.rs @@ -343,26 +343,9 @@ impl std::fmt::Display for PacketError { } } -#[must_use] -pub fn packetstream_to_wasmrs(index: u32, stream: PacketStream) -> BoxFlux { - let s = tokio_stream::StreamExt::map(stream, move |p| { +impl From> for Packet { + fn from(p: Result) -> Self { p.map_or_else( - |e| Err(PayloadError::application_error(e.to_string(), None)), - |p| { - let md = wasmrs::Metadata::new_extra(index, p.extra.encode()).encode(); - match p.payload { - PacketPayload::Ok(b) => Ok(wasmrs::RawPayload::new_data(Some(md), b.map(Into::into))), - PacketPayload::Err(e) => Err(wasmrs::PayloadError::application_error(e.msg(), Some(md))), - } - }, - ) - }); - Box::pin(s) -} - -pub fn from_raw_wasmrs(stream: BoxFlux) -> PacketStream { - let s = tokio_stream::StreamExt::map(stream, move |p| { - let p = p.map_or_else( |e| { if let Some(mut metadata) = e.metadata { let md = wasmrs::Metadata::decode(&mut metadata); @@ -398,16 +381,13 @@ pub fn from_raw_wasmrs(stream: BoxFlux) -> PacketStrea Packet::component_error("invalid wasmrs packet with no metadata.") } }, - ); - Ok(p) - }); - - PacketStream::new(Box::new(s)) + ) + } } -pub fn from_wasmrs(stream: BoxFlux) -> PacketStream { - let s = tokio_stream::StreamExt::map(stream, move |p| { - let p = p.map_or_else( +impl From> for Packet { + fn from(p: Result) -> Self { + p.map_or_else( |e| { let md = wasmrs::Metadata::decode(&mut e.metadata.unwrap()); @@ -425,9 +405,34 @@ pub fn from_wasmrs(stream: BoxFlux) -> PacketStream { let data = p.data; Packet::new_for_port(wmd.port(), PacketPayload::Ok(Some(data.into())), wmd.flags()) }, - ); - Ok(p) + ) + } +} + +#[must_use] +pub fn packetstream_to_wasmrs(index: u32, stream: PacketStream) -> BoxFlux { + let s = tokio_stream::StreamExt::map(stream, move |p| { + p.map_or_else( + |e| Err(PayloadError::application_error(e.to_string(), None)), + |p| { + let md = wasmrs::Metadata::new_extra(index, p.extra.encode()).encode(); + match p.payload { + PacketPayload::Ok(b) => Ok(wasmrs::RawPayload::new_data(Some(md), b.map(Into::into))), + PacketPayload::Err(e) => Err(wasmrs::PayloadError::application_error(e.msg(), Some(md))), + } + }, + ) }); + Box::pin(s) +} + +pub fn from_raw_wasmrs(stream: BoxFlux) -> PacketStream { + let s = tokio_stream::StreamExt::map(stream, move |p| Ok(p.into())); + PacketStream::new(Box::new(s)) +} + +pub fn from_wasmrs(stream: BoxFlux) -> PacketStream { + let s = tokio_stream::StreamExt::map(stream, move |p| Ok(p.into())); PacketStream::new(Box::new(s)) } diff --git a/crates/wick/wick-packet/src/packet_stream.rs b/crates/wick/wick-packet/src/packet_stream.rs index 54da475dc..eaffb61a7 100644 --- a/crates/wick/wick-packet/src/packet_stream.rs +++ b/crates/wick/wick-packet/src/packet_stream.rs @@ -6,7 +6,7 @@ use tokio_stream::Stream; use tracing::{span_enabled, Span}; use wasmrs_rx::FluxChannel; -use crate::{BoxError, ContextTransport, InherentData, Packet, Result, RuntimeConfig}; +use crate::{ContextTransport, InherentData, Packet, Result, RuntimeConfig}; pub type PacketSender = FluxChannel; @@ -163,7 +163,7 @@ impl Stream for PacketStream { pub fn into_packet, T: serde::Serialize>( name: N, -) -> Box) -> Result> { +) -> Box) -> Result> { let name = name.into(); Box::new(move |x| Ok(x.map_or_else(|e| Packet::err(&name, e.to_string()), |x| Packet::encode(&name, &x)))) } diff --git a/crates/wick/wick-runtime/src/components/scope_component.rs b/crates/wick/wick-runtime/src/components/scope_component.rs index d7d9f0139..24401cf17 100644 --- a/crates/wick/wick-runtime/src/components/scope_component.rs +++ b/crates/wick/wick-runtime/src/components/scope_component.rs @@ -42,7 +42,7 @@ impl Component for ScopeComponent { Box::pin(async move { let scope = Scope::for_id(&self.scope_id) - .ok_or_else(|| flow_component::ComponentError::message(&format!("scope '{}' not found", target_url)))?; + .ok_or_else(|| flow_component::ComponentError::msg(format!("scope '{}' not found", target_url)))?; let target_component = invocation.target.component_id().to_owned(); if target_component != scope.namespace() { diff --git a/crates/wick/wick-runtime/src/triggers.rs b/crates/wick/wick-runtime/src/triggers.rs index e0ee57502..3af1c2258 100644 --- a/crates/wick/wick-runtime/src/triggers.rs +++ b/crates/wick/wick-runtime/src/triggers.rs @@ -48,7 +48,8 @@ pub trait Trigger { async fn shutdown_gracefully(self) -> Result<(), RuntimeError>; /// Wait for the trigger to finish. - async fn wait_for_done(&self); + #[must_use = "this returns the output of the trigger"] + async fn wait_for_done(&self) -> StructuredOutput; } /// Runtime configuration necessary for a trigger to execute. diff --git a/crates/wick/wick-stdlib/src/macros.rs b/crates/wick/wick-stdlib/src/macros.rs index 98e1b6425..f15d3346d 100644 --- a/crates/wick/wick-stdlib/src/macros.rs +++ b/crates/wick/wick-stdlib/src/macros.rs @@ -4,7 +4,7 @@ macro_rules! request_response { inputs: {$($ikey:ident => $ity:ty),* $(,)?}, output: $okey:expr, }) => { - pub(crate) async fn $name(mut invocation: wick_packet::Invocation) -> Result> { + pub(crate) async fn $name(mut invocation: wick_packet::Invocation) -> Result { #[allow(unused_parens)] let ($(mut $ikey),*) = fan_out!(invocation.packets, $(stringify!($ikey)),*); let (tx, rx) = PacketStream::new_channels(); diff --git a/examples/components/wasi-fs/Cargo.lock b/examples/components/wasi-fs/Cargo.lock index 05b658347..81a0b9eb4 100644 --- a/examples/components/wasi-fs/Cargo.lock +++ b/examples/components/wasi-fs/Cargo.lock @@ -2385,6 +2385,7 @@ dependencies = [ name = "wick-component" version = "0.15.0" dependencies = [ + "anyhow", "async-recursion", "flow-component", "paste", @@ -2502,6 +2503,7 @@ dependencies = [ name = "wick-packet" version = "0.15.0" dependencies = [ + "anyhow", "base64 0.21.2", "base64-serde", "bytes 1.4.0", diff --git a/examples/http/middleware/request/Cargo.lock b/examples/http/middleware/request/Cargo.lock index f590c3436..17011da74 100644 --- a/examples/http/middleware/request/Cargo.lock +++ b/examples/http/middleware/request/Cargo.lock @@ -2356,6 +2356,7 @@ dependencies = [ name = "wick-component" version = "0.15.0" dependencies = [ + "anyhow", "async-recursion", "flow-component", "paste", @@ -2473,6 +2474,7 @@ dependencies = [ name = "wick-packet" version = "0.15.0" dependencies = [ + "anyhow", "base64 0.21.2", "base64-serde", "bytes 1.4.0", diff --git a/examples/http/wasm-http-call/wasm-component/.vscode/settings.json b/examples/http/wasm-http-call/wasm-component/.vscode/settings.json index 1cd2bd528..2b834d747 100644 --- a/examples/http/wasm-http-call/wasm-component/.vscode/settings.json +++ b/examples/http/wasm-http-call/wasm-component/.vscode/settings.json @@ -1,6 +1,5 @@ { "rust-analyzer.cargo.target": "wasm32-unknown-unknown", - "extensions": [ - "CandleCorp.wick-development-framework" - ] -} \ No newline at end of file + "rust-analyzer.cargo.features": ["localgen"], + "extensions": ["CandleCorp.wick-development-framework"] +} diff --git a/examples/http/wasm-http-call/wasm-component/Cargo.lock b/examples/http/wasm-http-call/wasm-component/Cargo.lock index 667d6895a..1c65d704d 100644 --- a/examples/http/wasm-http-call/wasm-component/Cargo.lock +++ b/examples/http/wasm-http-call/wasm-component/Cargo.lock @@ -2327,6 +2327,7 @@ dependencies = [ name = "wick-component" version = "0.15.0" dependencies = [ + "anyhow", "async-recursion", "flow-component", "paste", @@ -2444,6 +2445,7 @@ dependencies = [ name = "wick-packet" version = "0.15.0" dependencies = [ + "anyhow", "base64 0.21.2", "base64-serde", "bytes 1.4.0", diff --git a/examples/http/wasm-http-call/wasm-component/Cargo.toml b/examples/http/wasm-http-call/wasm-component/Cargo.toml index 0871fb41c..6d7cce429 100644 --- a/examples/http/wasm-http-call/wasm-component/Cargo.toml +++ b/examples/http/wasm-http-call/wasm-component/Cargo.toml @@ -15,6 +15,10 @@ panic = "abort" [lib] crate-type = ["cdylib"] +[features] +default = [] +localgen = [] + [dependencies] wick-component = { path = "../../../../crates/wick/wick-component" } serde = { version = "1", features = ["derive"] } diff --git a/examples/http/wasm-http-call/wasm-component/build.rs b/examples/http/wasm-http-call/wasm-component/build.rs index d450ca798..30844241c 100644 --- a/examples/http/wasm-http-call/wasm-component/build.rs +++ b/examples/http/wasm-http-call/wasm-component/build.rs @@ -1,5 +1,23 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=component.wick"); + #[cfg(not(feature = "localgen"))] wick_component_codegen::configure().generate("component.wick")?; + + #[cfg(feature = "localgen")] + { + wick_component_codegen::configure() + .out_dir("src/generated") + .generate("component.wick")?; + + let fmt = std::process::Command::new("cargo") + .args(["+nightly", "fmt", "--", "src/generated/mod.rs"]) + .status() + .expect("Failed to run cargo fmt on generated files."); + + if !fmt.success() { + // This can happen on minimally setup machines and is not a problem on its own. + println!("Could not format generated files"); + } + } Ok(()) } diff --git a/examples/http/wasm-http-call/wasm-component/src/generated/mod.rs b/examples/http/wasm-http-call/wasm-component/src/generated/mod.rs new file mode 100644 index 000000000..6e29698fb --- /dev/null +++ b/examples/http/wasm-http-call/wasm-component/src/generated/mod.rs @@ -0,0 +1,927 @@ +pub use async_trait::async_trait; +pub use wick_component::flow_component::Context; +#[allow(unused)] +pub(crate) use wick_component::WickStream; +#[allow(unused)] +pub(crate) use wick_component::*; +#[no_mangle] +#[cfg(target_family = "wasm")] +extern "C" fn __wasmrs_init(guest_buffer_size: u32, host_buffer_size: u32, max_host_frame_len: u32) { + wick_component::wasmrs_guest::init(guest_buffer_size, host_buffer_size, max_host_frame_len); + wick_component::wasmrs_guest::register_request_response("wick", "__setup", Box::new(__setup)); + wick_component::wasmrs_guest::register_request_channel("wick", "request", Box::new(Component::request_wrapper)); +} +#[cfg(target_family = "wasm")] +mod provided { + #[allow(unused)] + use super::*; + #[allow(unused)] + pub struct ClientComponent { + component: wick_packet::ComponentReference, + inherent: flow_component::InherentContext, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + #[allow(clippy::exhaustive_structs)] + pub struct PostOpConfig { + #[serde(rename = "message")] + pub message: String, + } + impl Default for PostOpConfig { + fn default() -> Self { + Self { + message: Default::default(), + } + } + } + impl From for wick_packet::RuntimeConfig { + fn from(v: PostOpConfig) -> Self { + wick_component::to_value(v).unwrap().try_into().unwrap() + } + } + impl ClientComponent { + pub fn new(component: wick_packet::ComponentReference, inherent: flow_component::InherentContext) -> Self { + Self { component, inherent } + } + #[allow(unused)] + pub fn component(&self) -> &wick_packet::ComponentReference { + &self.component + } + #[allow(unused)] + pub fn post_op( + &self, + op_config: PostOpConfig, + id: impl wick_component::Stream> + 'static, + name: impl wick_component::Stream> + 'static, + ) -> std::result::Result< + (WickStream, WickStream), + wick_packet::Error, + > { + let id = id.map(wick_component::wick_packet::into_packet("id")); + let name = name.map(wick_component::wick_packet::into_packet("name")); + let stream = wick_component::empty(); + let stream = stream.merge(id).merge(name).chain(wick_component::iter_raw(vec![ + Ok(Packet::done("id")), + Ok(Packet::done("name")), + ])); + let stream = wick_packet::PacketStream::new(Box::pin(stream)); + let mut stream = self.post_op_raw(op_config, stream)?; + Ok(wick_component::payload_fan_out!( + stream, raw : false, wick_component::AnyError, [("response", + types::http::HttpResponse), ("body", wick_component::Value)] + )) + } + #[allow(unused)] + pub fn post_op_raw>( + &self, + op_config: PostOpConfig, + stream: T, + ) -> std::result::Result { + Ok(self.component.call( + "post_op", + stream.into(), + Some(op_config.into()), + self.inherent.clone().into(), + )?) + } + } +} +#[cfg(target_family = "wasm")] +pub use provided::*; +#[cfg(target_family = "wasm")] +mod imported { + #[allow(unused)] + use super::*; +} +#[cfg(target_family = "wasm")] +pub use imported::*; +#[allow(unused)] +#[cfg(target_family = "wasm")] +mod provided_wasm { + #[allow(unused)] + use super::*; + pub(crate) struct Provided { + pub client: ClientComponent, + } + pub(crate) trait ProvidedContext { + fn provided(&self) -> Provided; + } + impl ProvidedContext for wick_component::flow_component::Context + where + T: std::fmt::Debug, + { + fn provided(&self) -> Provided { + let config = get_config(); + let inherent = self.inherent.clone(); + Provided { + client: ClientComponent::new(config.provided.get("client").cloned().unwrap(), inherent.clone()), + } + } + } +} +#[cfg(target_family = "wasm")] +pub(crate) use provided_wasm::*; +#[cfg(target_family = "wasm")] +thread_local! { + static __CONFIG : std::cell::UnsafeCell < Option < SetupPayload >> = + std::cell::UnsafeCell::new(None); +} +#[derive(Debug, Clone, Default, ::serde::Serialize, ::serde::Deserialize, PartialEq)] +#[allow(clippy::exhaustive_structs)] +pub struct RootConfig {} +#[cfg(target_family = "wasm")] +#[derive(Debug, ::serde::Deserialize)] +pub(crate) struct SetupPayload { + #[allow(unused)] + pub(crate) provided: std::collections::HashMap, + #[allow(unused)] + pub(crate) imported: std::collections::HashMap, + #[allow(unused)] + pub(crate) config: RootConfig, +} +#[cfg(target_family = "wasm")] +fn __setup( + input: wasmrs_rx::BoxMono, +) -> Result, wick_component::BoxError> { + Ok(Box::pin(async move { + let payload = input.await?; + match wasmrs_codec::messagepack::deserialize::(&payload.data) { + Ok(input) => { + __CONFIG.with(|cell| { + #[allow(unsafe_code)] + unsafe { &mut *cell.get() }.replace(input); + }); + Ok(wasmrs::RawPayload::new_data(None, None)) + } + Err(e) => Err(wasmrs::PayloadError::application_error(e.to_string(), None)), + } + })) +} +#[allow(unused)] +#[cfg(target_family = "wasm")] +pub(crate) fn get_config() -> &'static SetupPayload { + __CONFIG.with(|cell| { + #[allow(unsafe_code)] + unsafe { &*cell.get() }.as_ref().unwrap() + }) +} +#[allow(unused)] +#[cfg(target_family = "wasm")] +pub(crate) fn get_root_config() -> &'static RootConfig { + __CONFIG.with(|cell| { + #[allow(unsafe_code)] + &unsafe { &*cell.get() }.as_ref().unwrap().config + }) +} +pub(crate) trait RootConfigContext { + fn root_config(&self) -> &'static RootConfig; +} +impl RootConfigContext for Context +where + T: std::fmt::Debug + wick_component::flow_component::LocalAwareSend, +{ + fn root_config(&self) -> &'static RootConfig { + #[cfg(target_family = "wasm")] + { + get_root_config() + } + #[cfg(not(target_family = "wasm"))] + { + unimplemented!("root_config is only available in wasm builds") + } + } +} +///Additional generated types +pub mod types { + #[allow(unused)] + use super::types; + pub mod http { + #[allow(unused)] + use super::http; + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP method enum + #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] + #[allow(clippy::exhaustive_enums)] + pub enum HttpMethod { + ///HTTP GET method + Get, + ///HTTP POST method + Post, + ///HTTP PUT method + Put, + ///HTTP DELETE method + Delete, + ///HTTP PATCH method + Patch, + ///HTTP HEAD method + Head, + ///HTTP OPTIONS method + Options, + ///HTTP TRACE method + Trace, + } + impl TryFrom for HttpMethod { + type Error = String; + fn try_from(value: wick_component::serde_util::enum_repr::StringOrNum) -> std::result::Result { + use std::str::FromStr; + match value { + wick_component::serde_util::enum_repr::StringOrNum::String(v) => Self::from_str(&v), + wick_component::serde_util::enum_repr::StringOrNum::Int(v) => Self::from_str(&v.to_string()), + wick_component::serde_util::enum_repr::StringOrNum::Float(v) => Self::from_str(&v.to_string()), + } + } + } + impl From for String { + fn from(value: HttpMethod) -> Self { + value.value().map_or_else(|| value.to_string(), |v| v.to_owned()) + } + } + impl HttpMethod { + #[allow(unused)] + ///Returns the value of the enum variant as a string. + #[must_use] + pub fn value(&self) -> Option<&'static str> { + #[allow(clippy::match_single_binding)] + match self { + Self::Get => None, + Self::Post => None, + Self::Put => None, + Self::Delete => None, + Self::Patch => None, + Self::Head => None, + Self::Options => None, + Self::Trace => None, + } + } + } + impl TryFrom for HttpMethod { + type Error = u32; + fn try_from(i: u32) -> Result { + #[allow(clippy::match_single_binding)] + match i { + _ => Err(i), + } + } + } + impl std::str::FromStr for HttpMethod { + type Err = String; + fn from_str(s: &str) -> Result { + #[allow(clippy::match_single_binding)] + match s { + "Get" => Ok(Self::Get), + "Post" => Ok(Self::Post), + "Put" => Ok(Self::Put), + "Delete" => Ok(Self::Delete), + "Patch" => Ok(Self::Patch), + "Head" => Ok(Self::Head), + "Options" => Ok(Self::Options), + "Trace" => Ok(Self::Trace), + _ => Err(s.to_owned()), + } + } + } + impl std::fmt::Display for HttpMethod { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[allow(clippy::match_single_binding)] + match self { + Self::Get => f.write_str("Get"), + Self::Post => f.write_str("Post"), + Self::Put => f.write_str("Put"), + Self::Delete => f.write_str("Delete"), + Self::Patch => f.write_str("Patch"), + Self::Head => f.write_str("Head"), + Self::Options => f.write_str("Options"), + Self::Trace => f.write_str("Trace"), + } + } + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP request + #[allow(clippy::exhaustive_structs)] + pub struct HttpRequest { + ///method from request line enum + #[serde(rename = "method")] + pub method: HttpMethod, + ///scheme from request line enum + #[serde(rename = "scheme")] + pub scheme: HttpScheme, + ///domain/port and any authentication from request line. optional + #[serde(rename = "authority")] + pub authority: String, + ///query parameters from request line. optional + #[serde(rename = "query_parameters")] + #[serde(default)] + #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")] + pub query_parameters: std::collections::HashMap>, + ///path from request line (not including query parameters) + #[serde(rename = "path")] + pub path: String, + ///full URI from request line + #[serde(rename = "uri")] + pub uri: String, + ///HTTP version enum + #[serde(rename = "version")] + pub version: HttpVersion, + ///All request headers. Duplicates are comma separated + #[serde(rename = "headers")] + #[serde(default)] + #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")] + pub headers: std::collections::HashMap>, + ///The remote address of the connected client + #[serde(rename = "remote_addr")] + pub remote_addr: String, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP response + #[allow(clippy::exhaustive_structs)] + pub struct HttpResponse { + ///HTTP version enum + #[serde(rename = "version")] + pub version: HttpVersion, + ///status code enum + #[serde(rename = "status")] + pub status: StatusCode, + ///All response headers. Supports duplicates. + #[serde(rename = "headers")] + #[serde(default)] + #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")] + pub headers: std::collections::HashMap>, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP scheme + #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] + #[allow(clippy::exhaustive_enums)] + pub enum HttpScheme { + ///HTTP scheme + Http, + ///HTTPS scheme + Https, + } + impl TryFrom for HttpScheme { + type Error = String; + fn try_from(value: wick_component::serde_util::enum_repr::StringOrNum) -> std::result::Result { + use std::str::FromStr; + match value { + wick_component::serde_util::enum_repr::StringOrNum::String(v) => Self::from_str(&v), + wick_component::serde_util::enum_repr::StringOrNum::Int(v) => Self::from_str(&v.to_string()), + wick_component::serde_util::enum_repr::StringOrNum::Float(v) => Self::from_str(&v.to_string()), + } + } + } + impl From for String { + fn from(value: HttpScheme) -> Self { + value.value().map_or_else(|| value.to_string(), |v| v.to_owned()) + } + } + impl HttpScheme { + #[allow(unused)] + ///Returns the value of the enum variant as a string. + #[must_use] + pub fn value(&self) -> Option<&'static str> { + #[allow(clippy::match_single_binding)] + match self { + Self::Http => None, + Self::Https => None, + } + } + } + impl TryFrom for HttpScheme { + type Error = u32; + fn try_from(i: u32) -> Result { + #[allow(clippy::match_single_binding)] + match i { + _ => Err(i), + } + } + } + impl std::str::FromStr for HttpScheme { + type Err = String; + fn from_str(s: &str) -> Result { + #[allow(clippy::match_single_binding)] + match s { + "Http" => Ok(Self::Http), + "Https" => Ok(Self::Https), + _ => Err(s.to_owned()), + } + } + } + impl std::fmt::Display for HttpScheme { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[allow(clippy::match_single_binding)] + match self { + Self::Http => f.write_str("Http"), + Self::Https => f.write_str("Https"), + } + } + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP version + #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] + #[allow(clippy::exhaustive_enums)] + pub enum HttpVersion { + ///HTTP 1.0 version + Http10, + ///HTTP 1.1 version + Http11, + ///HTTP 2.0 version + Http20, + } + impl TryFrom for HttpVersion { + type Error = String; + fn try_from(value: wick_component::serde_util::enum_repr::StringOrNum) -> std::result::Result { + use std::str::FromStr; + match value { + wick_component::serde_util::enum_repr::StringOrNum::String(v) => Self::from_str(&v), + wick_component::serde_util::enum_repr::StringOrNum::Int(v) => Self::from_str(&v.to_string()), + wick_component::serde_util::enum_repr::StringOrNum::Float(v) => Self::from_str(&v.to_string()), + } + } + } + impl From for String { + fn from(value: HttpVersion) -> Self { + value.value().map_or_else(|| value.to_string(), |v| v.to_owned()) + } + } + impl HttpVersion { + #[allow(unused)] + ///Returns the value of the enum variant as a string. + #[must_use] + pub fn value(&self) -> Option<&'static str> { + #[allow(clippy::match_single_binding)] + match self { + Self::Http10 => Some("1.0"), + Self::Http11 => Some("1.1"), + Self::Http20 => Some("2.0"), + } + } + } + impl TryFrom for HttpVersion { + type Error = u32; + fn try_from(i: u32) -> Result { + #[allow(clippy::match_single_binding)] + match i { + _ => Err(i), + } + } + } + impl std::str::FromStr for HttpVersion { + type Err = String; + fn from_str(s: &str) -> Result { + #[allow(clippy::match_single_binding)] + match s { + "1.0" => Ok(Self::Http10), + "Http10" => Ok(Self::Http10), + "1.1" => Ok(Self::Http11), + "Http11" => Ok(Self::Http11), + "2.0" => Ok(Self::Http20), + "Http20" => Ok(Self::Http20), + _ => Err(s.to_owned()), + } + } + } + impl std::fmt::Display for HttpVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[allow(clippy::match_single_binding)] + match self { + Self::Http10 => f.write_str("1.0"), + Self::Http11 => f.write_str("1.1"), + Self::Http20 => f.write_str("2.0"), + } + } + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///A response from pre-request middleware + #[serde(untagged)] + pub enum RequestMiddlewareResponse { + ///A HttpRequest value. + HttpRequest(HttpRequest), + ///A HttpResponse value. + HttpResponse(HttpResponse), + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP status code + #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] + #[allow(clippy::exhaustive_enums)] + pub enum StatusCode { + ///Continue status code + Continue, + ///SwitchingProtocols status code + SwitchingProtocols, + ///HTTP OK status code + Ok, + ///Created status code + Created, + ///Accepted status code + Accepted, + ///NonAuthoritativeInformation status code + NonAuthoritativeInformation, + ///NoContent status code + NoContent, + ///ResetContent status code + ResetContent, + ///PartialContent status code + PartialContent, + ///MultipleChoices status code + MultipleChoices, + ///MovedPermanently status code + MovedPermanently, + ///Found status code + Found, + ///SeeOther status code + SeeOther, + ///NotModified status code + NotModified, + ///TemporaryRedirect status code + TemporaryRedirect, + ///PermanentRedirect status code + PermanentRedirect, + ///BadRequest status code + BadRequest, + ///Unauthorized status code + Unauthorized, + ///PaymentRequired status code + PaymentRequired, + ///Forbidden status code + Forbidden, + ///NotFound status code + NotFound, + ///MethodNotAllowed status code + MethodNotAllowed, + ///NotAcceptable status code + NotAcceptable, + ///ProxyAuthenticationRequired status code + ProxyAuthenticationRequired, + ///RequestTimeout status code + RequestTimeout, + ///Conflict status code + Conflict, + ///Gone status code + Gone, + ///LengthRequired status code + LengthRequired, + ///PreconditionFailed status code + PreconditionFailed, + ///PayloadTooLarge status code + PayloadTooLarge, + ///URITooLong status code + UriTooLong, + ///UnsupportedMediaType status code + UnsupportedMediaType, + ///RangeNotSatisfiable status code + RangeNotSatisfiable, + ///ExpectationFailed status code + ExpectationFailed, + ///ImATeapot status code + ImATeapot, + ///UnprocessableEntity status code + UnprocessableEntity, + ///Locked status code + Locked, + ///FailedDependency status code + FailedDependency, + ///TooManyRequests status code + TooManyRequests, + ///InternalServerError status code + InternalServerError, + ///NotImplemented status code + NotImplemented, + ///BadGateway status code + BadGateway, + ///ServiceUnavailable status code + ServiceUnavailable, + ///GatewayTimeout status code + GatewayTimeout, + ///HTTPVersionNotSupported status code + HttpVersionNotSupported, + ///Indicates an unknown status code + Unknown, + } + impl TryFrom for StatusCode { + type Error = String; + fn try_from(value: wick_component::serde_util::enum_repr::StringOrNum) -> std::result::Result { + use std::str::FromStr; + match value { + wick_component::serde_util::enum_repr::StringOrNum::String(v) => Self::from_str(&v), + wick_component::serde_util::enum_repr::StringOrNum::Int(v) => Self::from_str(&v.to_string()), + wick_component::serde_util::enum_repr::StringOrNum::Float(v) => Self::from_str(&v.to_string()), + } + } + } + impl From for String { + fn from(value: StatusCode) -> Self { + value.value().map_or_else(|| value.to_string(), |v| v.to_owned()) + } + } + impl StatusCode { + #[allow(unused)] + ///Returns the value of the enum variant as a string. + #[must_use] + pub fn value(&self) -> Option<&'static str> { + #[allow(clippy::match_single_binding)] + match self { + Self::Continue => Some("100"), + Self::SwitchingProtocols => Some("101"), + Self::Ok => Some("200"), + Self::Created => Some("201"), + Self::Accepted => Some("202"), + Self::NonAuthoritativeInformation => Some("203"), + Self::NoContent => Some("204"), + Self::ResetContent => Some("205"), + Self::PartialContent => Some("206"), + Self::MultipleChoices => Some("300"), + Self::MovedPermanently => Some("301"), + Self::Found => Some("302"), + Self::SeeOther => Some("303"), + Self::NotModified => Some("304"), + Self::TemporaryRedirect => Some("307"), + Self::PermanentRedirect => Some("308"), + Self::BadRequest => Some("400"), + Self::Unauthorized => Some("401"), + Self::PaymentRequired => Some("402"), + Self::Forbidden => Some("403"), + Self::NotFound => Some("404"), + Self::MethodNotAllowed => Some("405"), + Self::NotAcceptable => Some("406"), + Self::ProxyAuthenticationRequired => Some("407"), + Self::RequestTimeout => Some("408"), + Self::Conflict => Some("409"), + Self::Gone => Some("410"), + Self::LengthRequired => Some("411"), + Self::PreconditionFailed => Some("412"), + Self::PayloadTooLarge => Some("413"), + Self::UriTooLong => Some("414"), + Self::UnsupportedMediaType => Some("415"), + Self::RangeNotSatisfiable => Some("416"), + Self::ExpectationFailed => Some("417"), + Self::ImATeapot => Some("418"), + Self::UnprocessableEntity => Some("422"), + Self::Locked => Some("423"), + Self::FailedDependency => Some("424"), + Self::TooManyRequests => Some("429"), + Self::InternalServerError => Some("500"), + Self::NotImplemented => Some("501"), + Self::BadGateway => Some("502"), + Self::ServiceUnavailable => Some("503"), + Self::GatewayTimeout => Some("504"), + Self::HttpVersionNotSupported => Some("505"), + Self::Unknown => Some("-1"), + } + } + } + impl TryFrom for StatusCode { + type Error = u32; + fn try_from(i: u32) -> Result { + #[allow(clippy::match_single_binding)] + match i { + _ => Err(i), + } + } + } + impl std::str::FromStr for StatusCode { + type Err = String; + fn from_str(s: &str) -> Result { + #[allow(clippy::match_single_binding)] + match s { + "100" => Ok(Self::Continue), + "Continue" => Ok(Self::Continue), + "101" => Ok(Self::SwitchingProtocols), + "SwitchingProtocols" => Ok(Self::SwitchingProtocols), + "200" => Ok(Self::Ok), + "Ok" => Ok(Self::Ok), + "201" => Ok(Self::Created), + "Created" => Ok(Self::Created), + "202" => Ok(Self::Accepted), + "Accepted" => Ok(Self::Accepted), + "203" => Ok(Self::NonAuthoritativeInformation), + "NonAuthoritativeInformation" => Ok(Self::NonAuthoritativeInformation), + "204" => Ok(Self::NoContent), + "NoContent" => Ok(Self::NoContent), + "205" => Ok(Self::ResetContent), + "ResetContent" => Ok(Self::ResetContent), + "206" => Ok(Self::PartialContent), + "PartialContent" => Ok(Self::PartialContent), + "300" => Ok(Self::MultipleChoices), + "MultipleChoices" => Ok(Self::MultipleChoices), + "301" => Ok(Self::MovedPermanently), + "MovedPermanently" => Ok(Self::MovedPermanently), + "302" => Ok(Self::Found), + "Found" => Ok(Self::Found), + "303" => Ok(Self::SeeOther), + "SeeOther" => Ok(Self::SeeOther), + "304" => Ok(Self::NotModified), + "NotModified" => Ok(Self::NotModified), + "307" => Ok(Self::TemporaryRedirect), + "TemporaryRedirect" => Ok(Self::TemporaryRedirect), + "308" => Ok(Self::PermanentRedirect), + "PermanentRedirect" => Ok(Self::PermanentRedirect), + "400" => Ok(Self::BadRequest), + "BadRequest" => Ok(Self::BadRequest), + "401" => Ok(Self::Unauthorized), + "Unauthorized" => Ok(Self::Unauthorized), + "402" => Ok(Self::PaymentRequired), + "PaymentRequired" => Ok(Self::PaymentRequired), + "403" => Ok(Self::Forbidden), + "Forbidden" => Ok(Self::Forbidden), + "404" => Ok(Self::NotFound), + "NotFound" => Ok(Self::NotFound), + "405" => Ok(Self::MethodNotAllowed), + "MethodNotAllowed" => Ok(Self::MethodNotAllowed), + "406" => Ok(Self::NotAcceptable), + "NotAcceptable" => Ok(Self::NotAcceptable), + "407" => Ok(Self::ProxyAuthenticationRequired), + "ProxyAuthenticationRequired" => Ok(Self::ProxyAuthenticationRequired), + "408" => Ok(Self::RequestTimeout), + "RequestTimeout" => Ok(Self::RequestTimeout), + "409" => Ok(Self::Conflict), + "Conflict" => Ok(Self::Conflict), + "410" => Ok(Self::Gone), + "Gone" => Ok(Self::Gone), + "411" => Ok(Self::LengthRequired), + "LengthRequired" => Ok(Self::LengthRequired), + "412" => Ok(Self::PreconditionFailed), + "PreconditionFailed" => Ok(Self::PreconditionFailed), + "413" => Ok(Self::PayloadTooLarge), + "PayloadTooLarge" => Ok(Self::PayloadTooLarge), + "414" => Ok(Self::UriTooLong), + "UriTooLong" => Ok(Self::UriTooLong), + "415" => Ok(Self::UnsupportedMediaType), + "UnsupportedMediaType" => Ok(Self::UnsupportedMediaType), + "416" => Ok(Self::RangeNotSatisfiable), + "RangeNotSatisfiable" => Ok(Self::RangeNotSatisfiable), + "417" => Ok(Self::ExpectationFailed), + "ExpectationFailed" => Ok(Self::ExpectationFailed), + "418" => Ok(Self::ImATeapot), + "ImATeapot" => Ok(Self::ImATeapot), + "422" => Ok(Self::UnprocessableEntity), + "UnprocessableEntity" => Ok(Self::UnprocessableEntity), + "423" => Ok(Self::Locked), + "Locked" => Ok(Self::Locked), + "424" => Ok(Self::FailedDependency), + "FailedDependency" => Ok(Self::FailedDependency), + "429" => Ok(Self::TooManyRequests), + "TooManyRequests" => Ok(Self::TooManyRequests), + "500" => Ok(Self::InternalServerError), + "InternalServerError" => Ok(Self::InternalServerError), + "501" => Ok(Self::NotImplemented), + "NotImplemented" => Ok(Self::NotImplemented), + "502" => Ok(Self::BadGateway), + "BadGateway" => Ok(Self::BadGateway), + "503" => Ok(Self::ServiceUnavailable), + "ServiceUnavailable" => Ok(Self::ServiceUnavailable), + "504" => Ok(Self::GatewayTimeout), + "GatewayTimeout" => Ok(Self::GatewayTimeout), + "505" => Ok(Self::HttpVersionNotSupported), + "HttpVersionNotSupported" => Ok(Self::HttpVersionNotSupported), + "-1" => Ok(Self::Unknown), + "Unknown" => Ok(Self::Unknown), + _ => Err(s.to_owned()), + } + } + } + impl std::fmt::Display for StatusCode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[allow(clippy::match_single_binding)] + match self { + Self::Continue => f.write_str("100"), + Self::SwitchingProtocols => f.write_str("101"), + Self::Ok => f.write_str("200"), + Self::Created => f.write_str("201"), + Self::Accepted => f.write_str("202"), + Self::NonAuthoritativeInformation => f.write_str("203"), + Self::NoContent => f.write_str("204"), + Self::ResetContent => f.write_str("205"), + Self::PartialContent => f.write_str("206"), + Self::MultipleChoices => f.write_str("300"), + Self::MovedPermanently => f.write_str("301"), + Self::Found => f.write_str("302"), + Self::SeeOther => f.write_str("303"), + Self::NotModified => f.write_str("304"), + Self::TemporaryRedirect => f.write_str("307"), + Self::PermanentRedirect => f.write_str("308"), + Self::BadRequest => f.write_str("400"), + Self::Unauthorized => f.write_str("401"), + Self::PaymentRequired => f.write_str("402"), + Self::Forbidden => f.write_str("403"), + Self::NotFound => f.write_str("404"), + Self::MethodNotAllowed => f.write_str("405"), + Self::NotAcceptable => f.write_str("406"), + Self::ProxyAuthenticationRequired => f.write_str("407"), + Self::RequestTimeout => f.write_str("408"), + Self::Conflict => f.write_str("409"), + Self::Gone => f.write_str("410"), + Self::LengthRequired => f.write_str("411"), + Self::PreconditionFailed => f.write_str("412"), + Self::PayloadTooLarge => f.write_str("413"), + Self::UriTooLong => f.write_str("414"), + Self::UnsupportedMediaType => f.write_str("415"), + Self::RangeNotSatisfiable => f.write_str("416"), + Self::ExpectationFailed => f.write_str("417"), + Self::ImATeapot => f.write_str("418"), + Self::UnprocessableEntity => f.write_str("422"), + Self::Locked => f.write_str("423"), + Self::FailedDependency => f.write_str("424"), + Self::TooManyRequests => f.write_str("429"), + Self::InternalServerError => f.write_str("500"), + Self::NotImplemented => f.write_str("501"), + Self::BadGateway => f.write_str("502"), + Self::ServiceUnavailable => f.write_str("503"), + Self::GatewayTimeout => f.write_str("504"), + Self::HttpVersionNotSupported => f.write_str("505"), + Self::Unknown => f.write_str("-1"), + } + } + } + } +} +///Types associated with the `request` operation +pub mod request { + #[allow(unused)] + use super::*; + #[derive(Debug, Clone, Default, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + #[allow(clippy::exhaustive_structs)] + pub struct Config {} + pub struct Outputs { + #[allow(unused)] + pub(crate) output: wick_packet::OutgoingPort, + } + impl wick_component::Broadcast for Outputs { + fn outputs_mut(&mut self) -> wick_packet::OutputIterator<'_> { + wick_packet::OutputIterator::new(vec![&mut self.output]) + } + } + impl wick_component::SingleOutput for Outputs { + fn single_output(&mut self) -> &mut dyn wick_packet::Port { + &mut self.output + } + } + impl Outputs { + pub fn new(channel: wasmrs_rx::FluxChannel) -> Self { + Self { + output: wick_packet::OutgoingPort::new("output", channel), + } + } + } + #[async_trait::async_trait(?Send)] + #[cfg(target_family = "wasm")] + pub trait Operation { + type Error; + type Outputs; + type Config: std::fmt::Debug; + #[allow(unused)] + async fn request( + id: WickStream, + name: WickStream, + outputs: Self::Outputs, + ctx: wick_component::flow_component::Context, + ) -> std::result::Result<(), Self::Error>; + } + #[async_trait::async_trait] + #[cfg(not(target_family = "wasm"))] + pub trait Operation { + type Error: Send; + type Outputs: Send; + type Config: std::fmt::Debug + Send; + #[allow(unused)] + async fn request( + id: WickStream, + name: WickStream, + outputs: Self::Outputs, + ctx: wick_component::flow_component::Context, + ) -> std::result::Result<(), Self::Error>; + } +} +#[derive(Default, Clone)] +///The struct that the component implementation hinges around +pub struct Component; +impl Component { + fn request_wrapper( + mut input: wasmrs_rx::BoxFlux, + ) -> std::result::Result, wick_component::BoxError> { + let (channel, rx) = wasmrs_rx::FluxChannel::::new_parts(); + let outputs = request::Outputs::new(channel.clone()); + runtime::spawn("request_wrapper", async move { + #[allow(unused_parens)] + let (config, (id, name)) = wick_component::payload_fan_out!( + input, raw : false, wick_component::AnyError, request::Config, + [("id", String), ("name", String)] + ); + let config = match config.await { + Ok(Ok(config)) => config, + Err(e) => { + let _ = channel + .send_result(wick_packet::Packet::component_error(format!("Component sent invalid context: {}", e)).into()); + return; + } + Ok(Err(e)) => { + let _ = channel + .send_result(wick_packet::Packet::component_error(format!("Component sent invalid context: {}", e)).into()); + return; + } + }; + use request::Operation; + if let Err(e) = Component::request(Box::pin(id), Box::pin(name), outputs, config).await { + let _ = channel.send_result(wick_packet::Packet::component_error(e.to_string()).into()); + } + }); + Ok(Box::pin(rx)) + } +} diff --git a/examples/http/wasm-http-call/wasm-component/src/lib.rs b/examples/http/wasm-http-call/wasm-component/src/lib.rs index ae78122ff..0e079e719 100644 --- a/examples/http/wasm-http-call/wasm-component/src/lib.rs +++ b/examples/http/wasm-http-call/wasm-component/src/lib.rs @@ -1,6 +1,13 @@ +#[cfg(feature = "localgen")] +mod generated; +#[cfg(feature = "localgen")] +use generated as wick; +#[cfg(not(feature = "localgen"))] mod wick { + #![allow(unused_imports, missing_debug_implementations, clippy::needless_pass_by_value)] wick_component::wick_import!(); } + use wick::*; #[async_trait::async_trait(?Send)] diff --git a/integration-tests/codegen-tests/src/import_types/mod.rs b/integration-tests/codegen-tests/src/import_types/mod.rs index c7a259e5a..6f21b3c3d 100644 --- a/integration-tests/codegen-tests/src/import_types/mod.rs +++ b/integration-tests/codegen-tests/src/import_types/mod.rs @@ -32,8 +32,8 @@ mod provided { #[allow(unused)] pub fn echo( &self, - input: impl wick_component::Stream> + 'static, - ) -> std::result::Result, wick_packet::Error> { + input: impl wick_component::Stream> + 'static, + ) -> std::result::Result<(WickStream), wick_packet::Error> { let input = input.map(wick_component::wick_packet::into_packet("input")); let stream = wick_component::empty(); let stream = stream @@ -42,7 +42,7 @@ mod provided { let stream = wick_packet::PacketStream::new(Box::pin(stream)); let mut stream = self.echo_raw(stream)?; Ok(wick_component::payload_fan_out!( - stream, raw : false, wick_component::BoxError, [("output", + stream, raw : false, wick_component::AnyError, [("output", types::http::HttpRequest)] )) } @@ -81,9 +81,9 @@ mod imported { #[allow(unused)] pub fn add( &self, - left: impl wick_component::Stream> + 'static, - right: impl wick_component::Stream> + 'static, - ) -> std::result::Result, wick_packet::Error> { + left: impl wick_component::Stream> + 'static, + right: impl wick_component::Stream> + 'static, + ) -> std::result::Result<(WickStream), wick_packet::Error> { let left = left.map(wick_component::wick_packet::into_packet("left")); let right = right.map(wick_component::wick_packet::into_packet("right")); let stream = wick_component::empty(); @@ -94,7 +94,7 @@ mod imported { let stream = wick_packet::PacketStream::new(Box::pin(stream)); let mut stream = self.add_raw(stream)?; Ok(wick_component::payload_fan_out!( - stream, raw : false, wick_component::BoxError, [("output", u64)] + stream, raw : false, wick_component::AnyError, [("output", u64)] )) } #[allow(unused)] @@ -111,8 +111,8 @@ mod imported { #[allow(unused)] pub fn error( &self, - input: impl wick_component::Stream> + 'static, - ) -> std::result::Result, wick_packet::Error> { + input: impl wick_component::Stream> + 'static, + ) -> std::result::Result<(WickStream), wick_packet::Error> { let input = input.map(wick_component::wick_packet::into_packet("input")); let stream = wick_component::empty(); let stream = stream @@ -121,7 +121,7 @@ mod imported { let stream = wick_packet::PacketStream::new(Box::pin(stream)); let mut stream = self.error_raw(stream)?; Ok(wick_component::payload_fan_out!( - stream, raw : false, wick_component::BoxError, [("output", String)] + stream, raw : false, wick_component::AnyError, [("output", String)] )) } #[allow(unused)] @@ -138,8 +138,8 @@ mod imported { #[allow(unused)] pub fn validate( &self, - input: impl wick_component::Stream> + 'static, - ) -> std::result::Result, wick_packet::Error> { + input: impl wick_component::Stream> + 'static, + ) -> std::result::Result<(WickStream), wick_packet::Error> { let input = input.map(wick_component::wick_packet::into_packet("input")); let stream = wick_component::empty(); let stream = stream @@ -148,7 +148,7 @@ mod imported { let stream = wick_packet::PacketStream::new(Box::pin(stream)); let mut stream = self.validate_raw(stream)?; Ok(wick_component::payload_fan_out!( - stream, raw : false, wick_component::BoxError, [("output", String)] + stream, raw : false, wick_component::AnyError, [("output", String)] )) } #[allow(unused)] @@ -2253,8 +2253,8 @@ pub mod echo { pub struct Config {} pub struct Outputs { #[allow(unused)] - pub(crate) output: wick_packet::Output, - pub(crate) time: wick_packet::Output, + pub(crate) output: wick_packet::OutgoingPort, + pub(crate) time: wick_packet::OutgoingPort, } impl wick_component::Broadcast for Outputs { fn outputs_mut(&mut self) -> wick_packet::OutputIterator<'_> { @@ -2264,15 +2264,15 @@ pub mod echo { impl Outputs { pub fn new(channel: wasmrs_rx::FluxChannel) -> Self { Self { - output: wick_packet::Output::new("output", channel.clone()), - time: wick_packet::Output::new("time", channel), + output: wick_packet::OutgoingPort::new("output", channel.clone()), + time: wick_packet::OutgoingPort::new("time", channel), } } } #[async_trait::async_trait(?Send)] #[cfg(target_family = "wasm")] pub trait Operation { - type Error: std::fmt::Display; + type Error; type Outputs; type Config: std::fmt::Debug; #[allow(unused)] @@ -2286,7 +2286,7 @@ pub mod echo { #[async_trait::async_trait] #[cfg(not(target_family = "wasm"))] pub trait Operation { - type Error: std::fmt::Display + Send; + type Error: Send; type Outputs: Send; type Config: std::fmt::Debug + Send; #[allow(unused)] @@ -2320,7 +2320,7 @@ pub mod testop { } pub struct Outputs { #[allow(unused)] - pub(crate) output: wick_packet::Output, + pub(crate) output: wick_packet::OutgoingPort, } impl wick_component::Broadcast for Outputs { fn outputs_mut(&mut self) -> wick_packet::OutputIterator<'_> { @@ -2335,14 +2335,14 @@ pub mod testop { impl Outputs { pub fn new(channel: wasmrs_rx::FluxChannel) -> Self { Self { - output: wick_packet::Output::new("output", channel), + output: wick_packet::OutgoingPort::new("output", channel), } } } #[async_trait::async_trait(?Send)] #[cfg(target_family = "wasm")] pub trait Operation { - type Error: std::fmt::Display; + type Error; type Outputs; type Config: std::fmt::Debug; #[allow(unused)] @@ -2355,7 +2355,7 @@ pub mod testop { #[async_trait::async_trait] #[cfg(not(target_family = "wasm"))] pub trait Operation { - type Error: std::fmt::Display + Send; + type Error: Send; type Outputs: Send; type Config: std::fmt::Debug + Send; #[allow(unused)] @@ -2372,17 +2372,15 @@ pub struct Component; impl Component { fn echo_wrapper( mut input: wasmrs_rx::BoxFlux, - ) -> std::result::Result< - wasmrs_rx::BoxFlux, - Box, - > { + ) -> std::result::Result, wick_component::BoxError> { let (channel, rx) = wasmrs_rx::FluxChannel::::new_parts(); let outputs = echo::Outputs::new(channel.clone()); runtime::spawn("echo_wrapper", async move { - let (config, input, time) = wick_component::payload_fan_out!( - input, raw : false, Box < dyn std::error::Error + Send + Sync >, - echo::Config, [("input", types::http::HttpRequest), ("time", - wick_component::datetime::DateTime),] + #[allow(unused_parens)] + let (config, (input, time)) = wick_component::payload_fan_out!( + input, raw : false, wick_component::AnyError, echo::Config, + [("input", types::http::HttpRequest), ("time", + wick_component::datetime::DateTime)] ); let config = match config.await { Ok(Ok(config)) => config, @@ -2406,16 +2404,14 @@ impl Component { } fn testop_wrapper( mut input: wasmrs_rx::BoxFlux, - ) -> std::result::Result< - wasmrs_rx::BoxFlux, - Box, - > { + ) -> std::result::Result, wick_component::BoxError> { let (channel, rx) = wasmrs_rx::FluxChannel::::new_parts(); let outputs = testop::Outputs::new(channel.clone()); runtime::spawn("testop_wrapper", async move { - let (config, message) = wick_component::payload_fan_out!( - input, raw : false, Box < dyn std::error::Error + Send + Sync >, - testop::Config, [("message", types::http::HttpResponse),] + #[allow(unused_parens)] + let (config, (message)) = wick_component::payload_fan_out!( + input, raw : false, wick_component::AnyError, testop::Config, + [("message", types::http::HttpResponse)] ); let config = match config.await { Ok(Ok(config)) => config,