Skip to content

Commit

Permalink
Merge pull request #167 from junkurihara/develop
Browse files Browse the repository at this point in the history
0.8.1
  • Loading branch information
junkurihara authored Jul 10, 2024
2 parents 8c9b542 + e3af5c4 commit 03ea24c
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 43 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## 0.9.0 (Unreleased)

## 0.8.1

### Improvement

- Refactor: lots of minor improvements
- Deps

## 0.8.0

### Important Changes
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.8.0"
version = "0.8.1"
authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy"
Expand Down
6 changes: 3 additions & 3 deletions rpxy-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [
mimalloc = { version = "*", default-features = false }
anyhow = "1.0.86"
rustc-hash = "2.0.0"
serde = { version = "1.0.203", default-features = false, features = ["derive"] }
serde = { version = "1.0.204", default-features = false, features = ["derive"] }
tokio = { version = "1.38.0", default-features = false, features = [
"net",
"rt-multi-thread",
"time",
"sync",
"macros",
] }
async-trait = "0.1.80"
async-trait = "0.1.81"


# config
clap = { version = "4.5.7", features = ["std", "cargo", "wrap_help"] }
clap = { version = "4.5.9", features = ["std", "cargo", "wrap_help"] }
toml = { version = "0.8.14", default-features = false, features = ["parse"] }
hot_reload = "0.1.5"

Expand Down
6 changes: 3 additions & 3 deletions rpxy-certs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ tracing = { version = "0.1.40" }
derive_builder = { version = "0.20.0" }
thiserror = { version = "1.0.61" }
hot_reload = { version = "0.1.5" }
async-trait = { version = "0.1.80" }
rustls = { version = "0.23.10", default-features = false, features = [
async-trait = { version = "0.1.81" }
rustls = { version = "0.23.11", default-features = false, features = [
"std",
"aws_lc_rs",
] }
rustls-pemfile = { version = "2.1.2" }
rustls-webpki = { version = "0.102.4", default-features = false, features = [
rustls-webpki = { version = "0.102.5", default-features = false, features = [
"std",
"aws_lc_rs",
] }
Expand Down
24 changes: 13 additions & 11 deletions rpxy-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ publish.workspace = true
default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"]
http3-s2n = [
"h3",
"s2n-quic",
"s2n-quic-core",
"s2n-quic-rustls",
"s2n-quic-h3",
"rpxy-certs/http3",
"h3",
]
cache = ["http-cache-semantics", "lru", "sha2", "base64"]
sticky-cookie = ["base64", "sha2", "chrono"]
Expand All @@ -45,7 +45,7 @@ tokio = { version = "1.38.0", default-features = false, features = [
"fs",
] }
pin-project-lite = "0.2.14"
async-trait = "0.1.80"
async-trait = "0.1.81"

# Error handling
anyhow = "1.0.86"
Expand All @@ -54,8 +54,8 @@ thiserror = "1.0.61"
# http for both server and client
http = "1.1.0"
http-body-util = "0.1.2"
hyper = { version = "1.3.1", default-features = false }
hyper-util = { version = "0.1.5", features = ["full"] }
hyper = { version = "1.4.1", default-features = false }
hyper-util = { version = "0.1.6", features = ["full"] }
futures-util = { version = "0.3.30", default-features = false }
futures-channel = { version = "0.3.30", default-features = false }

Expand All @@ -76,22 +76,24 @@ hyper-rustls = { git = "https://github.com/junkurihara/hyper-rustls", branch = "
# tls and cert management for server
rpxy-certs = { path = "../rpxy-certs/", default-features = false }
hot_reload = "0.1.5"
rustls = { version = "0.23.10", default-features = false }
rustls = { version = "0.23.11", default-features = false }
tokio-rustls = { version = "0.26.0", features = ["early-data"] }

# logging
tracing = { version = "0.1.40" }

# http/3
quinn = { version = "0.11.2", optional = true }
h3 = { version = "0.0.5", optional = true }
h3-quinn = { version = "0.0.6", optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true }
s2n-quic = { version = "1.41.0", default-features = false, features = [
h3 = { version = "0.0.6", features = ["tracing"], optional = true }
h3-quinn = { version = "0.0.7", optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", features = [
"tracing",
], optional = true }
s2n-quic = { version = "1.42.0", default-features = false, features = [
"provider-tls-rustls",
], optional = true }
s2n-quic-core = { version = "0.41.0", default-features = false, optional = true }
s2n-quic-rustls = { version = "0.41.0", optional = true }
s2n-quic-core = { version = "0.42.0", default-features = false, optional = true }
s2n-quic-rustls = { version = "0.42.0", optional = true }
##########
# for UDP socket wit SO_REUSEADDR when h3 with quinn
socket2 = { version = "0.5.7", features = ["all"], optional = true }
Expand Down
3 changes: 3 additions & 0 deletions rpxy-lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub enum RpxyError {
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
#[error("H3 error: {0}")]
H3Error(#[from] h3::Error),
// #[cfg(feature = "http3-s2n")]
// #[error("H3 error: {0}")]
// H3Error(#[from] s2n_quic_h3::h3::Error),
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
#[error("Exceeds max request body size for HTTP/3")]
H3TooLargeBody,
Expand Down
10 changes: 5 additions & 5 deletions rpxy-lib/src/proxy/proxy_h3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use hyper_util::client::legacy::connect::Connect;
use std::net::SocketAddr;

#[cfg(feature = "http3-quinn")]
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream};
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream};

impl<T> Proxy<T>
where
Expand All @@ -28,9 +28,9 @@ where
) -> RpxyResult<()>
where
C: ConnectionQuic<Bytes>,
<C as ConnectionQuic<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
<C as OpenStreams<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static,
<<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send,
<<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
{
let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?;
info!(
Expand Down
10 changes: 7 additions & 3 deletions submodules/s2n-quic-h3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ publish = false
[dependencies]
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false }
h3 = "0.0.5"
h3 = { version = "0.0.6", features = ["tracing"] }
# s2n-quic = { path = "../s2n-quic" }
# s2n-quic-core = { path = "../s2n-quic-core" }
s2n-quic = { version = "1.41.0" }
s2n-quic-core = { version = "0.41.0" }
s2n-quic = { version = "1.42.0" }
s2n-quic-core = { version = "0.42.0" }
tracing = { version = "0.1.40", optional = true }

[features]
tracing = ["dep:tracing"]
60 changes: 43 additions & 17 deletions submodules/s2n-quic-h3/src/s2n_quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::{
task::{self, Poll},
};

#[cfg(feature = "tracing")]
use tracing::instrument;

pub struct Connection {
conn: s2n_quic::connection::Handle,
bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor,
Expand Down Expand Up @@ -66,27 +69,27 @@ impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type OpenStreams = OpenStreams;
type Error = ConnectionError;
type AcceptError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? {
Some(x) => x,
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? {
Some(x) => x.split(),
None => return Poll::Ready(Ok(None)),
Expand All @@ -97,28 +100,41 @@ where
})))
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}
}

impl<B> quic::OpenStreams<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type OpenError = ConnectionError;


#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand All @@ -138,25 +154,27 @@ where
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
type OpenError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand Down Expand Up @@ -271,6 +289,7 @@ impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -279,13 +298,15 @@ impl quic::RecvStream for RecvStream {
Ok(buf).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn stop_sending(&mut self, error_code: u64) {
let _ = self.stream.stop_sending(
s2n_quic::application::Error::new(error_code)
.expect("s2n-quic supports error codes up to 2^62-1"),
);
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn recv_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down Expand Up @@ -369,6 +390,7 @@ where
{
type Error = SendStreamError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// try to flush the current chunk if we have one
Expand Down Expand Up @@ -409,6 +431,7 @@ where
// Poll::Ready(Ok(()))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.buf.is_some() {
return Err(Self::Error::NotReady);
Expand All @@ -427,19 +450,22 @@ where
// Ok(())
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// ensure all chunks are flushed to the QUIC stream before finishing
ready!(self.poll_ready(cx))?;
self.stream.finish()?;
Ok(()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
.reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into()));
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down

0 comments on commit 03ea24c

Please sign in to comment.