Skip to content

Commit

Permalink
add write_all_flush method
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Jun 4, 2024
1 parent 66a822d commit 3a22cc2
Show file tree
Hide file tree
Showing 35 changed files with 188 additions and 125 deletions.
2 changes: 1 addition & 1 deletion g3bench/src/target/h1/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl BenchHttpArgs {

if let Some(data) = self.proxy_protocol.data() {
stream
.write_all(data)
.write_all(data) // no need to flush data
.await
.map_err(|e| anyhow!("failed to send proxy protocol data: {e:?}"))?;
}
Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/h2/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl BenchH2Args {

if let Some(data) = self.proxy_protocol.data() {
stream
.write_all(data)
.write_all(data) // no need to flush data
.await
.map_err(|e| anyhow!("failed to write proxy protocol data: {e:?}"))?;
}
Expand Down
6 changes: 4 additions & 2 deletions g3bench/src/target/keyless/cloudflare/connection/simplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use std::net::SocketAddr;

use futures_util::FutureExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};

use g3_io_ext::LimitedWriteExt;

use super::{KeylessLocalError, KeylessRequest, KeylessResponse, KeylessResponseError};

Expand Down Expand Up @@ -62,7 +64,7 @@ impl SimplexTransfer {
self.next_req_id = self.next_req_id.wrapping_add(1);

self.writer
.write_all(req.as_bytes())
.write_all_flush(req.as_bytes())
.await
.map_err(KeylessLocalError::WriteFailed)?;

Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/keyless/cloudflare/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl KeylessCloudflareArgs {

if let Some(data) = self.proxy_protocol.data() {
stream
.write_all(data)
.write_all(data) // no need to flush data
.await
.map_err(|e| anyhow!("failed to write proxy protocol data: {e:?}"))?;
}
Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/openssl/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl BenchOpensslArgs {

if let Some(data) = self.proxy_protocol.data() {
stream
.write_all(data)
.write_all(data) // no need to flush data
.await
.map_err(|e| anyhow!("failed to write proxy protocol data: {e:?}"))?;
}
Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/rustls/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl BenchRustlsArgs {

if let Some(data) = self.proxy_protocol.data() {
stream
.write_all(data)
.write_all(data) // no need to flush data
.await
.map_err(|e| anyhow!("failed to write proxy protocol data: {e:?}"))?;
}
Expand Down
8 changes: 4 additions & 4 deletions g3keymess/src/serve/task/simplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/

use openssl::pkey::{PKey, Private};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::io::{AsyncRead, AsyncWrite, BufReader};
use tokio::sync::broadcast;

use g3_io_ext::LimitedBufReadExt;
use g3_io_ext::{LimitedBufReadExt, LimitedWriteExt};
use g3_types::ext::DurationExt;

use super::{KeylessTask, WrappedKeylessRequest};
Expand Down Expand Up @@ -161,10 +161,10 @@ impl KeylessTask {
RequestErrorLogContext { task_id: &self.id }.log(&self.ctx.request_logger, &rsp);

writer
.write_all(rsp.message())
.write_all_flush(rsp.message())
.await
.map_err(ServerTaskError::WriteFailed)?;
writer.flush().await.map_err(ServerTaskError::WriteFailed)?;

Ok(())
}
}
9 changes: 3 additions & 6 deletions g3proxy/src/escape/divert_tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use slog::Logger;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::AsyncWrite;

use g3_daemon::stat::remote::ArcTcpConnectionTaskRemoteStats;
use g3_io_ext::LimitedWriteExt;
use g3_resolver::{ResolveError, ResolveLocalError};
use g3_types::collection::{SelectiveVec, SelectiveVecBuilder};
use g3_types::metrics::MetricsName;
Expand Down Expand Up @@ -190,11 +191,7 @@ impl DivertTcpEscaper {

let pp2_data = pp2_encoder.finalize();
writer
.write_all(pp2_data)
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
writer
.flush()
.write_all_flush(pp2_data)
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
Ok(pp2_data.len())
Expand Down
2 changes: 1 addition & 1 deletion g3proxy/src/escape/proxy_http/tcp_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl ProxyHttpEscaper {
.encode_tcp(task_notes.client_addr(), task_notes.server_addr())
.map_err(TcpConnectError::ProxyProtocolEncodeError)?;
stream
.write_all(bytes)
.write_all(bytes) // no need to flush data
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
}
Expand Down
2 changes: 1 addition & 1 deletion g3proxy/src/escape/proxy_https/tcp_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl ProxyHttpsEscaper {
.encode_tcp(task_notes.client_addr(), task_notes.server_addr())
.map_err(TcpConnectError::ProxyProtocolEncodeError)?;
stream
.write_all(bytes)
.write_all(bytes) // no need to flush data
.await
.map_err(TcpConnectError::ProxyProtocolWriteFailed)?;
}
Expand Down
5 changes: 5 additions & 0 deletions g3proxy/src/inspect/http/v1/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ where
LimitedCopyError::WriteFailed(e) => ServerTaskError::ClientTcpWriteFailed(e),
})?;
recv_body.save_connection().await;
} else {
clt_w
.flush()
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
}

Ok(())
Expand Down
13 changes: 7 additions & 6 deletions g3proxy/src/inspect/http/v1/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use g3_icap_client::reqmod::IcapReqmodClient;
use g3_icap_client::respmod::h1::{
HttpResponseAdapter, RespmodAdaptationEndState, RespmodAdaptationRunState,
};
use g3_io_ext::{LimitedBufReadExt, LimitedCopy, LimitedCopyError};
use g3_io_ext::{LimitedBufReadExt, LimitedCopy, LimitedCopyError, LimitedWriteExt};
use g3_slog_types::{LtDateTime, LtDuration, LtHttpMethod, LtHttpUri, LtUuid};
use g3_types::net::HttpHeaderMap;

Expand Down Expand Up @@ -430,6 +430,11 @@ impl<'a, SC: ServerConfig> H1ForwardTask<'a, SC> {
LimitedCopyError::WriteFailed(e) => ServerTaskError::ClientTcpWriteFailed(e),
})?;
recv_body.save_connection().await;
} else {
clt_w
.flush()
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
}

Ok(())
Expand Down Expand Up @@ -769,11 +774,7 @@ impl<'a, SC: ServerConfig> H1ForwardTask<'a, SC> {
CW: AsyncWrite + Unpin,
{
clt_w
.write_all(&head_bytes)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
clt_w
.flush()
.write_all_flush(&head_bytes)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)
}
Expand Down
5 changes: 5 additions & 0 deletions g3proxy/src/inspect/http/v1/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ where
LimitedCopyError::WriteFailed(e) => ServerTaskError::ClientTcpWriteFailed(e),
})?;
recv_body.save_connection().await;
} else {
clt_w
.flush()
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
}

Ok(())
Expand Down
8 changes: 2 additions & 6 deletions g3proxy/src/inspect/smtp/ending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use anyhow::anyhow;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};

use g3_io_ext::{LineRecvBuf, RecvLineError};
use g3_io_ext::{LimitedWriteExt, LineRecvBuf, RecvLineError};
use g3_smtp_proto::command::Command;
use g3_smtp_proto::response::{ReplyCode, ResponseEncoder, ResponseParser};

Expand All @@ -39,11 +39,7 @@ impl EndQuitServer {
W: AsyncWrite + Unpin,
{
ups_w
.write_all(b"QUIT\r\n")
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;
ups_w
.flush()
.write_all_flush(b"QUIT\r\n")
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;

Expand Down
7 changes: 3 additions & 4 deletions g3proxy/src/inspect/smtp/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::io;
use std::net::IpAddr;

use anyhow::anyhow;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite};

use g3_io_ext::{LineRecvBuf, RecvLineError};
use g3_io_ext::{LimitedWriteExt, LineRecvBuf, RecvLineError};
use g3_smtp_proto::command::Command;
use g3_smtp_proto::response::{ResponseEncoder, ResponseParser};

Expand Down Expand Up @@ -198,6 +198,5 @@ async fn send_cmd<W>(ups_w: &mut W, line: &[u8]) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
ups_w.write_all(line).await?;
ups_w.flush().await
ups_w.write_all_flush(line).await
}
16 changes: 4 additions & 12 deletions g3proxy/src/inspect/smtp/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

use std::net::IpAddr;

use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite};

use g3_io_ext::LineRecvBuf;
use g3_io_ext::{LimitedWriteExt, LineRecvBuf};
use g3_smtp_proto::command::{Command, MailParam};
use g3_smtp_proto::response::{ReplyCode, ResponseEncoder, ResponseParser};

Expand Down Expand Up @@ -169,11 +169,7 @@ impl Forward {
.await?;

clt_w
.write_all(line)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
clt_w
.flush()
.write_all_flush(line)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;

Expand Down Expand Up @@ -212,11 +208,7 @@ impl Forward {
match recv_buf.read_line(clt_r).await {
Ok(line) => {
ups_w
.write_all(line)
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;
ups_w
.flush()
.write_all_flush(line)
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;
recv_buf.consume_line();
Expand Down
7 changes: 3 additions & 4 deletions g3proxy/src/inspect/smtp/greeting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use anyhow::anyhow;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};

use g3_io_ext::{LineRecvBuf, OnceBufReader, RecvLineError};
use g3_io_ext::{LimitedWriteExt, LineRecvBuf, OnceBufReader, RecvLineError};
use g3_smtp_proto::response::{ReplyCode, ResponseEncoder, ResponseLineError, ResponseParser};
use g3_types::net::Host;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Greeting {
let msg = self.rsp.feed_line(line)?;
self.total_to_write += line.len();
clt_w
.write_all(line)
.write_all_flush(line)
.await
.map_err(GreetingError::ClientWriteFailed)?;

Expand Down Expand Up @@ -150,8 +150,7 @@ impl Greeting {
_ => return,
};
let rsp = ResponseEncoder::upstream_service_not_ready(self.local_ip, reason);
let _ = clt_w.write_all(rsp.as_bytes()).await;
let _ = clt_w.flush().await;
let _ = clt_w.write_all_flush(rsp.as_bytes()).await;
let _ = clt_w.shutdown().await;
}
}
Expand Down
14 changes: 3 additions & 11 deletions g3proxy/src/inspect/smtp/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::anyhow;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::time::Instant;

use g3_io_ext::{LimitedCopy, LimitedCopyError};
use g3_io_ext::{LimitedCopy, LimitedCopyError, LimitedWriteExt};
use g3_smtp_proto::command::{Command, MailParam, RecipientParam};
use g3_smtp_proto::io::TextDataReader;
use g3_smtp_proto::response::{ReplyCode, ResponseEncoder, ResponseParser};
Expand Down Expand Up @@ -205,11 +205,7 @@ impl<'a, SC: ServerConfig> Transaction<'a, SC> {
.await?;

clt_w
.write_all(line)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;
clt_w
.flush()
.write_all_flush(line)
.await
.map_err(ServerTaskError::ClientTcpWriteFailed)?;

Expand Down Expand Up @@ -331,11 +327,7 @@ impl<'a, SC: ServerConfig> Transaction<'a, SC> {
UW: AsyncWrite + Unpin,
{
ups_w
.write_all(cmd_line)
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;
ups_w
.flush()
.write_all_flush(cmd_line)
.await
.map_err(ServerTaskError::UpstreamWriteFailed)?;

Expand Down
10 changes: 4 additions & 6 deletions g3proxy/src/module/http_forward/response/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};

use g3_ftp_client::FtpConnectError;
use g3_http::server::HttpRequestParseError;
use g3_io_ext::LimitedWriteExt;
use g3_types::net::ConnectError;

use crate::module::http_header;
Expand Down Expand Up @@ -477,8 +478,7 @@ impl HttpProxyClientResponse {
header.extend_from_slice(line.as_bytes());
}
header.extend_from_slice(b"\r\n");
writer.write_all(header.as_ref()).await?;
writer.flush().await?;
writer.write_all_flush(header.as_ref()).await?;
Ok(())
}

Expand Down Expand Up @@ -509,8 +509,7 @@ impl HttpProxyClientResponse {
W: AsyncWrite + Unpin,
{
let s = format!("{version:?} 100 Continue\r\n\r\n");
writer.write_all(s.as_bytes()).await?;
writer.flush().await?;
writer.write_all_flush(s.as_bytes()).await?;
Ok(())
}

Expand Down Expand Up @@ -546,8 +545,7 @@ impl HttpProxyClientResponse {
// append body
header.extend_from_slice(body.as_bytes());

writer.write_all(header.as_ref()).await?;
writer.flush().await?;
writer.write_all_flush(header.as_ref()).await?;
Ok(())
}

Expand Down
Loading

0 comments on commit 3a22cc2

Please sign in to comment.