Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ic-http-gateway): fix body streaming implementation #14

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"

members = [
"examples/http-gateway/canister/src/custom_assets",
"examples/http-gateway/rust",
"packages/ic-http-gateway",
]

Expand All @@ -28,12 +29,15 @@ thiserror = "1"
futures = "0.3"
http = "1"
http-body = "1"
http-body-util = "0.1"
bytes = "1"
base64 = "0.22"
lazy_static = "1"
serde = "1"
serde_cbor = "0.11"
tokio = { version = "1", features = ["full"] }
hyper = { version = "1", features = ["full"] }
hyper-util = "0.1"

ic-cdk = "0.13"
ic-cdk-macros = "0.13"
Expand Down
19 changes: 19 additions & 0 deletions examples/http-gateway/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "http_gateway_rust"
version.workspace = true
authors.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true
license.workspace = true

[dependencies]
tokio.workspace = true
hyper.workspace = true
hyper-util.workspace = true
http-body-util.workspace = true

ic-http-gateway.workspace = true
ic-agent.workspace = true

pocket-ic.workspace = true
116 changes: 116 additions & 0 deletions examples/http-gateway/rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use http_body_util::BodyExt;
use hyper::{body::Incoming, server::conn::http2, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use ic_agent::Agent;
use ic_http_gateway::{HttpGatewayClient, HttpGatewayRequestArgs, HttpGatewayResponseBody};
use pocket_ic::PocketIcBuilder;
use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::{fs::File, io::AsyncReadExt, net::TcpListener, task};

pub async fn load_custom_assets_wasm() -> Vec<u8> {
load_wasm("http_gateway_canister_custom_assets").await
}

async fn load_wasm(canister: &str) -> Vec<u8> {
let file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../../.dfx/local/canisters")
.join(canister)
.join(format!("{}.wasm.gz", canister));

load_file(file_path).await
}

async fn load_file(file_path: PathBuf) -> Vec<u8> {
let mut file = File::open(&file_path).await.unwrap();

let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.unwrap();

buffer
}

fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
let wasm_bytes = rt.block_on(async { load_custom_assets_wasm().await });

let pic = PocketIcBuilder::new()
.with_nns_subnet()
.with_application_subnet()
.build();

let canister_id = pic.create_canister();
pic.add_cycles(canister_id, 2_000_000_000_000);
pic.install_canister(canister_id, wasm_bytes, vec![], None);

let url = pic.auto_progress();

let agent = Agent::builder().with_url(url).build().unwrap();
rt.block_on(async {
agent.fetch_root_key().await.unwrap();
});

let http_gateway = HttpGatewayClient::builder()
.with_agent(agent)
.build()
.unwrap();

rt.block_on(async {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await.unwrap();

println!("Listening on: {}", addr);

loop {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);

let http_gateway_clone = Arc::new(http_gateway.clone());

let service = service_fn(move |req: Request<Incoming>| {
let http_gateway_clone = Arc::clone(&http_gateway_clone);

async move {
let canister_request = Request::builder().uri(req.uri()).method(req.method());
let collected_req = req.collect().await.unwrap().to_bytes().to_vec();
let canister_request = canister_request.body(collected_req).unwrap();

let gateway_response = http_gateway_clone
.request(HttpGatewayRequestArgs {
canister_id,
canister_request,
})
.send()
.await;

Ok::<Response<HttpGatewayResponseBody>, Infallible>(
gateway_response.canister_response,
)
}
});

let local = task::LocalSet::new();
local
.run_until(async move {
if let Err(err) = http2::Builder::new(LocalExec)
.serve_connection(io, service)
.await
{
eprintln!("Error serving connection: {:?}", err);
}
})
.await;
}
});
}

#[derive(Clone, Copy, Debug)]
struct LocalExec;

impl<F> hyper::rt::Executor<F> for LocalExec
where
F: std::future::Future + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn_local(fut);
}
}
1 change: 1 addition & 0 deletions packages/ic-http-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ thiserror.workspace = true
futures.workspace = true
http.workspace = true
http-body.workspace = true
http-body-util.workspace = true
bytes.workspace = true

ic-agent.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions packages/ic-http-gateway/src/client/http_gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::{
};
use ic_agent::Agent;

#[derive(Clone)]
pub struct HttpGatewayClientArgs {
pub agent: Agent,
}

#[derive(Clone)]
pub struct HttpGatewayClient {
agent: Agent,
}
Expand Down
18 changes: 12 additions & 6 deletions packages/ic-http-gateway/src/protocol/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use candid::Principal;
use http::{Response, StatusCode};
use http_body_util::{BodyExt, Either, Full};
use ic_agent::{
agent::{RejectCode, RejectResponse},
Agent, AgentError,
Expand All @@ -18,7 +19,9 @@ use ic_utils::{
};

fn create_err_response(status_code: StatusCode, msg: &str) -> CanisterResponse {
let mut response = Response::new(HttpGatewayResponseBody::Bytes(msg.as_bytes().to_vec()));
let mut response = Response::new(HttpGatewayResponseBody::Right(Full::from(
msg.as_bytes().to_vec(),
)));
*response.status_mut() = status_code;

response
Expand Down Expand Up @@ -72,7 +75,7 @@ pub async fn process_request(
metadata: HttpGatewayResponseMetadata {
upgraded_to_update_call: false,
response_verification_version: None,
internal_error: Some(e.into()),
internal_error: Some(e),
},
}
}
Expand Down Expand Up @@ -176,7 +179,10 @@ pub async fn process_request(
// strategy. Performing verification for those requests would required to join all the chunks
// and this could cause memory issues and possibly create DOS attack vectors.
match &response_body {
HttpGatewayResponseBody::Bytes(body) => {
Either::Right(body) => {
// this unwrap should never panic because `Either::Right` will always have a full body
let body = body.clone().collect().await.unwrap().to_bytes().to_vec();

let validation_result = validate(
agent,
&canister_id,
Expand All @@ -188,7 +194,7 @@ pub async fn process_request(
.iter()
.map(|HeaderField(k, v)| (k.to_string(), v.to_string()))
.collect(),
body: body.to_owned(),
body,
upgrade: None,
},
allow_skip_verification,
Expand Down Expand Up @@ -327,7 +333,7 @@ fn handle_agent_error(error: &AgentError) -> CanisterResponse {
reject_code: RejectCode::DestinationInvalid,
reject_message,
..
}) => create_err_response(StatusCode::NOT_FOUND, &reject_message),
}) => create_err_response(StatusCode::NOT_FOUND, reject_message),

// If the result is a Replica error, returns the 500 code and message. There is no information
// leak here because a user could use `dfx` to get the same reply.
Expand All @@ -343,7 +349,7 @@ fn handle_agent_error(error: &AgentError) -> CanisterResponse {
reject_code: RejectCode::DestinationInvalid,
reject_message,
..
}) => create_err_response(StatusCode::NOT_FOUND, &reject_message),
}) => create_err_response(StatusCode::NOT_FOUND, reject_message),

// If the result is a Replica error, returns the 500 code and message. There is no information
// leak here because a user could use `dfx` to get the same reply.
Expand Down
Loading
Loading