diff --git a/Cargo.lock b/Cargo.lock index 6ad025fe..368a24d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1419,6 +1419,17 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -4963,6 +4974,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "uhttp_sse" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6ff93345ba2206230b1bb1aa3ece1a63dd9443b7531024575d16a0680a59444" + [[package]] name = "unicase" version = "2.7.0" @@ -6131,6 +6148,7 @@ name = "wick-http-client" version = "0.2.0" dependencies = [ "anyhow", + "eventsource-stream", "flow-component", "futures 0.3.28", "liquid-json", @@ -6141,6 +6159,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "uhttp_sse", "url", "wick-config", "wick-interface-http", @@ -6525,6 +6544,7 @@ dependencies = [ "async-trait", "bytes 1.4.0", "chrono", + "eventsource-stream", "futures 0.3.28", "hyper", "hyper-reverse-proxy", @@ -6541,6 +6561,7 @@ dependencies = [ "structured-output", "thiserror", "tokio", + "tokio-stream", "tracing", "url", "uuid", diff --git a/Cargo.toml b/Cargo.toml index afa8f348..72a97460 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -196,6 +196,9 @@ dyn-clone = { version = "1.0", default-features = false } either = { version = "1.9.0", default-features = false } env_logger = { version = "0.10", default-features = false } escargot = { version = "0.5", default-features = false } +eventsource-stream = { version = "0.2", default-features = false, features = [ + "std", +] } futures = { version = "0.3", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false } getset = { version = "0.1", default-features = false } @@ -287,6 +290,7 @@ tracing-log = { version = "0.1", default-features = false } tracing-opentelemetry = { version = "0.19.0", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } trycmd = { version = "0.14", default-features = false } +uhttp_sse = { version = "0.5.1" } url = { version = "2.3", default-features = false } uuid = { version = "1.1", default-features = false } wasm-encoder = { version = "0.31", default-features = false } diff --git a/crates/components/wick-http-client/Cargo.toml b/crates/components/wick-http-client/Cargo.toml index 1808e02d..a108d4ce 100644 --- a/crates/components/wick-http-client/Cargo.toml +++ b/crates/components/wick-http-client/Cargo.toml @@ -22,6 +22,8 @@ tokio = { workspace = true } tracing = { workspace = true } liquid-json = { workspace = true, features = ["serde"] } anyhow = { workspace = true } +eventsource-stream = { workspace = true } +uhttp_sse = { workspace = true } # serde = { workspace = true, features = ["derive"] } # diff --git a/crates/components/wick-http-client/src/component.rs b/crates/components/wick-http-client/src/component.rs index 0460353d..5cdf4ebf 100644 --- a/crates/components/wick-http-client/src/component.rs +++ b/crates/components/wick-http-client/src/component.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use eventsource_stream::Eventsource; use flow_component::{BoxFuture, Component, ComponentError, LocalScope}; use futures::{Stream, StreamExt, TryStreamExt}; use reqwest::header::CONTENT_TYPE; @@ -17,6 +18,7 @@ use wick_config::config::components::{ }; use wick_config::config::{Codec, HttpMethod, LiquidJsonConfig, Metadata, UrlResource}; use wick_config::{ConfigValidation, Resolver}; +use wick_interface_http::types::HttpEvent; use wick_interface_types::{ComponentSignature, OperationSignatures}; use wick_packet::{ Base64Bytes, @@ -326,15 +328,28 @@ async fn handle( invocation.trace(|| debug!(status=%response.status(), "http:client:response_status")); - let codec = response.headers().get(CONTENT_TYPE).map_or(Codec::Raw, |value| { + let content_type = response.headers().get(CONTENT_TYPE); + let event_stream = content_type.map_or(false, |t| t == "text/event-stream"); + + let codec = content_type.map_or(Codec::Raw, |value| { let value = value.to_str().unwrap(); let (value, _other) = value.split_once(';').unwrap_or((value, "")); - match value { - "application/json" => Codec::Json, - "application/x-www-form-urlencoded" => Codec::FormData, - _ => Codec::Raw, + if value.starts_with("text/") { + if event_stream { + Codec::Json + } else { + Codec::Text + } + } else { + match value { + "application/json" => Codec::Json, + "application/x-www-form-urlencoded" => Codec::FormData, + "application/xhtml+xml" => Codec::Text, + _ => Codec::Raw, + } } }); + let (our_response, body_stream) = match crate::conversions::to_wick_response(response) { Ok(r) => r, Err(e) => { @@ -348,7 +363,8 @@ async fn handle( handles.push(tokio::spawn(output_task( invocation.span.clone(), codec, - Box::pin(body_stream), + body_stream, + event_stream, tx.clone(), ))); } @@ -359,67 +375,81 @@ async fn handle( Ok(()) } -fn output_task( +fn output_task> + Send + Unpin + 'static>( span: Span, codec: Codec, - mut body_stream: std::pin::Pin> + Send + 'static>>, + body_stream: T, + event_stream: bool, tx: PacketSender, ) -> BoxFuture<'static, ()> { let task = async move { - match codec { - Codec::Json => { - let bytes: Vec = match body_stream.try_collect().await { - Ok(r) => r, - Err(e) => { - let _ = tx.error(wick_packet::Error::component_error(e.to_string())); - return; + if let Err(e) = output_task_inner(span, codec, body_stream, event_stream, tx.clone()).await { + let _ = tx.send(Packet::err("body", e.to_string())); + } + }; + Box::pin(task) +} + +async fn output_task_inner> + Send + Unpin + 'static>( + span: Span, + codec: Codec, + mut body_stream: T, + event_stream: bool, + tx: PacketSender, +) -> Result<(), anyhow::Error> { + match codec { + Codec::Json => { + if event_stream { + let mut stream = body_stream.map(Into::into).eventsource(); + while let Some(event) = stream.next().await { + if let Err(e) = event { + tx.send(Packet::err("body", e.to_string()))?; + break; } - }; + let event = event.unwrap(); + let wick_event = HttpEvent { + event: event.event, + data: event.data, + id: event.id, + retry: event.retry.map(|d| d.as_millis() as _), + }; + + span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body:event")); + tx.send(Packet::encode("body", wick_event))?; + } + } else { + let bytes: Vec = body_stream.try_collect().await?; let bytes = bytes.concat(); - let json: Value = match serde_json::from_slice(&bytes) { - Ok(r) => r, - Err(e) => { - let _ = tx.error(wick_packet::Error::component_error(e.to_string())); - return; - } - }; + let json: Value = serde_json::from_slice(&bytes)?; span.in_scope(|| trace!(%json, "http:client:response_body")); - let _ = tx.send(Packet::encode("body", json)); + tx.send(Packet::encode("body", json))?; } - Codec::Raw => { - let _ = tx.send(Packet::open_bracket("body")); - while let Some(Ok(bytes)) = body_stream.next().await { - span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body")); + } + Codec::Raw => { + tx.send(Packet::open_bracket("body"))?; + while let Some(bytes) = body_stream.next().await { + let bytes = bytes?; - let _ = tx.send(Packet::encode("body", bytes)); - } - let _ = tx.send(Packet::close_bracket("body")); - } - Codec::FormData => unreachable!("Form data on the response is not supported."), - Codec::Text => { - let bytes: Vec = match body_stream.try_collect().await { - Ok(r) => r, - Err(e) => { - let _ = tx.error(wick_packet::Error::component_error(e.to_string())); - return; - } - }; - let bytes = bytes.concat(); + span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body")); - let text = match String::from_utf8(bytes) { - Ok(r) => r, - Err(e) => { - let _ = tx.error(wick_packet::Error::component_error(e.to_string())); - return; - } - }; - span.in_scope(|| trace!(%text, "response body")); - let _ = tx.send(Packet::encode("body", text)); + tx.send(Packet::encode("body", bytes))?; } + tx.send(Packet::close_bracket("body"))?; } - }; - Box::pin(task) + Codec::FormData => unreachable!("Form data on the response is not supported."), + Codec::Text => { + tx.send(Packet::open_bracket("body"))?; + while let Some(bytes) = body_stream.next().await { + let bytes = bytes?; + let text = String::from_utf8(bytes.into())?; + span.in_scope(|| debug!(%text, "http:client:response_body")); + tx.send(Packet::encode("body", text))?; + } + tx.send(Packet::close_bracket("body"))?; + } + } + Ok(()) } impl ConfigValidation for HttpClientComponent { diff --git a/crates/interfaces/wick-interface-http/component.yaml b/crates/interfaces/wick-interface-http/component.yaml index 5499b122..705b5e36 100644 --- a/crates/interfaces/wick-interface-http/component.yaml +++ b/crates/interfaces/wick-interface-http/component.yaml @@ -3,7 +3,7 @@ name: http kind: wick/types@v1 metadata: - version: 0.4.0 + version: 0.5.0 package: registry: host: registry.candle.dev @@ -242,3 +242,19 @@ types: types: - HttpRequest - HttpResponse + - name: HttpEvent + kind: wick/type/struct@v1 + description: HTTP server side event + fields: + - name: event + type: string + description: The event name if given + - name: data + type: string + description: The event data + - name: id + type: string + description: The event id if given + - name: retry + type: u64? + description: Retry duration if given diff --git a/crates/wick/wick-trigger-http/Cargo.toml b/crates/wick/wick-trigger-http/Cargo.toml index a7d59774..fc7e59d5 100644 --- a/crates/wick/wick-trigger-http/Cargo.toml +++ b/crates/wick/wick-trigger-http/Cargo.toml @@ -44,7 +44,8 @@ bytes = { workspace = true } openapiv3 = { workspace = true } percent-encoding = { workspace = true } liquid = { workspace = true } - +eventsource-stream = { workspace = true } +tokio-stream = { workspace = true } [dev-dependencies] diff --git a/crates/wick/wick-trigger-http/src/http/component_utils.rs b/crates/wick/wick-trigger-http/src/http/component_utils.rs index aa900762..5724df6b 100644 --- a/crates/wick/wick-trigger-http/src/http/component_utils.rs +++ b/crates/wick/wick-trigger-http/src/http/component_utils.rs @@ -1,15 +1,17 @@ use std::collections::HashMap; use futures::stream::StreamExt; -use hyper::header::CONTENT_LENGTH; +use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE}; use hyper::http::response::Builder; use hyper::http::{HeaderName, HeaderValue}; use hyper::{Body, Response, StatusCode}; use serde_json::{Map, Value}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::oneshot; use tracing::Span; use uuid::Uuid; use wick_config::config::Codec; -use wick_interface_http::types as wick_http; +use wick_interface_http::types::{self as wick_http}; use wick_packet::{ packets, Base64Bytes, @@ -153,46 +155,167 @@ pub(super) async fn respond( .unwrap(), ); } - let mut stream = stream.unwrap(); - let mut builder = Response::builder(); - let mut body = bytes::BytesMut::new(); - while let Some(packet) = stream.next().await { - match packet { - Ok(p) => { - if p.port() == "response" { - if let PacketPayload::Err(e) = p.payload() { - return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); - } - if p.is_done() { - continue; - } - let response: wick_interface_http::types::HttpResponse = p - .decode() - .map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string()))?; - builder = convert_response(builder, response)?; - } else if p.port() == "body" { - if let PacketPayload::Err(e) = p.payload() { - return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); - } - if !p.has_data() { - continue; + let stream = stream.unwrap(); + let builder = Response::builder(); + + let (handle, response, mut body_stream) = split_stream(stream); + + let response = match response.await { + Ok(response) => response?, + Err(e) => { + handle.abort(); + return Ok( + Builder::new() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(e.to_string())) + .unwrap(), + ); + } + }; + + let mut builder = convert_response(builder, response)?; + let event_stream = builder + .headers_ref() + .and_then(|h| h.get(CONTENT_TYPE)) + .map_or(false, |v| v == "text/event-stream"); + + let res = if event_stream { + let body_stream = + tokio_stream::wrappers::UnboundedReceiverStream::new(body_stream).filter_map(move |p| async move { + if !p.has_data() { + return None; + } + Some(match codec { + Codec::Json => p + .decode::() + .map_err(|e| HttpError::Bytes(e.to_string())) + .map(|v| to_sse_string_bytes(&v)), + Codec::Raw => p + .decode::() + .map_err(|e| HttpError::Bytes(e.to_string())) + .map(Into::into), + Codec::Text => p + .decode::() + .map_err(|e| HttpError::Utf8Text(e.to_string())) + .map(Into::into), + Codec::FormData => unreachable!("FormData is not supported as a decoder for HTTP responses"), + }) + }); + let body = Body::wrap_stream(body_stream); + builder.body(body).unwrap() + } else { + let mut body = bytes::BytesMut::new(); + while let Some(p) = body_stream.recv().await { + if let PacketPayload::Err(e) = p.payload() { + return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); + } + if !p.has_data() { + continue; + } + if codec == Codec::Json { + let response: Value = p.decode().map_err(|e| HttpError::Codec(codec, e.to_string()))?; + let as_str = response.to_string(); + let bytes = as_str.as_bytes(); + body.extend_from_slice(bytes); + } else { + let response: Base64Bytes = p.decode().map_err(|e| HttpError::Bytes(e.to_string()))?; + body.extend_from_slice(&response); + } + } + builder = reset_header(builder, CONTENT_LENGTH, body.len()); + builder.body(body.freeze().into()).unwrap() + }; + + Ok(res) +} + +fn split_stream( + mut stream: PacketStream, +) -> ( + tokio::task::JoinHandle<()>, + oneshot::Receiver>, + UnboundedReceiver, +) { + let (body_tx, body_rx) = unbounded_channel(); + let (res_tx, res_rx) = oneshot::channel(); + let mut res_tx = Some(res_tx); + + let handle = tokio::spawn(async move { + while let Some(packet) = stream.next().await { + match packet { + Ok(p) => { + if p.port() == "response" { + if p.is_done() { + continue; + } + let Some(sender) = res_tx.take() else { + // we only respect the first packet to the response port. + continue; + }; + if let PacketPayload::Err(e) = p.payload() { + let _ = sender.send(Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned()))); + break; + } + let response: Result = p + .decode() + .map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string())); + let _ = sender.send(response); + } else if p.port() == "body" { + let _ = body_tx.send(p); + } else if let PacketPayload::Err(e) = p.payload { + if let Some(sender) = res_tx.take() { + let _ = sender.send(Err(HttpError::OperationError(e.to_string()))); + } + warn!(?e, "http:stream:error"); + break; } - if codec == Codec::Json { - let response: Value = p.decode().map_err(|e| HttpError::Codec(codec, e.to_string()))?; - let as_str = response.to_string(); - let bytes = as_str.as_bytes(); - body.extend_from_slice(bytes); - } else { - let response: Base64Bytes = p.decode().map_err(|e| HttpError::Bytes(e.to_string()))?; - body.extend_from_slice(&response); + } + Err(e) => { + if let Some(sender) = res_tx.take() { + let _ = sender.send(Err(HttpError::OperationError(e.to_string()))); } + warn!(?e, "http:stream:error"); + break; } } - Err(e) => return Err(HttpError::OperationError(e.to_string())), } + }); + + (handle, res_rx, body_rx) +} + +fn to_sse_string_bytes(event: &wick_http::HttpEvent) -> Vec { + let mut sse_string = String::new(); + + if !event.event.is_empty() { + sse_string.push_str("event: "); + sse_string.push_str(&event.event); + sse_string.push('\n'); + } + + // Splitting data by newline to ensure each line is prefixed with "data: " + for line in event.data.split('\n') { + sse_string.push_str("data: "); + sse_string.push_str(line); + sse_string.push('\n'); + } + + if !event.id.is_empty() { + sse_string.push_str("id: "); + sse_string.push_str(&event.id); + sse_string.push('\n'); + } + + if let Some(ref retry) = event.retry { + sse_string.push_str("retry: "); + sse_string.push_str(&retry.to_string()); + sse_string.push('\n'); } - builder = reset_header(builder, CONTENT_LENGTH, body.len()); - Ok(builder.body(body.freeze().into()).unwrap()) + + // Adding the required empty line to separate events + sse_string.push('\n'); + + sse_string.into() } fn reset_header(mut builder: Builder, header: HeaderName, value: impl Into) -> Builder { diff --git a/crates/wick/wick-trigger-http/src/http/error.rs b/crates/wick/wick-trigger-http/src/http/error.rs index 012f3e9f..5d25dcdd 100644 --- a/crates/wick/wick-trigger-http/src/http/error.rs +++ b/crates/wick/wick-trigger-http/src/http/error.rs @@ -34,9 +34,15 @@ pub enum HttpError { #[error("Could not serialize output into '{0}' codec: {1}")] Codec(Codec, String), - #[error("Could not read output as base64 bytes: {0}")] + #[error("Could not decode stream item as base64 bytes: {0}")] Bytes(String), + #[error("Could not decode stream item as a utf-8 string: {0}")] + Utf8Text(String), + + #[error("Could not decode stream item as HttpEvent: {0}")] + HttpEvent(String), + #[error("Invalid header name: {0}")] InvalidHeaderName(String), diff --git a/crates/wick/wick-trigger-http/src/http/routers/raw.rs b/crates/wick/wick-trigger-http/src/http/routers/raw.rs index 427db816..89b62613 100644 --- a/crates/wick/wick-trigger-http/src/http/routers/raw.rs +++ b/crates/wick/wick-trigger-http/src/http/routers/raw.rs @@ -1,9 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::task::{Context, Poll}; use futures::{StreamExt, TryStreamExt}; -use hyper::service::Service; use hyper::{Body, Request, Response}; use serde_json::Value; use tracing::{Instrument, Span}; @@ -152,20 +150,6 @@ impl RawHandler { } } -impl Service> for RawHandler { - type Response = Response; - type Error = HttpError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, request: Request) -> Self::Future { - Box::pin(self.clone().serve(request)) - } -} - pub(crate) fn register_raw_router(index: usize, router_config: &RawRouterConfig) -> Result { trace!(index, "registering raw router"); let middleware = resolve_middleware_components(router_config)?; diff --git a/integration-tests/codegen-tests/src/import_types/mod.rs b/integration-tests/codegen-tests/src/import_types/mod.rs index b12f7b10..b8d6284e 100644 --- a/integration-tests/codegen-tests/src/import_types/mod.rs +++ b/integration-tests/codegen-tests/src/import_types/mod.rs @@ -16,7 +16,7 @@ extern "C" fn __wasmrs_init(guest_buffer_size: u32, host_buffer_size: u32, max_h pub(crate) mod provided { #[allow(unused)] use super::*; - pub(crate) mod dep1_component { + pub(crate) mod dep1 { use super::*; pub(crate) mod echo { use super::*; @@ -50,6 +50,17 @@ pub(crate) mod provided { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + input: wick_packet::OutgoingPort::new("input", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { input: wick_packet::OutgoingPort::new("input", channel.clone()), @@ -122,17 +133,17 @@ pub(crate) mod provided { #[allow(unused)] pub fn echo( &self, - op_config: dep1_component::echo::Config, - mut inputs: impl Into, - ) -> std::result::Result { + op_config: dep1::echo::Config, + mut inputs: impl Into, + ) -> std::result::Result { let mut stream = self.echo_raw(op_config, inputs.into().channel.take_rx().unwrap().boxed())?; - let (_, outputs) = dep1_component::echo::process_incoming(stream); + let (_, outputs) = dep1::echo::process_incoming(stream); Ok(outputs) } #[allow(unused)] pub fn echo_packets( &self, - op_config: dep1_component::echo::Config, + op_config: dep1::echo::Config, stream: wick_packet::PacketStream, ) -> std::result::Result { Ok(wick_packet::from_wasmrs( @@ -142,7 +153,7 @@ pub(crate) mod provided { #[allow(unused)] pub fn echo_raw( &self, - op_config: dep1_component::echo::Config, + op_config: dep1::echo::Config, stream: wick_component::wasmrs_rx::BoxFlux, ) -> std::result::Result< wick_component::wasmrs_rx::BoxFlux, @@ -162,7 +173,7 @@ pub use provided::*; pub(crate) mod imported { #[allow(unused)] use super::*; - pub(crate) mod imported_component_component { + pub(crate) mod imported_component { use super::*; pub(crate) mod add { use super::*; @@ -193,6 +204,18 @@ pub(crate) mod imported { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + left: wick_packet::OutgoingPort::new("left", channel.clone()), + right: wick_packet::OutgoingPort::new("right", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { left: wick_packet::OutgoingPort::new("left", channel.clone()), @@ -279,6 +302,17 @@ pub(crate) mod imported { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + input: wick_packet::OutgoingPort::new("input", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { input: wick_packet::OutgoingPort::new("input", channel.clone()), @@ -362,6 +396,17 @@ pub(crate) mod imported { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + input: wick_packet::OutgoingPort::new("input", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { input: wick_packet::OutgoingPort::new("input", channel.clone()), @@ -430,17 +475,17 @@ pub(crate) mod imported { #[allow(unused)] pub fn add( &self, - op_config: imported_component_component::add::Config, - mut inputs: impl Into, - ) -> std::result::Result { + op_config: imported_component::add::Config, + mut inputs: impl Into, + ) -> std::result::Result { let mut stream = self.add_raw(op_config, inputs.into().channel.take_rx().unwrap().boxed())?; - let (_, outputs) = imported_component_component::add::process_incoming(stream); + let (_, outputs) = imported_component::add::process_incoming(stream); Ok(outputs) } #[allow(unused)] pub fn add_packets( &self, - op_config: imported_component_component::add::Config, + op_config: imported_component::add::Config, stream: wick_packet::PacketStream, ) -> std::result::Result { Ok(wick_packet::from_wasmrs( @@ -450,7 +495,7 @@ pub(crate) mod imported { #[allow(unused)] pub fn add_raw( &self, - op_config: imported_component_component::add::Config, + op_config: imported_component::add::Config, stream: wick_component::wasmrs_rx::BoxFlux, ) -> std::result::Result< wick_component::wasmrs_rx::BoxFlux, @@ -465,17 +510,17 @@ pub(crate) mod imported { #[allow(unused)] pub fn error( &self, - op_config: imported_component_component::error::Config, - mut inputs: impl Into, - ) -> std::result::Result { + op_config: imported_component::error::Config, + mut inputs: impl Into, + ) -> std::result::Result { let mut stream = self.error_raw(op_config, inputs.into().channel.take_rx().unwrap().boxed())?; - let (_, outputs) = imported_component_component::error::process_incoming(stream); + let (_, outputs) = imported_component::error::process_incoming(stream); Ok(outputs) } #[allow(unused)] pub fn error_packets( &self, - op_config: imported_component_component::error::Config, + op_config: imported_component::error::Config, stream: wick_packet::PacketStream, ) -> std::result::Result { Ok(wick_packet::from_wasmrs( @@ -485,7 +530,7 @@ pub(crate) mod imported { #[allow(unused)] pub fn error_raw( &self, - op_config: imported_component_component::error::Config, + op_config: imported_component::error::Config, stream: wick_component::wasmrs_rx::BoxFlux, ) -> std::result::Result< wick_component::wasmrs_rx::BoxFlux, @@ -500,17 +545,17 @@ pub(crate) mod imported { #[allow(unused)] pub fn validate( &self, - op_config: imported_component_component::validate::Config, - mut inputs: impl Into, - ) -> std::result::Result { + op_config: imported_component::validate::Config, + mut inputs: impl Into, + ) -> std::result::Result { let mut stream = self.validate_raw(op_config, inputs.into().channel.take_rx().unwrap().boxed())?; - let (_, outputs) = imported_component_component::validate::process_incoming(stream); + let (_, outputs) = imported_component::validate::process_incoming(stream); Ok(outputs) } #[allow(unused)] pub fn validate_packets( &self, - op_config: imported_component_component::validate::Config, + op_config: imported_component::validate::Config, stream: wick_packet::PacketStream, ) -> std::result::Result { Ok(wick_packet::from_wasmrs(self.validate_raw( @@ -521,7 +566,7 @@ pub(crate) mod imported { #[allow(unused)] pub fn validate_raw( &self, - op_config: imported_component_component::validate::Config, + op_config: imported_component::validate::Config, stream: wick_component::wasmrs_rx::BoxFlux, ) -> std::result::Result< wick_component::wasmrs_rx::BoxFlux, @@ -707,6 +752,25 @@ pub mod types { #[allow(unused)] use super::aaa; #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP server side event + #[allow(clippy::exhaustive_structs)] + pub struct HttpEvent { + ///The event name if given + #[serde(rename = "event")] + pub event: String, + ///The event data + #[serde(rename = "data")] + pub data: String, + ///The event id if given + #[serde(rename = "id")] + pub id: String, + ///Retry duration if given + #[serde(rename = "retry")] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub retry: Option, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] ///HTTP method enum #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] #[allow(clippy::exhaustive_enums)] @@ -1344,6 +1408,25 @@ pub mod types { #[allow(unused)] use super::zzz; #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP server side event + #[allow(clippy::exhaustive_structs)] + pub struct HttpEvent { + ///The event name if given + #[serde(rename = "event")] + pub event: String, + ///The event data + #[serde(rename = "data")] + pub data: String, + ///The event id if given + #[serde(rename = "id")] + pub id: String, + ///Retry duration if given + #[serde(rename = "retry")] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub retry: Option, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] ///HTTP method enum #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] #[allow(clippy::exhaustive_enums)] @@ -1981,6 +2064,25 @@ pub mod types { #[allow(unused)] use super::http; #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] + ///HTTP server side event + #[allow(clippy::exhaustive_structs)] + pub struct HttpEvent { + ///The event name if given + #[serde(rename = "event")] + pub event: String, + ///The event data + #[serde(rename = "data")] + pub data: String, + ///The event id if given + #[serde(rename = "id")] + pub id: String, + ///Retry duration if given + #[serde(rename = "retry")] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub retry: Option, + } + #[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize, PartialEq)] ///HTTP method enum #[serde(into = "String", try_from = "wick_component::serde_util::enum_repr::StringOrNum")] #[allow(clippy::exhaustive_enums)] @@ -2654,6 +2756,18 @@ pub mod echo { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + output: wick_packet::OutgoingPort::new("output", channel.clone()), + time: wick_packet::OutgoingPort::new("time", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { output: wick_packet::OutgoingPort::new("output", channel.clone()), @@ -2790,6 +2904,17 @@ pub mod testop { } } #[allow(unused)] + pub fn new_parts() -> (Self, wasmrs_rx::FluxReceiver) { + let (channel, rx) = wasmrs_rx::FluxChannel::new_parts(); + ( + Self { + output: wick_packet::OutgoingPort::new("output", channel.clone()), + channel, + }, + rx, + ) + } + #[allow(unused)] pub fn with_channel(channel: wasmrs_rx::FluxChannel) -> Self { Self { output: wick_packet::OutgoingPort::new("output", channel.clone()),