Skip to content

Commit

Permalink
add HttpBodyDecodeReader for h1
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Jun 13, 2024
1 parent 6c37a25 commit cde910c
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 21 deletions.
298 changes: 298 additions & 0 deletions lib/g3-http/src/body/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};

use g3_types::net::HttpHeaderMap;

use crate::{ChunkedDataDecodeReader, HttpBodyReader, TrailerReadError, TrailerReader};

enum HttpBodyDecodeState<'a, R> {
Plain(HttpBodyReader<'a, R>),
Chunked(ChunkedDataDecodeReader<'a, R>),
}

pub struct HttpBodyDecodeReader<'a, R> {
read_data_done: bool,
finished: bool,
decode_state: Option<HttpBodyDecodeState<'a, R>>,
}

impl<'a, R> HttpBodyDecodeReader<'a, R>
where
R: AsyncBufRead + Unpin,
{
fn new(state: HttpBodyDecodeState<'a, R>) -> Self {
HttpBodyDecodeReader {
read_data_done: false,
finished: false,
decode_state: Some(state),
}
}

pub fn new_read_until_end(stream: &'a mut R) -> Self {
HttpBodyDecodeReader::new(HttpBodyDecodeState::Plain(
HttpBodyReader::new_read_until_end(stream),
))
}

pub fn new_fixed_length(stream: &'a mut R, content_length: u64) -> Self {
HttpBodyDecodeReader::new(HttpBodyDecodeState::Plain(
HttpBodyReader::new_fixed_length(stream, content_length),
))
}

pub fn new_chunked(stream: &'a mut R, body_line_max_size: usize) -> Self {
HttpBodyDecodeReader::new(HttpBodyDecodeState::Chunked(ChunkedDataDecodeReader::new(
stream,
body_line_max_size,
)))
}

pub async fn trailer(
&mut self,
max_size: usize,
) -> Result<Option<HttpHeaderMap>, TrailerReadError> {
if !self.read_data_done {
return Err(TrailerReadError::ReadError(io::Error::other(
"data has not been read out yet",
)));
}
if self.finished {
return Ok(None);
}
let Some(state) = self.decode_state.take() else {
return Ok(None);
};

match state {
HttpBodyDecodeState::Plain(_) => Ok(None),
HttpBodyDecodeState::Chunked(decoder) => {
let headers = TrailerReader::new(decoder.into_reader(), max_size).await?;
self.finished = true;
if headers.is_empty() {
Ok(None)
} else {
Ok(Some(headers))
}
}
}
}

pub fn finished(&self) -> bool {
self.finished
}
}

impl<'a, R> AsyncRead for HttpBodyDecodeReader<'a, R>
where
R: AsyncBufRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.read_data_done {
return Poll::Ready(Ok(()));
}

let Some(reader) = self.decode_state.as_mut() else {
return Poll::Ready(Ok(()));
};

let prev_len = buf.filled().len();
match reader {
HttpBodyDecodeState::Plain(r) => {
ready!(Pin::new(r).poll_read(cx, buf))?;
if buf.filled().len() == prev_len {
self.read_data_done = true;
self.finished = true;
}
}
HttpBodyDecodeState::Chunked(c) => {
ready!(Pin::new(c).poll_read(cx, buf))?;
if buf.filled().len() == prev_len {
self.read_data_done = true;
}
}
}
Poll::Ready(Ok(()))
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tokio::io::{AsyncReadExt, BufReader, Result};
use tokio_util::io::StreamReader;

#[tokio::test]
async fn read_single_to_end() {
let content = b"test body";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader = HttpBodyDecodeReader::new_read_until_end(&mut buf_stream);

let mut buf = [0u8; 16];
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, content.len());
assert_eq!(&buf[0..len], content);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, 0);
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_split_to_end() {
let content1 = b"test body";
let content2 = b"hello world";
let stream = tokio_stream::iter(vec![
Result::Ok(Bytes::from_static(content1)),
Result::Ok(Bytes::from_static(content2)),
]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader = HttpBodyDecodeReader::new_read_until_end(&mut buf_stream);

let mut buf = [0u8; 32];
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, content1.len());
assert_eq!(&buf[0..len], content1);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, content2.len());
assert_eq!(&buf[0..len], content2);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, 0);
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_single_content_length() {
let body_len: usize = 9;
let content = b"test bodyxxxx";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader =
HttpBodyDecodeReader::new_fixed_length(&mut buf_stream, body_len as u64);

let mut buf = [0u8; 16];
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, body_len);
assert_eq!(&buf[0..len], &content[0..len]);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, 0);
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_split_content_length() {
let body_len: usize = 20;
let content1 = b"hello world";
let content2 = b"test bodyxxxx";
let stream = tokio_stream::iter(vec![
Result::Ok(Bytes::from_static(content1)),
Result::Ok(Bytes::from_static(content2)),
]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader =
HttpBodyDecodeReader::new_fixed_length(&mut buf_stream, body_len as u64);

let mut buf = [0u8; 32];
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, content1.len());
assert_eq!(&buf[0..len], content1);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, body_len - content1.len());
assert_eq!(&buf[0..len], &content2[0..len]);
let len = body_reader.read(&mut buf).await.unwrap();
assert_eq!(len, 0);
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_single_chunked() {
let body_len: usize = 9;
let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\n\r\nXXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader = HttpBodyDecodeReader::new_chunked(&mut buf_stream, 1024);

let mut buf = Vec::with_capacity(32);
tokio::io::copy(&mut body_reader, &mut buf).await.unwrap();
assert_eq!(buf.len(), body_len);
assert_eq!(&buf, b"test\nbody");
assert!(!body_reader.finished());
let header = body_reader.trailer(1024).await.unwrap();
assert!(header.is_none());
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_split_chunked() {
let body_len: usize = 9;
let content1 = b"5\r\ntest\n\r\n4\r";
let content2 = b"\nbody\r\n0\r\n\r\nXXX";
let stream = tokio_stream::iter(vec![
Result::Ok(Bytes::from_static(content1)),
Result::Ok(Bytes::from_static(content2)),
]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader = HttpBodyDecodeReader::new_chunked(&mut buf_stream, 1024);

let mut buf = Vec::with_capacity(32);
tokio::io::copy(&mut body_reader, &mut buf).await.unwrap();
assert_eq!(buf.len(), body_len);
assert_eq!(&buf, b"test\nbody");
assert!(!body_reader.finished());
let header = body_reader.trailer(1024).await.unwrap();
assert!(header.is_none());
assert!(body_reader.finished());
}

#[tokio::test]
async fn read_single_trailer() {
let body_len: usize = 9;
let content = b"5\r\ntest\n\r\n4\r\nbody\r\n0\r\nA: B\r\n\r\nXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let mut body_reader = HttpBodyDecodeReader::new_chunked(&mut buf_stream, 1024);

let mut buf = Vec::with_capacity(32);
tokio::io::copy(&mut body_reader, &mut buf).await.unwrap();
assert_eq!(buf.len(), body_len);
assert_eq!(&buf, b"test\nbody");
assert!(!body_reader.finished());
let header = body_reader.trailer(1024).await.unwrap();
assert!(header.is_some());
assert!(body_reader.finished());

let headers = header.unwrap();
let v = headers.get("a").unwrap();
assert_eq!(v.as_bytes(), b"B");
}
}
3 changes: 3 additions & 0 deletions lib/g3-http/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub enum HttpBodyType {
mod reader;
pub use reader::HttpBodyReader;

mod decoder;
pub use decoder::HttpBodyDecodeReader;

mod preview;
pub use preview::{PreviewData, PreviewDataState, PreviewError};

Expand Down
36 changes: 36 additions & 0 deletions lib/g3-http/src/body/trailer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl TrailerReaderInternal {
let value = HttpHeaderValue::from_str(header.value).map_err(|_| {
TrailerReadError::InvalidHeaderLine(HttpLineParseError::InvalidHeaderValue)
})?;
self.cached_line.clear();
self.headers.append(name, value);
}
}
Expand Down Expand Up @@ -154,3 +155,38 @@ where
me.internal.poll_read(cx, Pin::new(&mut me.reader))
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tokio::io::{BufReader, Result};
use tokio_util::io::StreamReader;

#[tokio::test]
async fn empty() {
let content = b"\r\nXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let trailer_reader = TrailerReader::new(&mut buf_stream, 1024);

let headers = trailer_reader.await.unwrap();
assert!(headers.is_empty());
}

#[tokio::test]
async fn single() {
let content = b"A: B\r\n\r\nXX";
let stream = tokio_stream::iter(vec![Result::Ok(Bytes::from_static(content))]);
let stream = StreamReader::new(stream);
let mut buf_stream = BufReader::new(stream);
let trailer_reader = TrailerReader::new(&mut buf_stream, 1024);

let headers = trailer_reader.await.unwrap();
assert!(!headers.is_empty());

let v = headers.get("a").unwrap();
assert_eq!(v.as_bytes(), b"B");
}
}
5 changes: 3 additions & 2 deletions lib/g3-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ pub use parse::{

mod body;
pub use body::{
ChunkedDataDecodeReader, H1BodyToChunkedTransfer, HttpBodyReader, HttpBodyType, PreviewData,
PreviewDataState, PreviewError, StreamToChunkedTransfer, TrailerReadError, TrailerReader,
ChunkedDataDecodeReader, H1BodyToChunkedTransfer, HttpBodyDecodeReader, HttpBodyReader,
HttpBodyType, PreviewData, PreviewDataState, PreviewError, StreamToChunkedTransfer,
TrailerReadError, TrailerReader,
};

pub mod client;
Expand Down
Loading

0 comments on commit cde910c

Please sign in to comment.