diff --git a/lib/g3-h2/src/body/from_chunked_transfer.rs b/lib/g3-h2/src/body/from_chunked_transfer.rs index 385cd0fa..099fc1c4 100644 --- a/lib/g3-h2/src/body/from_chunked_transfer.rs +++ b/lib/g3-h2/src/body/from_chunked_transfer.rs @@ -24,7 +24,7 @@ use std::task::{ready, Context, Poll}; use thiserror::Error; use tokio::io::AsyncBufRead; -use g3_http::{ChunkedDecodeReader, TrailerReadError, TrailerReader}; +use g3_http::{ChunkedDataDecodeReader, TrailerReadError, TrailerReader}; use g3_io_ext::LimitedCopyConfig; use super::{H2StreamBodyEncodeTransferError, ROwnedH2BodyEncodeTransfer}; @@ -91,22 +91,27 @@ where io::Error::new(io::ErrorKind::InvalidData, "too large trailer"), ), })?; - self.send_stream - .send_trailers(headers.to_h2_map()) - .map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?; + if headers.is_empty() { + self.send_stream + .send_data(Bytes::new(), true) + .map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?; + } else { + self.send_stream + .send_trailers(headers.to_h2_map()) + .map_err(H2StreamFromChunkedTransferError::SendTrailerFailed)?; + } Poll::Ready(Ok(())) } } enum TransferState<'a, R> { - Data(ROwnedH2BodyEncodeTransfer<'a, ChunkedDecodeReader<'a, R>>), + Data(ROwnedH2BodyEncodeTransfer<'a, ChunkedDataDecodeReader<'a, R>>), Trailer(TrailerTransfer<'a, R>), End, } pub struct H2StreamFromChunkedTransfer<'a, R> { state: TransferState<'a, R>, - has_trailer: bool, trailer_max_size: usize, active: bool, } @@ -118,13 +123,11 @@ impl<'a, R> H2StreamFromChunkedTransfer<'a, R> { copy_config: &LimitedCopyConfig, body_line_max_size: usize, trailer_max_size: usize, - has_trailer: bool, ) -> Self { - let decoder = ChunkedDecodeReader::new(reader, body_line_max_size); + let decoder = ChunkedDataDecodeReader::new(reader, body_line_max_size); let encode = ROwnedH2BodyEncodeTransfer::new(decoder, send_stream, copy_config); H2StreamFromChunkedTransfer { state: TransferState::Data(encode), - has_trailer, trailer_max_size, active: false, } @@ -205,22 +208,15 @@ where let TransferState::Data(encode) = old_state else { unreachable!() }; - if self.has_trailer { - let (reader, send_stream) = encode.into_io(); - let reader = reader.into_reader(); - self.state = TransferState::Trailer(TrailerTransfer::new( - reader, - send_stream, - self.trailer_max_size, - )); - self.poll(cx) - } else { - let (_, send_stream) = encode.into_io(); - send_stream - .send_data(Bytes::new(), true) - .map_err(H2StreamFromChunkedTransferError::SendDataFailed)?; - Poll::Ready(Ok(())) - } + + let (reader, send_stream) = encode.into_io(); + // read trailer (maybe empty) and send + self.state = TransferState::Trailer(TrailerTransfer::new( + reader.into_reader(), + send_stream, + self.trailer_max_size, + )); + self.poll(cx) } } } diff --git a/lib/g3-h2/src/body/to_chunked_transfer.rs b/lib/g3-h2/src/body/to_chunked_transfer.rs index 35958ca7..a7f28d1d 100644 --- a/lib/g3-h2/src/body/to_chunked_transfer.rs +++ b/lib/g3-h2/src/body/to_chunked_transfer.rs @@ -42,7 +42,6 @@ enum TransferStage { } struct ChunkedEncodeTransferInternal { - has_trailer: bool, yield_size: usize, this_chunk_size: usize, chunk: Option, @@ -57,9 +56,8 @@ struct ChunkedEncodeTransferInternal { } impl ChunkedEncodeTransferInternal { - fn new(has_trailer: bool, yield_size: usize) -> Self { + fn new(yield_size: usize) -> Self { ChunkedEncodeTransferInternal { - has_trailer, yield_size, this_chunk_size: 0, chunk: None, @@ -74,9 +72,8 @@ impl ChunkedEncodeTransferInternal { } } - fn with_chunk(has_trailer: bool, yield_size: usize, chunk: Bytes) -> Self { + fn with_chunk(yield_size: usize, chunk: Bytes) -> Self { ChunkedEncodeTransferInternal { - has_trailer, yield_size, this_chunk_size: 0, chunk: Some(chunk), @@ -208,9 +205,9 @@ impl ChunkedEncodeTransferInternal { self.active = true; self.static_header.clear(); if self.total_write == 0 { - let _ = write!(&mut self.static_header, "0\r\n\r\n"); + let _ = write!(&mut self.static_header, "0\r\n"); } else { - let _ = write!(&mut self.static_header, "\r\n0\r\n\r\n"); + let _ = write!(&mut self.static_header, "\r\n0\r\n"); } self.static_offset = 0; self.this_chunk_size = 0; @@ -228,11 +225,7 @@ impl ChunkedEncodeTransferInternal { self.total_write += nw as u64; } if self.read_data_finished { - if self.has_trailer { - self.transfer_stage = TransferStage::Trailer; - } else { - self.transfer_stage = TransferStage::End; - } + self.transfer_stage = TransferStage::Trailer; return self.poll_transfer(cx, recv_stream, writer); } @@ -294,30 +287,24 @@ pub struct H2StreamToChunkedTransfer<'a, W> { } impl<'a, W> H2StreamToChunkedTransfer<'a, W> { - pub fn new( - recv_stream: &'a mut RecvStream, - writer: &'a mut W, - has_trailer: bool, - yield_size: usize, - ) -> Self { + pub fn new(recv_stream: &'a mut RecvStream, writer: &'a mut W, yield_size: usize) -> Self { H2StreamToChunkedTransfer { recv_stream, writer, - internal: ChunkedEncodeTransferInternal::new(has_trailer, yield_size), + internal: ChunkedEncodeTransferInternal::new(yield_size), } } pub fn with_chunk( recv_stream: &'a mut RecvStream, writer: &'a mut W, - has_trailer: bool, yield_size: usize, chunk: Bytes, ) -> Self { H2StreamToChunkedTransfer { recv_stream, writer, - internal: ChunkedEncodeTransferInternal::with_chunk(has_trailer, yield_size, chunk), + internal: ChunkedEncodeTransferInternal::with_chunk(yield_size, chunk), } } diff --git a/lib/g3-http/src/body/chunked_decoder.rs b/lib/g3-http/src/body/chunked_decoder.rs index fae19f07..4aebb53a 100644 --- a/lib/g3-http/src/body/chunked_decoder.rs +++ b/lib/g3-http/src/body/chunked_decoder.rs @@ -23,7 +23,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; use crate::parse::HttpChunkedLine; -struct ChunkedDecodeReaderInternal { +struct ChunkedDataDecodeReaderInternal { body_line_max_size: usize, chunk_header: Vec, this_chunk_size: u64, @@ -33,9 +33,9 @@ struct ChunkedDecodeReaderInternal { poll_chunk_end: bool, } -impl ChunkedDecodeReaderInternal { +impl ChunkedDataDecodeReaderInternal { fn new(body_line_max_size: usize) -> Self { - ChunkedDecodeReaderInternal { + ChunkedDataDecodeReaderInternal { body_line_max_size, chunk_header: Vec::with_capacity(32), this_chunk_size: 0, @@ -46,6 +46,10 @@ impl ChunkedDecodeReaderInternal { } } + fn finished(&self) -> bool { + self.poll_chunk_end && self.this_chunk_size == 0 + } + fn poll_decode( &mut self, cx: &mut Context<'_>, @@ -192,7 +196,8 @@ impl ChunkedDecodeReaderInternal { self.this_chunk_size = chunk_line.chunk_size; self.left_chunk_size = chunk_line.chunk_size; if self.left_chunk_size == 0 { - self.poll_chunk_end_r = true; + self.poll_chunk_end = true; + return Poll::Ready(Ok(())); } self.chunk_header.clear(); } @@ -200,25 +205,30 @@ impl ChunkedDecodeReaderInternal { } } -pub struct ChunkedDecodeReader<'a, R> { +/// Decode chunked data, and leave the trailer fields +pub struct ChunkedDataDecodeReader<'a, R> { reader: &'a mut R, - internal: ChunkedDecodeReaderInternal, + internal: ChunkedDataDecodeReaderInternal, } -impl<'a, R> ChunkedDecodeReader<'a, R> { +impl<'a, R> ChunkedDataDecodeReader<'a, R> { pub fn new(reader: &'a mut R, body_line_max_size: usize) -> Self { - ChunkedDecodeReader { + ChunkedDataDecodeReader { reader, - internal: ChunkedDecodeReaderInternal::new(body_line_max_size), + internal: ChunkedDataDecodeReaderInternal::new(body_line_max_size), } } pub fn into_reader(self) -> &'a mut R { self.reader } + + pub fn finished(&self) -> bool { + self.internal.finished() + } } -impl<'a, R> AsyncRead for ChunkedDecodeReader<'a, R> +impl<'a, R> AsyncRead for ChunkedDataDecodeReader<'a, R> where R: AsyncBufRead + Unpin, { @@ -242,3 +252,43 @@ where } } } + +#[cfg(test)] +mod test { + use super::*; + use bytes::Bytes; + use tokio::io::{AsyncReadExt, BufReader, Result}; + use tokio_util::io::StreamReader; + + #[tokio::test] + async fn read_single_chunked() { + let body_len: usize = 9; + let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\n\r\nXXX"; + let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]); + let stream = StreamReader::new(stream); + let mut buf_stream = BufReader::new(stream); + let mut body_deocder = ChunkedDataDecodeReader::new(&mut buf_stream, 1024); + + let mut buf = [0u8; 32]; + let len = body_deocder.read(&mut buf).await.unwrap(); + assert_eq!(len, body_len); + assert_eq!(&buf[0..len], b"test\nbody"); + assert!(body_deocder.finished()); + } + + #[tokio::test] + async fn read_single_tailer() { + let body_len: usize = 9; + let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\nA: B\r\n\r\nXXX"; + let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]); + let stream = StreamReader::new(stream); + let mut buf_stream = BufReader::new(stream); + let mut body_deocder = ChunkedDataDecodeReader::new(&mut buf_stream, 1024); + + let mut buf = [0u8; 32]; + let len = body_deocder.read(&mut buf).await.unwrap(); + assert_eq!(len, body_len); + assert_eq!(&buf[0..len], b"test\nbody"); + assert!(body_deocder.finished()); + } +} diff --git a/lib/g3-http/src/body/mod.rs b/lib/g3-http/src/body/mod.rs index dd87b3f4..fbd04c3d 100644 --- a/lib/g3-http/src/body/mod.rs +++ b/lib/g3-http/src/body/mod.rs @@ -35,7 +35,7 @@ mod chunked_encoder; pub use chunked_encoder::ChunkedNoTrailerEncodeTransfer; mod chunked_decoder; -pub use chunked_decoder::ChunkedDecodeReader; +pub use chunked_decoder::ChunkedDataDecodeReader; mod trailer_reader; pub use trailer_reader::{TrailerReadError, TrailerReader}; diff --git a/lib/g3-http/src/lib.rs b/lib/g3-http/src/lib.rs index b85fdc53..9be267c6 100644 --- a/lib/g3-http/src/lib.rs +++ b/lib/g3-http/src/lib.rs @@ -21,7 +21,7 @@ pub use parse::{ mod body; pub use body::{ - ChunkedDecodeReader, ChunkedNoTrailerEncodeTransfer, ChunkedTransfer, HttpBodyReader, + ChunkedDataDecodeReader, ChunkedNoTrailerEncodeTransfer, ChunkedTransfer, HttpBodyReader, HttpBodyType, PreviewData, PreviewDataState, PreviewError, TrailerReadError, TrailerReader, }; diff --git a/lib/g3-icap-client/src/reqmod/h2/bidirectional.rs b/lib/g3-icap-client/src/reqmod/h2/bidirectional.rs index e2e1b32b..b1fc761c 100644 --- a/lib/g3-icap-client/src/reqmod/h2/bidirectional.rs +++ b/lib/g3-icap-client/src/reqmod/h2/bidirectional.rs @@ -144,7 +144,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpRequest<'a, I> { ) .await?; let trailers = self.icap_rsp.take_trailers(); - let has_trailer = !trailers.is_empty(); http_req.set_trailer(trailers); let final_req = orig_http_request.adapt_to(&http_req); @@ -159,7 +158,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpRequest<'a, I> { &self.copy_config, self.http_body_line_max_size, self.http_trailer_max_size, - has_trailer, ); let idle_duration = self.idle_checker.idle_duration(); diff --git a/lib/g3-icap-client/src/reqmod/h2/forward_body.rs b/lib/g3-icap-client/src/reqmod/h2/forward_body.rs index 7858a0e3..b537e07b 100644 --- a/lib/g3-icap-client/src/reqmod/h2/forward_body.rs +++ b/lib/g3-icap-client/src/reqmod/h2/forward_body.rs @@ -61,7 +61,6 @@ impl H2RequestAdapter { ) -> Result { let http_header = http_request.serialize_for_adapter(); let icap_header = self.build_forward_all_request(&http_request, http_header.len()); - let has_trailer = http_request.headers().contains_key(http::header::TRAILER); let icap_w = &mut self.icap_connection.0; icap_w @@ -72,7 +71,6 @@ impl H2RequestAdapter { let mut body_transfer = H2StreamToChunkedTransfer::new( &mut clt_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), ); let bidirectional_transfer = BidirectionalRecvIcapResponse { diff --git a/lib/g3-icap-client/src/reqmod/h2/mod.rs b/lib/g3-icap-client/src/reqmod/h2/mod.rs index 85e90837..a555a043 100644 --- a/lib/g3-icap-client/src/reqmod/h2/mod.rs +++ b/lib/g3-icap-client/src/reqmod/h2/mod.rs @@ -195,7 +195,6 @@ pub struct ReqmodRecvHttpResponseBody { copy_config: LimitedCopyConfig, http_body_line_max_size: usize, http_trailer_max_size: usize, - has_trailer: bool, } impl ReqmodRecvHttpResponseBody { @@ -209,7 +208,6 @@ impl ReqmodRecvHttpResponseBody { &self.copy_config, self.http_body_line_max_size, self.http_trailer_max_size, - self.has_trailer, ) } diff --git a/lib/g3-icap-client/src/reqmod/h2/preview.rs b/lib/g3-icap-client/src/reqmod/h2/preview.rs index f3d53106..13e34c96 100644 --- a/lib/g3-icap-client/src/reqmod/h2/preview.rs +++ b/lib/g3-icap-client/src/reqmod/h2/preview.rs @@ -90,7 +90,6 @@ impl H2RequestAdapter { let http_header = http_request.serialize_for_adapter(); let icap_header = self.build_preview_request(&http_request, http_header.len(), preview_size); - let has_trailer = http_request.headers().contains_key(http::header::TRAILER); let icap_w = &mut self.icap_connection.0; icap_w @@ -130,14 +129,12 @@ impl H2RequestAdapter { H2StreamToChunkedTransfer::new( &mut clt_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), ) } else { H2StreamToChunkedTransfer::with_chunk( &mut clt_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), initial_body_data, ) diff --git a/lib/g3-icap-client/src/reqmod/h2/recv_request.rs b/lib/g3-icap-client/src/reqmod/h2/recv_request.rs index c9194d61..3cd9b239 100644 --- a/lib/g3-icap-client/src/reqmod/h2/recv_request.rs +++ b/lib/g3-icap-client/src/reqmod/h2/recv_request.rs @@ -242,7 +242,6 @@ impl H2RequestAdapter { ) .await?; let trailers = icap_rsp.take_trailers(); - let has_trailer = !trailers.is_empty(); http_req.set_trailer(trailers); let final_req = orig_http_request.adapt_to(&http_req); @@ -257,7 +256,6 @@ impl H2RequestAdapter { &self.copy_config, self.http_body_line_max_size, self.http_trailer_max_size, - has_trailer, ); let idle_duration = self.idle_checker.idle_duration(); diff --git a/lib/g3-icap-client/src/reqmod/h2/recv_response.rs b/lib/g3-icap-client/src/reqmod/h2/recv_response.rs index cd9deabe..3f4c20bb 100644 --- a/lib/g3-icap-client/src/reqmod/h2/recv_response.rs +++ b/lib/g3-icap-client/src/reqmod/h2/recv_response.rs @@ -46,7 +46,6 @@ impl H2RequestAdapter { let mut http_rsp = HttpAdapterErrorResponse::parse(&mut self.icap_connection.1, http_header_size).await?; let trailers = icap_rsp.take_trailers(); - let has_trailer = !trailers.is_empty(); http_rsp.set_trailer(trailers); let recv_body = ReqmodRecvHttpResponseBody { icap_client: self.icap_client, @@ -55,7 +54,6 @@ impl H2RequestAdapter { copy_config: self.copy_config, http_body_line_max_size: self.http_body_line_max_size, http_trailer_max_size: self.http_trailer_max_size, - has_trailer, }; Ok((http_rsp, recv_body)) } diff --git a/lib/g3-icap-client/src/respmod/h2/bidirectional.rs b/lib/g3-icap-client/src/respmod/h2/bidirectional.rs index 3eb904c5..6334dbcc 100644 --- a/lib/g3-icap-client/src/respmod/h2/bidirectional.rs +++ b/lib/g3-icap-client/src/respmod/h2/bidirectional.rs @@ -137,7 +137,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpResponse<'a, I> { { let mut http_rsp = HttpAdaptedResponse::parse(self.icap_reader, http_header_size).await?; let trailers = self.icap_rsp.take_trailers(); - let has_trailer = !trailers.is_empty(); http_rsp.set_trailer(trailers); let final_rsp = orig_http_response.adapt_to(&http_rsp); @@ -153,7 +152,6 @@ impl<'a, I: IdleCheck> BidirectionalRecvHttpResponse<'a, I> { &self.copy_config, self.http_body_line_max_size, self.http_trailer_max_size, - has_trailer, ); let idle_duration = self.idle_checker.idle_duration(); diff --git a/lib/g3-icap-client/src/respmod/h2/forward_body.rs b/lib/g3-icap-client/src/respmod/h2/forward_body.rs index 4a8aa86f..57fbe6f5 100644 --- a/lib/g3-icap-client/src/respmod/h2/forward_body.rs +++ b/lib/g3-icap-client/src/respmod/h2/forward_body.rs @@ -72,7 +72,6 @@ impl H2ResponseAdapter { &http_response, http_rsp_header.len(), ); - let has_trailer = http_response.headers().contains_key(http::header::TRAILER); let icap_w = &mut self.icap_connection.0; icap_w @@ -87,7 +86,6 @@ impl H2ResponseAdapter { let mut body_transfer = H2StreamToChunkedTransfer::new( &mut ups_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), ); let bidirectional_transfer = BidirectionalRecvIcapResponse { diff --git a/lib/g3-icap-client/src/respmod/h2/preview.rs b/lib/g3-icap-client/src/respmod/h2/preview.rs index 23cb87ac..2f7a5e0c 100644 --- a/lib/g3-icap-client/src/respmod/h2/preview.rs +++ b/lib/g3-icap-client/src/respmod/h2/preview.rs @@ -109,7 +109,6 @@ impl H2ResponseAdapter { http_rsp_header.len(), preview_size, ); - let has_trailer = http_request.headers().contains_key(http::header::TRAILER); let icap_w = &mut self.icap_connection.0; icap_w @@ -148,14 +147,12 @@ impl H2ResponseAdapter { H2StreamToChunkedTransfer::new( &mut ups_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), ) } else { H2StreamToChunkedTransfer::with_chunk( &mut ups_body, &mut self.icap_connection.0, - has_trailer, self.copy_config.yield_size(), initial_body_data, ) diff --git a/lib/g3-icap-client/src/respmod/h2/recv_response.rs b/lib/g3-icap-client/src/respmod/h2/recv_response.rs index 94b12d5c..70778ac4 100644 --- a/lib/g3-icap-client/src/respmod/h2/recv_response.rs +++ b/lib/g3-icap-client/src/respmod/h2/recv_response.rs @@ -205,7 +205,6 @@ impl H2ResponseAdapter { let mut http_rsp = HttpAdaptedResponse::parse(&mut self.icap_connection.1, http_header_size).await?; let trailers = icap_rsp.take_trailers(); - let has_trailer = !trailers.is_empty(); http_rsp.set_trailer(trailers); let final_rsp = orig_http_response.adapt_to(&http_rsp); @@ -221,7 +220,6 @@ impl H2ResponseAdapter { &self.copy_config, self.http_body_line_max_size, self.http_trailer_max_size, - has_trailer, ); let idle_duration = self.idle_checker.idle_duration();