From 4b9f54f8d0e1967962c7b6bfaae18130eae06b5d Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 16:20:47 +0800 Subject: [PATCH 1/6] g3proxy: add support for periodic TcpConnect task log --- .../doc/configuration/servers/http_proxy.rst | 3 + g3proxy/doc/configuration/servers/index.rst | 44 +++ .../doc/configuration/servers/sni_proxy.rst | 3 + .../doc/configuration/servers/socks_proxy.rst | 3 + .../doc/configuration/servers/tcp_stream.rst | 3 + .../doc/configuration/servers/tcp_tproxy.rst | 3 + .../doc/configuration/servers/tls_stream.rst | 3 + g3proxy/doc/log/task/index.rst | 18 ++ g3proxy/src/config/server/http_proxy.rs | 20 ++ g3proxy/src/config/server/sni_proxy/mod.rs | 20 ++ g3proxy/src/config/server/socks_proxy.rs | 20 ++ g3proxy/src/config/server/tcp_stream.rs | 20 ++ g3proxy/src/config/server/tcp_tproxy.rs | 20 ++ g3proxy/src/config/server/tls_stream.rs | 20 ++ g3proxy/src/inspect/http/v2/mod.rs | 13 +- g3proxy/src/inspect/imap/mod.rs | 28 +- g3proxy/src/inspect/mod.rs | 1 + g3proxy/src/inspect/smtp/mod.rs | 44 +-- g3proxy/src/inspect/stream/mod.rs | 280 +++++++++++------- g3proxy/src/inspect/websocket/h1.rs | 26 +- g3proxy/src/inspect/websocket/h2.rs | 26 +- g3proxy/src/log/task/tcp_connect.rs | 89 +++++- .../src/serve/http_proxy/task/connect/task.rs | 65 ++-- .../src/serve/sni_proxy/task/relay/task.rs | 63 ++-- .../socks_proxy/task/tcp_connect/task.rs | 65 ++-- g3proxy/src/serve/tcp_stream/common.rs | 8 - g3proxy/src/serve/tcp_stream/task.rs | 63 ++-- g3proxy/src/serve/tcp_tproxy/common.rs | 5 - g3proxy/src/serve/tcp_tproxy/task.rs | 63 ++-- g3proxy/src/serve/tls_stream/common.rs | 8 - g3proxy/src/serve/tls_stream/task.rs | 63 ++-- lib/g3-io-ext/src/lib.rs | 2 + lib/g3-io-ext/src/time/mod.rs | 18 ++ lib/g3-io-ext/src/time/optional_interval.rs | 59 ++++ 34 files changed, 824 insertions(+), 365 deletions(-) create mode 100644 lib/g3-io-ext/src/time/mod.rs create mode 100644 lib/g3-io-ext/src/time/optional_interval.rs diff --git a/g3proxy/doc/configuration/servers/http_proxy.rst b/g3proxy/doc/configuration/servers/http_proxy.rst index 0dc5215e6..040d4b9ba 100644 --- a/g3proxy/doc/configuration/servers/http_proxy.rst +++ b/g3proxy/doc/configuration/servers/http_proxy.rst @@ -23,6 +23,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` The auth scheme supported by the server is determined by the type of the specified user group. diff --git a/g3proxy/doc/configuration/servers/index.rst b/g3proxy/doc/configuration/servers/index.rst index 8fa64ad68..8e5b34e19 100644 --- a/g3proxy/doc/configuration/servers/index.rst +++ b/g3proxy/doc/configuration/servers/index.rst @@ -303,6 +303,50 @@ The task will be closed if the idle check return IDLE the times as this value. **default**: 1 +.. _conf_server_common_flush_task_log_on_created: + +flush_task_log_on_created +------------------------- + +**optional**, **type**: bool + +Log when task get created. + +**default**: false + +.. versionadded:: 1.11.0 + +.. _conf_server_common_flush_task_log_on_connected: + +flush_task_log_on_connected +--------------------------- + +**optional**, **type**: bool + +Log when upstream connected. + +**default**: false + +.. versionadded:: 1.11.0 + +.. _conf_server_common_task_log_flush_interval: + +task_log_flush_interval +----------------------- + +**optional**, **type**: :ref:`humanize duration ` + +Enable periodic task log and set the flush interval. + +.. note:: + + There will be no periodic task log if protocol inspection is enabled, as intercept and inspect logs will be available + in this case. + +**default**: not set + +.. versionadded:: 1.11.0 + .. _conf_server_common_extra_metrics_tags: extra_metrics_tags diff --git a/g3proxy/doc/configuration/servers/sni_proxy.rst b/g3proxy/doc/configuration/servers/sni_proxy.rst index 022cb2213..bf4e0abcd 100644 --- a/g3proxy/doc/configuration/servers/sni_proxy.rst +++ b/g3proxy/doc/configuration/servers/sni_proxy.rst @@ -18,6 +18,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` listen diff --git a/g3proxy/doc/configuration/servers/socks_proxy.rst b/g3proxy/doc/configuration/servers/socks_proxy.rst index 2b9be4d36..fdf33d189 100644 --- a/g3proxy/doc/configuration/servers/socks_proxy.rst +++ b/g3proxy/doc/configuration/servers/socks_proxy.rst @@ -26,6 +26,9 @@ The following common keys are supported: * :ref:`udp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` The auth type supported by the server is determined by the type of the specified user group. diff --git a/g3proxy/doc/configuration/servers/tcp_stream.rst b/g3proxy/doc/configuration/servers/tcp_stream.rst index a6b67afb1..fd3172a15 100644 --- a/g3proxy/doc/configuration/servers/tcp_stream.rst +++ b/g3proxy/doc/configuration/servers/tcp_stream.rst @@ -18,6 +18,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` listen diff --git a/g3proxy/doc/configuration/servers/tcp_tproxy.rst b/g3proxy/doc/configuration/servers/tcp_tproxy.rst index d27b4d396..63c7a21d8 100644 --- a/g3proxy/doc/configuration/servers/tcp_tproxy.rst +++ b/g3proxy/doc/configuration/servers/tcp_tproxy.rst @@ -22,6 +22,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` listen diff --git a/g3proxy/doc/configuration/servers/tls_stream.rst b/g3proxy/doc/configuration/servers/tls_stream.rst index 7db86a073..6da6acdee 100644 --- a/g3proxy/doc/configuration/servers/tls_stream.rst +++ b/g3proxy/doc/configuration/servers/tls_stream.rst @@ -23,6 +23,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` listen diff --git a/g3proxy/doc/log/task/index.rst b/g3proxy/doc/log/task/index.rst index 35a043add..f5b6227b9 100644 --- a/g3proxy/doc/log/task/index.rst +++ b/g3proxy/doc/log/task/index.rst @@ -41,6 +41,24 @@ UUID of the task. The *task_id* will appear in other logs such as escape log if they have any association with this task. +task_event +---------- + +**optional**, **type**: string + +Show the event that trigger this log. + +The event can be + + - created + - connected + - periodic + - finished + +This field can be omitted if the value is *finished*. + +.. versionadded:: 1.11.0 + stage ----- diff --git a/g3proxy/src/config/server/http_proxy.rs b/g3proxy/src/config/server/http_proxy.rs index 5d39d40a7..2f4d51cac 100644 --- a/g3proxy/src/config/server/http_proxy.rs +++ b/g3proxy/src/config/server/http_proxy.rs @@ -83,6 +83,9 @@ pub(crate) struct HttpProxyServerConfig { pub(crate) timeout: HttpProxyServerTimeoutConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) req_hdr_max_size: usize, @@ -126,6 +129,9 @@ impl HttpProxyServerConfig { timeout: HttpProxyServerTimeoutConfig::default(), task_idle_check_duration: IDLE_CHECK_DEFAULT_DURATION, task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), req_hdr_max_size: 65536, // 64KiB @@ -292,6 +298,20 @@ impl HttpProxyServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } "req_header_recv_timeout" => { self.timeout.recv_req_header = g3_yaml::humanize::as_duration(v) .context(format!("invalid humanize duration value for key {k}"))?; diff --git a/g3proxy/src/config/server/sni_proxy/mod.rs b/g3proxy/src/config/server/sni_proxy/mod.rs index 9e77ed2ea..0ce802346 100644 --- a/g3proxy/src/config/server/sni_proxy/mod.rs +++ b/g3proxy/src/config/server/sni_proxy/mod.rs @@ -49,6 +49,9 @@ pub(crate) struct SniProxyServerConfig { pub(crate) tcp_sock_speed_limit: TcpSockSpeedLimitConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) tls_max_client_hello_size: u32, @@ -75,6 +78,9 @@ impl SniProxyServerConfig { tcp_sock_speed_limit: TcpSockSpeedLimitConfig::default(), task_idle_check_duration: Duration::from_secs(300), task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), tls_max_client_hello_size: 1 << 16, @@ -179,6 +185,20 @@ impl SniProxyServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } "request_wait_timeout" => { self.request_wait_timeout = g3_yaml::humanize::as_duration(v) .context(format!("invalid humanize duration value for key {k}"))?; diff --git a/g3proxy/src/config/server/socks_proxy.rs b/g3proxy/src/config/server/socks_proxy.rs index 7b70ded81..53d089b16 100644 --- a/g3proxy/src/config/server/socks_proxy.rs +++ b/g3proxy/src/config/server/socks_proxy.rs @@ -82,6 +82,9 @@ pub(crate) struct SocksProxyServerConfig { pub(crate) timeout: SocksProxyServerTimeoutConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) udp_relay: LimitedUdpRelayConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, @@ -114,6 +117,9 @@ impl SocksProxyServerConfig { timeout: SocksProxyServerTimeoutConfig::default(), task_idle_check_duration: IDLE_CHECK_DEFAULT_DURATION, task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), udp_relay: Default::default(), tcp_misc_opts: Default::default(), @@ -295,6 +301,20 @@ impl SocksProxyServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } "transmute_udp_echo_ip" | "auto_reply_local_ip_map" => { if let Yaml::Hash(_) = v { let map = g3_yaml::value::as_hashmap( diff --git a/g3proxy/src/config/server/tcp_stream.rs b/g3proxy/src/config/server/tcp_stream.rs index 372b1365a..8fae5b159 100644 --- a/g3proxy/src/config/server/tcp_stream.rs +++ b/g3proxy/src/config/server/tcp_stream.rs @@ -52,6 +52,9 @@ pub(crate) struct TcpStreamServerConfig { pub(crate) tcp_sock_speed_limit: TcpSockSpeedLimitConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) extra_metrics_tags: Option>, @@ -75,6 +78,9 @@ impl TcpStreamServerConfig { tcp_sock_speed_limit: TcpSockSpeedLimitConfig::default(), task_idle_check_duration: Duration::from_secs(300), task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), extra_metrics_tags: None, @@ -205,6 +211,20 @@ impl TcpStreamServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } _ => Err(anyhow!("invalid key {k}")), } } diff --git a/g3proxy/src/config/server/tcp_tproxy.rs b/g3proxy/src/config/server/tcp_tproxy.rs index fa6e70088..4ebebd595 100644 --- a/g3proxy/src/config/server/tcp_tproxy.rs +++ b/g3proxy/src/config/server/tcp_tproxy.rs @@ -44,6 +44,9 @@ pub(crate) struct TcpTProxyServerConfig { pub(crate) tcp_sock_speed_limit: TcpSockSpeedLimitConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) extra_metrics_tags: Option>, @@ -63,6 +66,9 @@ impl TcpTProxyServerConfig { tcp_sock_speed_limit: TcpSockSpeedLimitConfig::default(), task_idle_check_duration: Duration::from_secs(300), task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), extra_metrics_tags: None, @@ -155,6 +161,20 @@ impl TcpTProxyServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } _ => Err(anyhow!("invalid key {k}")), } } diff --git a/g3proxy/src/config/server/tls_stream.rs b/g3proxy/src/config/server/tls_stream.rs index e899c5c92..c0e485d58 100644 --- a/g3proxy/src/config/server/tls_stream.rs +++ b/g3proxy/src/config/server/tls_stream.rs @@ -55,6 +55,9 @@ pub(crate) struct TlsStreamServerConfig { pub(crate) tcp_sock_speed_limit: TcpSockSpeedLimitConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) extra_metrics_tags: Option>, @@ -80,6 +83,9 @@ impl TlsStreamServerConfig { tcp_sock_speed_limit: TcpSockSpeedLimitConfig::default(), task_idle_check_duration: Duration::from_secs(300), task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), extra_metrics_tags: None, @@ -224,6 +230,20 @@ impl TlsStreamServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } _ => Err(anyhow!("invalid key {k}")), } } diff --git a/g3proxy/src/inspect/http/v2/mod.rs b/g3proxy/src/inspect/http/v2/mod.rs index 95897b8de..8927306d1 100644 --- a/g3proxy/src/inspect/http/v2/mod.rs +++ b/g3proxy/src/inspect/http/v2/mod.rs @@ -235,16 +235,9 @@ where ups_w, } = self.io.take().unwrap(); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } async fn do_block(&mut self) -> Result<(), H2InterceptionError> { diff --git a/g3proxy/src/inspect/imap/mod.rs b/g3proxy/src/inspect/imap/mod.rs index e129175ef..84ff29284 100644 --- a/g3proxy/src/inspect/imap/mod.rs +++ b/g3proxy/src/inspect/imap/mod.rs @@ -226,16 +226,9 @@ where ups_w, } = self.io.take().unwrap(); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } async fn do_block(&mut self) -> ServerTaskResult<()> { @@ -365,17 +358,10 @@ where start_tls_obj.set_io(clt_r, clt_w, ups_r, ups_w); Ok(Some(StreamInspection::StartTls(start_tls_obj))) } else { - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await - .map(|_| None) + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await + .map(|_| None) } } InitiationStatus::Authenticated => { diff --git a/g3proxy/src/inspect/mod.rs b/g3proxy/src/inspect/mod.rs index 3f0f5c5b6..c677b3164 100644 --- a/g3proxy/src/inspect/mod.rs +++ b/g3proxy/src/inspect/mod.rs @@ -38,6 +38,7 @@ mod error; pub(crate) use error::InterceptionError; pub(crate) mod stream; +pub(crate) use stream::StreamTransitTask; pub(crate) mod tls; use tls::TlsInterceptionContext; diff --git a/g3proxy/src/inspect/smtp/mod.rs b/g3proxy/src/inspect/smtp/mod.rs index 5abd19ed1..64226e4ec 100644 --- a/g3proxy/src/inspect/smtp/mod.rs +++ b/g3proxy/src/inspect/smtp/mod.rs @@ -229,16 +229,9 @@ where ups_w, } = self.io.take().unwrap(); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } async fn do_block(&mut self) -> ServerTaskResult<()> { @@ -362,31 +355,18 @@ where start_tls_obj.set_io(clt_r, clt_w, ups_r, ups_w); Ok(Some(StreamInspection::StartTls(start_tls_obj))) } else { - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await - .map(|_| None) + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await + .map(|_| None) } } ForwardNextAction::ReverseConnection => { - return crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await - .map(|_| None); + return self + .ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await + .map(|_| None); } ForwardNextAction::SetExtensions(ext) => server_ext = ext, ForwardNextAction::MailTransport(param) => { diff --git a/g3proxy/src/inspect/stream/mod.rs b/g3proxy/src/inspect/stream/mod.rs index 6fd8fc93d..e85167abe 100644 --- a/g3proxy/src/inspect/stream/mod.rs +++ b/g3proxy/src/inspect/stream/mod.rs @@ -14,14 +14,14 @@ * limitations under the License. */ -use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::Instant; use g3_daemon::server::ServerQuitPolicy; use g3_dpi::{MaybeProtocol, ProtocolInspectionConfig, ProtocolInspector}; -use g3_io_ext::{LimitedCopy, LimitedCopyError}; +use g3_io_ext::{LimitedCopy, LimitedCopyConfig, LimitedCopyError, OptionalInterval}; use g3_types::net::UpstreamAddr; use super::{StreamInspectContext, StreamInspection}; @@ -32,107 +32,116 @@ use crate::serve::{ServerTaskError, ServerTaskForbiddenError, ServerTaskResult}; mod object; pub(crate) use object::StreamInspectObject; -pub(crate) async fn transit_transparent( - mut clt_r: CR, - mut clt_w: CW, - mut ups_r: UR, - mut ups_w: UW, - server_config: &Arc, - server_quit_policy: &Arc, - user: Option<&Arc>, -) -> ServerTaskResult<()> -where - CR: AsyncRead + Unpin, - CW: AsyncWrite + Unpin, - UR: AsyncRead + Unpin, - UW: AsyncWrite + Unpin, - SC: ServerConfig, -{ - let copy_config = server_config.limited_copy_config(); - let clt_to_ups = LimitedCopy::new(&mut clt_r, &mut ups_w, ©_config); - let ups_to_clt = LimitedCopy::new(&mut ups_r, &mut clt_w, ©_config); - - transit_transparent2( - clt_to_ups, - ups_to_clt, - server_config, - server_quit_policy, - user, - ) - .await -} +pub(crate) trait StreamTransitTask { + fn copy_config(&self) -> LimitedCopyConfig; + fn idle_check_interval(&self) -> Duration; + fn max_idle_count(&self) -> i32; + fn log_periodic(&self); + fn log_flush_interval(&self) -> Option; + fn quit_policy(&self) -> &ServerQuitPolicy; + fn user(&self) -> Option<&User>; -pub(crate) async fn transit_transparent2<'a, CR, CW, UR, UW, SC>( - mut clt_to_ups: LimitedCopy<'a, CR, UW>, - mut ups_to_clt: LimitedCopy<'a, UR, CW>, - server_config: &'a Arc, - server_quit_policy: &'a Arc, - user: Option<&'a Arc>, -) -> ServerTaskResult<()> -where - CR: AsyncRead + Unpin, - CW: AsyncWrite + Unpin, - UR: AsyncRead + Unpin, - UW: AsyncWrite + Unpin, - SC: ServerConfig, -{ - let idle_duration = server_config.task_idle_check_duration(); - let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); - let mut idle_count = 0; - loop { - tokio::select! { - biased; - - r = &mut clt_to_ups => { - let _ = ups_to_clt.write_flush().await; - return match r { - Ok(_) => Err(ServerTaskError::ClosedByClient), - Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::ClientTcpReadFailed(e)), - Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::UpstreamWriteFailed(e)), - }; - } - r = &mut ups_to_clt => { - let _ = clt_to_ups.write_flush().await; - return match r { - Ok(_) => Err(ServerTaskError::ClosedByUpstream), - Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::UpstreamReadFailed(e)), - Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::ClientTcpWriteFailed(e)), - }; - } - _ = idle_interval.tick() => { - if clt_to_ups.is_idle() && ups_to_clt.is_idle() { - idle_count += 1; + async fn transit_transparent( + &self, + mut clt_r: CR, + mut clt_w: CW, + mut ups_r: UR, + mut ups_w: UW, + ) -> ServerTaskResult<()> + where + CR: AsyncRead + Unpin, + CW: AsyncWrite + Unpin, + UR: AsyncRead + Unpin, + UW: AsyncWrite + Unpin, + { + let copy_config = self.copy_config(); + let clt_to_ups = LimitedCopy::new(&mut clt_r, &mut ups_w, ©_config); + let ups_to_clt = LimitedCopy::new(&mut ups_r, &mut clt_w, ©_config); - let quit = if let Some(user) = user { - if user.is_blocked() { - return Err(ServerTaskError::CanceledAsUserBlocked); + self.transit_transparent2(clt_to_ups, ups_to_clt).await + } + + async fn transit_transparent2<'a, CR, CW, UR, UW>( + &self, + mut clt_to_ups: LimitedCopy<'a, CR, UW>, + mut ups_to_clt: LimitedCopy<'a, UR, CW>, + ) -> ServerTaskResult<()> + where + CR: AsyncRead + Unpin, + CW: AsyncWrite + Unpin, + UR: AsyncRead + Unpin, + UW: AsyncWrite + Unpin, + { + let idle_duration = self.idle_check_interval(); + let mut idle_interval = + tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self + .log_flush_interval() + .map(|log_interval| { + let interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(interval) + }) + .unwrap_or_default(); + let mut idle_count = 0; + loop { + tokio::select! { + biased; + + r = &mut clt_to_ups => { + let _ = ups_to_clt.write_flush().await; + return match r { + Ok(_) => Err(ServerTaskError::ClosedByClient), + Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::ClientTcpReadFailed(e)), + Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::UpstreamWriteFailed(e)), + }; + } + r = &mut ups_to_clt => { + let _ = clt_to_ups.write_flush().await; + return match r { + Ok(_) => Err(ServerTaskError::ClosedByUpstream), + Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::UpstreamReadFailed(e)), + Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::ClientTcpWriteFailed(e)), + }; + } + _ = log_interval.tick() => { + self.log_periodic(); + } + _ = idle_interval.tick() => { + if clt_to_ups.is_idle() && ups_to_clt.is_idle() { + idle_count += 1; + + let quit = if let Some(user) = self.user() { + if user.is_blocked() { + return Err(ServerTaskError::CanceledAsUserBlocked); + } + idle_count >= user.task_max_idle_count() + } else { + idle_count >= self.max_idle_count() + }; + + if quit { + return Err(ServerTaskError::Idle(idle_duration, idle_count)); } - idle_count >= user.task_max_idle_count() } else { - idle_count >= server_config.task_max_idle_count() - }; + idle_count = 0; - if quit { - return Err(ServerTaskError::Idle(idle_duration, idle_count)); + clt_to_ups.reset_active(); + ups_to_clt.reset_active(); } - } else { - idle_count = 0; - - clt_to_ups.reset_active(); - ups_to_clt.reset_active(); - } - if let Some(user) = user { - if user.is_blocked() { - return Err(ServerTaskError::CanceledAsUserBlocked); + if let Some(user) = self.user() { + if user.is_blocked() { + return Err(ServerTaskError::CanceledAsUserBlocked); + } } - } - if server_quit_policy.force_quit() { - return Err(ServerTaskError::CanceledAsServerQuit) + if self.quit_policy().force_quit() { + return Err(ServerTaskError::CanceledAsServerQuit) + } } - } - }; + }; + } } } @@ -168,7 +177,7 @@ where impl StreamInspectContext where - SC: ServerConfig + Send + Sync + 'static, + SC: ServerConfig, { #[inline] fn protocol_inspection(&self) -> &ProtocolInspectionConfig { @@ -230,12 +239,12 @@ where self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } - pub(super) async fn transit_transparent( + pub(crate) async fn transit_transparent( &self, - clt_r: CR, - clt_w: CW, - ups_r: UR, - ups_w: UW, + mut clt_r: CR, + mut clt_w: CW, + mut ups_r: UR, + mut ups_w: UW, ) -> ServerTaskResult<()> where CR: AsyncRead + Unpin, @@ -243,16 +252,69 @@ where UR: AsyncRead + Unpin, UW: AsyncWrite + Unpin, { - transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.server_config, - &self.server_quit_policy, - self.user(), - ) - .await + let copy_config = self.server_config.limited_copy_config(); + let mut clt_to_ups = LimitedCopy::new(&mut clt_r, &mut ups_w, ©_config); + let mut ups_to_clt = LimitedCopy::new(&mut ups_r, &mut clt_w, ©_config); + + let idle_duration = self.server_config.task_idle_check_duration(); + let mut idle_interval = + tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut idle_count = 0; + loop { + tokio::select! { + biased; + + r = &mut clt_to_ups => { + let _ = ups_to_clt.write_flush().await; + return match r { + Ok(_) => Err(ServerTaskError::ClosedByClient), + Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::ClientTcpReadFailed(e)), + Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::UpstreamWriteFailed(e)), + }; + } + r = &mut ups_to_clt => { + let _ = clt_to_ups.write_flush().await; + return match r { + Ok(_) => Err(ServerTaskError::ClosedByUpstream), + Err(LimitedCopyError::ReadFailed(e)) => Err(ServerTaskError::UpstreamReadFailed(e)), + Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::ClientTcpWriteFailed(e)), + }; + } + _ = idle_interval.tick() => { + if clt_to_ups.is_idle() && ups_to_clt.is_idle() { + idle_count += 1; + + let quit = if let Some(user) = self.user() { + if user.is_blocked() { + return Err(ServerTaskError::CanceledAsUserBlocked); + } + idle_count >= user.task_max_idle_count() + } else { + idle_count >= self.server_config.task_max_idle_count() + }; + + if quit { + return Err(ServerTaskError::Idle(idle_duration, idle_count)); + } + } else { + idle_count = 0; + + clt_to_ups.reset_active(); + ups_to_clt.reset_active(); + } + + if let Some(user) = self.user() { + if user.is_blocked() { + return Err(ServerTaskError::CanceledAsUserBlocked); + } + } + + if self.server_quit_policy.force_quit() { + return Err(ServerTaskError::CanceledAsServerQuit) + } + } + }; + } } } diff --git a/g3proxy/src/inspect/websocket/h1.rs b/g3proxy/src/inspect/websocket/h1.rs index 090380702..a76ad2f86 100644 --- a/g3proxy/src/inspect/websocket/h1.rs +++ b/g3proxy/src/inspect/websocket/h1.rs @@ -192,16 +192,9 @@ impl H1WebsocketInterceptObject { ups_w, } = self.io.take().unwrap(); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } async fn do_block(&mut self) -> ServerTaskResult<()> { @@ -237,15 +230,8 @@ impl H1WebsocketInterceptObject { ups_w, } = self.io.take().unwrap(); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } } diff --git a/g3proxy/src/inspect/websocket/h2.rs b/g3proxy/src/inspect/websocket/h2.rs index c8bdc7fc6..b09147b57 100644 --- a/g3proxy/src/inspect/websocket/h2.rs +++ b/g3proxy/src/inspect/websocket/h2.rs @@ -172,16 +172,9 @@ impl H2WebsocketInterceptObject { let ups_r = H2StreamReader::new(ups_r); let ups_w = H2StreamWriter::new(ups_w); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } async fn do_block( @@ -211,15 +204,8 @@ impl H2WebsocketInterceptObject { let ups_r = H2StreamReader::new(ups_r); let ups_w = H2StreamWriter::new(ups_w); - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.ctx.user(), - ) - .await + self.ctx + .transit_transparent(clt_r, clt_w, ups_r, ups_w) + .await } } diff --git a/g3proxy/src/log/task/tcp_connect.rs b/g3proxy/src/log/task/tcp_connect.rs index 78e8c7272..2f84a30b9 100644 --- a/g3proxy/src/log/task/tcp_connect.rs +++ b/g3proxy/src/log/task/tcp_connect.rs @@ -14,8 +14,6 @@ * limitations under the License. */ -use std::time::Duration; - use slog::{slog_info, Logger}; use g3_slog_types::{LtDateTime, LtDuration, LtIpAddr, LtUpstreamAddr, LtUuid}; @@ -28,7 +26,6 @@ pub(crate) struct TaskLogForTcpConnect<'a> { pub(crate) upstream: &'a UpstreamAddr, pub(crate) task_notes: &'a ServerTaskNotes, pub(crate) tcp_notes: &'a TcpConnectTaskNotes, - pub(crate) total_time: Duration, pub(crate) client_rd_bytes: u64, pub(crate) client_wr_bytes: u64, pub(crate) remote_rd_bytes: u64, @@ -36,6 +33,89 @@ pub(crate) struct TaskLogForTcpConnect<'a> { } impl TaskLogForTcpConnect<'_> { + pub(crate) fn log_created(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "TcpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "created", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "wait_time" => LtDuration(self.task_notes.wait_time), + ) + } + + pub(crate) fn log_connected(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "TcpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "connected", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "escaper" => self.tcp_notes.escaper.as_str(), + "next_bind_ip" => self.tcp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.tcp_notes.local, + "next_peer_addr" => self.tcp_notes.next, + "next_expire" => self.tcp_notes.expire.as_ref().map(LtDateTime), + "tcp_connect_tries" => self.tcp_notes.tries, + "tcp_connect_spend" => LtDuration(self.tcp_notes.duration), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + ) + } + + pub(crate) fn log_periodic(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "TcpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "periodic", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "escaper" => self.tcp_notes.escaper.as_str(), + "next_bind_ip" => self.tcp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.tcp_notes.local, + "next_peer_addr" => self.tcp_notes.next, + "next_expire" => self.tcp_notes.expire.as_ref().map(LtDateTime), + "tcp_connect_tries" => self.tcp_notes.tries, + "tcp_connect_spend" => LtDuration(self.tcp_notes.duration), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_wr_bytes" => self.client_wr_bytes, + "r_rd_bytes" => self.remote_rd_bytes, + "r_wr_bytes" => self.remote_wr_bytes, + ) + } + pub(crate) fn log(&self, logger: &Logger, e: &ServerTaskError) { if let Some(user_ctx) = self.task_notes.user_ctx() { if user_ctx.skip_log() { @@ -46,6 +126,7 @@ impl TaskLogForTcpConnect<'_> { slog_info!(logger, "{}", e; "task_type" => "TcpConnect", "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "finished", "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -62,7 +143,7 @@ impl TaskLogForTcpConnect<'_> { "reason" => e.brief(), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_wr_bytes" => self.client_wr_bytes, "r_rd_bytes" => self.remote_rd_bytes, diff --git a/g3proxy/src/serve/http_proxy/task/connect/task.rs b/g3proxy/src/serve/http_proxy/task/connect/task.rs index 7714b9199..bb1aec871 100644 --- a/g3proxy/src/serve/http_proxy/task/connect/task.rs +++ b/g3proxy/src/serve/http_proxy/task/connect/task.rs @@ -15,21 +15,23 @@ */ use std::sync::Arc; +use std::time::Duration; use http::Version; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::TcpStreamTaskStats; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedCopyConfig, LimitedReader, LimitedWriter}; use g3_types::acl::AclAction; use g3_types::net::{ProxyRequestType, UpstreamAddr}; use super::protocol::{HttpClientWriter, HttpProxyRequest}; use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats}; use crate::audit::AuditContext; +use crate::auth::User; use crate::config::server::ServerConfig; -use crate::inspect::StreamInspectContext; +use crate::inspect::{StreamInspectContext, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::http_forward::HttpProxyClientResponse; use crate::module::tcp_connect::{ @@ -328,13 +330,6 @@ impl HttpProxyConnectTask { } fn pre_start(&self) { - debug!( - "HttpProxy/CONNECT: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_http_connect.add_task(); self.ctx.server_stats.task_http_connect.inc_alive_task(); @@ -344,6 +339,10 @@ impl HttpProxyConnectTask { s.req_alive.add_http_connect(); }); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -365,7 +364,6 @@ impl HttpProxyConnectTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -411,6 +409,10 @@ impl HttpProxyConnectTask { UR: AsyncRead + Send + Sync + Unpin + 'static, UW: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + self.task_notes.stage = ServerTaskStage::Replying; self.reply_ok(&mut clt_w).await?; @@ -473,16 +475,7 @@ impl HttpProxyConnectTask { } } - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.task_notes.user_ctx().map(|ctx| ctx.user()), - ) - .await + self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } fn update_clt( @@ -538,3 +531,33 @@ impl HttpProxyConnectTask { (clt_r, clt_w) } } + +impl StreamTransitTask for HttpProxyConnectTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + self.task_notes.user_ctx().map(|ctx| ctx.user().as_ref()) + } +} diff --git a/g3proxy/src/serve/sni_proxy/task/relay/task.rs b/g3proxy/src/serve/sni_proxy/task/relay/task.rs index c36dbf7e0..330017791 100644 --- a/g3proxy/src/serve/sni_proxy/task/relay/task.rs +++ b/g3proxy/src/serve/sni_proxy/task/relay/task.rs @@ -18,18 +18,20 @@ use std::sync::Arc; use std::time::Duration; use bytes::BytesMut; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::{TcpStreamConnectionStats, TcpStreamTaskStats}; use g3_dpi::Protocol; -use g3_io_ext::{FlexBufReader, LimitedCopy, LimitedReader, LimitedWriter, OnceBufReader}; +use g3_io_ext::{ + FlexBufReader, LimitedCopy, LimitedCopyConfig, LimitedReader, LimitedWriter, OnceBufReader, +}; use g3_types::net::UpstreamAddr; use super::CommonTaskContext; use crate::audit::AuditContext; -use crate::config::server::ServerConfig; -use crate::inspect::{StreamInspectContext, StreamInspection}; +use crate::auth::User; +use crate::inspect::{StreamInspectContext, StreamInspection, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::tcp_connect::{TcpConnectTaskConf, TcpConnectTaskNotes}; use crate::serve::tcp_stream::TcpStreamTaskCltWrapperStats; @@ -71,7 +73,6 @@ impl TcpStreamTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -99,15 +100,11 @@ impl TcpStreamTask { } fn pre_start(&self) { - debug!( - "SniProxy: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.add_task(); self.ctx.server_stats.inc_alive_task(); + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&self) { @@ -170,6 +167,9 @@ impl TcpStreamTask { UR: AsyncRead + Send + Sync + Unpin + 'static, UW: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } self.task_notes.mark_relaying(); self.relay(clt_r, clt_r_buf, clt_w, ups_r, ups_w).await } @@ -264,13 +264,36 @@ impl TcpStreamTask { let clt_to_ups = LimitedCopy::with_data(&mut clt_r, &mut ups_w, ©_config, clt_r_buf.into()); let ups_to_clt = LimitedCopy::new(&mut ups_r, &mut clt_w, ©_config); - crate::inspect::stream::transit_transparent2( - clt_to_ups, - ups_to_clt, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - None, - ) - .await + self.transit_transparent2(clt_to_ups, ups_to_clt).await + } +} + +impl StreamTransitTask for TcpStreamTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + None } } diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs index 16ec073b6..35875800e 100644 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs @@ -16,20 +16,22 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::TcpStreamTaskStats; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedCopyConfig, LimitedReader, LimitedWriter}; use g3_socks::{v4a, v5, SocksVersion}; use g3_types::acl::AclAction; use g3_types::net::{ProxyRequestType, UpstreamAddr}; use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats}; use crate::audit::AuditContext; +use crate::auth::User; use crate::config::server::ServerConfig; -use crate::inspect::StreamInspectContext; +use crate::inspect::{StreamInspectContext, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::tcp_connect::{TcpConnectTaskConf, TcpConnectTaskNotes}; use crate::serve::{ @@ -81,7 +83,6 @@ impl SocksProxyTcpConnectTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -107,13 +108,6 @@ impl SocksProxyTcpConnectTask { } fn pre_start(&self) { - debug!( - "Socks/TcpConnect: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_tcp_connect.add_task(); self.ctx.server_stats.task_tcp_connect.inc_alive_task(); @@ -123,6 +117,10 @@ impl SocksProxyTcpConnectTask { s.req_alive.add_socks_tcp_connect(); }); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -325,6 +323,10 @@ impl SocksProxyTcpConnectTask { UR: AsyncRead + Send + Sync + Unpin + 'static, UW: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + self.task_notes.stage = ServerTaskStage::Replying; match self.socks_version { SocksVersion::V4a => { @@ -410,16 +412,7 @@ impl SocksProxyTcpConnectTask { } } - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - self.task_notes.user_ctx().map(|ctx| ctx.user()), - ) - .await + self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } fn update_clt(&mut self, clt_r: &mut LimitedReader, clt_w: &mut LimitedWriter) @@ -461,3 +454,33 @@ impl SocksProxyTcpConnectTask { clt_w.reset_stats(wrapper_stats); } } + +impl StreamTransitTask for SocksProxyTcpConnectTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + self.task_notes.user_ctx().map(|ctx| ctx.user().as_ref()) + } +} diff --git a/g3proxy/src/serve/tcp_stream/common.rs b/g3proxy/src/serve/tcp_stream/common.rs index eaf45e8ca..1a1528781 100644 --- a/g3proxy/src/serve/tcp_stream/common.rs +++ b/g3proxy/src/serve/tcp_stream/common.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use std::net::SocketAddr; use std::sync::Arc; use slog::Logger; @@ -36,10 +35,3 @@ pub(super) struct CommonTaskContext { pub(super) tls_client_config: Option>, pub(super) task_logger: Logger, } - -impl CommonTaskContext { - #[inline] - pub(crate) fn client_addr(&self) -> SocketAddr { - self.cc_info.client_addr() - } -} diff --git a/g3proxy/src/serve/tcp_stream/task.rs b/g3proxy/src/serve/tcp_stream/task.rs index 8587e0008..08c76c6f1 100644 --- a/g3proxy/src/serve/tcp_stream/task.rs +++ b/g3proxy/src/serve/tcp_stream/task.rs @@ -17,18 +17,18 @@ use std::sync::Arc; use std::time::Duration; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::TcpStreamTaskStats; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedCopyConfig, LimitedReader, LimitedWriter}; use g3_types::net::UpstreamAddr; use super::common::CommonTaskContext; use super::stats::TcpStreamTaskCltWrapperStats; use crate::audit::AuditContext; -use crate::config::server::ServerConfig; -use crate::inspect::StreamInspectContext; +use crate::auth::User; +use crate::inspect::{StreamInspectContext, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::tcp_connect::{TcpConnectTaskConf, TcpConnectTaskNotes, TlsConnectTaskConf}; use crate::serve::{ServerTaskError, ServerTaskNotes, ServerTaskResult, ServerTaskStage}; @@ -64,7 +64,6 @@ impl TcpStreamTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -89,15 +88,11 @@ impl TcpStreamTask { } fn pre_start(&self) { - debug!( - "TcpStream: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.add_task(); self.ctx.server_stats.inc_alive_task(); + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&self) { @@ -175,6 +170,9 @@ impl TcpStreamTask { UR: AsyncRead + Send + Sync + Unpin + 'static, UW: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } self.task_notes.mark_relaying(); self.relay(clt_r, clt_w, ups_r, ups_w).await } @@ -211,16 +209,7 @@ impl TcpStreamTask { ) .await } else { - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - None, - ) - .await + self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } } @@ -253,3 +242,33 @@ impl TcpStreamTask { (clt_r, clt_w) } } + +impl StreamTransitTask for TcpStreamTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + None + } +} diff --git a/g3proxy/src/serve/tcp_tproxy/common.rs b/g3proxy/src/serve/tcp_tproxy/common.rs index 3fe9379e3..a36512b1f 100644 --- a/g3proxy/src/serve/tcp_tproxy/common.rs +++ b/g3proxy/src/serve/tcp_tproxy/common.rs @@ -36,11 +36,6 @@ pub(super) struct CommonTaskContext { } impl CommonTaskContext { - #[inline] - pub(super) fn client_addr(&self) -> SocketAddr { - self.cc_info.client_addr() - } - #[inline] pub(super) fn target_addr(&self) -> SocketAddr { self.cc_info.server_addr() diff --git a/g3proxy/src/serve/tcp_tproxy/task.rs b/g3proxy/src/serve/tcp_tproxy/task.rs index 77c9bd6c1..657592c4d 100644 --- a/g3proxy/src/serve/tcp_tproxy/task.rs +++ b/g3proxy/src/serve/tcp_tproxy/task.rs @@ -17,18 +17,18 @@ use std::sync::Arc; use std::time::Duration; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::TcpStreamTaskStats; -use g3_io_ext::{LimitedReader, LimitedWriter}; +use g3_io_ext::{LimitedCopyConfig, LimitedReader, LimitedWriter}; use g3_types::net::UpstreamAddr; use super::common::CommonTaskContext; use crate::audit::AuditContext; -use crate::config::server::ServerConfig; -use crate::inspect::StreamInspectContext; +use crate::auth::User; +use crate::inspect::{StreamInspectContext, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::tcp_connect::{TcpConnectTaskConf, TcpConnectTaskNotes}; use crate::serve::tcp_stream::TcpStreamTaskCltWrapperStats; @@ -62,7 +62,6 @@ impl TProxyStreamTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -82,15 +81,11 @@ impl TProxyStreamTask { } fn pre_start(&self) { - debug!( - "TProxyStream: new transparent connection from {} to {} via server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.target_addr(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.add_task(); self.ctx.server_stats.inc_alive_task(); + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&self) { @@ -137,6 +132,9 @@ impl TProxyStreamTask { R: AsyncRead + Send + Sync + Unpin + 'static, W: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } self.task_notes.mark_relaying(); self.relay(clt_stream, ups_r, ups_w).await } @@ -172,16 +170,7 @@ impl TProxyStreamTask { ) .await } else { - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - None, - ) - .await + self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } } @@ -214,3 +203,33 @@ impl TProxyStreamTask { (clt_r, clt_w) } } + +impl StreamTransitTask for TProxyStreamTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + None + } +} diff --git a/g3proxy/src/serve/tls_stream/common.rs b/g3proxy/src/serve/tls_stream/common.rs index 904411187..daf7000e1 100644 --- a/g3proxy/src/serve/tls_stream/common.rs +++ b/g3proxy/src/serve/tls_stream/common.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use std::net::SocketAddr; use std::sync::Arc; use slog::Logger; @@ -36,10 +35,3 @@ pub(super) struct CommonTaskContext { pub(super) tls_client_config: Option>, pub(super) task_logger: Logger, } - -impl CommonTaskContext { - #[inline] - pub(super) fn client_addr(&self) -> SocketAddr { - self.cc_info.client_addr() - } -} diff --git a/g3proxy/src/serve/tls_stream/task.rs b/g3proxy/src/serve/tls_stream/task.rs index 3c797ec5c..883b2bc22 100644 --- a/g3proxy/src/serve/tls_stream/task.rs +++ b/g3proxy/src/serve/tls_stream/task.rs @@ -17,19 +17,19 @@ use std::sync::Arc; use std::time::Duration; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_rustls::server::TlsStream; +use g3_daemon::server::ServerQuitPolicy; use g3_daemon::stat::task::TcpStreamTaskStats; -use g3_io_ext::{AsyncStream, LimitedReader, LimitedWriter}; +use g3_io_ext::{AsyncStream, LimitedCopyConfig, LimitedReader, LimitedWriter}; use g3_types::net::UpstreamAddr; use super::common::CommonTaskContext; use crate::audit::AuditContext; -use crate::config::server::ServerConfig; -use crate::inspect::StreamInspectContext; +use crate::auth::User; +use crate::inspect::{StreamInspectContext, StreamTransitTask}; use crate::log::task::tcp_connect::TaskLogForTcpConnect; use crate::module::tcp_connect::{TcpConnectTaskConf, TcpConnectTaskNotes, TlsConnectTaskConf}; use crate::serve::tcp_stream::TcpStreamTaskCltWrapperStats; @@ -66,7 +66,6 @@ impl TlsStreamTask { upstream: &self.upstream, task_notes: &self.task_notes, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), @@ -86,15 +85,11 @@ impl TlsStreamTask { } fn pre_start(&self) { - debug!( - "TlsStream: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.add_task(); self.ctx.server_stats.inc_alive_task(); + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&self) { @@ -165,6 +160,9 @@ impl TlsStreamTask { R: AsyncRead + Send + Sync + Unpin + 'static, W: AsyncWrite + Send + Sync + Unpin + 'static, { + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } self.task_notes.mark_relaying(); self.relay(clt_stream, ups_r, ups_w).await } @@ -200,16 +198,7 @@ impl TlsStreamTask { ) .await } else { - crate::inspect::stream::transit_transparent( - clt_r, - clt_w, - ups_r, - ups_w, - &self.ctx.server_config, - &self.ctx.server_quit_policy, - None, - ) - .await + self.transit_transparent(clt_r, clt_w, ups_r, ups_w).await } } @@ -242,3 +231,33 @@ impl TlsStreamTask { (clt_r, clt_w) } } + +impl StreamTransitTask for TlsStreamTask { + fn copy_config(&self) -> LimitedCopyConfig { + self.ctx.server_config.tcp_copy + } + + fn idle_check_interval(&self) -> Duration { + self.ctx.server_config.task_idle_check_duration + } + + fn max_idle_count(&self) -> i32 { + self.ctx.server_config.task_idle_max_count + } + + fn log_periodic(&self) { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + + fn log_flush_interval(&self) -> Option { + self.ctx.server_config.task_log_flush_interval + } + + fn quit_policy(&self) -> &ServerQuitPolicy { + self.ctx.server_quit_policy.as_ref() + } + + fn user(&self) -> Option<&User> { + None + } +} diff --git a/lib/g3-io-ext/src/lib.rs b/lib/g3-io-ext/src/lib.rs index 8565b1b19..0b481f43d 100644 --- a/lib/g3-io-ext/src/lib.rs +++ b/lib/g3-io-ext/src/lib.rs @@ -18,6 +18,7 @@ mod cache; mod io; mod limit; mod listen; +mod time; mod udp; pub use cache::{ @@ -27,6 +28,7 @@ pub use cache::{ pub use io::*; pub use limit::*; pub use listen::*; +pub use time::*; pub use udp::*; pub mod haproxy; diff --git a/lib/g3-io-ext/src/time/mod.rs b/lib/g3-io-ext/src/time/mod.rs new file mode 100644 index 000000000..fb88e78e6 --- /dev/null +++ b/lib/g3-io-ext/src/time/mod.rs @@ -0,0 +1,18 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod optional_interval; +pub use optional_interval::OptionalInterval; diff --git a/lib/g3-io-ext/src/time/optional_interval.rs b/lib/g3-io-ext/src/time/optional_interval.rs new file mode 100644 index 000000000..2fbde63f9 --- /dev/null +++ b/lib/g3-io-ext/src/time/optional_interval.rs @@ -0,0 +1,59 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use tokio::time::{Instant, Interval}; + +#[derive(Default)] +pub struct OptionalInterval { + inner: Option, +} + +impl OptionalInterval { + pub fn with(inner: Interval) -> Self { + OptionalInterval { inner: Some(inner) } + } + + pub async fn tick(&mut self) -> Instant { + match &mut self.inner { + Some(interval) => interval.tick().await, + None => std::future::pending().await, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn never() { + let mut f = OptionalInterval::default(); + let r = tokio::time::timeout(Duration::from_millis(10), f.tick()).await; + assert!(r.is_err()); + } + + #[tokio::test] + async fn normal() { + let interval = Duration::from_millis(8); + let mut f = OptionalInterval::with(tokio::time::interval_at( + Instant::now() + interval, + interval, + )); + let r = tokio::time::timeout(Duration::from_millis(10), f.tick()).await; + assert!(r.is_ok()); + } +} From d1366243a7b7e5bd04f42ae48669dcc7e6254510 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 16:55:55 +0800 Subject: [PATCH 2/6] g3proxy: add support for periodic HttpForward task log --- .../doc/configuration/servers/http_rproxy.rst | 3 + g3proxy/src/config/server/http_rproxy/mod.rs | 20 ++++ g3proxy/src/log/task/http_forward.rs | 104 ++++++++++++++++++ .../src/serve/http_proxy/task/forward/task.rs | 86 +++++++++++---- .../serve/http_rproxy/task/forward/task.rs | 39 +++++-- 5 files changed, 219 insertions(+), 33 deletions(-) diff --git a/g3proxy/doc/configuration/servers/http_rproxy.rst b/g3proxy/doc/configuration/servers/http_rproxy.rst index c1e44aba3..ae19462ad 100644 --- a/g3proxy/doc/configuration/servers/http_rproxy.rst +++ b/g3proxy/doc/configuration/servers/http_rproxy.rst @@ -20,6 +20,9 @@ The following common keys are supported: * :ref:`tcp_misc_opts ` * :ref:`task_idle_check_duration ` * :ref:`task_idle_max_count ` +* :ref:`flush_task_log_on_created ` +* :ref:`flush_task_log_on_connected ` +* :ref:`task_log_flush_interval ` * :ref:`extra_metrics_tags ` The auth scheme supported by the server is determined by the type of the specified user group. diff --git a/g3proxy/src/config/server/http_rproxy/mod.rs b/g3proxy/src/config/server/http_rproxy/mod.rs index 7e58514fb..28c831e2f 100644 --- a/g3proxy/src/config/server/http_rproxy/mod.rs +++ b/g3proxy/src/config/server/http_rproxy/mod.rs @@ -76,6 +76,9 @@ pub(crate) struct HttpRProxyServerConfig { pub(crate) timeout: HttpRProxyServerTimeoutConfig, pub(crate) task_idle_check_duration: Duration, pub(crate) task_idle_max_count: i32, + pub(crate) flush_task_log_on_created: bool, + pub(crate) flush_task_log_on_connected: bool, + pub(crate) task_log_flush_interval: Option, pub(crate) tcp_copy: LimitedCopyConfig, pub(crate) tcp_misc_opts: TcpMiscSockOpts, pub(crate) req_hdr_max_size: usize, @@ -113,6 +116,9 @@ impl HttpRProxyServerConfig { timeout: HttpRProxyServerTimeoutConfig::default(), task_idle_check_duration: IDLE_CHECK_DEFAULT_DURATION, task_idle_max_count: 1, + flush_task_log_on_created: false, + flush_task_log_on_connected: false, + task_log_flush_interval: None, tcp_copy: Default::default(), tcp_misc_opts: Default::default(), req_hdr_max_size: 65536, // 64KiB @@ -232,6 +238,20 @@ impl HttpRProxyServerConfig { g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?; Ok(()) } + "flush_task_log_on_created" => { + self.flush_task_log_on_created = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "flush_task_log_on_connected" => { + self.flush_task_log_on_connected = g3_yaml::value::as_bool(v)?; + Ok(()) + } + "task_log_flush_interval" => { + let interval = g3_yaml::humanize::as_duration(v) + .context(format!("invalid humanize duration value for key {k}"))?; + self.task_log_flush_interval = Some(interval); + Ok(()) + } "req_header_recv_timeout" => { self.timeout.recv_req_header = g3_yaml::humanize::as_duration(v) .context(format!("invalid humanize duration value for key {k}"))?; diff --git a/g3proxy/src/log/task/http_forward.rs b/g3proxy/src/log/task/http_forward.rs index 3096aa4f9..e172fccdd 100644 --- a/g3proxy/src/log/task/http_forward.rs +++ b/g3proxy/src/log/task/http_forward.rs @@ -41,6 +41,109 @@ pub(crate) struct TaskLogForHttpForward<'a> { } impl TaskLogForHttpForward<'_> { + pub(crate) fn log_created(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "HttpForward", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "created", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "pipeline_wait" => LtDuration(self.http_notes.pipeline_wait), + "method" => LtHttpMethod(&self.http_notes.method), + "uri" => LtHttpUri::new(&self.http_notes.uri, self.http_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "wait_time" => LtDuration(self.task_notes.wait_time), + ) + } + + pub(crate) fn log_connected(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "HttpForward", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "connected", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "escaper" => self.tcp_notes.escaper.as_str(), + "next_bind_ip" => self.tcp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.tcp_notes.local, + "next_peer_addr" => self.tcp_notes.next, + "next_expire" => self.tcp_notes.expire.as_ref().map(LtDateTime), + "tcp_connect_tries" => self.tcp_notes.tries, + "tcp_connect_spend" => LtDuration(self.tcp_notes.duration), + "pipeline_wait" => LtDuration(self.http_notes.pipeline_wait), + "reuse_connection" => self.http_notes.reuse_connection, + "method" => LtHttpMethod(&self.http_notes.method), + "uri" => LtHttpUri::new(&self.http_notes.uri, self.http_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + ) + } + + pub(crate) fn log_periodic(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "HttpForward", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "periodic", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.upstream), + "escaper" => self.tcp_notes.escaper.as_str(), + "next_bind_ip" => self.tcp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.tcp_notes.local, + "next_peer_addr" => self.tcp_notes.next, + "next_expire" => self.tcp_notes.expire.as_ref().map(LtDateTime), + "tcp_connect_tries" => self.tcp_notes.tries, + "tcp_connect_spend" => LtDuration(self.tcp_notes.duration), + "pipeline_wait" => LtDuration(self.http_notes.pipeline_wait), + "reuse_connection" => self.http_notes.reuse_connection, + "method" => LtHttpMethod(&self.http_notes.method), + "uri" => LtHttpUri::new(&self.http_notes.uri, self.http_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "rsp_status" => self.http_notes.rsp_status, + "origin_status" => self.http_notes.origin_status, + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "dur_req_send_hdr" => LtDuration(self.http_notes.dur_req_send_hdr), + "dur_req_send_all" => LtDuration(self.http_notes.dur_req_send_all), + "dur_rsp_recv_hdr" => LtDuration(self.http_notes.dur_rsp_recv_hdr), + "dur_rsp_recv_all" => LtDuration(self.http_notes.dur_rsp_recv_all), + "c_rd_bytes" => self.client_rd_bytes, + "c_wr_bytes" => self.client_wr_bytes, + "r_rd_bytes" => self.remote_rd_bytes, + "r_wr_bytes" => self.remote_wr_bytes, + ) + } + pub(crate) fn log(&self, logger: &Logger, e: &ServerTaskError) { if let Some(user_ctx) = self.task_notes.user_ctx() { if user_ctx.skip_log() { @@ -51,6 +154,7 @@ impl TaskLogForHttpForward<'_> { slog_info!(logger, "{}", e; "task_type" => "HttpForward", "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "finished", "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), diff --git a/g3proxy/src/serve/http_proxy/task/forward/task.rs b/g3proxy/src/serve/http_proxy/task/forward/task.rs index e37a9b078..fb2ca0fe7 100644 --- a/g3proxy/src/serve/http_proxy/task/forward/task.rs +++ b/g3proxy/src/serve/http_proxy/task/forward/task.rs @@ -20,7 +20,6 @@ use std::time::Duration; use anyhow::anyhow; use futures_util::FutureExt; use http::header; -use log::debug; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::time::Instant; @@ -36,7 +35,7 @@ use g3_icap_client::respmod::h1::{ }; use g3_io_ext::{ GlobalLimitGroup, LimitedBufReadExt, LimitedCopy, LimitedCopyError, LimitedReadExt, - LimitedWriteExt, + LimitedWriteExt, OptionalInterval, }; use g3_types::acl::AclAction; use g3_types::net::{HttpHeaderMap, ProxyRequestType, UpstreamAddr}; @@ -224,6 +223,18 @@ impl<'a> HttpProxyForwardTask<'a> { } } + fn get_log_interval(&self) -> OptionalInterval { + self.ctx + .server_config + .task_log_flush_interval + .map(|log_interval| { + let log_interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(log_interval) + }) + .unwrap_or_default() + } + pub(crate) async fn run( &mut self, clt_r: &mut Option>, @@ -247,13 +258,6 @@ impl<'a> HttpProxyForwardTask<'a> { } fn pre_start(&self) { - debug!( - "HttpProxy/FORWARD: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_http_forward.add_task(); self.ctx.server_stats.task_http_forward.inc_alive_task(); @@ -263,6 +267,10 @@ impl<'a> HttpProxyForwardTask<'a> { s.req_alive.add_http_forward(self.is_https); }); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -737,6 +745,11 @@ impl<'a> HttpProxyForwardTask<'a> { }; } } + + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + ups_c.0.prepare_new(&self.task_notes, &self.upstream); if audit_task { @@ -804,7 +817,7 @@ impl<'a> HttpProxyForwardTask<'a> { ups_c: BoxHttpForwardConnection, ) -> ServerTaskResult> where - CDR: AsyncRead + Unpin, + CDR: AsyncRead + Send + Unpin, CDW: AsyncWrite + Send + Unpin, { if self.req.body_type().is_none() { @@ -848,6 +861,8 @@ impl<'a> HttpProxyForwardTask<'a> { ) .boxed(); + let mut log_interval = self.get_log_interval(); + let mut rsp_header: Option = None; loop { tokio::select! { @@ -896,6 +911,9 @@ impl<'a> HttpProxyForwardTask<'a> { } } } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } } } drop(adaptation_fut); @@ -1117,7 +1135,7 @@ impl<'a> HttpProxyForwardTask<'a> { mut ups_c: BoxHttpForwardConnection, ) -> ServerTaskResult> where - R: AsyncBufRead + Unpin, + R: AsyncBufRead + Send + Unpin, W: AsyncWrite + Send + Unpin, { let ups_w = &mut ups_c.0; @@ -1183,6 +1201,7 @@ impl<'a> HttpProxyForwardTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { tokio::select! { @@ -1216,6 +1235,9 @@ impl<'a> HttpProxyForwardTask<'a> { self.http_notes.mark_req_send_all(); break; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if clt_to_ups.is_idle() { idle_count += 1; @@ -1358,7 +1380,7 @@ impl<'a> HttpProxyForwardTask<'a> { adaptation_respond_shared_headers: Option, ) -> ServerTaskResult<()> where - R: AsyncBufRead + Unpin, + R: AsyncBufRead + Send + Unpin, W: AsyncWrite + Send + Unpin, { if self.should_close { @@ -1435,22 +1457,34 @@ impl<'a> HttpProxyForwardTask<'a> { adaptation_state: &mut RespmodAdaptationRunState, ) -> ServerTaskResult<()> where - R: AsyncBufRead + Unpin, + R: AsyncBufRead + Send + Unpin, W: AsyncWrite + Send + Unpin, { - match icap_adapter + let mut log_interval = self.get_log_interval(); + let mut adaptation_fut = icap_adapter .xfer(adaptation_state, self.req, rsp_header, ups_r, clt_w) - .await - { - Ok(RespmodAdaptationEndState::OriginalTransferred) => { - self.http_notes.rsp_status = rsp_header.code; - Ok(()) - } - Ok(RespmodAdaptationEndState::AdaptedTransferred(adapted_rsp)) => { - self.http_notes.rsp_status = adapted_rsp.code; - Ok(()) + .boxed(); + loop { + tokio::select! { + biased; + + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } + r = &mut adaptation_fut => { + return match r { + Ok(RespmodAdaptationEndState::OriginalTransferred) => { + self.http_notes.rsp_status = rsp_header.code; + Ok(()) + } + Ok(RespmodAdaptationEndState::AdaptedTransferred(adapted_rsp)) => { + self.http_notes.rsp_status = adapted_rsp.code; + Ok(()) + } + Err(e) => Err(e.into()), + } + } } - Err(e) => Err(e.into()), } } @@ -1504,6 +1538,7 @@ impl<'a> HttpProxyForwardTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { tokio::select! { @@ -1525,6 +1560,9 @@ impl<'a> HttpProxyForwardTask<'a> { Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::ClientTcpWriteFailed(e)), }; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if ups_to_clt.is_idle() { idle_count += 1; diff --git a/g3proxy/src/serve/http_rproxy/task/forward/task.rs b/g3proxy/src/serve/http_rproxy/task/forward/task.rs index d6fae7ec5..4b9e4b4f9 100644 --- a/g3proxy/src/serve/http_rproxy/task/forward/task.rs +++ b/g3proxy/src/serve/http_rproxy/task/forward/task.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use futures_util::FutureExt; use http::header; -use log::debug; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::time::Instant; @@ -27,7 +26,7 @@ use g3_http::server::HttpProxyClientRequest; use g3_http::{HttpBodyReader, HttpBodyType}; use g3_io_ext::{ GlobalLimitGroup, LimitedBufReadExt, LimitedCopy, LimitedCopyError, LimitedReadExt, - LimitedWriteExt, + LimitedWriteExt, OptionalInterval, }; use g3_types::acl::AclAction; @@ -205,6 +204,18 @@ impl<'a> HttpRProxyForwardTask<'a> { } } + fn get_log_interval(&self) -> OptionalInterval { + self.ctx + .server_config + .task_log_flush_interval + .map(|log_interval| { + let log_interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(log_interval) + }) + .unwrap_or_default() + } + pub(crate) async fn run( &mut self, clt_r: &mut Option>, @@ -228,13 +239,6 @@ impl<'a> HttpRProxyForwardTask<'a> { } fn pre_start(&self) { - debug!( - "HttpRProxy/FORWARD: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_http_forward.add_task(); self.ctx.server_stats.task_http_forward.inc_alive_task(); @@ -244,6 +248,10 @@ impl<'a> HttpRProxyForwardTask<'a> { s.req_alive.add_http_forward(self.is_https); }); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -620,6 +628,11 @@ impl<'a> HttpRProxyForwardTask<'a> { }; } } + + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + ups_c .0 .prepare_new(&self.task_notes, self.host.config.upstream()); @@ -812,6 +825,7 @@ impl<'a> HttpRProxyForwardTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { tokio::select! { @@ -845,6 +859,9 @@ impl<'a> HttpRProxyForwardTask<'a> { self.http_notes.mark_req_send_all(); break; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if clt_to_ups.is_idle() { idle_count += 1; @@ -1029,6 +1046,7 @@ impl<'a> HttpRProxyForwardTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { tokio::select! { @@ -1050,6 +1068,9 @@ impl<'a> HttpRProxyForwardTask<'a> { Err(LimitedCopyError::WriteFailed(e)) => Err(ServerTaskError::ClientTcpWriteFailed(e)), }; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if ups_to_clt.is_idle() { idle_count += 1; From 8a555830ad7ee6727ef60497b9906c96e2c82e7a Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 17:08:17 +0800 Subject: [PATCH 3/6] g3proxy: add support for periodic FtpOverHttp task log --- g3proxy/src/log/task/ftp_over_http.rs | 104 ++++++++++++++++++ g3proxy/src/serve/http_proxy/task/ftp/task.rs | 39 +++++-- 2 files changed, 134 insertions(+), 9 deletions(-) diff --git a/g3proxy/src/log/task/ftp_over_http.rs b/g3proxy/src/log/task/ftp_over_http.rs index 1978ff2e8..022699f8f 100644 --- a/g3proxy/src/log/task/ftp_over_http.rs +++ b/g3proxy/src/log/task/ftp_over_http.rs @@ -39,6 +39,109 @@ pub(crate) struct TaskLogForFtpOverHttp<'a> { } impl TaskLogForFtpOverHttp<'_> { + pub(crate) fn log_created(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "FtpOverHttp", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "created", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.ftp_notes.upstream()), + "method" => LtHttpMethod(&self.ftp_notes.method), + "uri" => LtHttpUri::new(&self.ftp_notes.uri, self.ftp_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "wait_time" => LtDuration(self.task_notes.wait_time), + ) + } + + pub(crate) fn log_connected(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "FtpOverHttp", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "connected", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.ftp_notes.upstream()), + "escaper" => self.ftp_notes.control_tcp_notes.escaper.as_str(), + "next_bind_ip" => self.ftp_notes.control_tcp_notes.bind.ip().map(LtIpAddr), + "next_expire" => self.ftp_notes.control_tcp_notes.expire.as_ref().map(LtDateTime), + "ftp_c_bound_addr" => self.ftp_notes.control_tcp_notes.local, + "ftp_c_peer_addr" => self.ftp_notes.control_tcp_notes.next, + "ftp_c_connect_tries" => self.ftp_notes.control_tcp_notes.tries, + "ftp_c_connect_spend" => LtDuration(self.ftp_notes.control_tcp_notes.duration), + "method" => LtHttpMethod(&self.ftp_notes.method), + "uri" => LtHttpUri::new(&self.ftp_notes.uri, self.ftp_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "rsp_status" => self.ftp_notes.rsp_status, + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "ftp_c_rd_bytes" => self.ftp_c_rd_bytes, + "ftp_c_wr_bytes" => self.ftp_c_wr_bytes, + ) + } + + pub(crate) fn log_periodic(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "FtpOverHttp", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "periodic", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "server_addr" => self.task_notes.server_addr(), + "client_addr" => self.task_notes.client_addr(), + "upstream" => LtUpstreamAddr(self.ftp_notes.upstream()), + "escaper" => self.ftp_notes.control_tcp_notes.escaper.as_str(), + "next_bind_ip" => self.ftp_notes.control_tcp_notes.bind.ip().map(LtIpAddr), + "next_expire" => self.ftp_notes.control_tcp_notes.expire.as_ref().map(LtDateTime), + "ftp_c_bound_addr" => self.ftp_notes.control_tcp_notes.local, + "ftp_c_peer_addr" => self.ftp_notes.control_tcp_notes.next, + "ftp_c_connect_tries" => self.ftp_notes.control_tcp_notes.tries, + "ftp_c_connect_spend" => LtDuration(self.ftp_notes.control_tcp_notes.duration), + "ftp_d_bound_addr" => self.ftp_notes.transfer_tcp_notes.local, + "ftp_d_peer_addr" => self.ftp_notes.transfer_tcp_notes.next, + "ftp_d_connect_tries" => self.ftp_notes.transfer_tcp_notes.tries, + "ftp_d_connect_spend" => LtDuration(self.ftp_notes.transfer_tcp_notes.duration), + "method" => LtHttpMethod(&self.ftp_notes.method), + "uri" => LtHttpUri::new(&self.ftp_notes.uri, self.ftp_notes.uri_log_max_chars), + "user_agent" => self.http_user_agent, + "rsp_status" => self.ftp_notes.rsp_status, + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "total_time" => LtDuration(self.total_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_wr_bytes" => self.client_wr_bytes, + "ftp_c_rd_bytes" => self.ftp_c_rd_bytes, + "ftp_c_wr_bytes" => self.ftp_c_wr_bytes, + "ftp_d_rd_bytes" => self.ftp_d_rd_bytes, + "ftp_d_wr_bytes" => self.ftp_d_wr_bytes, + ) + } + pub(crate) fn log(&self, logger: &Logger, e: &ServerTaskError) { if let Some(user_ctx) = self.task_notes.user_ctx() { if user_ctx.skip_log() { @@ -49,6 +152,7 @@ impl TaskLogForFtpOverHttp<'_> { slog_info!(logger, "{}", e; "task_type" => "FtpOverHttp", "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "finished", "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), diff --git a/g3proxy/src/serve/http_proxy/task/ftp/task.rs b/g3proxy/src/serve/http_proxy/task/ftp/task.rs index cacd90e63..a1e94f526 100644 --- a/g3proxy/src/serve/http_proxy/task/ftp/task.rs +++ b/g3proxy/src/serve/http_proxy/task/ftp/task.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use anyhow::anyhow; use http::Method; -use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::Instant; @@ -29,7 +28,7 @@ use g3_ftp_client::{ }; use g3_http::server::HttpProxyClientRequest; use g3_http::{HttpBodyDecodeReader, HttpBodyReader, HttpBodyType}; -use g3_io_ext::{GlobalLimitGroup, LimitedCopy, LimitedCopyError, SizedReader}; +use g3_io_ext::{GlobalLimitGroup, LimitedCopy, LimitedCopyError, OptionalInterval, SizedReader}; use g3_types::acl::AclAction; use g3_types::net::ProxyRequestType; @@ -110,6 +109,18 @@ impl<'a> FtpOverHttpTask<'a> { } } + fn get_log_interval(&self) -> OptionalInterval { + self.ctx + .server_config + .task_log_flush_interval + .map(|log_interval| { + let log_interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(log_interval) + }) + .unwrap_or_default() + } + pub(crate) async fn run( &mut self, clt_r: &mut HttpClientReader, @@ -132,13 +143,6 @@ impl<'a> FtpOverHttpTask<'a> { } fn pre_start(&self) { - debug!( - "HttpProxy/FtpOverHttp: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_ftp_over_http.add_task(); self.ctx.server_stats.task_ftp_over_http.inc_alive_task(); @@ -148,6 +152,10 @@ impl<'a> FtpOverHttpTask<'a> { s.req_alive.add_ftp_over_http(); }); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -650,6 +658,11 @@ impl<'a> FtpOverHttpTask<'a> { .connection_provider() .connect_context() .fetch_control_tcp_notes(&mut self.ftp_notes.control_tcp_notes); + + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + Ok(client) } Err((e, ftp_connection_provider)) => { @@ -1184,6 +1197,7 @@ impl<'a> FtpOverHttpTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { tokio::select! { @@ -1226,6 +1240,9 @@ impl<'a> FtpOverHttpTask<'a> { Err(_) => Err(ServerTaskError::UpstreamAppTimeout("timeout to wait transfer end")), }; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if data_copy.is_idle() { idle_count += 1; @@ -1397,6 +1414,7 @@ impl<'a> FtpOverHttpTask<'a> { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self.get_log_interval(); let mut idle_count = 0; loop { @@ -1428,6 +1446,9 @@ impl<'a> FtpOverHttpTask<'a> { anyhow!("unexpected server end reply after {} bytes sent)", data_copy.copied_size()) )); } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if data_copy.is_idle() { idle_count += 1; From ac02af5cac825e921d4643c599f17bfd26899a8f Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 17:22:02 +0800 Subject: [PATCH 4/6] g3proxy: add support for periodic UDP task log --- g3proxy/src/log/task/udp_associate.rs | 86 +++++++++++++++++- g3proxy/src/log/task/udp_connect.rs | 89 +++++++++++++++++++ .../socks_proxy/task/udp_associate/task.rs | 35 +++++--- .../socks_proxy/task/udp_connect/task.rs | 35 +++++--- 4 files changed, 221 insertions(+), 24 deletions(-) diff --git a/g3proxy/src/log/task/udp_associate.rs b/g3proxy/src/log/task/udp_associate.rs index 654b4105b..98d94beef 100644 --- a/g3proxy/src/log/task/udp_associate.rs +++ b/g3proxy/src/log/task/udp_associate.rs @@ -19,11 +19,12 @@ use std::time::Duration; use slog::{slog_info, Logger}; -use crate::module::udp_relay::UdpRelayTaskNotes; -use crate::serve::{ServerTaskError, ServerTaskNotes}; use g3_slog_types::{LtDateTime, LtDuration, LtUpstreamAddr, LtUuid}; use g3_types::net::UpstreamAddr; +use crate::module::udp_relay::UdpRelayTaskNotes; +use crate::serve::{ServerTaskError, ServerTaskNotes}; + pub(crate) struct TaskLogForUdpAssociate<'a> { pub(crate) task_notes: &'a ServerTaskNotes, pub(crate) tcp_server_addr: SocketAddr, @@ -44,6 +45,86 @@ pub(crate) struct TaskLogForUdpAssociate<'a> { } impl TaskLogForUdpAssociate<'_> { + pub(crate) fn log_created(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpAssociate", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "created", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "wait_time" => LtDuration(self.task_notes.wait_time), + ) + } + + pub(crate) fn log_connected(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpAssociate", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "connected", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "udp_listen_addr" => self.udp_listen_addr, + "udp_client_addr" => self.udp_client_addr, + "initial_peer" => LtUpstreamAddr(self.initial_peer), + "escaper" => self.udp_notes.escaper.as_str(), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_rd_packets" => self.client_rd_packets, + ) + } + + pub(crate) fn log_periodic(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpAssociate", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "periodic", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "udp_listen_addr" => self.udp_listen_addr, + "udp_client_addr" => self.udp_client_addr, + "initial_peer" => LtUpstreamAddr(self.initial_peer), + "escaper" => self.udp_notes.escaper.as_str(), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_rd_packets" => self.client_rd_packets, + "c_wr_bytes" => self.client_wr_bytes, + "c_wr_packets" => self.client_wr_packets, + "r_rd_bytes" => self.remote_rd_bytes, + "r_rd_packets" => self.remote_rd_packets, + "r_wr_bytes" => self.remote_wr_bytes, + "r_wr_packets" => self.remote_wr_packets, + ) + } + pub(crate) fn log(&self, logger: &Logger, e: &ServerTaskError) { if let Some(user_ctx) = self.task_notes.user_ctx() { if user_ctx.skip_log() { @@ -54,6 +135,7 @@ impl TaskLogForUdpAssociate<'_> { slog_info!(logger, "{}", e; "task_type" => "UdpAssociate", "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "finished", "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), diff --git a/g3proxy/src/log/task/udp_connect.rs b/g3proxy/src/log/task/udp_connect.rs index e2439b25e..6c5992da4 100644 --- a/g3proxy/src/log/task/udp_connect.rs +++ b/g3proxy/src/log/task/udp_connect.rs @@ -45,6 +45,94 @@ pub(crate) struct TaskLogForUdpConnect<'a> { } impl TaskLogForUdpConnect<'_> { + pub(crate) fn log_created(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "created", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "wait_time" => LtDuration(self.task_notes.wait_time), + ) + } + + pub(crate) fn log_connected(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "connected", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "udp_listen_addr" => self.udp_listen_addr, + "udp_client_addr" => self.udp_client_addr, + "upstream" => self.upstream.map(LtUpstreamAddr), + "escaper" => self.udp_notes.escaper.as_str(), + "next_bind_ip" => self.udp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.udp_notes.local, + "next_peer_addr" => self.udp_notes.next, + "next_expire" => self.udp_notes.expire.as_ref().map(LtDateTime), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_rd_packets" => self.client_rd_packets, + ) + } + + pub(crate) fn log_periodic(&self, logger: &Logger) { + if let Some(user_ctx) = self.task_notes.user_ctx() { + if user_ctx.skip_log() { + return; + } + } + + slog_info!(logger, ""; + "task_type" => "UdpConnect", + "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "periodic", + "stage" => self.task_notes.stage.brief(), + "start_at" => LtDateTime(&self.task_notes.start_at), + "user" => self.task_notes.raw_user_name(), + "tcp_server_addr" => self.tcp_server_addr, + "tcp_client_addr" => self.tcp_client_addr, + "udp_listen_addr" => self.udp_listen_addr, + "udp_client_addr" => self.udp_client_addr, + "upstream" => self.upstream.map(LtUpstreamAddr), + "escaper" => self.udp_notes.escaper.as_str(), + "next_bind_ip" => self.udp_notes.bind.ip().map(LtIpAddr), + "next_bound_addr" => self.udp_notes.local, + "next_peer_addr" => self.udp_notes.next, + "next_expire" => self.udp_notes.expire.as_ref().map(LtDateTime), + "wait_time" => LtDuration(self.task_notes.wait_time), + "ready_time" => LtDuration(self.task_notes.ready_time), + "c_rd_bytes" => self.client_rd_bytes, + "c_rd_packets" => self.client_rd_packets, + "c_wr_bytes" => self.client_wr_bytes, + "c_wr_packets" => self.client_wr_packets, + "r_rd_bytes" => self.remote_rd_bytes, + "r_rd_packets" => self.remote_rd_packets, + "r_wr_bytes" => self.remote_wr_bytes, + "r_wr_packets" => self.remote_wr_packets, + ) + } + pub(crate) fn log(&self, logger: &Logger, e: &ServerTaskError) { if let Some(user_ctx) = self.task_notes.user_ctx() { if user_ctx.skip_log() { @@ -55,6 +143,7 @@ impl TaskLogForUdpConnect<'_> { slog_info!(logger, "{}", e; "task_type" => "UdpConnect", "task_id" => LtUuid(&self.task_notes.id), + "task_event" => "finished", "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), diff --git a/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs index e44082fa0..9a1c1a2f4 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs @@ -18,16 +18,15 @@ use std::future::poll_fn; use std::net::SocketAddr; use std::sync::Arc; -use log::debug; use slog::Logger; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use tokio::net::UdpSocket; use tokio::time::Instant; use g3_io_ext::{ - LimitedUdpRecv, LimitedUdpSend, UdpRecvHalf, UdpRelayClientRecv, UdpRelayClientSend, - UdpRelayClientToRemote, UdpRelayError, UdpRelayRemoteRecv, UdpRelayRemoteSend, - UdpRelayRemoteToClient, UdpSendHalf, + LimitedUdpRecv, LimitedUdpSend, OptionalInterval, UdpRecvHalf, UdpRelayClientRecv, + UdpRelayClientSend, UdpRelayClientToRemote, UdpRelayError, UdpRelayRemoteRecv, + UdpRelayRemoteSend, UdpRelayRemoteToClient, UdpSendHalf, }; use g3_socks::v5::Socks5Reply; use g3_types::acl::AclAction; @@ -112,13 +111,6 @@ impl SocksProxyUdpAssociateTask { } fn pre_start(&self) { - debug!( - "SocksProxy/UdpAssociate: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_udp_associate.add_task(); self.ctx.server_stats.task_udp_associate.inc_alive_task(); @@ -126,6 +118,10 @@ impl SocksProxyUdpAssociateTask { user_ctx.req_stats().req_total.add_socks_udp_associate(); user_ctx.req_stats().req_alive.add_socks_udp_associate(); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -279,6 +275,16 @@ impl SocksProxyUdpAssociateTask { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self + .ctx + .server_config + .task_log_flush_interval + .map(|log_interval| { + let interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(interval) + }) + .unwrap_or_default(); let mut idle_count = 0; let mut buf: [u8; 4] = [0; 4]; loop { @@ -325,6 +331,9 @@ impl SocksProxyUdpAssociateTask { } Err(UdpRelayError::ClientError(e)) => Err(e.into()), }; + } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); } _ = idle_interval.tick() => { if c_to_r.is_idle() && r_to_c.is_idle() { @@ -495,6 +504,10 @@ impl SocksProxyUdpAssociateTask { .await?; self.task_notes.stage = ServerTaskStage::Connected; + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + poll_fn(|cx| ups_w.poll_send_packet(cx, &buf[buf_off..buf_nr], &self.initial_peer)).await?; let clt_w = Socks5UdpAssociateClientSend::new(clt_w, udp_client_addr); diff --git a/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs index d091a1b94..89f721774 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs @@ -18,16 +18,15 @@ use std::future::poll_fn; use std::net::SocketAddr; use std::sync::Arc; -use log::debug; use slog::Logger; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use tokio::net::UdpSocket; use tokio::time::Instant; use g3_io_ext::{ - LimitedUdpRecv, LimitedUdpSend, UdpCopyClientRecv, UdpCopyClientSend, UdpCopyClientToRemote, - UdpCopyError, UdpCopyRemoteRecv, UdpCopyRemoteSend, UdpCopyRemoteToClient, UdpRecvHalf, - UdpSendHalf, + LimitedUdpRecv, LimitedUdpSend, OptionalInterval, UdpCopyClientRecv, UdpCopyClientSend, + UdpCopyClientToRemote, UdpCopyError, UdpCopyRemoteRecv, UdpCopyRemoteSend, + UdpCopyRemoteToClient, UdpRecvHalf, UdpSendHalf, }; use g3_socks::v5::Socks5Reply; use g3_types::acl::AclAction; @@ -112,13 +111,6 @@ impl SocksProxyUdpConnectTask { } fn pre_start(&self) { - debug!( - "SocksProxy/UdpConnect: new client from {} to {} server {}, using escaper {}", - self.ctx.client_addr(), - self.ctx.server_config.server_type(), - self.ctx.server_config.name(), - self.ctx.server_config.escaper - ); self.ctx.server_stats.task_udp_connect.add_task(); self.ctx.server_stats.task_udp_connect.inc_alive_task(); @@ -126,6 +118,10 @@ impl SocksProxyUdpConnectTask { user_ctx.req_stats().req_total.add_socks_udp_connect(); user_ctx.req_stats().req_alive.add_socks_udp_connect(); } + + if self.ctx.server_config.flush_task_log_on_created { + self.get_log_context().log_created(&self.ctx.task_logger); + } } fn pre_stop(&mut self) { @@ -329,6 +325,16 @@ impl SocksProxyUdpConnectTask { let idle_duration = self.ctx.server_config.task_idle_check_duration; let mut idle_interval = tokio::time::interval_at(Instant::now() + idle_duration, idle_duration); + let mut log_interval = self + .ctx + .server_config + .task_log_flush_interval + .map(|log_interval| { + let interval = + tokio::time::interval_at(Instant::now() + log_interval, log_interval); + OptionalInterval::with(interval) + }) + .unwrap_or_default(); let mut idle_count = 0; let mut buf: [u8; 4] = [0; 4]; loop { @@ -376,6 +382,9 @@ impl SocksProxyUdpConnectTask { Err(UdpCopyError::ClientError(e)) => Err(e.into()), }; } + _ = log_interval.tick() => { + self.get_log_context().log_periodic(&self.ctx.task_logger); + } _ = idle_interval.tick() => { if c_to_r.is_idle() && r_to_c.is_idle() { idle_count += 1; @@ -546,6 +555,10 @@ impl SocksProxyUdpConnectTask { .await?; self.task_notes.stage = ServerTaskStage::Connected; + if self.ctx.server_config.flush_task_log_on_connected { + self.get_log_context().log_connected(&self.ctx.task_logger); + } + poll_fn(|cx| ups_w.poll_send_packet(cx, &buf[buf_off..buf_nr])).await?; let clt_w = Socks5UdpConnectClientSend::new(clt_w, upstream); From 6fdb372f476cbb453dea94d2e19f89976f2c9316 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 17:33:06 +0800 Subject: [PATCH 5/6] g3proxy: use common TaskEvent type in task log --- g3proxy/src/log/task/ftp_over_http.rs | 16 +++++++--------- g3proxy/src/log/task/http_forward.rs | 15 +++++++-------- g3proxy/src/log/task/mod.rs | 18 ++++++++++++++++++ g3proxy/src/log/task/tcp_connect.rs | 10 ++++++---- g3proxy/src/log/task/udp_associate.rs | 14 +++++++------- g3proxy/src/log/task/udp_connect.rs | 14 +++++++------- .../src/serve/http_proxy/task/forward/task.rs | 1 - g3proxy/src/serve/http_proxy/task/ftp/task.rs | 1 - .../src/serve/http_rproxy/task/forward/task.rs | 1 - .../socks_proxy/task/udp_associate/task.rs | 1 - .../serve/socks_proxy/task/udp_connect/task.rs | 1 - 11 files changed, 52 insertions(+), 40 deletions(-) diff --git a/g3proxy/src/log/task/ftp_over_http.rs b/g3proxy/src/log/task/ftp_over_http.rs index 022699f8f..460cdd746 100644 --- a/g3proxy/src/log/task/ftp_over_http.rs +++ b/g3proxy/src/log/task/ftp_over_http.rs @@ -14,14 +14,13 @@ * limitations under the License. */ -use std::time::Duration; - use slog::{slog_info, Logger}; use g3_slog_types::{ LtDateTime, LtDuration, LtHttpMethod, LtHttpUri, LtIpAddr, LtUpstreamAddr, LtUuid, }; +use super::TaskEvent; use crate::module::ftp_over_http::FtpOverHttpTaskNotes; use crate::serve::{ServerTaskError, ServerTaskNotes}; @@ -29,7 +28,6 @@ pub(crate) struct TaskLogForFtpOverHttp<'a> { pub(crate) task_notes: &'a ServerTaskNotes, pub(crate) ftp_notes: &'a FtpOverHttpTaskNotes, pub(crate) http_user_agent: Option<&'a str>, - pub(crate) total_time: Duration, pub(crate) client_rd_bytes: u64, pub(crate) client_wr_bytes: u64, pub(crate) ftp_c_rd_bytes: u64, @@ -49,7 +47,7 @@ impl TaskLogForFtpOverHttp<'_> { slog_info!(logger, ""; "task_type" => "FtpOverHttp", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "created", + "task_event" => TaskEvent::Created.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -73,7 +71,7 @@ impl TaskLogForFtpOverHttp<'_> { slog_info!(logger, ""; "task_type" => "FtpOverHttp", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "connected", + "task_event" => TaskEvent::Connected.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -108,7 +106,7 @@ impl TaskLogForFtpOverHttp<'_> { slog_info!(logger, ""; "task_type" => "FtpOverHttp", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "periodic", + "task_event" => TaskEvent::Periodic.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -132,7 +130,7 @@ impl TaskLogForFtpOverHttp<'_> { "rsp_status" => self.ftp_notes.rsp_status, "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_wr_bytes" => self.client_wr_bytes, "ftp_c_rd_bytes" => self.ftp_c_rd_bytes, @@ -152,7 +150,7 @@ impl TaskLogForFtpOverHttp<'_> { slog_info!(logger, "{}", e; "task_type" => "FtpOverHttp", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "finished", + "task_event" => TaskEvent::Finished.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -177,7 +175,7 @@ impl TaskLogForFtpOverHttp<'_> { "rsp_status" => self.ftp_notes.rsp_status, "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_wr_bytes" => self.client_wr_bytes, "ftp_c_rd_bytes" => self.ftp_c_rd_bytes, diff --git a/g3proxy/src/log/task/http_forward.rs b/g3proxy/src/log/task/http_forward.rs index e172fccdd..9e8539d8c 100644 --- a/g3proxy/src/log/task/http_forward.rs +++ b/g3proxy/src/log/task/http_forward.rs @@ -14,8 +14,6 @@ * limitations under the License. */ -use std::time::Duration; - use slog::{slog_info, Logger}; use g3_slog_types::{ @@ -23,6 +21,7 @@ use g3_slog_types::{ }; use g3_types::net::UpstreamAddr; +use super::TaskEvent; use crate::module::http_forward::HttpForwardTaskNotes; use crate::module::tcp_connect::TcpConnectTaskNotes; use crate::serve::{ServerTaskError, ServerTaskNotes}; @@ -33,7 +32,6 @@ pub(crate) struct TaskLogForHttpForward<'a> { pub(crate) http_notes: &'a HttpForwardTaskNotes, pub(crate) http_user_agent: Option<&'a str>, pub(crate) tcp_notes: &'a TcpConnectTaskNotes, - pub(crate) total_time: Duration, pub(crate) client_rd_bytes: u64, pub(crate) client_wr_bytes: u64, pub(crate) remote_rd_bytes: u64, @@ -51,7 +49,7 @@ impl TaskLogForHttpForward<'_> { slog_info!(logger, ""; "task_type" => "HttpForward", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "created", + "task_event" => TaskEvent::Created.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -76,7 +74,7 @@ impl TaskLogForHttpForward<'_> { slog_info!(logger, ""; "task_type" => "HttpForward", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "connected", + "task_event" => TaskEvent::Connected.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -110,7 +108,7 @@ impl TaskLogForHttpForward<'_> { slog_info!(logger, ""; "task_type" => "HttpForward", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "periodic", + "task_event" => TaskEvent::Periodic.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -133,6 +131,7 @@ impl TaskLogForHttpForward<'_> { "origin_status" => self.http_notes.origin_status, "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "dur_req_send_hdr" => LtDuration(self.http_notes.dur_req_send_hdr), "dur_req_send_all" => LtDuration(self.http_notes.dur_req_send_all), "dur_rsp_recv_hdr" => LtDuration(self.http_notes.dur_rsp_recv_hdr), @@ -154,7 +153,7 @@ impl TaskLogForHttpForward<'_> { slog_info!(logger, "{}", e; "task_type" => "HttpForward", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "finished", + "task_event" => TaskEvent::Finished.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -182,7 +181,7 @@ impl TaskLogForHttpForward<'_> { "dur_req_send_all" => LtDuration(self.http_notes.dur_req_send_all), "dur_rsp_recv_hdr" => LtDuration(self.http_notes.dur_rsp_recv_hdr), "dur_rsp_recv_all" => LtDuration(self.http_notes.dur_rsp_recv_all), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_wr_bytes" => self.client_wr_bytes, "r_rd_bytes" => self.remote_rd_bytes, diff --git a/g3proxy/src/log/task/mod.rs b/g3proxy/src/log/task/mod.rs index fbf6f0add..6339509f8 100644 --- a/g3proxy/src/log/task/mod.rs +++ b/g3proxy/src/log/task/mod.rs @@ -52,3 +52,21 @@ pub(crate) fn get_shared_logger( )) }) } + +enum TaskEvent { + Created, + Connected, + Periodic, + Finished, +} + +impl TaskEvent { + fn as_str(&self) -> &'static str { + match self { + TaskEvent::Created => "created", + TaskEvent::Connected => "connected", + TaskEvent::Periodic => "periodic", + TaskEvent::Finished => "finished", + } + } +} diff --git a/g3proxy/src/log/task/tcp_connect.rs b/g3proxy/src/log/task/tcp_connect.rs index 2f84a30b9..a282ff78b 100644 --- a/g3proxy/src/log/task/tcp_connect.rs +++ b/g3proxy/src/log/task/tcp_connect.rs @@ -19,6 +19,7 @@ use slog::{slog_info, Logger}; use g3_slog_types::{LtDateTime, LtDuration, LtIpAddr, LtUpstreamAddr, LtUuid}; use g3_types::net::UpstreamAddr; +use super::TaskEvent; use crate::module::tcp_connect::TcpConnectTaskNotes; use crate::serve::{ServerTaskError, ServerTaskNotes}; @@ -43,7 +44,7 @@ impl TaskLogForTcpConnect<'_> { slog_info!(logger, ""; "task_type" => "TcpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "created", + "task_event" => TaskEvent::Created.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -64,7 +65,7 @@ impl TaskLogForTcpConnect<'_> { slog_info!(logger, ""; "task_type" => "TcpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "connected", + "task_event" => TaskEvent::Connected.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -93,7 +94,7 @@ impl TaskLogForTcpConnect<'_> { slog_info!(logger, ""; "task_type" => "TcpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "periodic", + "task_event" => TaskEvent::Periodic.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -109,6 +110,7 @@ impl TaskLogForTcpConnect<'_> { "tcp_connect_spend" => LtDuration(self.tcp_notes.duration), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_wr_bytes" => self.client_wr_bytes, "r_rd_bytes" => self.remote_rd_bytes, @@ -126,7 +128,7 @@ impl TaskLogForTcpConnect<'_> { slog_info!(logger, "{}", e; "task_type" => "TcpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "finished", + "task_event" => TaskEvent::Finished.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), diff --git a/g3proxy/src/log/task/udp_associate.rs b/g3proxy/src/log/task/udp_associate.rs index 98d94beef..7395f63be 100644 --- a/g3proxy/src/log/task/udp_associate.rs +++ b/g3proxy/src/log/task/udp_associate.rs @@ -15,13 +15,13 @@ */ use std::net::SocketAddr; -use std::time::Duration; use slog::{slog_info, Logger}; use g3_slog_types::{LtDateTime, LtDuration, LtUpstreamAddr, LtUuid}; use g3_types::net::UpstreamAddr; +use super::TaskEvent; use crate::module::udp_relay::UdpRelayTaskNotes; use crate::serve::{ServerTaskError, ServerTaskNotes}; @@ -33,7 +33,6 @@ pub(crate) struct TaskLogForUdpAssociate<'a> { pub(crate) udp_client_addr: Option, pub(crate) initial_peer: &'a UpstreamAddr, pub(crate) udp_notes: &'a UdpRelayTaskNotes, - pub(crate) total_time: Duration, pub(crate) client_rd_bytes: u64, pub(crate) client_rd_packets: u64, pub(crate) client_wr_bytes: u64, @@ -55,7 +54,7 @@ impl TaskLogForUdpAssociate<'_> { slog_info!(logger, ""; "task_type" => "UdpAssociate", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "created", + "task_event" => TaskEvent::Created.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -75,7 +74,7 @@ impl TaskLogForUdpAssociate<'_> { slog_info!(logger, ""; "task_type" => "UdpAssociate", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "connected", + "task_event" => TaskEvent::Connected.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -102,7 +101,7 @@ impl TaskLogForUdpAssociate<'_> { slog_info!(logger, ""; "task_type" => "UdpAssociate", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "periodic", + "task_event" => TaskEvent::Periodic.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -114,6 +113,7 @@ impl TaskLogForUdpAssociate<'_> { "escaper" => self.udp_notes.escaper.as_str(), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_rd_packets" => self.client_rd_packets, "c_wr_bytes" => self.client_wr_bytes, @@ -135,7 +135,7 @@ impl TaskLogForUdpAssociate<'_> { slog_info!(logger, "{}", e; "task_type" => "UdpAssociate", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "finished", + "task_event" => TaskEvent::Finished.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -148,7 +148,7 @@ impl TaskLogForUdpAssociate<'_> { "reason" => e.brief(), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_rd_packets" => self.client_rd_packets, "c_wr_bytes" => self.client_wr_bytes, diff --git a/g3proxy/src/log/task/udp_connect.rs b/g3proxy/src/log/task/udp_connect.rs index 6c5992da4..a9569b98a 100644 --- a/g3proxy/src/log/task/udp_connect.rs +++ b/g3proxy/src/log/task/udp_connect.rs @@ -15,13 +15,13 @@ */ use std::net::SocketAddr; -use std::time::Duration; use slog::{slog_info, Logger}; use g3_slog_types::{LtDateTime, LtDuration, LtIpAddr, LtUpstreamAddr, LtUuid}; use g3_types::net::UpstreamAddr; +use super::TaskEvent; use crate::module::udp_connect::UdpConnectTaskNotes; use crate::serve::{ServerTaskError, ServerTaskNotes}; @@ -33,7 +33,6 @@ pub(crate) struct TaskLogForUdpConnect<'a> { pub(crate) udp_client_addr: Option, pub(crate) upstream: Option<&'a UpstreamAddr>, pub(crate) udp_notes: &'a UdpConnectTaskNotes, - pub(crate) total_time: Duration, pub(crate) client_rd_bytes: u64, pub(crate) client_rd_packets: u64, pub(crate) client_wr_bytes: u64, @@ -55,7 +54,7 @@ impl TaskLogForUdpConnect<'_> { slog_info!(logger, ""; "task_type" => "UdpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "created", + "task_event" => TaskEvent::Created.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -75,7 +74,7 @@ impl TaskLogForUdpConnect<'_> { slog_info!(logger, ""; "task_type" => "UdpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "connected", + "task_event" => TaskEvent::Connected.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -106,7 +105,7 @@ impl TaskLogForUdpConnect<'_> { slog_info!(logger, ""; "task_type" => "UdpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "periodic", + "task_event" => TaskEvent::Periodic.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -122,6 +121,7 @@ impl TaskLogForUdpConnect<'_> { "next_expire" => self.udp_notes.expire.as_ref().map(LtDateTime), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_rd_packets" => self.client_rd_packets, "c_wr_bytes" => self.client_wr_bytes, @@ -143,7 +143,7 @@ impl TaskLogForUdpConnect<'_> { slog_info!(logger, "{}", e; "task_type" => "UdpConnect", "task_id" => LtUuid(&self.task_notes.id), - "task_event" => "finished", + "task_event" => TaskEvent::Finished.as_str(), "stage" => self.task_notes.stage.brief(), "start_at" => LtDateTime(&self.task_notes.start_at), "user" => self.task_notes.raw_user_name(), @@ -160,7 +160,7 @@ impl TaskLogForUdpConnect<'_> { "reason" => e.brief(), "wait_time" => LtDuration(self.task_notes.wait_time), "ready_time" => LtDuration(self.task_notes.ready_time), - "total_time" => LtDuration(self.total_time), + "total_time" => LtDuration(self.task_notes.time_elapsed()), "c_rd_bytes" => self.client_rd_bytes, "c_rd_packets" => self.client_rd_packets, "c_wr_bytes" => self.client_wr_bytes, diff --git a/g3proxy/src/serve/http_proxy/task/forward/task.rs b/g3proxy/src/serve/http_proxy/task/forward/task.rs index fb2ca0fe7..dd5d8fe8d 100644 --- a/g3proxy/src/serve/http_proxy/task/forward/task.rs +++ b/g3proxy/src/serve/http_proxy/task/forward/task.rs @@ -215,7 +215,6 @@ impl<'a> HttpProxyForwardTask<'a> { http_notes: &self.http_notes, http_user_agent, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), diff --git a/g3proxy/src/serve/http_proxy/task/ftp/task.rs b/g3proxy/src/serve/http_proxy/task/ftp/task.rs index a1e94f526..ce7c55912 100644 --- a/g3proxy/src/serve/http_proxy/task/ftp/task.rs +++ b/g3proxy/src/serve/http_proxy/task/ftp/task.rs @@ -99,7 +99,6 @@ impl<'a> FtpOverHttpTask<'a> { task_notes: &self.task_notes, ftp_notes: &self.ftp_notes, http_user_agent, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.http_client.read.get_bytes(), client_wr_bytes: self.task_stats.http_client.write.get_bytes(), ftp_c_rd_bytes: self.task_stats.ftp_server.control_read.get_bytes(), diff --git a/g3proxy/src/serve/http_rproxy/task/forward/task.rs b/g3proxy/src/serve/http_rproxy/task/forward/task.rs index 4b9e4b4f9..d4d8d661f 100644 --- a/g3proxy/src/serve/http_rproxy/task/forward/task.rs +++ b/g3proxy/src/serve/http_rproxy/task/forward/task.rs @@ -196,7 +196,6 @@ impl<'a> HttpRProxyForwardTask<'a> { http_notes: &self.http_notes, http_user_agent, tcp_notes: &self.tcp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.read.get_bytes(), client_wr_bytes: self.task_stats.clt.write.get_bytes(), remote_rd_bytes: self.task_stats.ups.read.get_bytes(), diff --git a/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs index 9a1c1a2f4..de791825d 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_associate/task.rs @@ -81,7 +81,6 @@ impl SocksProxyUdpAssociateTask { udp_client_addr: self.udp_client_addr, initial_peer: &self.initial_peer, udp_notes: &self.udp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.recv.get_bytes(), client_rd_packets: self.task_stats.clt.recv.get_packets(), client_wr_bytes: self.task_stats.clt.send.get_bytes(), diff --git a/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs index 89f721774..2feba9427 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_connect/task.rs @@ -81,7 +81,6 @@ impl SocksProxyUdpConnectTask { udp_client_addr: self.udp_client_addr, upstream: self.upstream.as_ref(), udp_notes: &self.udp_notes, - total_time: self.task_notes.time_elapsed(), client_rd_bytes: self.task_stats.clt.recv.get_bytes(), client_rd_packets: self.task_stats.clt.recv.get_packets(), client_wr_bytes: self.task_stats.clt.send.get_bytes(), From 38af3f83215bf78ecfa1fd7950ee9835d160df8f Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 25 Nov 2024 17:38:00 +0800 Subject: [PATCH 6/6] update doc --- g3proxy/doc/log/task/http_forward.rst | 2 +- g3proxy/doc/log/task/index.rst | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/g3proxy/doc/log/task/http_forward.rst b/g3proxy/doc/log/task/http_forward.rst index 6f3c45600..8e13d7618 100644 --- a/g3proxy/doc/log/task/http_forward.rst +++ b/g3proxy/doc/log/task/http_forward.rst @@ -19,7 +19,7 @@ Show the time spent from the receive of the http request header to the creation reuse_connection ---------------- -**required**, **type**: bool +**optional**, **type**: bool Show if this task reuse old remote connection. diff --git a/g3proxy/doc/log/task/index.rst b/g3proxy/doc/log/task/index.rst index f5b6227b9..65e919df2 100644 --- a/g3proxy/doc/log/task/index.rst +++ b/g3proxy/doc/log/task/index.rst @@ -126,7 +126,7 @@ The selected escaper name. reason ------ -**required**, **type**: enum string +**optional**, **type**: enum string The brief reason why the task ends. @@ -155,9 +155,9 @@ and the remote channel have been established. The value may be empty if the task total_time ---------- -**required**, **type**: time duration string +**optional**, **type**: time duration string -Show the time from the creation of the task to the end of the task. +Show the time from the creation of the task to the time of this log. Sub Types =========