Skip to content

Commit

Permalink
Changes to be committed:
Browse files Browse the repository at this point in the history
  • Loading branch information
gitworkflows authored Sep 23, 2024
1 parent 3da8146 commit 20a5b33
Show file tree
Hide file tree
Showing 14 changed files with 1,256 additions and 28 deletions.
210 changes: 210 additions & 0 deletions bongonet-core/src/apps/http_app.rs
Original file line number Diff line number Diff line change
@@ -1 +1,211 @@
// Copyright 2024 Khulnasoft, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! The abstraction and implementation interface for service application logic
use async_trait::async_trait;
use bongonet_http::ResponseHeader;
use http::Response;
use log::{debug, error, trace};
use std::sync::Arc;

use crate::apps::HttpServerApp;
use crate::modules::http::{HttpModules, ModuleBuilder};
use crate::protocols::http::HttpTask;
use crate::protocols::http::ServerSession;
use crate::protocols::Stream;
use crate::server::ShutdownWatch;

/// This trait defines how to map a request to a response
#[async_trait]
pub trait ServeHttp {
/// Define the mapping from a request to a response.
/// Note that the request header is already read, but the implementation needs to read the
/// request body if any.
///
/// # Limitation
/// In this API, the entire response has to be generated before the end of this call.
/// So it is not suitable for streaming response or interactive communications.
/// Users need to implement their own [`super::HttpServerApp`] for those use cases.
async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
}

// TODO: remove this in favor of HttpServer?
#[async_trait]
impl<SV> HttpServerApp for SV
where
SV: ServeHttp + Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match http.read_request().await {
Ok(res) => match res {
false => {
debug!("Failed to read request header");
return None;
}
true => {
debug!("Successfully get a new request");
}
},
Err(e) => {
error!("HTTP server fails to read from downstream: {e}");
return None;
}
}
trace!("{:?}", http.req_header());
if *shutdown.borrow() {
http.set_keepalive(None);
} else {
http.set_keepalive(Some(60));
}
let new_response = self.response(&mut http).await;
let (parts, body) = new_response.into_parts();
let resp_header: ResponseHeader = parts.into();
match http.write_response_header(Box::new(resp_header)).await {
Ok(()) => {
debug!("HTTP response header done.");
}
Err(e) => {
error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
);
}
}
if !body.is_empty() {
// TODO: check if chunked encoding is needed
match http.write_response_body(body.into(), true).await {
Ok(_) => debug!("HTTP response written."),
Err(e) => error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
),
}
}
match http.finish().await {
Ok(c) => c,
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
}
}
}
}

/// A helper struct for HTTP server with http modules embedded
pub struct HttpServer<SV> {
app: SV,
modules: HttpModules,
}

impl<SV> HttpServer<SV> {
/// Create a new [HttpServer] with the given app which implements [ServeHttp]
pub fn new_app(app: SV) -> Self {
HttpServer {
app,
modules: HttpModules::new(),
}
}

/// Add [ModuleBuilder] to this [HttpServer]
pub fn add_module(&mut self, module: ModuleBuilder) {
self.modules.add_module(module)
}
}

#[async_trait]
impl<SV> HttpServerApp for HttpServer<SV>
where
SV: ServeHttp + Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match http.read_request().await {
Ok(res) => match res {
false => {
debug!("Failed to read request header");
return None;
}
true => {
debug!("Successfully get a new request");
}
},
Err(e) => {
error!("HTTP server fails to read from downstream: {e}");
return None;
}
}
trace!("{:?}", http.req_header());
if *shutdown.borrow() {
http.set_keepalive(None);
} else {
http.set_keepalive(Some(60));
}
let mut module_ctx = self.modules.build_ctx();
let req = http.req_header_mut();
module_ctx.request_header_filter(req).await.ok()?;
let new_response = self.app.response(&mut http).await;
let (parts, body) = new_response.into_parts();
let mut resp_header: ResponseHeader = parts.into();
module_ctx
.response_header_filter(&mut resp_header, body.is_empty())
.await
.ok()?;

let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
trace!("{task:?}");

match http.response_duplex_vec(vec![task]).await {
Ok(_) => {
debug!("HTTP response header done.");
}
Err(e) => {
error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
);
}
}

let mut body = Some(body.into());
module_ctx.response_body_filter(&mut body, true).ok()?;

let task = HttpTask::Body(body, true);

trace!("{task:?}");

// TODO: check if chunked encoding is needed
match http.response_duplex_vec(vec![task]).await {
Ok(_) => debug!("HTTP response written."),
Err(e) => error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
),
}
match http.finish().await {
Ok(c) => c,
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
}
}
}
}
4 changes: 2 additions & 2 deletions bongonet-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "bongonet-proxy"
version = "0.3.0"
authors = ["KhulnaSoft DevOps <info@khulnasoft.com>"]
authors = ["Yuchen Wu <yuchen@khulnasoft.com>"]
license = "Apache-2.0"
edition = "2021"
repository = "https://github.com/khulnasoft/bongonet"
Expand Down Expand Up @@ -64,4 +64,4 @@ boringssl = ["bongonet-core/boringssl", "bongonet-cache/boringssl"]
rustdoc-args = ["--cfg", "doc_async_trait"]

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(doc_async_trait)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(doc_async_trait)'] }
2 changes: 1 addition & 1 deletion bongonet-proxy/examples/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn main() {
let mut my_proxy = bongonet_proxy::http_proxy_service(
&my_server.configuration,
MyGateway {
req_metric: register_int_counter!("reg_counter", "Number of requests").unwrap(),
req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(),
},
);
my_proxy.add_tcp("0.0.0.0:6191");
Expand Down
90 changes: 90 additions & 0 deletions bongonet-proxy/examples/grpc_web_module.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 Khulnasoft, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use clap::Parser;

use bongonet_core::server::Server;
use bongonet_core::upstreams::peer::HttpPeer;
use bongonet_core::Result;
use bongonet_core::{
modules::http::{
grpc_web::{GrpcWeb, GrpcWebBridge},
HttpModules,
},
prelude::Opt,
};
use bongonet_proxy::{ProxyHttp, Session};

/// This example shows how to use the gRPC-web bridge module
pub struct GrpcWebBridgeProxy;

#[async_trait]
impl ProxyHttp for GrpcWebBridgeProxy {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}

fn init_downstream_modules(&self, modules: &mut HttpModules) {
// Add the gRPC web module
modules.add_module(Box::new(GrpcWeb))
}

async fn early_request_filter(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<()> {
let grpc = session
.downstream_modules_ctx
.get_mut::<GrpcWebBridge>()
.expect("GrpcWebBridge module added");

// initialize gRPC module for this request
grpc.init();
Ok(())
}

async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
// this needs to be your gRPC server
let grpc_peer = Box::new(HttpPeer::new(
("1.1.1.1", 443),
true,
"one.one.one.one".to_string(),
));
Ok(grpc_peer)
}
}

// RUST_LOG=INFO cargo run --example grpc_web_module

fn main() {
env_logger::init();

// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();

let mut my_proxy =
bongonet_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
my_proxy.add_tcp("0.0.0.0:6194");

my_server.add_service(my_proxy);
my_server.run_forever();
}
Loading

0 comments on commit 20a5b33

Please sign in to comment.