Skip to content

Commit

Permalink
feat: added event-stream handling to http client and raw router
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Oct 13, 2023
1 parent cefb885 commit 496c902
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 130 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ dyn-clone = { version = "1.0", default-features = false }
either = { version = "1.9.0", default-features = false }
env_logger = { version = "0.10", default-features = false }
escargot = { version = "0.5", default-features = false }
eventsource-stream = { version = "0.2", default-features = false, features = [
"std",
] }
futures = { version = "0.3", default-features = false, features = ["std"] }
getrandom = { version = "0.2", default-features = false }
getset = { version = "0.1", default-features = false }
Expand Down Expand Up @@ -287,6 +290,7 @@ tracing-log = { version = "0.1", default-features = false }
tracing-opentelemetry = { version = "0.19.0", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false }
trycmd = { version = "0.14", default-features = false }
uhttp_sse = { version = "0.5.1" }
url = { version = "2.3", default-features = false }
uuid = { version = "1.1", default-features = false }
wasm-encoder = { version = "0.31", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions crates/components/wick-http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ tokio = { workspace = true }
tracing = { workspace = true }
liquid-json = { workspace = true, features = ["serde"] }
anyhow = { workspace = true }
eventsource-stream = { workspace = true }
uhttp_sse = { workspace = true }
#
serde = { workspace = true, features = ["derive"] }
#
Expand Down
136 changes: 83 additions & 53 deletions crates/components/wick-http-client/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use eventsource_stream::Eventsource;
use flow_component::{BoxFuture, Component, ComponentError, LocalScope};
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::CONTENT_TYPE;
Expand All @@ -17,6 +18,7 @@ use wick_config::config::components::{
};
use wick_config::config::{Codec, HttpMethod, LiquidJsonConfig, Metadata, UrlResource};
use wick_config::{ConfigValidation, Resolver};
use wick_interface_http::types::HttpEvent;
use wick_interface_types::{ComponentSignature, OperationSignatures};
use wick_packet::{
Base64Bytes,
Expand Down Expand Up @@ -326,15 +328,28 @@ async fn handle(

invocation.trace(|| debug!(status=%response.status(), "http:client:response_status"));

let codec = response.headers().get(CONTENT_TYPE).map_or(Codec::Raw, |value| {
let content_type = response.headers().get(CONTENT_TYPE);
let event_stream = content_type.map_or(false, |t| t == "text/event-stream");

let codec = content_type.map_or(Codec::Raw, |value| {
let value = value.to_str().unwrap();
let (value, _other) = value.split_once(';').unwrap_or((value, ""));
match value {
"application/json" => Codec::Json,
"application/x-www-form-urlencoded" => Codec::FormData,
_ => Codec::Raw,
if value.starts_with("text/") {
if event_stream {
Codec::Json
} else {
Codec::Text
}
} else {
match value {
"application/json" => Codec::Json,
"application/x-www-form-urlencoded" => Codec::FormData,
"application/xhtml+xml" => Codec::Text,
_ => Codec::Raw,
}
}
});

let (our_response, body_stream) = match crate::conversions::to_wick_response(response) {
Ok(r) => r,
Err(e) => {
Expand All @@ -348,7 +363,8 @@ async fn handle(
handles.push(tokio::spawn(output_task(
invocation.span.clone(),
codec,
Box::pin(body_stream),
body_stream,
event_stream,
tx.clone(),
)));
}
Expand All @@ -359,67 +375,81 @@ async fn handle(
Ok(())
}

fn output_task(
fn output_task<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
span: Span,
codec: Codec,
mut body_stream: std::pin::Pin<Box<impl Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + 'static>>,
body_stream: T,
event_stream: bool,
tx: PacketSender,
) -> BoxFuture<'static, ()> {
let task = async move {
match codec {
Codec::Json => {
let bytes: Vec<Base64Bytes> = match body_stream.try_collect().await {
Ok(r) => r,
Err(e) => {
let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
return;
if let Err(e) = output_task_inner(span, codec, body_stream, event_stream, tx.clone()).await {
let _ = tx.send(Packet::err("body", e.to_string()));
}
};
Box::pin(task)
}

async fn output_task_inner<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
span: Span,
codec: Codec,
mut body_stream: T,
event_stream: bool,
tx: PacketSender,
) -> Result<(), anyhow::Error> {
match codec {
Codec::Json => {
if event_stream {
let mut stream = body_stream.map(Into::into).eventsource();
while let Some(event) = stream.next().await {
if let Err(e) = event {
tx.send(Packet::err("body", e.to_string()))?;
break;
}
};
let event = event.unwrap();
let wick_event = HttpEvent {
event: event.event,
data: event.data,
id: event.id,
retry: event.retry.map(|d| d.as_millis() as _),
};

span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body:event"));
tx.send(Packet::encode("body", wick_event))?;
}
} else {
let bytes: Vec<Base64Bytes> = body_stream.try_collect().await?;
let bytes = bytes.concat();

let json: Value = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(e) => {
let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
return;
}
};
let json: Value = serde_json::from_slice(&bytes)?;
span.in_scope(|| trace!(%json, "http:client:response_body"));
let _ = tx.send(Packet::encode("body", json));
tx.send(Packet::encode("body", json))?;
}
Codec::Raw => {
let _ = tx.send(Packet::open_bracket("body"));
while let Some(Ok(bytes)) = body_stream.next().await {
span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body"));
}
Codec::Raw => {
tx.send(Packet::open_bracket("body"))?;
while let Some(bytes) = body_stream.next().await {
let bytes = bytes?;

let _ = tx.send(Packet::encode("body", bytes));
}
let _ = tx.send(Packet::close_bracket("body"));
}
Codec::FormData => unreachable!("Form data on the response is not supported."),
Codec::Text => {
let bytes: Vec<Base64Bytes> = match body_stream.try_collect().await {
Ok(r) => r,
Err(e) => {
let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
return;
}
};
let bytes = bytes.concat();
span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body"));

let text = match String::from_utf8(bytes) {
Ok(r) => r,
Err(e) => {
let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
return;
}
};
span.in_scope(|| trace!(%text, "response body"));
let _ = tx.send(Packet::encode("body", text));
tx.send(Packet::encode("body", bytes))?;
}
tx.send(Packet::close_bracket("body"))?;
}
};
Box::pin(task)
Codec::FormData => unreachable!("Form data on the response is not supported."),
Codec::Text => {
tx.send(Packet::open_bracket("body"))?;
while let Some(bytes) = body_stream.next().await {
let bytes = bytes?;
let text = String::from_utf8(bytes.into())?;
span.in_scope(|| debug!(%text, "http:client:response_body"));
tx.send(Packet::encode("body", text))?;
}
tx.send(Packet::close_bracket("body"))?;
}
}
Ok(())
}

impl ConfigValidation for HttpClientComponent {
Expand Down
16 changes: 16 additions & 0 deletions crates/interfaces/wick-interface-http/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,19 @@ types:
types:
- HttpRequest
- HttpResponse
- name: HttpEvent
kind: wick/type/struct@v1
description: HTTP server side event
fields:
- name: event
type: string
description: The event name if given
- name: data
type: string
description: The event data
- name: id
type: string
description: The event id if given
- name: retry
type: u64?
description: Retry duration if given
3 changes: 2 additions & 1 deletion crates/wick/wick-trigger-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ bytes = { workspace = true }
openapiv3 = { workspace = true }
percent-encoding = { workspace = true }
liquid = { workspace = true }

eventsource-stream = { workspace = true }
tokio-stream = { workspace = true }

[dev-dependencies]

Expand Down
Loading

0 comments on commit 496c902

Please sign in to comment.