From 1e7bbd0f5f9fcf2a4415285e1e55e924d40fc716 Mon Sep 17 00:00:00 2001 From: Fawad Shaikh Date: Fri, 6 Oct 2023 14:35:56 -0400 Subject: [PATCH 1/4] feat: added event stream support --- Cargo.lock | 84 ++++--------------- Cargo.toml | 3 + crates/components/wick-http-client/Cargo.toml | 2 + .../wick-http-client/src/component.rs | 77 +++++++++++++++++ .../components/wick-http-client/src/error.rs | 3 + .../v0/templates/typescript/partials/blank | 0 .../wick-config/definitions/v1/manifest.apex | 3 + crates/wick/wick-config/docs/v1.md | 1 + .../wick-config/json-schema/manifest.json | 3 +- .../wick-config/json-schema/v1/manifest.json | 2 +- .../wick-config/src/config/common/http.rs | 3 + .../src/config/components/http_client.rs | 2 + crates/wick/wick-config/src/v1.rs | 4 + crates/wick/wick-config/src/v1/conversions.rs | 2 + crates/wick/wick-config/typescript/v1.ts | 78 ++++++++++++++++- .../wick/configuration/reference/v1.md | 48 +++++++---- 16 files changed, 229 insertions(+), 86 deletions(-) create mode 100644 crates/wick/wick-config/definitions/v0/templates/typescript/partials/blank diff --git a/Cargo.lock b/Cargo.lock index f6cd8906e..40e2f8c75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,17 +26,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "ahash" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.3" @@ -559,15 +548,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "chumsky" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23170228b96236b5a7299057ac284a321457700bc8c41a4476052f0f4ba5349d" -dependencies = [ - "hashbrown 0.12.3", -] - [[package]] name = "clap" version = "4.4.1" @@ -1241,12 +1221,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dyn-clone" -version = "1.0.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" - [[package]] name = "ed25519" version = "2.2.2" @@ -1445,6 +1419,17 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1854,9 +1839,6 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -dependencies = [ - "ahash 0.7.6", -] [[package]] name = "hashbrown" @@ -1864,7 +1846,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.3", + "ahash", ] [[package]] @@ -1873,7 +1855,7 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" dependencies = [ - "ahash 0.8.3", + "ahash", "allocator-api2", ] @@ -1942,12 +1924,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hifijson" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85ef6b41c333e6dd2a4aaa59125a19b633cd17e7aaf372b2260809777bcdef4a" - [[package]] name = "hkdf" version = "0.12.3" @@ -2326,34 +2302,6 @@ dependencies = [ "cc", ] -[[package]] -name = "jaq-core" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb52eeac20f256459e909bd4a03bb8c4fab6a1fdbb8ed52d00f644152df48ece" -dependencies = [ - "ahash 0.7.6", - "dyn-clone", - "hifijson", - "indexmap 1.9.3", - "itertools 0.10.5", - "jaq-parse", - "log", - "once_cell", - "regex", - "serde_json", -] - -[[package]] -name = "jaq-parse" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0f97f01eb9e87af3cbcc843b0dfe693fc6b0a2b9093dc8980dd9fc682826b0" -dependencies = [ - "chumsky", - "serde", -] - [[package]] name = "jobserver" version = "0.1.26" @@ -4166,7 +4114,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd4cef4251aabbae751a3710927945901ee1d97ee96d757f6880ebb9a79bfd53" dependencies = [ - "ahash 0.8.3", + "ahash", "atoi", "byteorder", "bytes 1.4.0", @@ -5971,8 +5919,8 @@ dependencies = [ "flow-expression-parser", "futures 0.3.28", "human-panic", - "jaq-core", "liquid-json", + "loose-liquid", "markup-converter", "nkeys", "once_cell", @@ -6195,6 +6143,8 @@ name = "wick-http-client" version = "0.2.0" dependencies = [ "anyhow", + "bytes 1.4.0", + "eventsource-stream", "flow-component", "futures 0.3.28", "liquid-json", diff --git a/Cargo.toml b/Cargo.toml index ffd9ede24..066365f5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,9 @@ dyn-clone = { version = "1.0", default-features = false } either = { version = "1.9.0", default-features = false } env_logger = { version = "0.10", default-features = false } escargot = { version = "0.5", default-features = false } +eventsource-stream = { version = "0.2", default-features = false, features = [ + "std", +] } futures = { version = "0.3", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false } getset = { version = "0.1", default-features = false } diff --git a/crates/components/wick-http-client/Cargo.toml b/crates/components/wick-http-client/Cargo.toml index 1808e02d9..de0de11f3 100644 --- a/crates/components/wick-http-client/Cargo.toml +++ b/crates/components/wick-http-client/Cargo.toml @@ -10,6 +10,7 @@ description = "SQL Database component for the wick project." [features] [dependencies] +bytes = { workspace = true } wick-packet = { workspace = true, features = ["rt-tokio", "invocation"] } wick-interface-http = { workspace = true } flow-component = { workspace = true, features = ["invocation"] } @@ -28,6 +29,7 @@ serde = { workspace = true, features = ["derive"] } futures = { workspace = true } thiserror = { workspace = true } serde_json = { workspace = true } +eventsource-stream = { workspace = true } # [dev-dependencies] diff --git a/crates/components/wick-http-client/src/component.rs b/crates/components/wick-http-client/src/component.rs index f0accdb5a..30690fac1 100644 --- a/crates/components/wick-http-client/src/component.rs +++ b/crates/components/wick-http-client/src/component.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use eventsource_stream::Eventsource; use flow_component::{BoxFuture, Component, ComponentError, RuntimeCallback}; use futures::{Stream, StreamExt, TryStreamExt}; use reqwest::header::CONTENT_TYPE; @@ -281,6 +282,9 @@ async fn handle( } Codec::FormData => request_builder.form(&body), Codec::Text => request_builder.body(body.to_string()), + Codec::EventStream => { + unimplemented!("Event stream is not a valid client content-type") + } } } else { request_builder @@ -322,9 +326,11 @@ async fn handle( match value { "application/json" => Codec::Json, "application/x-www-form-urlencoded" => Codec::FormData, + "text/event-stream" => Codec::EventStream, _ => Codec::Raw, } }); + let (our_response, body_stream) = match crate::conversions::to_wick_response(response) { Ok(r) => r, Err(e) => { @@ -349,6 +355,18 @@ async fn handle( Ok(()) } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +struct WickEvent { + /// The event name if given + event: String, + /// The event data + data: String, + /// The event id if given + id: String, + /// Retry duration if given + retry: Option, +} + fn output_task( span: Span, codec: Codec, @@ -357,6 +375,27 @@ fn output_task( ) -> BoxFuture<'static, ()> { let task = async move { match codec { + Codec::EventStream => { + let mut stream = body_stream.map(Into::into).eventsource(); + while let Some(event) = stream.next().await { + match event { + Ok(event) => { + let wick_event = WickEvent { + event: event.event, + data: event.data, + id: event.id, + retry: event.retry, + }; + span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body")); + let _ = tx.send(Packet::encode("body", wick_event)); + } + Err(e) => { + let _ = tx.error(wick_packet::Error::component_error(e.to_string())); + return; + } + } + } + } Codec::Json => { let bytes: Vec = match body_stream.try_collect().await { Ok(r) => r, @@ -659,5 +698,43 @@ mod test { Ok(()) } + + #[test_logger::test(tokio::test)] + async fn test_event_stream() -> Result<()> { + let (app_config, component_config) = get_config(); + let comp = get_component(&app_config, component_config); + + // Simulate an event stream + let event_stream = "data: {\"id\":\"1\",\"object\":\"event1\"}\n\n\ + data: {\"id\":\"2\",\"object\":\"event2\"}\n\n"; + let packets = packet_stream!(("input", event_stream)); + + // Replace "event_stream_op" with the actual operation id for event stream + let invocation = Invocation::test( + "test_event_stream", + Entity::local("event_stream_op"), + packets, + Default::default(), + )?; + let stream = comp + .handle(invocation, Default::default(), panic_callback()) + .await? + .filter(|p| futures::future::ready(p.as_ref().map_or(false, |p| p.has_data()))) + .collect::>() + .await; + + let packets = stream.into_iter().collect::, _>>()?; + for packet in packets { + if packet.port() == "body" { + let response: WickEvent = packet.decode().unwrap(); + assert!(response.id == "1" && response.event == "event1" || response.id == "2" && response.event == "event2"); + } else { + let response: HttpResponse = packet.decode().unwrap(); + assert_eq!(response.version, HttpVersion::Http11); + } + } + + Ok(()) + } } } diff --git a/crates/components/wick-http-client/src/error.rs b/crates/components/wick-http-client/src/error.rs index 70d27f52e..e53de0b46 100644 --- a/crates/components/wick-http-client/src/error.rs +++ b/crates/components/wick-http-client/src/error.rs @@ -20,4 +20,7 @@ pub enum Error { #[error("Proxy and baseurl must not be the same: {0}")] ProxyLoop(Url), + + #[error("Conversion error: {0}")] + Conversion(String), } diff --git a/crates/wick/wick-config/definitions/v0/templates/typescript/partials/blank b/crates/wick/wick-config/definitions/v0/templates/typescript/partials/blank new file mode 100644 index 000000000..e69de29bb diff --git a/crates/wick/wick-config/definitions/v1/manifest.apex b/crates/wick/wick-config/definitions/v1/manifest.apex index b28399b1f..4cf466007 100644 --- a/crates/wick/wick-config/definitions/v1/manifest.apex +++ b/crates/wick/wick-config/definitions/v1/manifest.apex @@ -1036,6 +1036,9 @@ enum Codec { "Raw text" Text = 3 as "text", + + "Eventstream" + EventStream = 4 as "event-stream", } "Supported HTTP methods" diff --git a/crates/wick/wick-config/docs/v1.md b/crates/wick/wick-config/docs/v1.md index 623c324ab..74bfe2c15 100644 --- a/crates/wick/wick-config/docs/v1.md +++ b/crates/wick/wick-config/docs/v1.md @@ -1894,6 +1894,7 @@ Any one of the following types: | Raw | unknown type | Raw bytes | | FormData | unknown type | Form Data | | Text | unknown type | Raw text | +| EventStream | unknown type | Eventstream | -------- diff --git a/crates/wick/wick-config/json-schema/manifest.json b/crates/wick/wick-config/json-schema/manifest.json index 4f7dad259..915ab9cb3 100644 --- a/crates/wick/wick-config/json-schema/manifest.json +++ b/crates/wick/wick-config/json-schema/manifest.json @@ -2980,7 +2980,8 @@ "Json", "Raw", "FormData", - "Text" + "Text", + "EventStream" ] }, "v1.HttpMethod": { diff --git a/crates/wick/wick-config/json-schema/v1/manifest.json b/crates/wick/wick-config/json-schema/v1/manifest.json index 8660dfa9b..7b5ebb172 100644 --- a/crates/wick/wick-config/json-schema/v1/manifest.json +++ b/crates/wick/wick-config/json-schema/v1/manifest.json @@ -2588,7 +2588,7 @@ "v1.Codec": { "$anchor": "v1.Codec", - "enum": ["Json", "Raw", "FormData", "Text"] + "enum": ["Json", "Raw", "FormData", "Text", "EventStream"] }, "v1.HttpMethod": { diff --git a/crates/wick/wick-config/src/config/common/http.rs b/crates/wick/wick-config/src/config/common/http.rs index e68658467..1ee415f2d 100644 --- a/crates/wick/wick-config/src/config/common/http.rs +++ b/crates/wick/wick-config/src/config/common/http.rs @@ -20,6 +20,8 @@ pub enum Codec { FormData = 2, /// Raw Text Data Text = 3, + /// Event Stream Data + EventStream = 4, } impl Default for Codec { @@ -35,6 +37,7 @@ impl std::fmt::Display for Codec { Codec::Raw => write!(f, "raw"), Codec::FormData => write!(f, "form-data"), Codec::Text => write!(f, "text"), + Codec::EventStream => write!(f, "event-stream"), } } } diff --git a/crates/wick/wick-config/src/config/components/http_client.rs b/crates/wick/wick-config/src/config/components/http_client.rs index 03be46970..e8e104f54 100644 --- a/crates/wick/wick-config/src/config/components/http_client.rs +++ b/crates/wick/wick-config/src/config/components/http_client.rs @@ -129,6 +129,7 @@ impl OperationConfig for HttpClientOperationDefinition { Some(Codec::Raw) => wick_interface_types::Type::Bytes, Some(Codec::FormData) => wick_interface_types::Type::Object, Some(Codec::Text) => wick_interface_types::Type::Object, + Some(Codec::EventStream) => wick_interface_types::Type::Object, None => wick_interface_types::Type::Object, }, ), @@ -151,6 +152,7 @@ impl From for wick_interface_types::OperationSign Some(Codec::Raw) => wick_interface_types::Type::Bytes, Some(Codec::FormData) => wick_interface_types::Type::Object, Some(Codec::Text) => wick_interface_types::Type::Object, + Some(Codec::EventStream) => wick_interface_types::Type::Object, None => wick_interface_types::Type::Object, }, ), diff --git a/crates/wick/wick-config/src/v1.rs b/crates/wick/wick-config/src/v1.rs index aa31d61be..21135db31 100644 --- a/crates/wick/wick-config/src/v1.rs +++ b/crates/wick/wick-config/src/v1.rs @@ -2013,6 +2013,8 @@ pub enum Codec { FormData = 2, /// Raw text Text = 3, + /// Eventstream + EventStream = 4, } impl Default for Codec { @@ -2028,6 +2030,7 @@ impl FromPrimitive for Codec { 1 => Self::Raw, 2 => Self::FormData, 3 => Self::Text, + 4 => Self::EventStream, _ => { return None; } @@ -2040,6 +2043,7 @@ impl FromPrimitive for Codec { 1 => Self::Raw, 2 => Self::FormData, 3 => Self::Text, + 4 => Self::EventStream, _ => { return None; } diff --git a/crates/wick/wick-config/src/v1/conversions.rs b/crates/wick/wick-config/src/v1/conversions.rs index 143c2f52c..f0d33ef9e 100644 --- a/crates/wick/wick-config/src/v1/conversions.rs +++ b/crates/wick/wick-config/src/v1/conversions.rs @@ -854,6 +854,7 @@ impl From for v1::Codec { config::common::Codec::Raw => Self::Raw, config::common::Codec::FormData => Self::FormData, config::common::Codec::Text => Self::Text, + config::common::Codec::EventStream => Self::EventStream, } } } @@ -865,6 +866,7 @@ impl From for config::common::Codec { v1::Codec::Raw => Self::Raw, v1::Codec::FormData => Self::FormData, v1::Codec::Text => Self::Text, + v1::Codec::EventStream => Self::EventStream, } } } diff --git a/crates/wick/wick-config/typescript/v1.ts b/crates/wick/wick-config/typescript/v1.ts index 0efe4b9d1..335a245aa 100644 --- a/crates/wick/wick-config/typescript/v1.ts +++ b/crates/wick/wick-config/typescript/v1.ts @@ -4583,6 +4583,10 @@ export class HttpClientComponent implements HasKind { _resource : string ; // The codec to use when encoding/decoding data. Can be overridden by individual operations. _codec : Codec| undefined = undefined; + // The proxy HTTP / HTTPS to use. + _proxy : Proxy| undefined = undefined; + // The timeout in seconds + _timeout : number| undefined = undefined; // Configuration necessary to provide when instantiating the component. _with : Field[] = []; // A list of operations to expose on this component. @@ -4609,6 +4613,22 @@ codec(value: Codec| undefined) : HttpClientComponent { getCodec() : Codec| undefined { return this._codec; + } +proxy(value: Proxy| undefined) : HttpClientComponent { + this._proxy = value; + return this; + } + getProxy() : Proxy| undefined { + return this._proxy; + + } +timeout(value: number| undefined) : HttpClientComponent { + this._timeout = value; + return this; + } + getTimeout() : number| undefined { + return this._timeout; + } with(value: Field[]) : HttpClientComponent { this._with = value; @@ -4634,7 +4654,61 @@ operations(value: HttpClientOperationDefinition[]) : HttpClientComponent { toJSON() : any { return { kind : "wick/component/http@v1", -resource: this._resource,codec: this._codec,with: this._with,operations: this._operations, } +resource: this._resource,codec: this._codec,proxy: this._proxy,timeout: this._timeout,with: this._with,operations: this._operations, } + + } +} + + + + + + + + +export class Proxy implements HasKind { + // The URL base to use. http, https are supported. + _resource : string =""; + // The username to use when authenticating with the proxy. + _username : string| undefined = undefined; + // The password to use when authenticating with the proxy. + _password : string| undefined = undefined; + constructor ( + ) { + } + +resource(value: string) : Proxy { + this._resource = value; + return this; + } + getResource() : string { + return this._resource; + + } +username(value: string| undefined) : Proxy { + this._username = value; + return this; + } + getUsername() : string| undefined { + return this._username; + + } +password(value: string| undefined) : Proxy { + this._password = value; + return this; + } + getPassword() : string| undefined { + return this._password; + + } + + getKind() : string { + return ""; + } + + toJSON() : any { + return { +resource: this._resource,username: this._username,password: this._password, } } } @@ -4757,7 +4831,7 @@ name: this._name,with: this._with,inputs: this._inputs,method: this._method,code export enum Codec { -Json = "Json",Raw = "Raw",FormData = "FormData",Text = "Text",} +Json = "Json",Raw = "Raw",FormData = "FormData",Text = "Text",EventStream = "EventStream",} diff --git a/docs/content/wick/configuration/reference/v1.md b/docs/content/wick/configuration/reference/v1.md index d7f28890c..74bfe2c15 100644 --- a/docs/content/wick/configuration/reference/v1.md +++ b/docs/content/wick/configuration/reference/v1.md @@ -86,7 +86,7 @@ See [liquid-json](https://crates.io/crates/liquid-json) and [liquid's website](h | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `kind` | `string` | must be `"wick/app@v1"` | Yes | || `name` | `string` |The application's name.||| +| `kind` | `string` | must be `"wick/app@v1"` | Yes | || `name` | `string` |The application's name.|Yes|| | `metadata` | [`Metadata`](#metadata) |Associated metadata for this application.||| | `package` | [`PackageDefinition`](#packagedefinition) |Details about the package for this application.||| | `resources` | [`ResourceBinding`](#resourcebinding)[] |Resources and configuration that the application and its components can access.||| @@ -210,8 +210,8 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `kind` | `string` | must be `"wick/resource/tcpport@v1"` | Yes | || `port` | [`LiquidTemplate`](#liquidtemplate) |The port to bind to.||| -| `address` | [`LiquidTemplate`](#liquidtemplate) |The address to bind to.||| +| `kind` | `string` | must be `"wick/resource/tcpport@v1"` | Yes | || `port` | [`LiquidTemplate`](#liquidtemplate) |The port to bind to.|Yes|| +| `address` | [`LiquidTemplate`](#liquidtemplate) |The address to bind to.|Yes|| @@ -227,8 +227,8 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `kind` | `string` | must be `"wick/resource/udpport@v1"` | Yes | || `port` | [`LiquidTemplate`](#liquidtemplate) |The port to bind to.||| -| `address` | [`LiquidTemplate`](#liquidtemplate) |The address to bind to.||| +| `kind` | `string` | must be `"wick/resource/udpport@v1"` | Yes | || `port` | [`LiquidTemplate`](#liquidtemplate) |The port to bind to.|Yes|| +| `address` | [`LiquidTemplate`](#liquidtemplate) |The address to bind to.|Yes|| @@ -1147,7 +1147,6 @@ Any one of the following types: ## TypeSignature - This type can be abbreviated with shortform syntax Any one of the following types: @@ -1562,6 +1561,7 @@ Any one of the following types: Any one of the following types: - [`SuccessPacket`](#successpacket) +- [`SignalPacket`](#signalpacket) - [`ErrorPacket`](#errorpacket) -------- @@ -1576,15 +1576,16 @@ Any one of the following types: Any one of the following types: - [`SuccessPacket`](#successpacket) +- [`SignalPacket`](#signalpacket) - [`PacketAssertionDef`](#packetassertiondef) - [`ErrorPacket`](#errorpacket) -------- -## SuccessPacket +## SignalPacket

-

A simplified representation of a Wick data packet & payload, used when writing tests.
+
A simplified representation of a Wick signal packet, used when writing tests.

@@ -1594,7 +1595,23 @@ Any one of the following types: | `name` | `string` |The name of the input or output this packet is going to or coming from.|Yes|| | `flags` | [`PacketFlags`](#packetflags) |Any flags set on the packet. Deprecated, use 'flag:' instead||| | `flag` | [`PacketFlag`](#packetflag) |The flag set on the packet.||| -| `value` | [`LiquidJsonValue`](#liquidjsonvalue) |The packet payload.||| + + + +-------- + +## SuccessPacket + +

+

A simplified representation of a Wick data packet & payload, used when writing tests.
+

+ + + +| Field name | Type | Description | Required? | Shortform? | +|------------|------|-------------|-----------|------------| +| `name` | `string` |The name of the input or output this packet is going to or coming from.|Yes|| +| `value` | [`LiquidJsonValue`](#liquidjsonvalue) |The packet payload.|Yes|| @@ -1719,7 +1736,7 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `kind` | `string` | must be `"wick/component/sql@v1"` | Yes | || `resource` | `string` |The connect string URL resource for the database.||| +| `kind` | `string` | must be `"wick/component/sql@v1"` | Yes | || `resource` | `string` |The connect string URL resource for the database.|Yes|| | `tls` | `bool` |Whether or not to use TLS.||| | `with` | [`Field`](#field)[] |Configuration necessary to provide when instantiating the component.||| | `operations` | [`SqlQueryKind`](#sqlquerykind)[] |A list of operations to expose on this component.||| @@ -1749,7 +1766,7 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `name` | `string` |The name of the operation.||| +| `name` | `string` |The name of the operation.|Yes|| | `with` | [`Field`](#field)[] |Any configuration required by the operation.||| | `inputs` | [`Field`](#field)[] |Types of the inputs to the operation.||| | `outputs` | [`Field`](#field)[] |Types of the outputs to the operation.||| @@ -1771,7 +1788,7 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `name` | `string` |The name of the operation.||| +| `name` | `string` |The name of the operation.|Yes|| | `with` | [`Field`](#field)[] |Any configuration required by the operation.||| | `inputs` | [`Field`](#field)[] |Types of the inputs to the operation.||| | `outputs` | [`Field`](#field)[] |Types of the outputs to the operation.||| @@ -1812,7 +1829,7 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `kind` | `string` | must be `"wick/component/http@v1"` | Yes | || `resource` | `string` |The URL base to use.||| +| `kind` | `string` | must be `"wick/component/http@v1"` | Yes | || `resource` | `string` |The URL base to use.|Yes|| | `codec` | [`Codec`](#codec) |The codec to use when encoding/decoding data. Can be overridden by individual operations.||| | `proxy` | [`Proxy`](#proxy) |The proxy HTTP / HTTPS to use.||| | `timeout` | `u16` |The timeout in seconds||| @@ -1848,10 +1865,10 @@ Any one of the following types: | Field name | Type | Description | Required? | Shortform? | |------------|------|-------------|-----------|------------| -| `name` | `string` |The name of the operation.||| +| `name` | `string` |The name of the operation.|Yes|| | `with` | [`Field`](#field)[] |Any configuration required by the operation.||| | `inputs` | [`Field`](#field)[] |Types of the inputs to the operation.||| -| `method` | [`HttpMethod`](#httpmethod) |The HTTP method to use.||| +| `method` | [`HttpMethod`](#httpmethod) |The HTTP method to use.|Yes|| | `codec` | [`Codec`](#codec) |The codec to use when encoding/decoding data.||| | `headers` | `{` `string` `: ` `string`[] `}` |Any headers to add to the request.||| | `body` | [`LiquidJsonValue`](#liquidjsonvalue) |The body to send, processed as a structured JSON liquid template.||| @@ -1877,6 +1894,7 @@ Any one of the following types: | Raw | unknown type | Raw bytes | | FormData | unknown type | Form Data | | Text | unknown type | Raw text | +| EventStream | unknown type | Eventstream | -------- From fd12266ca96911ffe92ce489a3424a95378c92d8 Mon Sep 17 00:00:00 2001 From: Fawad Shaikh Date: Mon, 9 Oct 2023 09:16:13 -0400 Subject: [PATCH 2/4] feat: streaming response support for http trigger --- Cargo.lock | 1 + .../wick-http-client/src/component.rs | 29 +--- crates/wick/wick-config/src/config/common.rs | 2 +- .../wick-config/src/config/common/http.rs | 72 ++++++++++ crates/wick/wick-runtime/Cargo.toml | 1 + .../src/triggers/http/component_utils.rs | 133 ++++++++++++++---- 6 files changed, 186 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40e2f8c75..900918e8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6370,6 +6370,7 @@ dependencies = [ "chrono", "cron", "derive_builder", + "eventsource-stream", "flow-component", "flow-graph", "flow-graph-interpreter", diff --git a/crates/components/wick-http-client/src/component.rs b/crates/components/wick-http-client/src/component.rs index 30690fac1..da0e44964 100644 --- a/crates/components/wick-http-client/src/component.rs +++ b/crates/components/wick-http-client/src/component.rs @@ -16,7 +16,7 @@ use wick_config::config::components::{ HttpClientOperationDefinition, OperationConfig, }; -use wick_config::config::{Codec, HttpMethod, LiquidJsonConfig, Metadata, UrlResource}; +use wick_config::config::{Codec, HttpEvent, HttpMethod, LiquidJsonConfig, Metadata, UrlResource}; use wick_config::{ConfigValidation, Resolver}; use wick_interface_types::{ComponentSignature, OperationSignatures}; use wick_packet::{Base64Bytes, FluxChannel, Invocation, Observer, Packet, PacketSender, PacketStream, RuntimeConfig}; @@ -355,18 +355,6 @@ async fn handle( Ok(()) } -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -struct WickEvent { - /// The event name if given - event: String, - /// The event data - data: String, - /// The event id if given - id: String, - /// Retry duration if given - retry: Option, -} - fn output_task( span: Span, codec: Codec, @@ -380,12 +368,7 @@ fn output_task( while let Some(event) = stream.next().await { match event { Ok(event) => { - let wick_event = WickEvent { - event: event.event, - data: event.data, - id: event.id, - retry: event.retry, - }; + let wick_event = HttpEvent::new(Some(event.event), event.data, Some(event.id), event.retry); span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body")); let _ = tx.send(Packet::encode("body", wick_event)); } @@ -704,12 +687,10 @@ mod test { let (app_config, component_config) = get_config(); let comp = get_component(&app_config, component_config); - // Simulate an event stream let event_stream = "data: {\"id\":\"1\",\"object\":\"event1\"}\n\n\ data: {\"id\":\"2\",\"object\":\"event2\"}\n\n"; let packets = packet_stream!(("input", event_stream)); - // Replace "event_stream_op" with the actual operation id for event stream let invocation = Invocation::test( "test_event_stream", Entity::local("event_stream_op"), @@ -726,8 +707,10 @@ mod test { let packets = stream.into_iter().collect::, _>>()?; for packet in packets { if packet.port() == "body" { - let response: WickEvent = packet.decode().unwrap(); - assert!(response.id == "1" && response.event == "event1" || response.id == "2" && response.event == "event2"); + let response: HttpEvent = packet.decode().unwrap(); + let response_id = response.get_id().as_ref().unwrap(); + let response_event = response.get_event().as_ref().unwrap(); + assert!(response_id == "1" && response_event == "event1" || response_id == "2" && response_event == "event2"); } else { let response: HttpResponse = packet.decode().unwrap(); assert_eq!(response.version, HttpVersion::Http11); diff --git a/crates/wick/wick-config/src/config/common.rs b/crates/wick/wick-config/src/config/common.rs index 2f9d90837..c1e0bb53f 100644 --- a/crates/wick/wick-config/src/config/common.rs +++ b/crates/wick/wick-config/src/config/common.rs @@ -29,7 +29,7 @@ pub use self::error_behavior::ErrorBehavior; pub use self::exposed_resources::{ExposedVolume, ExposedVolumeBuilder}; pub use self::glob::Glob; pub use self::host_definition::{HostConfig, HostConfigBuilder, HttpConfig, HttpConfigBuilder}; -pub use self::http::{Codec, HttpMethod}; +pub use self::http::{Codec, HttpEvent, HttpMethod}; pub use self::import_definition::ImportDefinition; pub use self::interface::InterfaceDefinition; pub use self::liquid_json_config::LiquidJsonConfig; diff --git a/crates/wick/wick-config/src/config/common/http.rs b/crates/wick/wick-config/src/config/common/http.rs index 1ee415f2d..66951828e 100644 --- a/crates/wick/wick-config/src/config/common/http.rs +++ b/crates/wick/wick-config/src/config/common/http.rs @@ -1,3 +1,5 @@ +use std::fmt::Write; + #[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)] /// Supported HTTP methods #[serde(rename_all = "kebab-case")] @@ -30,6 +32,76 @@ impl Default for Codec { } } +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +pub struct HttpEvent { + /// The event name if given + event: Option, + /// The event data + data: String, + /// The event id if given + id: Option, + /// Retry duration if given + retry: Option, +} + +impl HttpEvent { + #[must_use] + pub const fn new( + event: Option, + data: String, + id: Option, + retry: Option, + ) -> Self { + Self { event, data, id, retry } + } + + #[must_use] + pub const fn get_event(&self) -> &Option { + &self.event + } + #[must_use] + pub const fn get_data(&self) -> &String { + &self.data + } + #[must_use] + pub const fn get_id(&self) -> &Option { + &self.id + } + #[must_use] + pub const fn get_retry(&self) -> &Option { + &self.retry + } + + #[must_use] + pub fn to_sse_string(&self) -> String { + let mut sse_string = String::new(); + + if let Some(ref event) = self.event { + writeln!(sse_string, "event: {}", event).unwrap(); + } + + // Splitting data by newline to ensure each line is prefixed with "data: " + for line in self.data.split('\n') { + writeln!(sse_string, "data: {}", line).unwrap(); + } + + if let Some(ref id) = self.id { + writeln!(sse_string, "id: {}", id).unwrap(); + } + + if let Some(ref retry) = self.retry { + // Converting retry duration to milliseconds + let millis = retry.as_millis(); + writeln!(sse_string, "retry: {}", millis).unwrap(); + } + + // Adding the required empty line to separate events + sse_string.push_str("\n"); + + sse_string + } +} + impl std::fmt::Display for Codec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/crates/wick/wick-runtime/Cargo.toml b/crates/wick/wick-runtime/Cargo.toml index 1b6b24772..9794f02f7 100644 --- a/crates/wick/wick-runtime/Cargo.toml +++ b/crates/wick/wick-runtime/Cargo.toml @@ -47,6 +47,7 @@ hyper-staticfile = { workspace = true } hyper-reverse-proxy = { workspace = true } url = { workspace = true } bytes = { workspace = true } +eventsource-stream = { workspace = true } openapiv3 = { workspace = true } percent-encoding = { workspace = true } liquid = { workspace = true } diff --git a/crates/wick/wick-runtime/src/triggers/http/component_utils.rs b/crates/wick/wick-runtime/src/triggers/http/component_utils.rs index 0ab59d0e6..5b1edc060 100644 --- a/crates/wick/wick-runtime/src/triggers/http/component_utils.rs +++ b/crates/wick/wick-runtime/src/triggers/http/component_utils.rs @@ -5,10 +5,12 @@ use hyper::http::response::Builder; use hyper::http::{HeaderName, HeaderValue}; use hyper::{Body, Response, StatusCode}; use serde_json::{Map, Value}; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::oneshot; use tokio_stream::StreamExt; use tracing::Span; use uuid::Uuid; -use wick_config::config::Codec; +use wick_config::config::{Codec, HttpEvent}; use wick_interface_http::types as wick_http; use wick_packet::{ packets, @@ -132,21 +134,12 @@ pub(super) async fn handle_response_middleware( } } -pub(super) async fn respond( +async fn stream_response( codec: Codec, - stream: Result, -) -> Result, HttpError> { - if let Err(e) = stream { - return Ok( - Builder::new() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(e.to_string())) - .unwrap(), - ); - } - let mut stream = stream.unwrap(); - let mut builder = Response::builder(); - let mut body = bytes::BytesMut::new(); + mut stream: PacketStream, + mut oneshot_channel: Option>, + tx_channel: tokio::sync::mpsc::UnboundedSender, HttpError>>, +) -> Result<(), HttpError> { while let Some(packet) = stream.next().await { match packet { Ok(p) => { @@ -159,8 +152,11 @@ pub(super) async fn respond( } let response: wick_interface_http::types::HttpResponse = p .decode() - .map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string()))?; - builder = convert_response(builder, response)?; + .map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string())) + .unwrap(); + let mut builder = Response::builder(); + builder = convert_response(builder, response).unwrap(); + let _ = oneshot_channel.take().unwrap().send(builder); } else if p.port() == "body" { if let PacketPayload::Err(e) = p.payload() { return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); @@ -168,22 +164,103 @@ pub(super) async fn respond( if !p.has_data() { continue; } - if codec == Codec::Json { - let response: Value = p.decode().map_err(|e| HttpError::Codec(codec, e.to_string()))?; - let as_str = response.to_string(); - let bytes = as_str.as_bytes(); - body.extend_from_slice(bytes); - } else { - let response: Base64Bytes = p.decode().map_err(|e| HttpError::Bytes(e.to_string()))?; - body.extend_from_slice(&response); - } + let response: Value = p + .decode_value() + .map_err(|e| HttpError::Codec(codec, e.to_string())) + .unwrap(); + let http_event: HttpEvent = serde_json::from_value(response).unwrap(); + let as_str = http_event.to_sse_string(); + let bytes = as_str.as_bytes(); + let _ = tx_channel.send(Ok::<_, HttpError>(bytes.to_vec())); } } Err(e) => return Err(HttpError::OperationError(e.to_string())), } } - builder = reset_header(builder, CONTENT_LENGTH, body.len()); - Ok(builder.body(body.freeze().into()).unwrap()) + Ok(()) +} + +pub(super) async fn respond( + codec: Codec, + stream: Result, +) -> Result, HttpError> { + if let Err(e) = stream { + return Ok( + Builder::new() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(e.to_string())) + .unwrap(), + ); + } + let mut stream = stream.unwrap(); + if codec == Codec::EventStream { + let (tx, rx) = unbounded_channel(); + let (tx_one, rx_one) = oneshot::channel(); + let tx_one = Some(tx_one); + + tokio::spawn(async move { + let _ = stream_response(codec, stream, tx_one, tx).await; + }); + + let response = rx_one.await; + response.map_or_else( + |_| { + Ok( + Builder::new() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("data: No response received\n\ndata: [DONE]")) + .unwrap(), + ) + }, + |response| { + let mut builder = response; + builder = reset_header(builder, CONTENT_LENGTH, 0); + builder = builder.header("content-type", "text/event-stream"); + let body = Body::wrap_stream(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); + Ok(builder.body(body).unwrap()) + }, + ) + } else { + let mut body = bytes::BytesMut::new(); + let mut builder = Response::builder(); + while let Some(packet) = stream.next().await { + match packet { + Ok(p) => { + if p.port() == "response" { + if let PacketPayload::Err(e) = p.payload() { + return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); + } + if p.is_done() { + continue; + } + let response: wick_interface_http::types::HttpResponse = p + .decode() + .map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string()))?; + builder = convert_response(builder, response)?; + } else if p.port() == "body" { + if let PacketPayload::Err(e) = p.payload() { + return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())); + } + if !p.has_data() { + continue; + } + if codec == Codec::Json { + let response: Value = p.decode().map_err(|e| HttpError::Codec(codec, e.to_string()))?; + let as_str: String = response.to_string(); + let bytes = as_str.as_bytes(); + body.extend_from_slice(bytes); + } else { + let response: Base64Bytes = p.decode().map_err(|e| HttpError::Bytes(e.to_string()))?; + body.extend_from_slice(&response); + } + } + } + Err(e) => return Err(HttpError::OperationError(e.to_string())), + } + } + builder = reset_header(builder, CONTENT_LENGTH, body.len()); + Ok(builder.body(body.freeze().into()).unwrap()) + } } fn reset_header(mut builder: Builder, header: HeaderName, value: impl Into) -> Builder { From c733486a1497034f4559a94bd33e33b3538527d0 Mon Sep 17 00:00:00 2001 From: Fawad Shaikh Date: Mon, 9 Oct 2023 14:42:04 -0400 Subject: [PATCH 3/4] feat: fixed empty id and 0 content length --- crates/wick/wick-config/src/config/common/http.rs | 4 +++- crates/wick/wick-runtime/src/triggers/http/component_utils.rs | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/wick/wick-config/src/config/common/http.rs b/crates/wick/wick-config/src/config/common/http.rs index 66951828e..ea7e74177 100644 --- a/crates/wick/wick-config/src/config/common/http.rs +++ b/crates/wick/wick-config/src/config/common/http.rs @@ -86,7 +86,9 @@ impl HttpEvent { } if let Some(ref id) = self.id { - writeln!(sse_string, "id: {}", id).unwrap(); + if id != "" { + writeln!(sse_string, "id: {}", id).unwrap(); + } } if let Some(ref retry) = self.retry { diff --git a/crates/wick/wick-runtime/src/triggers/http/component_utils.rs b/crates/wick/wick-runtime/src/triggers/http/component_utils.rs index 5b1edc060..54b5971d4 100644 --- a/crates/wick/wick-runtime/src/triggers/http/component_utils.rs +++ b/crates/wick/wick-runtime/src/triggers/http/component_utils.rs @@ -213,9 +213,7 @@ pub(super) async fn respond( ) }, |response| { - let mut builder = response; - builder = reset_header(builder, CONTENT_LENGTH, 0); - builder = builder.header("content-type", "text/event-stream"); + let builder = response; let body = Body::wrap_stream(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)); Ok(builder.body(body).unwrap()) }, From acdaa5fbfae84a5335a64b8722f1e8a7fa357e4f Mon Sep 17 00:00:00 2001 From: Fawad Shaikh Date: Mon, 9 Oct 2023 16:31:29 -0400 Subject: [PATCH 4/4] feat: added HttpEvent type to http component --- .../wick-interface-http/component.yaml | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/interfaces/wick-interface-http/component.yaml b/crates/interfaces/wick-interface-http/component.yaml index 5499b1226..a5d9ae1f1 100644 --- a/crates/interfaces/wick-interface-http/component.yaml +++ b/crates/interfaces/wick-interface-http/component.yaml @@ -3,7 +3,7 @@ name: http kind: wick/types@v1 metadata: - version: 0.4.0 + version: 0.5.0 package: registry: host: registry.candle.dev @@ -242,3 +242,19 @@ types: types: - HttpRequest - HttpResponse + - name: HttpEvent + kind: wick/type/struct@v1 + description: HTTP server side event + fields: + - name: event + type: string? + description: The event name if given + - name: data + type: string + description: The event data + - name: id + type: string? + description: The event id if given + - name: retry + type: u32? + description: Retry duration if given