Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP proxy metrics server #13

Merged
merged 3 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
proxy.socket
2 changes: 2 additions & 0 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::env;
#[derive(Debug, Clone)]
pub struct Config {
pub proxy_addr: String,
pub proxy_namespace: String,
pub prometheus_addr: String,
pub ssl_crt_path: String,
pub ssl_key_path: String,
Expand All @@ -13,6 +14,7 @@ impl Config {
pub fn new() -> Self {
Self {
proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"),
proxy_namespace: env::var("PROXY_NAMESPACE").expect("PROXY_NAMESPACE must be set"),
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),
Expand Down
14 changes: 11 additions & 3 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Metrics {
pub fn new() -> Self {
let total_packages_bytes = register_int_counter_vec!(
opts!("node_proxy_total_packages_bytes", "Total bytes transferred",),
&["consumer"]
&["consumer", "namespace", "instance"]
)
.unwrap();

Expand All @@ -111,9 +111,17 @@ impl Metrics {
}
}

pub fn count_total_packages_bytes(&self, consumer: &Consumer, value: usize) {
pub fn count_total_packages_bytes(
&self,
consumer: &Consumer,
namespace: &str,
instance: &str,
value: usize,
) {
let consumer = &consumer.to_string();

self.total_packages_bytes
.with_label_values(&[&consumer.to_string()])
.with_label_values(&[consumer, namespace, instance])
.inc_by(value as u64)
}
}
Expand Down
113 changes: 61 additions & 52 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
};
use tracing::error;

use crate::{config::Config, Consumer, State};
use crate::{config::Config, State};

pub struct ProxyApp {
client_connector: TransportConnector,
Expand All @@ -37,58 +37,18 @@ impl ProxyApp {
state,
}
}

async fn duplex(
&self,
state: State,
consumer: Consumer,
mut server_session: Stream,
mut client_session: Stream,
) {
let mut upstream_buf = [0; 1024];
let mut downstream_buf = [0; 1024];
loop {
let downstream_read = server_session.read(&mut upstream_buf);
let upstream_read = client_session.read(&mut downstream_buf);
let event: DuplexEvent;
select! {
n = downstream_read => event
= DuplexEvent::DownstreamRead(n.unwrap()),
n = upstream_read => event
= DuplexEvent::UpstreamRead(n.unwrap()),
}

match event {
DuplexEvent::DownstreamRead(n) => {
state.metrics.count_total_packages_bytes(&consumer, n);

client_session.write_all(&upstream_buf[0..n]).await.unwrap();
client_session.flush().await.unwrap();
}
DuplexEvent::UpstreamRead(n) => {
state.metrics.count_total_packages_bytes(&consumer, n);

server_session
.write_all(&downstream_buf[0..n])
.await
.unwrap();
server_session.flush().await.unwrap();
}
}
}
}
}

#[async_trait]
impl ServerApp for ProxyApp {
async fn process_new(
self: &Arc<Self>,
io: Stream,
mut io_server: Stream,
_shutdown: &ShutdownWatch,
) -> Option<Stream> {
let state = self.state.read().await.clone();

let hostname = io.get_ssl()?.servername(NameType::HOST_NAME);
let hostname = io_server.get_ssl()?.servername(NameType::HOST_NAME);
if hostname.is_none() {
error!("hostname is not present in the certificate");
return None;
Expand All @@ -104,15 +64,16 @@ impl ServerApp for ProxyApp {
let token = captures.get(1)?.as_str().to_string();
let network = captures.get(2)?.as_str().to_string();
let version = captures.get(3)?.as_str().to_string();

let namespace = self.config.proxy_namespace.clone();

let consumer = state.get_consumer(&network, &version, &token)?;

let node_host = format!(
let instance = format!(
"node-{network}-{version}.{}:{}",
self.config.node_dns, self.config.node_port
);

let lookup_result = lookup_host(node_host).await;
let lookup_result = lookup_host(&instance).await;
if let Err(err) = lookup_result {
error!(error = err.to_string(), "fail to lookup ip");
return None;
Expand All @@ -122,12 +83,60 @@ impl ServerApp for ProxyApp {

let proxy_to = BasicPeer::new(&node_addr.to_string());

let client_session = self.client_connector.new_stream(&proxy_to).await;

match client_session {
Ok(client_session) => {
self.duplex(state, consumer, io, client_session).await;
None
let io_client = self.client_connector.new_stream(&proxy_to).await;

match io_client {
Ok(mut io_client) => {
let mut upstream_buf = [0; 1024];
let mut downstream_buf = [0; 1024];

loop {
let downstream_read = io_server.read(&mut upstream_buf);
let upstream_read = io_client.read(&mut downstream_buf);
let event: DuplexEvent;

select! {
n = downstream_read => {
if let Err(err) = &n {
error!(error=err.to_string(), "Downstream error");
return None;
}
event = DuplexEvent::DownstreamRead(n.unwrap())
},
n = upstream_read => {
if let Err(err) = &n {
error!(error=err.to_string(), "Upstream error");
return None;
}
event = DuplexEvent::UpstreamRead(n.unwrap())
},
}

match event {
DuplexEvent::DownstreamRead(0) => {
return None;
}
DuplexEvent::UpstreamRead(0) => {
return None;
}
DuplexEvent::DownstreamRead(n) => {
state
.metrics
.count_total_packages_bytes(&consumer, &namespace, &instance, n);

io_client.write_all(&upstream_buf[0..n]).await.unwrap();
io_client.flush().await.unwrap();
}
DuplexEvent::UpstreamRead(n) => {
state
.metrics
.count_total_packages_bytes(&consumer, &namespace, &instance, n);

io_server.write_all(&downstream_buf[0..n]).await.unwrap();
io_server.flush().await.unwrap();
}
Comment on lines +122 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where the byte rate limiter should act right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n is the quantity

}
}
}
Err(e) => {
error!("failed to create client session: {}", e);
Expand Down
Loading