Skip to content

Commit

Permalink
Switch to tracing spans for stats logging
Browse files Browse the repository at this point in the history
- Also add basic bearer auth in standalone ip service
  • Loading branch information
helius-kurt committed Jul 25, 2024
1 parent e6ae9dd commit 039eaba
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 21 deletions.
2 changes: 1 addition & 1 deletion validator-firewall/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
anyhow = "1"
async-trait = "0.1.80"
axum = "0.7.5"
axum-auth = "0.7"
tower-http = { version = "0.5.2", features = ["validate-request", "auth"] }
aya = "0.12"
aya-log = "0.2"
cadence = "1.4.0"
Expand Down
70 changes: 57 additions & 13 deletions validator-firewall/src/ip_service_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use cidr::Ipv4Cidr;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
use tower_http::auth::AddAuthorizationLayer;
use tracing::{debug, info, warn};

pub struct IPState {
pub gossip_nodes: Arc<RwLock<HashSet<Ipv4Cidr>>>,
Expand Down Expand Up @@ -54,14 +55,21 @@ impl IPState {
}
}

pub fn create_router(state: Arc<IPState>) -> Router {
async fn get_nodes(state: State<Arc<IPState>>) -> impl IntoResponse {
pub fn create_router(state: Arc<IPState>, token: Option<String>) -> Router {
async fn get_allowed_nodes(state: State<Arc<IPState>>) -> impl IntoResponse {
let nodes = state.get_combined_nodes().await;
let nodes: Vec<String> = nodes.iter().map(|node| node.to_string()).collect();
let body = serde_json::to_string(&nodes).unwrap();
(StatusCode::OK, body)
}

async fn get_http_nodes(state: State<Arc<IPState>>) -> impl IntoResponse {
let nodes = state.http_nodes.read().await;
let nodes: Vec<String> = nodes.iter().map(|node| node.to_string()).collect();
let body = serde_json::to_string(&nodes).unwrap();
(StatusCode::OK, body)
}

async fn add_http_node(
state: State<Arc<IPState>>,
Json(payload): Json<Ipv4Cidr>,
Expand All @@ -71,6 +79,13 @@ pub fn create_router(state: Arc<IPState>) -> Router {
(StatusCode::CREATED, payload.to_string())
}

async fn get_blocked_nodes(state: State<Arc<IPState>>) -> impl IntoResponse {
let nodes = state.blocked_nodes.read().await;
let nodes: Vec<String> = nodes.iter().map(|node| node.to_string()).collect();
let body = serde_json::to_string(&nodes).unwrap();
(StatusCode::OK, body)
}

async fn add_blocked_node(
state: State<Arc<IPState>>,
Json(payload): Json<Ipv4Cidr>,
Expand Down Expand Up @@ -99,14 +114,43 @@ pub fn create_router(state: Arc<IPState>) -> Router {
}

let app = Router::new()
.route("/", get(get_nodes))
.route("/nodes", get(get_nodes))
.route("/node", post(add_http_node).delete(remove_http_node))
.route(
"/blocked",
post(add_blocked_node).delete(remove_blocked_node),
)
.with_state(state);

app
.route("/", get(get_allowed_nodes))
.route("/nodes", get(get_allowed_nodes))
.with_state(state.clone());
return if let Some(token) = token {
info!("Adding authentication layer with token: {}", token);
Router::new()
.route(
"/nodes/allowed",
post(add_http_node)
.delete(remove_http_node)
.get(get_http_nodes),
)
.route(
"/nodes/blocked",
post(add_blocked_node)
.delete(remove_blocked_node)
.get(get_blocked_nodes),
)
.route_layer(AddAuthorizationLayer::bearer(&token))
.with_state(state.clone())
.merge(app)
} else {
warn!("No authentication configured for write layer.");
Router::new()
.route(
"/nodes/allowed",
post(add_http_node)
.delete(remove_http_node)
.get(get_http_nodes),
)
.route(
"/nodes/blocked",
post(add_blocked_node)
.delete(remove_blocked_node)
.get(get_blocked_nodes),
)
.with_state(state.clone())
.merge(app)
};
}
9 changes: 7 additions & 2 deletions validator-firewall/src/ip_service_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ struct IpServiceConfig {
rpc_endpoint: String,
#[clap(short, long)]
static_overrides: Option<PathBuf>,
#[clap(short, long)]
bearer_token: Option<String>,
#[clap(short, long, default_value = "11525")]
port: u16,
}

#[tokio::main]
Expand Down Expand Up @@ -102,10 +106,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
app_state.add_blocked_node(node.clone()).await;
}

let listener = tokio::net::TcpListener::bind("0.0.0.0:11525")
info!("Starting IP service on port {}", config.port.clone());
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port))
.await
.unwrap();
let app = create_router(app_state.clone());
let app = create_router(app_state.clone(), config.bearer_token);

Ok(axum::serve(listener, app.into_make_service())
.await
Expand Down
26 changes: 21 additions & 5 deletions validator-firewall/src/stats_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use aya::maps::{Map, MapData, MapIter, PerCpuHashMap, PerCpuValues};
use log::info;
use std::net::Ipv4Addr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing::info;
use validator_firewall_common::StatType::{All, Blocked};
use validator_firewall_common::{ConnectionStats, StatType};

Expand Down Expand Up @@ -73,11 +73,19 @@ impl StatsService {
log_limit -= 1;
}
}

let rate = (all_sum - all_last_sum) / all_las_eval_time.elapsed().as_secs().max(1);
let delta = all_sum - all_last_sum;

info!(
traffic_type = "All",
rate = rate,
delta = delta,
total = all_sum,
"All traffic summary: {} pkts last_interval {} pkts {} pkts/s",
all_sum,
all_sum - all_last_sum,
(all_sum - all_last_sum) / all_las_eval_time.elapsed().as_secs().max(1)
delta,
rate
);
all_last_sum = all_sum;
all_las_eval_time = std::time::Instant::now();
Expand All @@ -91,11 +99,19 @@ impl StatsService {
log_limit -= 1;
}
}

let rate =
(blocked_sum - blocked_last_sum) / blocked_las_eval_time.elapsed().as_secs().max(1);
let delta = blocked_sum - blocked_last_sum;
info!(
traffic_type = "Blocked",
rate = rate,
delta = delta,
total = blocked_sum,
"Blocked traffic summary: {} pkts last_interval {} pkts {} pkts/s",
blocked_sum,
blocked_sum - blocked_last_sum,
(blocked_sum - blocked_last_sum) / blocked_las_eval_time.elapsed().as_secs().max(1)
delta,
rate
);
blocked_last_sum = blocked_sum;
blocked_las_eval_time = std::time::Instant::now();
Expand Down

0 comments on commit 039eaba

Please sign in to comment.