Skip to content

Commit

Permalink
feat(volo-thrift): use afit and rpitit to optimize codec (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu authored Oct 17, 2023
1 parent 5b06e2c commit fc050e4
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 14 deletions.
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl<D: ZeroCopyDecoder> FramedDecoder<D> {
/// https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#compatibility
pub const HEADER_DETECT_LENGTH: usize = 6;

#[async_trait::async_trait]
impl<D> ZeroCopyDecoder for FramedDecoder<D>
where
D: ZeroCopyDecoder,
Expand Down
9 changes: 4 additions & 5 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
//! [Kitex]: https://github.com/cloudwego/kitex
//! [TTHeader]: https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/
//! [Framed]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport
use std::future::Future;

use bytes::Bytes;
use linkedbytes::LinkedBytes;
use pilota::thrift::{DecodeError, EncodeError, TransportError};
Expand Down Expand Up @@ -73,7 +75,6 @@ pub trait ZeroCopyEncoder: Send + Sync + 'static {
/// [`ZeroCopyDecoder`] tries to decode a message without copying large data, so the [`Bytes`] in
/// the [`decode`] method is not designed to be reused, and the implementation can use
/// `Bytes::split_to` to get a [`Bytes`] and hand it to the user directly.
#[async_trait::async_trait]
pub trait ZeroCopyDecoder: Send + Sync + 'static {
/// If the outer decoder is framed, it can reads all the payload into a [`Bytes`] and
/// call this function for better performance.
Expand All @@ -85,15 +86,15 @@ pub trait ZeroCopyDecoder: Send + Sync + 'static {

/// The [`DefaultDecoder`] will always call `decode_async`, so the most outer decoder
/// must implement this function.
async fn decode_async<
fn decode_async<
Msg: Send + EntryMessage,
Cx: ThriftContext,
R: AsyncRead + Unpin + Send + Sync,
>(
&mut self,
cx: &mut Cx,
reader: &mut BufReader<R>,
) -> Result<Option<ThriftMessage<Msg>>, DecodeError>;
) -> impl Future<Output = Result<Option<ThriftMessage<Msg>>, DecodeError>> + Send;
}

/// [`MakeZeroCopyCodec`] is used to create a [`ZeroCopyEncoder`] and a [`ZeroCopyDecoder`].
Expand All @@ -112,7 +113,6 @@ pub struct DefaultEncoder<E, W> {
linked_bytes: LinkedBytes,
}

#[async_trait::async_trait]
impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
for DefaultEncoder<E, W>
{
Expand Down Expand Up @@ -184,7 +184,6 @@ pub struct DefaultDecoder<D, R> {
reader: BufReader<R>,
}

#[async_trait::async_trait]
impl<D: ZeroCopyDecoder, R: AsyncRead + Unpin + Send + Sync + 'static> Decoder
for DefaultDecoder<D, R>
{
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ impl Default for ThriftCodec {
}
}

#[async_trait::async_trait]
impl ZeroCopyDecoder for ThriftCodec {
#[inline]
fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/ttheader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl<D: ZeroCopyDecoder> TTHeaderDecoder<D> {
/// https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/
pub const HEADER_DETECT_LENGTH: usize = 6;

#[async_trait::async_trait]
impl<D> ZeroCopyDecoder for TTHeaderDecoder<D>
where
D: ZeroCopyDecoder,
Expand Down
12 changes: 6 additions & 6 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use tokio::io::{AsyncRead, AsyncWrite};

use crate::{context::ThriftContext, EntryMessage, ThriftMessage};
Expand All @@ -12,24 +14,22 @@ pub use default::DefaultMakeCodec;
/// Returning an Ok(None) indicates the EOF has been reached.
///
/// Note: [`Decoder`] should be designed to be ready for reuse.
#[async_trait::async_trait]
pub trait Decoder: Send + 'static {
async fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
) -> Result<Option<ThriftMessage<Msg>>, crate::Error>;
) -> impl Future<Output = Result<Option<ThriftMessage<Msg>>, crate::Error>> + Send;
}

/// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data.
///
/// Note: [`Encoder`] should be designed to be ready for reuse.
#[async_trait::async_trait]
pub trait Encoder: Send + 'static {
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> Result<(), crate::Error>;
) -> impl Future<Output = Result<(), crate::Error>> + Send;
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down

0 comments on commit fc050e4

Please sign in to comment.