diff --git a/Cargo.lock b/Cargo.lock index 0713d02..1f1593d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1140,9 +1140,11 @@ dependencies = [ name = "ic-http-gateway" version = "0.0.0" dependencies = [ + "bytes", "candid", "futures", "http 1.1.0", + "http-body", "ic-agent", "ic-http-certification", "ic-response-verification", diff --git a/Cargo.toml b/Cargo.toml index a918da6..e351acc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ codegen-units = 1 thiserror = "1" futures = "0.3" http = "1" +http-body = "1" +bytes = "1" base64 = "0.22" lazy_static = "1" serde = "1" diff --git a/packages/ic-http-gateway/Cargo.toml b/packages/ic-http-gateway/Cargo.toml index be4e04f..f1a474b 100644 --- a/packages/ic-http-gateway/Cargo.toml +++ b/packages/ic-http-gateway/Cargo.toml @@ -23,6 +23,8 @@ homepage.workspace = true thiserror.workspace = true futures.workspace = true http.workspace = true +http-body.workspace = true +bytes.workspace = true ic-agent.workspace = true ic-utils.workspace = true diff --git a/packages/ic-http-gateway/src/protocol/handler.rs b/packages/ic-http-gateway/src/protocol/handler.rs index 946a207..04a925a 100644 --- a/packages/ic-http-gateway/src/protocol/handler.rs +++ b/packages/ic-http-gateway/src/protocol/handler.rs @@ -18,9 +18,16 @@ use ic_utils::{ }; fn convert_request(request: CanisterRequest) -> HttpGatewayResult { + let uri = request.uri(); + let mut url = uri.path().to_string(); + if let Some(query) = uri.query() { + url.push('?'); + url.push_str(query); + } + Ok(HttpRequest { method: request.method().to_string(), - url: request.uri().to_string(), + url, headers: request .headers() .into_iter() @@ -46,7 +53,7 @@ pub async fn process_request( request: CanisterRequest, canister_id: Principal, allow_skip_verification: bool, -) -> HttpGatewayResult> { +) -> HttpGatewayResult { let http_request = convert_request(request)?; let canister = HttpRequestCanister::create(agent, canister_id); @@ -246,7 +253,7 @@ pub async fn process_request( }) } -fn handle_agent_error<'a>(error: AgentError) -> HttpGatewayResult> { +fn handle_agent_error(error: AgentError) -> HttpGatewayResult { match error { // Turn all `DestinationInvalid`s into 404 AgentError::CertifiedReject(RejectResponse { diff --git a/packages/ic-http-gateway/src/request/http_gateway_request_builder.rs b/packages/ic-http-gateway/src/request/http_gateway_request_builder.rs index 84caf3c..62eb631 100644 --- a/packages/ic-http-gateway/src/request/http_gateway_request_builder.rs +++ b/packages/ic-http-gateway/src/request/http_gateway_request_builder.rs @@ -31,13 +31,16 @@ impl<'a> HttpGatewayRequestBuilder<'a> { } } - pub fn unsafe_allow_skip_verification(mut self) -> Self { - self.allow_skip_verification = true; + pub fn unsafe_set_allow_skip_verification( + &mut self, + allow_skip_verification: bool, + ) -> &mut Self { + self.allow_skip_verification = allow_skip_verification; self } - pub async fn send(self) -> HttpGatewayResult> { + pub async fn send(self) -> HttpGatewayResult { process_request( self.args.agent, self.args.request_args.canister_request, diff --git a/packages/ic-http-gateway/src/response/http_gateway_response.rs b/packages/ic-http-gateway/src/response/http_gateway_response.rs index 99436a8..0c80676 100644 --- a/packages/ic-http-gateway/src/response/http_gateway_response.rs +++ b/packages/ic-http-gateway/src/response/http_gateway_response.rs @@ -1,5 +1,7 @@ +use bytes::Bytes; use futures::Stream; use http::Response; +use http_body::{Body, Frame, SizeHint}; use ic_agent::AgentError; use std::{ fmt::{Debug, Formatter}, @@ -7,14 +9,14 @@ use std::{ task::{Context, Poll}, }; -pub type CanisterResponse<'a> = Response>; +pub type CanisterResponse = Response; /// A response from the HTTP gateway. #[derive(Debug)] -pub struct HttpGatewayResponse<'a> { +pub struct HttpGatewayResponse { /// The certified response, excluding uncertified headers. /// If response verification v1 is used, the original, uncertified headers are returned. - pub canister_response: CanisterResponse<'a>, + pub canister_response: CanisterResponse, /// Additional metadata regarding the response. pub metadata: HttpGatewayResponseMetadata, @@ -22,25 +24,67 @@ pub struct HttpGatewayResponse<'a> { /// The body of an HTTP gateway response. #[derive(Debug)] -pub enum HttpGatewayResponseBody<'a> { +pub enum HttpGatewayResponseBody { /// A byte array representing the response body. Bytes(Vec), /// A stream of response body chunks. - Stream(ResponseBodyStream<'a>), + Stream(ResponseBodyStream), +} + +impl Body for HttpGatewayResponseBody { + type Data = Bytes; + type Error = AgentError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.get_mut() { + HttpGatewayResponseBody::Bytes(bytes) => { + Poll::Ready(Some(Ok(Frame::data(Bytes::from(bytes.clone()))))) + } + HttpGatewayResponseBody::Stream(stream) => Stream::poll_next(Pin::new(stream), cx), + } + } + + fn is_end_stream(&self) -> bool { + match self { + HttpGatewayResponseBody::Bytes(_) => true, + HttpGatewayResponseBody::Stream(_) => false, + } + } + + fn size_hint(&self) -> SizeHint { + match self { + HttpGatewayResponseBody::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64), + HttpGatewayResponseBody::Stream(stream) => { + let (lower, upper) = stream.size_hint(); + + let mut size_hint = SizeHint::new(); + size_hint.set_lower(lower as u64); + + if let Some(upper) = upper { + size_hint.set_upper(upper as u64); + } + + size_hint + } + } + } } /// An item in a response body stream. -pub type ResponseBodyStreamItem = Result, AgentError>; +pub type ResponseBodyStreamItem = Result, AgentError>; /// A stream of response body chunks. -pub struct ResponseBodyStream<'a> { - inner: Pin + 'a>>, +pub struct ResponseBodyStream { + inner: Pin + 'static>>, } // Trait bound added for cloning. -impl<'a> ResponseBodyStream<'a> { - pub fn new(stream: impl Stream + 'a) -> Self { +impl ResponseBodyStream { + pub fn new(stream: impl Stream + 'static) -> Self { Self { inner: Box::pin(stream), } @@ -48,14 +92,14 @@ impl<'a> ResponseBodyStream<'a> { } // Debug implementation remains the same -impl<'a> Debug for ResponseBodyStream<'a> { +impl Debug for ResponseBodyStream { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ResponseBodyStream").finish() } } // Stream implementation remains the same -impl<'a> Stream for ResponseBodyStream<'a> { +impl Stream for ResponseBodyStream { type Item = ResponseBodyStreamItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/packages/ic-http-gateway/src/response/response_handler.rs b/packages/ic-http-gateway/src/response/response_handler.rs index 62fe3e3..a6f8711 100644 --- a/packages/ic-http-gateway/src/response/response_handler.rs +++ b/packages/ic-http-gateway/src/response/response_handler.rs @@ -1,5 +1,7 @@ use crate::{HttpGatewayResponseBody, ResponseBodyStream}; +use bytes::Bytes; use futures::{stream, Stream, StreamExt, TryStreamExt}; +use http_body::Frame; use ic_agent::{Agent, AgentError}; use ic_utils::{ call::SyncCall, @@ -20,10 +22,10 @@ static STREAM_CALLBACK_BUFFER: usize = 2; pub type AgentResponseAny = AgentResponse; -pub async fn get_body_and_streaming_body<'a, 'b>( - agent: &'a Agent, - response: &'b AgentResponseAny, -) -> Result, AgentError> { +pub async fn get_body_and_streaming_body( + agent: &Agent, + response: &AgentResponseAny, +) -> Result { // if we already have the full body, we can return it early let Some(StreamingStrategy::Callback(callback_strategy)) = response.streaming_strategy.clone() else { @@ -71,16 +73,16 @@ pub async fn get_body_and_streaming_body<'a, 'b>( Ok(HttpGatewayResponseBody::Bytes(streamed_body)) } -fn create_body_stream<'a>( +fn create_body_stream( agent: Agent, callback: HttpRequestStreamingCallbackAny, token: Option, initial_body: Vec, -) -> ResponseBodyStream<'a> { - let chunks_stream = - create_stream(agent, callback, token).map(|chunk| chunk.map(|(body, _)| body)); +) -> ResponseBodyStream { + let chunks_stream = create_stream(agent, callback, token) + .map(|chunk| chunk.map(|(body, _)| Frame::data(Bytes::from(body)))); - let body_stream = stream::once(async move { Ok(initial_body) }) + let body_stream = stream::once(async move { Ok(Frame::data(Bytes::from(initial_body))) }) .chain(chunks_stream) .take(MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT) .map(|x| async move { x })