Skip to content

Commit

Permalink
g3-icap-client: decode http body in RESPMOD if length is known
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Nov 26, 2024
1 parent 19acd0b commit 0e94eb6
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 96 deletions.
9 changes: 8 additions & 1 deletion lib/g3-http/src/client/adaptation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct HttpAdaptedResponse {
pub status: StatusCode,
pub reason: String,
pub headers: HttpHeaderMap,
pub content_length: Option<u64>,
}

impl HttpAdaptedResponse {
Expand All @@ -39,6 +40,7 @@ impl HttpAdaptedResponse {
status,
reason,
headers: HttpHeaderMap::default(),
content_length: None,
}
}

Expand Down Expand Up @@ -138,7 +140,12 @@ impl HttpAdaptedResponse {
// ignored hop-by-hop options
return Ok(());
}
"transfer-encoding" | "content-length" => {
"content-length" => {
let content_length = u64::from_str(header.value)
.map_err(|_| HttpResponseParseError::InvalidContentLength)?;
self.content_length = Some(content_length);
}
"transfer-encoding" => {
// this will always be chunked encoding
return Ok(());
}
Expand Down
73 changes: 48 additions & 25 deletions lib/g3-http/src/client/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,55 @@ impl HttpForwardRemoteResponse {

pub fn adapt_to_chunked(&self, adapted: HttpAdaptedResponse) -> Self {
let mut hop_by_hop_headers = self.hop_by_hop_headers.clone();
if !self.chunked_transfer {
if let Some(mut v) = hop_by_hop_headers.remove(header::TRANSFER_ENCODING) {
v.set_static_value("chunked");
hop_by_hop_headers.insert(header::TRANSFER_ENCODING, v);
} else {
hop_by_hop_headers.insert(
header::TRANSFER_ENCODING,
HttpHeaderValue::from_static("chunked"),
);
match adapted.content_length {
Some(content_length) => {
hop_by_hop_headers.remove(header::TRANSFER_ENCODING);
HttpForwardRemoteResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
content_length,
chunked_transfer: false,
has_transfer_encoding: false,
has_content_length: true,
has_keep_alive: self.has_keep_alive,
}
}
None => {
if !self.chunked_transfer {
if let Some(mut v) = hop_by_hop_headers.remove(header::TRANSFER_ENCODING) {
v.set_static_value("chunked");
hop_by_hop_headers.insert(header::TRANSFER_ENCODING, v);
} else {
hop_by_hop_headers.insert(
header::TRANSFER_ENCODING,
HttpHeaderValue::from_static("chunked"),
);
}
}
HttpForwardRemoteResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
content_length: 0,
chunked_transfer: true,
has_transfer_encoding: true,
has_content_length: false,
has_keep_alive: self.has_keep_alive,
}
}
}
HttpForwardRemoteResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
content_length: 0,
chunked_transfer: true,
has_transfer_encoding: true,
has_content_length: false,
has_keep_alive: false,
}
}

Expand Down
79 changes: 52 additions & 27 deletions lib/g3-http/src/client/transparent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,59 @@ impl HttpTransparentResponse {

pub fn adapt_to_chunked(&self, adapted: HttpAdaptedResponse) -> Self {
let mut hop_by_hop_headers = self.hop_by_hop_headers.clone();
if !self.chunked_transfer {
if let Some(mut v) = hop_by_hop_headers.remove(header::TRANSFER_ENCODING) {
v.set_static_value("chunked");
hop_by_hop_headers.insert(header::TRANSFER_ENCODING, v);
} else {
hop_by_hop_headers.insert(
header::TRANSFER_ENCODING,
HttpHeaderValue::from_static("chunked"),
);
match adapted.content_length {
Some(content_length) => {
hop_by_hop_headers.remove(header::TRANSFER_ENCODING);
HttpTransparentResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
connection_upgrade: self.connection_upgrade,
upgrade: self.upgrade.clone(),
content_length,
chunked_transfer: false,
has_transfer_encoding: false,
has_content_length: true,
has_keep_alive: self.has_keep_alive,
}
}
None => {
if !self.chunked_transfer {
if let Some(mut v) = hop_by_hop_headers.remove(header::TRANSFER_ENCODING) {
v.set_static_value("chunked");
hop_by_hop_headers.insert(header::TRANSFER_ENCODING, v);
} else {
hop_by_hop_headers.insert(
header::TRANSFER_ENCODING,
HttpHeaderValue::from_static("chunked"),
);
}
}
HttpTransparentResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
connection_upgrade: self.connection_upgrade,
upgrade: self.upgrade.clone(),
content_length: 0,
chunked_transfer: true,
has_transfer_encoding: true,
has_content_length: false,
has_keep_alive: self.has_keep_alive,
}
}
}
HttpTransparentResponse {
version: adapted.version,
code: adapted.status.as_u16(),
reason: adapted.reason,
end_to_end_headers: adapted.headers,
hop_by_hop_headers,
original_connection_name: self.original_connection_name.clone(),
extra_connection_headers: self.extra_connection_headers.clone(),
origin_header_size: self.origin_header_size,
keep_alive: self.keep_alive,
connection_upgrade: self.connection_upgrade,
upgrade: self.upgrade.clone(),
content_length: 0,
chunked_transfer: true,
has_transfer_encoding: true,
has_content_length: false,
has_keep_alive: false,
}
}

Expand Down
91 changes: 65 additions & 26 deletions lib/g3-icap-client/src/respmod/h1/bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

use std::sync::Arc;

use tokio::io::AsyncBufRead;
use anyhow::anyhow;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use tokio::time::Instant;

use g3_http::{H1BodyToChunkedTransfer, HttpBodyReader};
use g3_http::{H1BodyToChunkedTransfer, HttpBodyDecodeReader, HttpBodyReader};
use g3_io_ext::{IdleCheck, LimitedBufReadExt, LimitedCopy, LimitedCopyConfig, LimitedCopyError};

use super::{
Expand Down Expand Up @@ -119,7 +120,6 @@ impl<I: IdleCheck> BidirectionalRecvIcapResponse<'_, I> {
}

pub(super) struct BidirectionalRecvHttpResponse<'a, I: IdleCheck> {
pub(super) icap_reader: &'a mut IcapClientReader,
pub(super) http_body_line_max_size: usize,
pub(super) copy_config: LimitedCopyConfig,
pub(super) idle_checker: &'a I,
Expand All @@ -129,17 +129,19 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
pub(super) async fn transfer<H, UR, CW>(
self,
state: &mut RespmodAdaptationRunState,
mut ups_body_transfer: &mut H1BodyToChunkedTransfer<'_, UR, IcapClientWriter>,
ups_body_transfer: &mut H1BodyToChunkedTransfer<'_, UR, IcapClientWriter>,
http_header_size: usize,
orig_http_response: &H,
icap_reader: &mut IcapClientReader,
clt_writer: &mut CW,
) -> Result<RespmodAdaptationEndState<H>, H1RespmodAdaptationError>
where
H: HttpResponseForAdaptation,
UR: AsyncBufRead + Unpin,
CW: HttpResponseClientWriter<H> + Unpin,
{
let http_rsp = HttpAdaptedResponse::parse(self.icap_reader, http_header_size).await?;
let http_rsp = HttpAdaptedResponse::parse(icap_reader, http_header_size).await?;
let body_content_length = http_rsp.content_length;

let final_rsp = orig_http_response.adapt_to_chunked(http_rsp);
state.mark_clt_send_start();
Expand All @@ -149,11 +151,58 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
.map_err(H1RespmodAdaptationError::HttpClientWriteFailed)?;
state.mark_clt_send_header();

let mut adp_body_reader =
HttpBodyReader::new_chunked(self.icap_reader, self.http_body_line_max_size);
let mut adp_body_transfer =
LimitedCopy::new(&mut adp_body_reader, clt_writer, &self.copy_config);
match body_content_length {
Some(0) => Err(H1RespmodAdaptationError::InvalidHttpBodyFromIcapServer(
anyhow!("Content-Length is 0 but the ICAP server response contains http-body"),
)),
Some(expected) => {
let mut clt_body_reader =
HttpBodyDecodeReader::new_chunked(icap_reader, self.http_body_line_max_size);
let mut clt_body_transfer =
LimitedCopy::new(&mut clt_body_reader, clt_writer, &self.copy_config);
self.do_transfer(ups_body_transfer, &mut clt_body_transfer)
.await?;

state.mark_clt_send_all();
let copied = clt_body_transfer.copied_size();
if ups_body_transfer.finished() && clt_body_reader.trailer(128).await.is_ok() {
state.icap_io_finished = true;
}

if copied != expected {
return Err(H1RespmodAdaptationError::InvalidHttpBodyFromIcapServer(
anyhow!("Content-Length is {expected} but decoded length is {copied}"),
));
}
Ok(RespmodAdaptationEndState::AdaptedTransferred(final_rsp))
}
None => {
let mut clt_body_reader =
HttpBodyReader::new_chunked(icap_reader, self.http_body_line_max_size);
let mut clt_body_transfer =
LimitedCopy::new(&mut clt_body_reader, clt_writer, &self.copy_config);
self.do_transfer(ups_body_transfer, &mut clt_body_transfer)
.await?;

state.mark_clt_send_all();
state.icap_io_finished =
ups_body_transfer.finished() && clt_body_transfer.finished();

Ok(RespmodAdaptationEndState::AdaptedTransferred(final_rsp))
}
}
}

async fn do_transfer<UR, IR, CW>(
self,
mut ups_body_transfer: &mut H1BodyToChunkedTransfer<'_, UR, IcapClientWriter>,
mut clt_body_transfer: &mut LimitedCopy<'_, IR, CW>,
) -> Result<(), H1RespmodAdaptationError>
where
UR: AsyncBufRead + Unpin,
IR: AsyncRead + Unpin,
CW: AsyncWrite + Unpin,
{
let idle_duration = self.idle_checker.idle_duration();
let mut idle_interval =
tokio::time::interval_at(Instant::now() + idle_duration, idle_duration);
Expand All @@ -164,12 +213,8 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
r = &mut ups_body_transfer => {
return match r {
Ok(_) => {
match adp_body_transfer.await {
Ok(_) => {
state.mark_clt_send_all();
state.icap_io_finished = true;
Ok(RespmodAdaptationEndState::AdaptedTransferred(final_rsp))
}
match clt_body_transfer.await {
Ok(_) => Ok(()),
Err(LimitedCopyError::ReadFailed(e)) => Err(H1RespmodAdaptationError::IcapServerReadFailed(e)),
Err(LimitedCopyError::WriteFailed(e)) => Err(H1RespmodAdaptationError::HttpClientWriteFailed(e)),
}
Expand All @@ -178,21 +223,15 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
Err(LimitedCopyError::WriteFailed(e)) => Err(H1RespmodAdaptationError::IcapServerWriteFailed(e)),
};
}
r = &mut adp_body_transfer => {
r = &mut clt_body_transfer => {
return match r {
Ok(_) => {
state.mark_clt_send_all();
if ups_body_transfer.finished() {
state.icap_io_finished = true;
}
Ok(RespmodAdaptationEndState::AdaptedTransferred(final_rsp))
}
Ok(_) => Ok(()),
Err(LimitedCopyError::ReadFailed(e)) => Err(H1RespmodAdaptationError::IcapServerReadFailed(e)),
Err(LimitedCopyError::WriteFailed(e)) => Err(H1RespmodAdaptationError::HttpClientWriteFailed(e)),
};
}
_ = idle_interval.tick() => {
if ups_body_transfer.is_idle() && adp_body_transfer.is_idle() {
if ups_body_transfer.is_idle() && clt_body_transfer.is_idle() {
idle_count += 1;

let quit = self.idle_checker.check_quit(idle_count);
Expand All @@ -203,7 +242,7 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
} else {
Err(H1RespmodAdaptationError::IcapServerWriteIdle)
}
} else if adp_body_transfer.no_cached_data() {
} else if clt_body_transfer.no_cached_data() {
Err(H1RespmodAdaptationError::IcapServerReadIdle)
} else {
Err(H1RespmodAdaptationError::HttpClientWriteIdle)
Expand All @@ -213,7 +252,7 @@ impl<I: IdleCheck> BidirectionalRecvHttpResponse<'_, I> {
idle_count = 0;

ups_body_transfer.reset_active();
adp_body_transfer.reset_active();
clt_body_transfer.reset_active();
}

if let Some(reason) = self.idle_checker.check_force_quit() {
Expand Down
2 changes: 2 additions & 0 deletions lib/g3-icap-client/src/respmod/h1/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum H1RespmodAdaptationError {
InvalidIcapServerResponse(#[from] IcapRespmodParseError),
#[error("invalid http error response from icap server: {0}")]
InvalidIcapServerHttpResponse(#[from] HttpResponseParseError),
#[error("invalid http body from icap server: {0:?}")]
InvalidHttpBodyFromIcapServer(anyhow::Error),
#[error("error response from icap server: {0} ({1} {2})")]
IcapServerErrorResponse(IcapErrorReason, u16, String),
#[error("read from http upstream failed: {0:?}")]
Expand Down
2 changes: 1 addition & 1 deletion lib/g3-icap-client/src/respmod/h1/forward_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ impl<I: IdleCheck> HttpResponseAdapter<I> {
} else {
let icap_keepalive = rsp.keep_alive;
let bidirectional_transfer = BidirectionalRecvHttpResponse {
icap_reader: &mut self.icap_connection.1,
http_body_line_max_size: self.http_body_line_max_size,
copy_config: self.copy_config,
idle_checker: &self.idle_checker,
Expand All @@ -133,6 +132,7 @@ impl<I: IdleCheck> HttpResponseAdapter<I> {
&mut body_transfer,
header_size,
http_response,
&mut self.icap_connection.1,
clt_writer,
)
.await?;
Expand Down
Loading

0 comments on commit 0e94eb6

Please sign in to comment.