Skip to content

Commit

Permalink
g3proxy: support smtp STARTTLS interception
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 17, 2024
1 parent b9f26bc commit a5fd2f5
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 35 deletions.
2 changes: 2 additions & 0 deletions g3proxy/src/inspect/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::serve::ServerTaskError;
pub(crate) enum InterceptionError {
#[error("tls: {0}")]
Tls(super::tls::TlsInterceptionError),
#[error("start tls: {0}")]
StartTls(super::tls::TlsInterceptionError),
#[error("http1: {0}")]
H1(super::http::H1InterceptionError),
#[error("http2: {0}")]
Expand Down
4 changes: 4 additions & 0 deletions g3proxy/src/inspect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub(crate) mod stream;
pub(crate) mod tls;
use tls::TlsInterceptionContext;

pub(crate) mod start_tls;
use start_tls::StartTlsProtocol;

pub(crate) mod http;
mod websocket;

Expand Down Expand Up @@ -291,6 +294,7 @@ pub(crate) enum StreamInspection<SC: ServerConfig> {
TlsModern(tls::TlsInterceptObject<SC>),
#[cfg(feature = "vendored-tongsuo")]
TlsTlcp(tls::TlsInterceptObject<SC>),
StartTls(start_tls::StartTlsInterceptObject<SC>),
H1(http::H1InterceptObject<SC>),
H2(http::H2InterceptObject<SC>),
Websocket(websocket::H1WebsocketInterceptObject<SC>),
Expand Down
9 changes: 8 additions & 1 deletion g3proxy/src/inspect/smtp/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ pub(super) enum ForwardNextAction {
pub(super) struct Forward {
local_ip: IpAddr,
allow_odmr: bool,
allow_starttls: bool,
auth_end: bool,
}

impl Forward {
pub(super) fn new(local_ip: IpAddr, allow_odmr: bool) -> Self {
pub(super) fn new(local_ip: IpAddr, allow_odmr: bool, allow_starttls: bool) -> Self {
Forward {
local_ip,
allow_odmr,
allow_starttls,
auth_end: false,
}
}
Expand Down Expand Up @@ -93,6 +95,11 @@ impl Forward {
return Some(ResponseEncoder::AUTHENTICATION_REQUIRED);
}
}
Command::StartTls => {
if !self.allow_starttls {
return Some(ResponseEncoder::COMMAND_NOT_IMPLEMENTED);
}
}
_ => {}
};
valid_cmd = cmd;
Expand Down
10 changes: 9 additions & 1 deletion g3proxy/src/inspect/smtp/initiation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ use crate::serve::{ServerTaskError, ServerTaskResult};
#[derive(Default)]
pub(super) struct InitializedExtensions {
odmr: bool,
starttls: bool,
}

impl InitializedExtensions {
pub(super) fn allow_odmr(&self, config: &SmtpInterceptionConfig) -> bool {
self.odmr && config.allow_on_demand_mail_relay
}

pub(super) fn allow_starttls(&self) -> bool {
self.starttls
}
}

pub(super) struct Initiation<'a> {
Expand Down Expand Up @@ -219,7 +224,10 @@ impl<'a> Initiation<'a> {
// Enhanced-Status-Codes, RFC2034, add status code preface to response
"ENHANCEDSTATUSCODES" => false,
// STARTTLS, RFC3207, add STARTTLS command
"STARTTLS" => true,
"STARTTLS" => {
self.server_ext.starttls = true;
true
}
// No Soliciting, RFC3865, add a MAIL param key
"NO-SOLICITING" => true,
// Message Tracking, RFC3885, add a MAIL MTRK param key
Expand Down
85 changes: 61 additions & 24 deletions g3proxy/src/inspect/smtp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use tokio::io::AsyncWriteExt;

use g3_dpi::ProtocolInspectPolicy;
use g3_io_ext::OnceBufReader;
use g3_slog_types::{LtHost, LtUuid};
use g3_slog_types::{LtHost, LtUpstreamAddr, LtUuid};
use g3_smtp_proto::response::{ReplyCode, ResponseEncoder};
use g3_types::net::Host;
use g3_types::net::{Host, UpstreamAddr};

use super::StartTlsProtocol;
use crate::config::server::ServerConfig;
use crate::inspect::{BoxAsyncRead, BoxAsyncWrite, StreamInspectContext};
use crate::inspect::{BoxAsyncRead, BoxAsyncWrite, StreamInspectContext, StreamInspection};
use crate::serve::{ServerTaskError, ServerTaskResult};

mod ext;
Expand All @@ -48,7 +49,7 @@ macro_rules! intercept_log {
"intercept_type" => "SMTP",
"task_id" => LtUuid($obj.ctx.server_task_id()),
"depth" => $obj.ctx.inspection_depth,
"upstream_host" => $obj.upstream_host.as_ref().map(LtHost),
"upstream" => LtUpstreamAddr(&$obj.upstream),
"client_host" => $obj.client_host.as_ref().map(LtHost),
)
};
Expand All @@ -63,17 +64,20 @@ struct SmtpIo {

pub(crate) struct SmtpInterceptObject<SC: ServerConfig> {
io: Option<SmtpIo>,
pub(crate) ctx: StreamInspectContext<SC>,
upstream_host: Option<Host>,
ctx: StreamInspectContext<SC>,
upstream: UpstreamAddr,
client_host: Option<Host>,
}

impl<SC: ServerConfig> SmtpInterceptObject<SC> {
pub(crate) fn new(ctx: StreamInspectContext<SC>) -> Self {
impl<SC> SmtpInterceptObject<SC>
where
SC: ServerConfig + Send + Sync + 'static,
{
pub(crate) fn new(ctx: StreamInspectContext<SC>, upstream: UpstreamAddr) -> Self {
SmtpInterceptObject {
io: None,
ctx,
upstream_host: None,
upstream,
client_host: None,
}
}
Expand All @@ -94,19 +98,26 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
self.io = Some(io);
}

pub(crate) async fn intercept(mut self) -> ServerTaskResult<()> {
pub(crate) async fn intercept(mut self) -> ServerTaskResult<Option<StreamInspection<SC>>> {
match self.ctx.smtp_inspect_policy() {
ProtocolInspectPolicy::Bypass => self.do_bypass().await,
ProtocolInspectPolicy::Intercept => {
if let Err(e) = self.do_intercept().await {
ProtocolInspectPolicy::Bypass => {
self.do_bypass().await?;
Ok(None)
}
ProtocolInspectPolicy::Intercept => match self.do_intercept().await {
Ok(obj) => {
intercept_log!(self, "finished");
Ok(obj)
}
Err(e) => {
intercept_log!(self, "{e}");
Err(e)
} else {
intercept_log!(self, "finished");
Ok(())
}
},
ProtocolInspectPolicy::Block => {
self.do_block().await?;
Ok(None)
}
ProtocolInspectPolicy::Block => self.do_block().await,
}
}

Expand Down Expand Up @@ -151,7 +162,7 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
EndWaitClient::new(local_ip).run_to_end(clt_r, clt_w).await
}

async fn do_intercept(&mut self) -> ServerTaskResult<()> {
async fn do_intercept(&mut self) -> ServerTaskResult<Option<StreamInspection<SC>>> {
let SmtpIo {
clt_r,
mut clt_w,
Expand All @@ -174,13 +185,16 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
}
};
let (code, host) = greeting.into_parts();
self.upstream_host = Some(host);
self.upstream.set_host(host);
if code == ReplyCode::NO_SERVICE {
let timeout = interception_config.quit_wait_timeout;
tokio::spawn(async move {
let _ = EndQuitServer::run_to_end(ups_r, ups_w, timeout).await;
});
return EndWaitClient::new(local_ip).run_to_end(clt_r, clt_w).await;
return EndWaitClient::new(local_ip)
.run_to_end(clt_r, clt_w)
.await
.map(|_| None);
}

self.start_initiation(clt_r, clt_w, ups_r, ups_w).await
Expand All @@ -192,7 +206,7 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
mut clt_w: BoxAsyncWrite,
mut ups_r: BoxAsyncRead,
mut ups_w: BoxAsyncWrite,
) -> ServerTaskResult<()> {
) -> ServerTaskResult<Option<StreamInspection<SC>>> {
let local_ip = self.ctx.task_notes.server_addr.ip();
let interception_config = self.ctx.smtp_interception();

Expand All @@ -205,13 +219,34 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {

let allow_odmr = server_ext.allow_odmr(interception_config);

let mut forward = Forward::new(local_ip, allow_odmr);
let mut forward = Forward::new(local_ip, allow_odmr, server_ext.allow_starttls());
let next_action = forward
.relay(&mut clt_r, &mut clt_w, &mut ups_r, &mut ups_w)
.await?;
match next_action {
ForwardNextAction::StartTls => {
// TODO
return if let Some(tls_interception) = self.ctx.tls_interception() {
let mut start_tls_obj = crate::inspect::start_tls::StartTlsInterceptObject::new(
self.ctx.clone(),
self.upstream.clone(),
tls_interception,
StartTlsProtocol::Smtp,
);
start_tls_obj.set_io(clt_r, clt_w, ups_r, ups_w);
Ok(Some(StreamInspection::StartTls(start_tls_obj)))
} else {
crate::inspect::stream::transit_transparent(
clt_r,
clt_w,
ups_r,
ups_w,
&self.ctx.server_config,
&self.ctx.server_quit_policy,
self.ctx.user(),
)
.await
.map(|_| None)
}
}
ForwardNextAction::ReverseConnection => {
return crate::inspect::stream::transit_transparent(
Expand All @@ -223,7 +258,8 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
&self.ctx.server_quit_policy,
self.ctx.user(),
)
.await;
.await
.map(|_| None);
}
ForwardNextAction::MailTransport(_param) => {
// TODO
Expand All @@ -240,5 +276,6 @@ impl<SC: ServerConfig> SmtpInterceptObject<SC> {
self.ctx.user(),
)
.await
.map(|_| None)
}
}
Loading

0 comments on commit a5fd2f5

Please sign in to comment.