Skip to content

Commit

Permalink
g3tiles: use common stream task wrapper stats
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq committed May 30, 2024
1 parent b3d0871 commit 1d839e0
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 229 deletions.
4 changes: 2 additions & 2 deletions g3tiles/src/module/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tokio::io::{AsyncRead, AsyncWrite};

mod stats;
pub(crate) use stats::{
StreamBackendDurationRecorder, StreamBackendDurationStats, StreamBackendStats,
StreamServerStats,
StreamAcceptTaskCltWrapperStats, StreamBackendDurationRecorder, StreamBackendDurationStats,
StreamBackendStats, StreamRelayTaskCltWrapperStats, StreamServerStats,
};

mod error;
Expand Down
3 changes: 3 additions & 0 deletions g3tiles/src/module/stream/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
mod server;
pub(crate) use server::StreamServerStats;

mod task;
pub(crate) use task::{StreamAcceptTaskCltWrapperStats, StreamRelayTaskCltWrapperStats};

mod backend;
pub(crate) use backend::{
StreamBackendDurationRecorder, StreamBackendDurationStats, StreamBackendStats,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,41 +16,72 @@

use std::sync::Arc;

use g3_daemon::stat::task::TcpStreamConnectionStats;
use g3_daemon::stat::task::{TcpStreamConnectionStats, TcpStreamTaskStats};
use g3_io_ext::{LimitedReaderStats, LimitedWriterStats};

use crate::module::stream::StreamServerStats;

#[derive(Clone)]
pub(crate) struct OpensslAcceptTaskCltWrapperStats {
pub(crate) struct StreamAcceptTaskCltWrapperStats {
server: Arc<StreamServerStats>,
conn: Arc<TcpStreamConnectionStats>,
}

impl OpensslAcceptTaskCltWrapperStats {
impl StreamAcceptTaskCltWrapperStats {
pub(crate) fn new(
server: &Arc<StreamServerStats>,
conn: &Arc<TcpStreamConnectionStats>,
) -> Self {
OpensslAcceptTaskCltWrapperStats {
StreamAcceptTaskCltWrapperStats {
server: Arc::clone(server),
conn: Arc::clone(conn),
}
}
}

impl LimitedReaderStats for OpensslAcceptTaskCltWrapperStats {
impl LimitedReaderStats for StreamAcceptTaskCltWrapperStats {
fn add_read_bytes(&self, size: usize) {
let size = size as u64;
self.conn.read.add_bytes(size);
self.server.add_read(size);
}
}

impl LimitedWriterStats for OpensslAcceptTaskCltWrapperStats {
impl LimitedWriterStats for StreamAcceptTaskCltWrapperStats {
fn add_write_bytes(&self, size: usize) {
let size = size as u64;
self.conn.write.add_bytes(size);
self.server.add_write(size);
}
}

#[derive(Clone)]
pub(crate) struct StreamRelayTaskCltWrapperStats {
server: Arc<StreamServerStats>,
task: Arc<TcpStreamTaskStats>,
}

impl StreamRelayTaskCltWrapperStats {
pub(crate) fn new(server: &Arc<StreamServerStats>, task: &Arc<TcpStreamTaskStats>) -> Self {
StreamRelayTaskCltWrapperStats {
server: Arc::clone(server),
task: Arc::clone(task),
}
}
}

impl LimitedReaderStats for StreamRelayTaskCltWrapperStats {
fn add_read_bytes(&self, size: usize) {
let size = size as u64;
self.task.clt.read.add_bytes(size);
self.server.add_read(size);
}
}

impl LimitedWriterStats for StreamRelayTaskCltWrapperStats {
fn add_write_bytes(&self, size: usize) {
let size = size as u64;
self.task.clt.write.add_bytes(size);
self.server.add_write(size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ use g3_types::limit::GaugeSemaphorePermit;
use g3_types::route::HostMatch;

use super::{CommonTaskContext, OpensslRelayTask};
use crate::module::stream::StreamAcceptTaskCltWrapperStats;
use crate::serve::openssl_proxy::OpensslHost;

mod stats;
use stats::OpensslAcceptTaskCltWrapperStats;

pub(crate) struct OpensslAcceptTask {
ctx: CommonTaskContext,
hosts: Arc<HostMatch<Arc<OpensslHost>>>,
Expand All @@ -55,7 +53,7 @@ impl OpensslAcceptTask {

let pre_handshake_stats = Arc::new(TcpStreamConnectionStats::default());
let wrapper_stats =
OpensslAcceptTaskCltWrapperStats::new(&self.ctx.server_stats, &pre_handshake_stats);
StreamAcceptTaskCltWrapperStats::new(&self.ctx.server_stats, &pre_handshake_stats);

let limit_config = self.ctx.server_config.tcp_sock_speed_limit;
let stream = LimitedStream::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use g3_io_ext::{LimitedCopy, LimitedCopyConfig, LimitedCopyError, LimitedStream}
use g3_openssl::SslStream;
use g3_types::limit::GaugeSemaphorePermit;

use super::{CommonTaskContext, OpensslRelayTaskCltWrapperStats};
use super::CommonTaskContext;
use crate::backend::ArcBackend;
use crate::config::server::ServerConfig;
use crate::log::task::tcp_connect::TaskLogForTcpConnect;
use crate::module::stream::StreamRelayTaskCltWrapperStats;
use crate::serve::openssl_proxy::OpensslHost;
use crate::serve::{ServerTaskError, ServerTaskNotes, ServerTaskResult, ServerTaskStage};

Expand Down Expand Up @@ -229,7 +230,7 @@ impl OpensslRelayTask {
// reset io stats
// TODO add host level stats
let clt_wrapper_stats =
OpensslRelayTaskCltWrapperStats::new(&self.ctx.server_stats, &self.task_stats);
StreamRelayTaskCltWrapperStats::new(&self.ctx.server_stats, &self.task_stats);
ssl_stream
.get_mut()
.reset_stats(Arc::new(clt_wrapper_stats));
Expand Down
23 changes: 0 additions & 23 deletions g3tiles/src/serve/openssl_proxy/task/relay/mod.rs

This file was deleted.

53 changes: 0 additions & 53 deletions g3tiles/src/serve/openssl_proxy/task/relay/stats.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ use g3_types::net::Host;
use g3_types::route::HostMatch;

use super::{CommonTaskContext, RustlsRelayTask};
use crate::module::stream::StreamAcceptTaskCltWrapperStats;
use crate::serve::rustls_proxy::RustlsHost;

mod stats;
use stats::RustlsAcceptTaskCltWrapperStats;

pub(crate) struct RustlsAcceptTask {
ctx: CommonTaskContext,
alive_permit: Option<GaugeSemaphorePermit>,
Expand All @@ -58,7 +56,7 @@ impl RustlsAcceptTask {

let pre_handshake_stats = Arc::new(TcpStreamConnectionStats::default());
let wrapper_stats =
RustlsAcceptTaskCltWrapperStats::new(&self.ctx.server_stats, &pre_handshake_stats);
StreamAcceptTaskCltWrapperStats::new(&self.ctx.server_stats, &pre_handshake_stats);

let limit_config = self.ctx.server_config.tcp_sock_speed_limit;
let stream = LimitedStream::new(
Expand Down
56 changes: 0 additions & 56 deletions g3tiles/src/serve/rustls_proxy/task/accept/stats.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use g3_daemon::stat::task::{TcpStreamConnectionStats, TcpStreamTaskStats};
use g3_io_ext::{LimitedCopy, LimitedCopyConfig, LimitedCopyError, LimitedStream};
use g3_types::limit::GaugeSemaphorePermit;

use super::{CommonTaskContext, RustlsRelayTaskCltWrapperStats};
use super::CommonTaskContext;
use crate::backend::ArcBackend;
use crate::config::server::ServerConfig;
use crate::log::task::tcp_connect::TaskLogForTcpConnect;
use crate::module::stream::StreamRelayTaskCltWrapperStats;
use crate::serve::rustls_proxy::RustlsHost;
use crate::serve::{ServerTaskError, ServerTaskNotes, ServerTaskResult, ServerTaskStage};

Expand Down Expand Up @@ -231,7 +232,7 @@ impl RustlsRelayTask {
// reset io stats
// TODO add host level stats
let clt_wrapper_stats =
RustlsRelayTaskCltWrapperStats::new(&self.ctx.server_stats, &self.task_stats);
StreamRelayTaskCltWrapperStats::new(&self.ctx.server_stats, &self.task_stats);
tls_stream
.get_mut()
.0
Expand Down
23 changes: 0 additions & 23 deletions g3tiles/src/serve/rustls_proxy/task/relay/mod.rs

This file was deleted.

Loading

0 comments on commit 1d839e0

Please sign in to comment.