Skip to content

Commit

Permalink
fix trailer encoding when convert h2 to h1
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 14, 2024
1 parent 373e64d commit 702cd1e
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 77 deletions.
46 changes: 21 additions & 25 deletions lib/g3-h2/src/body/from_chunked_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::task::{ready, Context, Poll};
use thiserror::Error;
use tokio::io::AsyncBufRead;

use g3_http::{ChunkedDecodeReader, TrailerReadError, TrailerReader};
use g3_http::{ChunkedDataDecodeReader, TrailerReadError, TrailerReader};
use g3_io_ext::LimitedCopyConfig;

use super::{H2StreamBodyEncodeTransferError, ROwnedH2BodyEncodeTransfer};
Expand Down Expand Up @@ -91,22 +91,27 @@ where
io::Error::new(io::ErrorKind::InvalidData, "too large trailer"),
),
})?;
self.send_stream
.send_trailers(headers.to_h2_map())
.map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?;
if headers.is_empty() {
self.send_stream
.send_data(Bytes::new(), true)
.map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?;
} else {
self.send_stream
.send_trailers(headers.to_h2_map())
.map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?;
}
Poll::Ready(Ok(()))
}
}

enum TransferState<'a, R> {
Data(ROwnedH2BodyEncodeTransfer<'a, ChunkedDecodeReader<'a, R>>),
Data(ROwnedH2BodyEncodeTransfer<'a, ChunkedDataDecodeReader<'a, R>>),
Trailer(TrailerTransfer<'a, R>),
End,
}

pub struct H2StreamFromChunkedTransfer<'a, R> {
state: TransferState<'a, R>,
has_trailer: bool,
trailer_max_size: usize,
active: bool,
}
Expand All @@ -118,13 +123,11 @@ impl<'a, R> H2StreamFromChunkedTransfer<'a, R> {
copy_config: &LimitedCopyConfig,
body_line_max_size: usize,
trailer_max_size: usize,
has_trailer: bool,
) -> Self {
let decoder = ChunkedDecodeReader::new(reader, body_line_max_size);
let decoder = ChunkedDataDecodeReader::new(reader, body_line_max_size);
let encode = ROwnedH2BodyEncodeTransfer::new(decoder, send_stream, copy_config);
H2StreamFromChunkedTransfer {
state: TransferState::Data(encode),
has_trailer,
trailer_max_size,
active: false,
}
Expand Down Expand Up @@ -205,22 +208,15 @@ where
let TransferState::Data(encode) = old_state else {
unreachable!()
};
if self.has_trailer {
let (reader, send_stream) = encode.into_io();
let reader = reader.into_reader();
self.state = TransferState::Trailer(TrailerTransfer::new(
reader,
send_stream,
self.trailer_max_size,
));
self.poll(cx)
} else {
let (_, send_stream) = encode.into_io();
send_stream
.send_data(Bytes::new(), true)
.map_err(H2StreamFromChunkedTransferError::SendDataFailed)?;
Poll::Ready(Ok(()))
}

let (reader, send_stream) = encode.into_io();
// read trailer and send
self.state = TransferState::Trailer(TrailerTransfer::new(
reader.into_reader(),
send_stream,
self.trailer_max_size,
));
self.poll(cx)
}
}
}
Expand Down
29 changes: 8 additions & 21 deletions lib/g3-h2/src/body/to_chunked_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ enum TransferStage {
}

struct ChunkedEncodeTransferInternal {
has_trailer: bool,
yield_size: usize,
this_chunk_size: usize,
chunk: Option<Bytes>,
Expand All @@ -57,9 +56,8 @@ struct ChunkedEncodeTransferInternal {
}

impl ChunkedEncodeTransferInternal {
fn new(has_trailer: bool, yield_size: usize) -> Self {
fn new(yield_size: usize) -> Self {
ChunkedEncodeTransferInternal {
has_trailer,
yield_size,
this_chunk_size: 0,
chunk: None,
Expand All @@ -74,9 +72,8 @@ impl ChunkedEncodeTransferInternal {
}
}

fn with_chunk(has_trailer: bool, yield_size: usize, chunk: Bytes) -> Self {
fn with_chunk(yield_size: usize, chunk: Bytes) -> Self {
ChunkedEncodeTransferInternal {
has_trailer,
yield_size,
this_chunk_size: 0,
chunk: Some(chunk),
Expand Down Expand Up @@ -208,9 +205,9 @@ impl ChunkedEncodeTransferInternal {
self.active = true;
self.static_header.clear();
if self.total_write == 0 {
let _ = write!(&mut self.static_header, "0\r\n\r\n");
let _ = write!(&mut self.static_header, "0\r\n");
} else {
let _ = write!(&mut self.static_header, "\r\n0\r\n\r\n");
let _ = write!(&mut self.static_header, "\r\n0\r\n");
}
self.static_offset = 0;
self.this_chunk_size = 0;
Expand All @@ -228,11 +225,7 @@ impl ChunkedEncodeTransferInternal {
self.total_write += nw as u64;
}
if self.read_data_finished {
if self.has_trailer {
self.transfer_stage = TransferStage::Trailer;
} else {
self.transfer_stage = TransferStage::End;
}
self.transfer_stage = TransferStage::Trailer;
return self.poll_transfer(cx, recv_stream, writer);
}

Expand Down Expand Up @@ -294,30 +287,24 @@ pub struct H2StreamToChunkedTransfer<'a, W> {
}

impl<'a, W> H2StreamToChunkedTransfer<'a, W> {
pub fn new(
recv_stream: &'a mut RecvStream,
writer: &'a mut W,
has_trailer: bool,
yield_size: usize,
) -> Self {
pub fn new(recv_stream: &'a mut RecvStream, writer: &'a mut W, yield_size: usize) -> Self {
H2StreamToChunkedTransfer {
recv_stream,
writer,
internal: ChunkedEncodeTransferInternal::new(has_trailer, yield_size),
internal: ChunkedEncodeTransferInternal::new(yield_size),
}
}

pub fn with_chunk(
recv_stream: &'a mut RecvStream,
writer: &'a mut W,
has_trailer: bool,
yield_size: usize,
chunk: Bytes,
) -> Self {
H2StreamToChunkedTransfer {
recv_stream,
writer,
internal: ChunkedEncodeTransferInternal::with_chunk(has_trailer, yield_size, chunk),
internal: ChunkedEncodeTransferInternal::with_chunk(yield_size, chunk),
}
}

Expand Down
70 changes: 60 additions & 10 deletions lib/g3-http/src/body/chunked_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};

use crate::parse::HttpChunkedLine;

struct ChunkedDecodeReaderInternal {
struct ChunkedDataDecodeReaderInternal {
body_line_max_size: usize,
chunk_header: Vec<u8>,
this_chunk_size: u64,
Expand All @@ -33,9 +33,9 @@ struct ChunkedDecodeReaderInternal {
poll_chunk_end: bool,
}

impl ChunkedDecodeReaderInternal {
impl ChunkedDataDecodeReaderInternal {
fn new(body_line_max_size: usize) -> Self {
ChunkedDecodeReaderInternal {
ChunkedDataDecodeReaderInternal {
body_line_max_size,
chunk_header: Vec::with_capacity(32),
this_chunk_size: 0,
Expand All @@ -46,6 +46,10 @@ impl ChunkedDecodeReaderInternal {
}
}

fn finished(&self) -> bool {
self.poll_chunk_end && self.this_chunk_size == 0
}

fn poll_decode<R>(
&mut self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -192,33 +196,39 @@ impl ChunkedDecodeReaderInternal {
self.this_chunk_size = chunk_line.chunk_size;
self.left_chunk_size = chunk_line.chunk_size;
if self.left_chunk_size == 0 {
self.poll_chunk_end_r = true;
self.poll_chunk_end = true;
return Poll::Ready(Ok(()));
}
self.chunk_header.clear();
}
}
}
}

pub struct ChunkedDecodeReader<'a, R> {
/// Decode chunked data, and leave the trailer fields
pub struct ChunkedDataDecodeReader<'a, R> {
reader: &'a mut R,
internal: ChunkedDecodeReaderInternal,
internal: ChunkedDataDecodeReaderInternal,
}

impl<'a, R> ChunkedDecodeReader<'a, R> {
impl<'a, R> ChunkedDataDecodeReader<'a, R> {
pub fn new(reader: &'a mut R, body_line_max_size: usize) -> Self {
ChunkedDecodeReader {
ChunkedDataDecodeReader {
reader,
internal: ChunkedDecodeReaderInternal::new(body_line_max_size),
internal: ChunkedDataDecodeReaderInternal::new(body_line_max_size),
}
}

pub fn into_reader(self) -> &'a mut R {
self.reader
}

pub fn finished(&self) -> bool {
self.internal.finished()
}
}

impl<'a, R> AsyncRead for ChunkedDecodeReader<'a, R>
impl<'a, R> AsyncRead for ChunkedDataDecodeReader<'a, R>
where
R: AsyncBufRead + Unpin,
{
Expand All @@ -242,3 +252,43 @@ where
}
}
}

#[cfg(test)]
mod test {
use super::*;
use bytes::Bytes;
use tokio::io::{AsyncReadExt, BufReader, Result};
use tokio_util::io::StreamReader;

#[tokio::test]
async fn read_single_chunked() {
let body_len: usize = 9;
let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\n\r\nXXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_deocder = ChunkedDataDecodeReader::new(&mut buf_stream, 1024);

let mut buf = [0u8; 32];
let len = body_deocder.read(&mut buf).await.unwrap();
assert_eq!(len, body_len);
assert_eq!(&buf[0..len], b"test\nbody");
assert!(body_deocder.finished());
}

#[tokio::test]
async fn read_single_tailer() {
let body_len: usize = 9;
let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\nA: B\r\nXXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_deocder = ChunkedDataDecodeReader::new(&mut buf_stream, 1024);

let mut buf = [0u8; 32];
let len = body_deocder.read(&mut buf).await.unwrap();
assert_eq!(len, body_len);
assert_eq!(&buf[0..len], b"test\nbody");
assert!(body_deocder.finished());
}
}
2 changes: 1 addition & 1 deletion lib/g3-http/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod chunked_encoder;
pub use chunked_encoder::ChunkedNoTrailerEncodeTransfer;

mod chunked_decoder;
pub use chunked_decoder::ChunkedDecodeReader;
pub use chunked_decoder::ChunkedDataDecodeReader;

mod trailer_reader;
pub use trailer_reader::{TrailerReadError, TrailerReader};
2 changes: 1 addition & 1 deletion lib/g3-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use parse::{

mod body;
pub use body::{
ChunkedDecodeReader, ChunkedNoTrailerEncodeTransfer, ChunkedTransfer, HttpBodyReader,
ChunkedDataDecodeReader, ChunkedNoTrailerEncodeTransfer, ChunkedTransfer, HttpBodyReader,
HttpBodyType, PreviewData, PreviewDataState, PreviewError, TrailerReadError, TrailerReader,
};

Expand Down
2 changes: 0 additions & 2 deletions lib/g3-icap-client/src/reqmod/h2/bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpRequest<'a, I> {
)
.await?;
let trailers = self.icap_rsp.take_trailers();
let has_trailer = !trailers.is_empty();
http_req.set_trailer(trailers);

let final_req = orig_http_request.adapt_to(&http_req);
Expand All @@ -159,7 +158,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpRequest<'a, I> {
&self.copy_config,
self.http_body_line_max_size,
self.http_trailer_max_size,
has_trailer,
);

let idle_duration = self.idle_checker.idle_duration();
Expand Down
2 changes: 0 additions & 2 deletions lib/g3-icap-client/src/reqmod/h2/forward_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
) -> Result<ReqmodAdaptationEndState, H2ReqmodAdaptationError> {
let http_header = http_request.serialize_for_adapter();
let icap_header = self.build_forward_all_request(&http_request, http_header.len());
let has_trailer = http_request.headers().contains_key(http::header::TRAILER);

let icap_w = &mut self.icap_connection.0;
icap_w
Expand All @@ -72,7 +71,6 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
let mut body_transfer = H2StreamToChunkedTransfer::new(
&mut clt_body,
&mut self.icap_connection.0,
has_trailer,
self.copy_config.yield_size(),
);
let bidirectional_transfer = BidirectionalRecvIcapResponse {
Expand Down
1 change: 0 additions & 1 deletion lib/g3-icap-client/src/reqmod/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl ReqmodRecvHttpResponseBody {
&self.copy_config,
self.http_body_line_max_size,
self.http_trailer_max_size,
self.has_trailer,
)
}

Expand Down
3 changes: 0 additions & 3 deletions lib/g3-icap-client/src/reqmod/h2/preview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
let http_header = http_request.serialize_for_adapter();
let icap_header =
self.build_preview_request(&http_request, http_header.len(), preview_size);
let has_trailer = http_request.headers().contains_key(http::header::TRAILER);

let icap_w = &mut self.icap_connection.0;
icap_w
Expand Down Expand Up @@ -130,14 +129,12 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
H2StreamToChunkedTransfer::new(
&mut clt_body,
&mut self.icap_connection.0,
has_trailer,
self.copy_config.yield_size(),
)
} else {
H2StreamToChunkedTransfer::with_chunk(
&mut clt_body,
&mut self.icap_connection.0,
has_trailer,
self.copy_config.yield_size(),
initial_body_data,
)
Expand Down
2 changes: 0 additions & 2 deletions lib/g3-icap-client/src/reqmod/h2/recv_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
)
.await?;
let trailers = icap_rsp.take_trailers();
let has_trailer = !trailers.is_empty();
http_req.set_trailer(trailers);

let final_req = orig_http_request.adapt_to(&http_req);
Expand All @@ -257,7 +256,6 @@ impl<I: IdleCheck> H2RequestAdapter<I> {
&self.copy_config,
self.http_body_line_max_size,
self.http_trailer_max_size,
has_trailer,
);

let idle_duration = self.idle_checker.idle_duration();
Expand Down
Loading

0 comments on commit 702cd1e

Please sign in to comment.