diff --git a/g3proxy/src/escape/proxy_float/peer/http/http_connect/mod.rs b/g3proxy/src/escape/proxy_float/peer/http/http_connect/mod.rs index 06b2faff8..27638e55f 100644 --- a/g3proxy/src/escape/proxy_float/peer/http/http_connect/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/http/http_connect/mod.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use anyhow::anyhow; use tokio::io::{AsyncRead, AsyncWrite, BufReader}; -use tokio::net::tcp; +use tokio::net::TcpStream; use g3_daemon::stat::remote::{ ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStatsWrapper, }; use g3_http::connect::{HttpConnectRequest, HttpConnectResponse}; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedReader, LimitedStream, LimitedWriter}; use g3_openssl::SslConnector; use g3_types::net::{Host, OpensslClientConfig}; @@ -40,41 +40,30 @@ impl ProxyFloatHttpPeer { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - BufReader>, - LimitedWriter, - ), - TcpConnectError, - > { - let (r, mut w) = self.tcp_new_connection(tcp_notes, task_notes).await?; + ) -> Result>, TcpConnectError> { + let mut stream = self.tcp_new_connection(tcp_notes, task_notes).await?; let req = HttpConnectRequest::new(&tcp_notes.upstream, &self.shared_config.append_http_headers); - req.send(&mut w) + req.send(&mut stream) .await .map_err(TcpConnectError::NegotiationWriteFailed)?; - let mut r = BufReader::new(r); - let _ = HttpConnectResponse::recv(&mut r, self.http_connect_rsp_hdr_max_size).await?; + let mut buf_stream = BufReader::new(stream); + let _ = + HttpConnectResponse::recv(&mut buf_stream, self.http_connect_rsp_hdr_max_size).await?; // TODO detect and set outgoing_addr and target_addr for supported remote proxies // set with the registered public ip by default - Ok((r, w)) + Ok(buf_stream) } pub(super) async fn timed_http_connect_tcp_connect_to<'a>( &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - BufReader>, - LimitedWriter, - ), - TcpConnectError, - > { + ) -> Result>, TcpConnectError> { tokio::time::timeout( self.escaper_config.peer_negotiation_timeout, self.http_connect_tcp_connect_to(tcp_notes, task_notes), @@ -89,12 +78,12 @@ impl ProxyFloatHttpPeer { task_notes: &'a ServerTaskNotes, task_stats: ArcTcpConnectionTaskRemoteStats, ) -> TcpConnectResult { - let (mut r, mut w) = self + let mut buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; // add in read buffered data - let r_buffer_size = r.buffer().len() as u64; + let r_buffer_size = buf_stream.buffer().len() as u64; task_stats.add_read_bytes(r_buffer_size); let mut wrapper_stats = TcpConnectRemoteWrapperStats::new(&self.escaper_stats, task_stats); let user_stats = self.fetch_user_upstream_io_stats(task_notes); @@ -105,9 +94,9 @@ impl ProxyFloatHttpPeer { let wrapper_stats = Arc::new(wrapper_stats); // reset underlying io stats - r.get_mut().reset_stats(wrapper_stats.clone() as _); - w.reset_stats(wrapper_stats as _); + buf_stream.get_mut().reset_stats(wrapper_stats.clone()); + let (r, w) = tokio::io::split(buf_stream); Ok((Box::new(r), Box::new(w))) } @@ -119,14 +108,14 @@ impl ProxyFloatHttpPeer { tls_name: &'a Host, tls_application: TlsApplication, ) -> Result { - let (ups_r, ups_w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; let ssl = tls_config .build_ssl(tls_name, tcp_notes.upstream.port()) .map_err(TcpConnectError::InternalTlsClientError)?; - let connector = SslConnector::new(ssl, tokio::io::join(ups_r, ups_w)) + let connector = SslConnector::new(ssl, buf_stream.into_inner()) .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(tls_config.handshake_timeout, connector.connect()).await { diff --git a/g3proxy/src/escape/proxy_float/peer/http/http_forward/mod.rs b/g3proxy/src/escape/proxy_float/peer/http/http_forward/mod.rs index 21af5daab..b62581d61 100644 --- a/g3proxy/src/escape/proxy_float/peer/http/http_forward/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/http/http_forward/mod.rs @@ -44,7 +44,8 @@ impl ProxyFloatHttpPeer { task_notes: &'a ServerTaskNotes, task_stats: ArcHttpForwardTaskRemoteStats, ) -> Result { - let (ups_r, mut ups_w) = self.tcp_new_connection(tcp_notes, task_notes).await?; + let stream = self.tcp_new_connection(tcp_notes, task_notes).await?; + let (ups_r, mut ups_w) = stream.into_split_tcp(); let mut w_wrapper_stats = HttpForwardRemoteWrapperStats::new(&self.escaper_stats, &task_stats); diff --git a/g3proxy/src/escape/proxy_float/peer/http/tcp_connect/mod.rs b/g3proxy/src/escape/proxy_float/peer/http/tcp_connect/mod.rs index 79f7ceeaa..f4e00d737 100644 --- a/g3proxy/src/escape/proxy_float/peer/http/tcp_connect/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/http/tcp_connect/mod.rs @@ -16,10 +16,10 @@ use std::net::{IpAddr, SocketAddr}; -use tokio::net::{tcp, TcpStream}; +use tokio::net::TcpStream; use tokio::time::Instant; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::LimitedStream; use g3_types::net::ConnectError; use super::ProxyFloatHttpPeer; @@ -105,30 +105,18 @@ impl ProxyFloatHttpPeer { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - LimitedReader, - LimitedWriter, - ), - TcpConnectError, - > { + ) -> Result, TcpConnectError> { let stream = self.tcp_connect_to(tcp_notes, task_notes).await?; - let (r, w) = stream.into_split(); let limit_config = &self.shared_config.tcp_conn_speed_limit; - let r = LimitedReader::new( - r, + let stream = LimitedStream::new( + stream, limit_config.shift_millis, limit_config.max_south, - self.escaper_stats.clone() as _, - ); - let w = LimitedWriter::new( - w, - limit_config.shift_millis, limit_config.max_north, - self.escaper_stats.clone() as _, + self.escaper_stats.clone(), ); - Ok((r, w)) + Ok(stream) } } diff --git a/g3proxy/src/escape/proxy_float/peer/https/http_connect/mod.rs b/g3proxy/src/escape/proxy_float/peer/https/http_connect/mod.rs index f1e179538..06e070183 100644 --- a/g3proxy/src/escape/proxy_float/peer/https/http_connect/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/https/http_connect/mod.rs @@ -37,29 +37,30 @@ impl ProxyFloatHttpsPeer { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(BufReader, impl AsyncWrite), TcpConnectError> { - let (r, mut w) = self.tls_handshake_with(tcp_notes, task_notes).await?; + ) -> Result, TcpConnectError> { + let mut stream = self.tls_handshake_with(tcp_notes, task_notes).await?; let req = HttpConnectRequest::new(&tcp_notes.upstream, &self.shared_config.append_http_headers); - req.send(&mut w) + req.send(&mut stream) .await .map_err(TcpConnectError::NegotiationWriteFailed)?; - let mut r = BufReader::new(r); - let _ = HttpConnectResponse::recv(&mut r, self.http_connect_rsp_hdr_max_size).await?; + let mut buf_stream = BufReader::new(stream); + let _ = + HttpConnectResponse::recv(&mut buf_stream, self.http_connect_rsp_hdr_max_size).await?; // TODO detect and set outgoing_addr and target_addr for supported remote proxies // set with the registered public ip by default - Ok((r, w)) + Ok(buf_stream) } pub(super) async fn timed_http_connect_tcp_connect_to<'a>( &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(BufReader, impl AsyncWrite), TcpConnectError> { + ) -> Result, TcpConnectError> { tokio::time::timeout( self.escaper_config.peer_negotiation_timeout, self.http_connect_tcp_connect_to(tcp_notes, task_notes), @@ -74,13 +75,13 @@ impl ProxyFloatHttpsPeer { task_notes: &'a ServerTaskNotes, task_stats: ArcTcpConnectionTaskRemoteStats, ) -> TcpConnectResult { - let (r, w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; // add task and user stats // add in read buffered data - let r_buffer_size = r.buffer().len() as u64; + let r_buffer_size = buf_stream.buffer().len() as u64; task_stats.add_read_bytes(r_buffer_size); let mut wrapper_stats = TcpConnectionTaskRemoteStatsWrapper::new(task_stats); let user_stats = self.fetch_user_upstream_io_stats(task_notes); @@ -90,6 +91,7 @@ impl ProxyFloatHttpsPeer { wrapper_stats.push_other_stats(user_stats); let wrapper_stats = Arc::new(wrapper_stats); + let (r, w) = tokio::io::split(buf_stream); let r = LimitedReader::new_unlimited(r, wrapper_stats.clone() as _); let w = LimitedWriter::new_unlimited(w, wrapper_stats as _); @@ -104,14 +106,14 @@ impl ProxyFloatHttpsPeer { tls_name: &'a Host, tls_application: TlsApplication, ) -> Result { - let (ups_r, ups_w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; let ssl = tls_config .build_ssl(tls_name, tcp_notes.upstream.port()) .map_err(TcpConnectError::InternalTlsClientError)?; - let connector = SslConnector::new(ssl, tokio::io::join(ups_r, ups_w)) + let connector = SslConnector::new(ssl, buf_stream.into_inner()) .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(tls_config.handshake_timeout, connector.connect()).await { @@ -160,7 +162,6 @@ impl ProxyFloatHttpsPeer { TlsApplication::TcpStream, ) .await?; - let (ups_r, ups_w) = tokio::io::split(tls_stream); // add task and user stats diff --git a/g3proxy/src/escape/proxy_float/peer/https/http_forward/mod.rs b/g3proxy/src/escape/proxy_float/peer/https/http_forward/mod.rs index c00076a88..0ee0657ae 100644 --- a/g3proxy/src/escape/proxy_float/peer/https/http_forward/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/https/http_forward/mod.rs @@ -40,7 +40,8 @@ impl ProxyFloatHttpsPeer { task_notes: &'a ServerTaskNotes, task_stats: ArcHttpForwardTaskRemoteStats, ) -> Result { - let (ups_r, ups_w) = self.tls_handshake_with(tcp_notes, task_notes).await?; + let stream = self.tls_handshake_with(tcp_notes, task_notes).await?; + let (ups_r, ups_w) = tokio::io::split(stream); // add task and user stats let mut wrapper_stats = HttpForwardTaskRemoteWrapperStats::new(task_stats); diff --git a/g3proxy/src/escape/proxy_float/peer/https/tls_handshake/mod.rs b/g3proxy/src/escape/proxy_float/peer/https/tls_handshake/mod.rs index 40954ce33..3a7fc5887 100644 --- a/g3proxy/src/escape/proxy_float/peer/https/tls_handshake/mod.rs +++ b/g3proxy/src/escape/proxy_float/peer/https/tls_handshake/mod.rs @@ -31,7 +31,7 @@ impl ProxyFloatHttpsPeer { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(impl AsyncRead, impl AsyncWrite), TcpConnectError> { + ) -> Result { let stream = self.tcp_new_connection(tcp_notes, task_notes).await?; let ssl = self @@ -42,10 +42,7 @@ impl ProxyFloatHttpsPeer { .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(self.tls_config.handshake_timeout, connector.connect()).await { - Ok(Ok(stream)) => { - let (r, w) = tokio::io::split(stream); - Ok((r, w)) - } + Ok(Ok(stream)) => Ok(stream), Ok(Err(e)) => { let e = anyhow::Error::new(e); let tls_peer = UpstreamAddr::from_ip_and_port(self.addr.ip(), self.addr.port()); diff --git a/g3proxy/src/escape/proxy_http/http_connect/mod.rs b/g3proxy/src/escape/proxy_http/http_connect/mod.rs index 023f4bb2f..93bf25391 100644 --- a/g3proxy/src/escape/proxy_http/http_connect/mod.rs +++ b/g3proxy/src/escape/proxy_http/http_connect/mod.rs @@ -18,13 +18,13 @@ use std::sync::Arc; use anyhow::anyhow; use tokio::io::{AsyncRead, AsyncWrite, BufReader}; -use tokio::net::tcp; +use tokio::net::TcpStream; use g3_daemon::stat::remote::{ ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStatsWrapper, }; use g3_http::connect::{HttpConnectRequest, HttpConnectResponse}; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedReader, LimitedStream, LimitedWriter}; use g3_openssl::SslConnector; use g3_types::net::{Host, OpensslClientConfig}; @@ -40,14 +40,8 @@ impl ProxyHttpEscaper { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - BufReader>, - LimitedWriter, - ), - TcpConnectError, - > { - let (r, mut w) = self.tcp_new_connection(tcp_notes, task_notes).await?; + ) -> Result>, TcpConnectError> { + let mut stream = self.tcp_new_connection(tcp_notes, task_notes).await?; let mut req = HttpConnectRequest::new(&tcp_notes.upstream, &self.config.append_http_headers); @@ -59,30 +53,25 @@ impl ProxyHttpEscaper { } } - req.send(&mut w) + req.send(&mut stream) .await .map_err(TcpConnectError::NegotiationWriteFailed)?; - let mut r = BufReader::new(r); + let mut buf_stream = BufReader::new(stream); let _ = - HttpConnectResponse::recv(&mut r, self.config.http_connect_rsp_hdr_max_size).await?; + HttpConnectResponse::recv(&mut buf_stream, self.config.http_connect_rsp_hdr_max_size) + .await?; // TODO detect and set outgoing_addr and target_addr for supported remote proxies - Ok((r, w)) + Ok(buf_stream) } pub(super) async fn timed_http_connect_tcp_connect_to<'a>( &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - BufReader>, - LimitedWriter, - ), - TcpConnectError, - > { + ) -> Result>, TcpConnectError> { tokio::time::timeout( self.config.peer_negotiation_timeout, self.http_connect_tcp_connect_to(tcp_notes, task_notes), @@ -97,12 +86,12 @@ impl ProxyHttpEscaper { task_notes: &'a ServerTaskNotes, task_stats: ArcTcpConnectionTaskRemoteStats, ) -> TcpConnectResult { - let (mut r, mut w) = self + let mut buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; // add in read buffered data - let r_buffer_size = r.buffer().len() as u64; + let r_buffer_size = buf_stream.buffer().len() as u64; task_stats.add_read_bytes(r_buffer_size); let mut wrapper_stats = TcpConnectRemoteWrapperStats::new(&self.stats, task_stats); let user_stats = self.fetch_user_upstream_io_stats(task_notes); @@ -113,9 +102,9 @@ impl ProxyHttpEscaper { let wrapper_stats = Arc::new(wrapper_stats); // reset underlying io stats - r.get_mut().reset_stats(wrapper_stats.clone() as _); - w.reset_stats(wrapper_stats as _); + buf_stream.get_mut().reset_stats(wrapper_stats.clone()); + let (r, w) = tokio::io::split(buf_stream); Ok((Box::new(r), Box::new(w))) } @@ -127,16 +116,14 @@ impl ProxyHttpEscaper { tls_name: &'a Host, tls_application: TlsApplication, ) -> Result { - let (ups_r, ups_w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; - // the buffer in ups_r should be empty as this is a tls connection - let ssl = tls_config .build_ssl(tls_name, tcp_notes.upstream.port()) .map_err(TcpConnectError::InternalTlsClientError)?; - let connector = SslConnector::new(ssl, tokio::io::join(ups_r, ups_w)) + let connector = SslConnector::new(ssl, buf_stream.into_inner()) .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(tls_config.handshake_timeout, connector.connect()).await { diff --git a/g3proxy/src/escape/proxy_http/http_forward/mod.rs b/g3proxy/src/escape/proxy_http/http_forward/mod.rs index 0a6185dc6..2ae6d527f 100644 --- a/g3proxy/src/escape/proxy_http/http_forward/mod.rs +++ b/g3proxy/src/escape/proxy_http/http_forward/mod.rs @@ -41,7 +41,8 @@ impl ProxyHttpEscaper { task_notes: &'a ServerTaskNotes, task_stats: ArcHttpForwardTaskRemoteStats, ) -> Result { - let (ups_r, mut ups_w) = self.tcp_new_connection(tcp_notes, task_notes).await?; + let stream = self.tcp_new_connection(tcp_notes, task_notes).await?; + let (ups_r, mut ups_w) = stream.into_split_tcp(); // add task and user stats let mut w_wrapper_stats = HttpForwardRemoteWrapperStats::new(&self.stats, &task_stats); diff --git a/g3proxy/src/escape/proxy_http/tcp_connect/mod.rs b/g3proxy/src/escape/proxy_http/tcp_connect/mod.rs index f84aef84f..128c450dc 100644 --- a/g3proxy/src/escape/proxy_http/tcp_connect/mod.rs +++ b/g3proxy/src/escape/proxy_http/tcp_connect/mod.rs @@ -17,11 +17,11 @@ use std::net::{IpAddr, SocketAddr}; use tokio::io::AsyncWriteExt; -use tokio::net::{tcp, TcpSocket, TcpStream}; +use tokio::net::{TcpSocket, TcpStream}; use tokio::task::JoinSet; use tokio::time::Instant; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::LimitedStream; use g3_types::net::{ConnectError, Host, ProxyProtocolEncoder}; use super::ProxyHttpEscaper; @@ -292,28 +292,16 @@ impl ProxyHttpEscaper { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result< - ( - LimitedReader, - LimitedWriter, - ), - TcpConnectError, - > { + ) -> Result, TcpConnectError> { let stream = self.tcp_connect_to(tcp_notes, task_notes).await?; - let (r, w) = stream.into_split(); let limit_config = &self.config.general.tcp_sock_speed_limit; - let r = LimitedReader::new( - r, + let mut stream = LimitedStream::new( + stream, limit_config.shift_millis, limit_config.max_south, - self.stats.clone() as _, - ); - let mut w = LimitedWriter::new( - w, - limit_config.shift_millis, limit_config.max_north, - self.stats.clone() as _, + self.stats.clone(), ); if let Some(version) = self.config.use_proxy_protocol { @@ -321,11 +309,12 @@ impl ProxyHttpEscaper { let bytes = encoder .encode_tcp(task_notes.client_addr(), task_notes.server_addr()) .map_err(TcpConnectError::ProxyProtocolEncodeError)?; - w.write_all(bytes) + stream + .write_all(bytes) .await .map_err(TcpConnectError::ProxyProtocolWriteFailed)?; } - Ok((r, w)) + Ok(stream) } } diff --git a/g3proxy/src/escape/proxy_https/http_connect/mod.rs b/g3proxy/src/escape/proxy_https/http_connect/mod.rs index fc30a8bce..4b6eafb84 100644 --- a/g3proxy/src/escape/proxy_https/http_connect/mod.rs +++ b/g3proxy/src/escape/proxy_https/http_connect/mod.rs @@ -37,8 +37,8 @@ impl ProxyHttpsEscaper { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(BufReader, impl AsyncWrite), TcpConnectError> { - let (r, mut w) = self.tls_handshake_to_remote(tcp_notes, task_notes).await?; + ) -> Result, TcpConnectError> { + let mut stream = self.tls_handshake_to_remote(tcp_notes, task_notes).await?; let mut req = HttpConnectRequest::new(&tcp_notes.upstream, &self.config.append_http_headers); @@ -50,24 +50,25 @@ impl ProxyHttpsEscaper { } } - req.send(&mut w) + req.send(&mut stream) .await .map_err(TcpConnectError::NegotiationWriteFailed)?; - let mut r = BufReader::new(r); + let mut buf_stream = BufReader::new(stream); let _ = - HttpConnectResponse::recv(&mut r, self.config.http_connect_rsp_hdr_max_size).await?; + HttpConnectResponse::recv(&mut buf_stream, self.config.http_connect_rsp_hdr_max_size) + .await?; // TODO detect and set outgoing_addr and target_addr for supported remote proxies - Ok((r, w)) + Ok(buf_stream) } pub(super) async fn timed_http_connect_tcp_connect_to<'a>( &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(BufReader, impl AsyncWrite), TcpConnectError> { + ) -> Result, TcpConnectError> { tokio::time::timeout( self.config.peer_negotiation_timeout, self.http_connect_tcp_connect_to(tcp_notes, task_notes), @@ -82,13 +83,13 @@ impl ProxyHttpsEscaper { task_notes: &'a ServerTaskNotes, task_stats: ArcTcpConnectionTaskRemoteStats, ) -> TcpConnectResult { - let (r, w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; // add task and user stats // add in read buffered data - let r_buffer_size = r.buffer().len() as u64; + let r_buffer_size = buf_stream.buffer().len() as u64; task_stats.add_read_bytes(r_buffer_size); let mut wrapper_stats = TcpConnectionTaskRemoteStatsWrapper::new(task_stats); let user_stats = self.fetch_user_upstream_io_stats(task_notes); @@ -98,6 +99,7 @@ impl ProxyHttpsEscaper { wrapper_stats.push_other_stats(user_stats); let wrapper_stats = Arc::new(wrapper_stats); + let (r, w) = tokio::io::split(buf_stream); let r = LimitedReader::new_unlimited(r, wrapper_stats.clone() as _); let w = LimitedWriter::new_unlimited(w, wrapper_stats as _); @@ -112,16 +114,14 @@ impl ProxyHttpsEscaper { tls_name: &'a Host, tls_application: TlsApplication, ) -> Result { - let (ups_r, ups_w) = self + let buf_stream = self .timed_http_connect_tcp_connect_to(tcp_notes, task_notes) .await?; - // the buffer in ups_r should be empty as this is a tls connection - let ssl = tls_config .build_ssl(tls_name, tcp_notes.upstream.port()) .map_err(TcpConnectError::InternalTlsClientError)?; - let connector = SslConnector::new(ssl, tokio::io::join(ups_r, ups_w)) + let connector = SslConnector::new(ssl, buf_stream.into_inner()) .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(tls_config.handshake_timeout, connector.connect()).await { diff --git a/g3proxy/src/escape/proxy_https/http_forward/mod.rs b/g3proxy/src/escape/proxy_https/http_forward/mod.rs index 308290b3a..e52774dc5 100644 --- a/g3proxy/src/escape/proxy_https/http_forward/mod.rs +++ b/g3proxy/src/escape/proxy_https/http_forward/mod.rs @@ -40,7 +40,8 @@ impl ProxyHttpsEscaper { task_notes: &'a ServerTaskNotes, task_stats: ArcHttpForwardTaskRemoteStats, ) -> Result { - let (ups_r, ups_w) = self.tls_handshake_to_remote(tcp_notes, task_notes).await?; + let stream = self.tls_handshake_to_remote(tcp_notes, task_notes).await?; + let (ups_r, ups_w) = tokio::io::split(stream); // add task and user stats let mut wrapper_stats = HttpForwardTaskRemoteWrapperStats::new(task_stats); diff --git a/g3proxy/src/escape/proxy_https/tls_handshake/mod.rs b/g3proxy/src/escape/proxy_https/tls_handshake/mod.rs index 1002c9ce9..25ba5a656 100644 --- a/g3proxy/src/escape/proxy_https/tls_handshake/mod.rs +++ b/g3proxy/src/escape/proxy_https/tls_handshake/mod.rs @@ -29,7 +29,7 @@ impl ProxyHttpsEscaper { &'a self, tcp_notes: &'a mut TcpConnectTaskNotes, task_notes: &'a ServerTaskNotes, - ) -> Result<(impl AsyncRead, impl AsyncWrite), TcpConnectError> { + ) -> Result { let (peer, ups_s) = self.tcp_new_connection(tcp_notes, task_notes).await?; let tls_name = self.config.tls_name.as_ref().unwrap_or_else(|| peer.host()); @@ -41,10 +41,7 @@ impl ProxyHttpsEscaper { .map_err(|e| TcpConnectError::InternalTlsClientError(anyhow::Error::new(e)))?; match tokio::time::timeout(self.tls_config.handshake_timeout, connector.connect()).await { - Ok(Ok(stream)) => { - let (r, w) = tokio::io::split(stream); - Ok((r, w)) - } + Ok(Ok(stream)) => Ok(stream), Ok(Err(e)) => { let e = anyhow::Error::new(e); EscapeLogForTlsHandshake {