From ff53cd6e1db968d36818cf07e610d4d6ab346e03 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 13 Nov 2024 12:05:36 +0100 Subject: [PATCH 1/9] Agents authenticate with NSAPI --- Cargo.lock | 18 ++- Cargo.toml | 2 +- common/models/Cargo.toml | 1 + common/models/src/ns_api.rs | 8 ++ nym-node-status-agent/.gitignore | 1 + nym-node-status-agent/Cargo.toml | 5 + nym-node-status-agent/gen_keypair.sh | 4 + nym-node-status-agent/run.sh | 13 +- nym-node-status-agent/src/cli.rs | 118 ----------------- .../src/cli/generate_keypair.rs | 59 +++++++++ nym-node-status-agent/src/cli/mod.rs | 74 +++++++++++ nym-node-status-agent/src/cli/run_probe.rs | 119 ++++++++++++++++++ nym-node-status-agent/src/main.rs | 15 --- nym-node-status-api/Cargo.toml | 1 + nym-node-status-api/launch_node_status_api.sh | 1 + nym-node-status-api/src/cli/mod.rs | 29 ++++- nym-node-status-api/src/http/api/testruns.rs | 65 +++++++--- nym-node-status-api/src/http/error.rs | 7 ++ nym-node-status-api/src/http/server.rs | 4 +- nym-node-status-api/src/http/state.rs | 9 +- nym-node-status-api/src/main.rs | 5 + nym-node-status-api/src/testruns/mod.rs | 3 +- 22 files changed, 394 insertions(+), 167 deletions(-) create mode 100755 nym-node-status-agent/gen_keypair.sh delete mode 100644 nym-node-status-agent/src/cli.rs create mode 100644 nym-node-status-agent/src/cli/generate_keypair.rs create mode 100644 nym-node-status-agent/src/cli/mod.rs create mode 100644 nym-node-status-agent/src/cli/run_probe.rs diff --git a/Cargo.lock b/Cargo.lock index 4896aab4066..bbd6905ab5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3822,9 +3822,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libm" @@ -5010,6 +5010,7 @@ dependencies = [ name = "nym-common-models" version = "0.1.0" dependencies = [ + "nym-crypto", "serde", ] @@ -6055,8 +6056,11 @@ dependencies = [ "clap 4.5.20", "nym-bin-common", "nym-common-models", + "nym-crypto", + "rand", "reqwest 0.12.4", "serde_json", + "tempfile", "tokio", "tokio-util", "tracing", @@ -6077,6 +6081,7 @@ dependencies = [ "moka", "nym-bin-common", "nym-common-models", + "nym-crypto", "nym-explorer-client", "nym-network-defaults", "nym-node-requests", @@ -8277,9 +8282,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags 2.5.0", "errno", @@ -9514,12 +9519,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.1" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand 2.1.1", + "once_cell", "rustix", "windows-sys 0.52.0", ] diff --git a/Cargo.toml b/Cargo.toml index 668450aaf92..7735cb8f59e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -329,7 +329,7 @@ syn = "1" sysinfo = "0.30.13" tap = "1.0.1" tar = "0.4.42" -tempfile = "3.5.0" +tempfile = "3.14" thiserror = "1.0.64" time = "0.3.30" tokio = "1.39" diff --git a/common/models/Cargo.toml b/common/models/Cargo.toml index acb6e35682d..123f3f7fd25 100644 --- a/common/models/Cargo.toml +++ b/common/models/Cargo.toml @@ -12,3 +12,4 @@ readme.workspace = true [dependencies] serde = { workspace = true, features = ["derive"] } +nym-crypto = { path = "../crypto", features = ["asymmetric", "serde"] } diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs index 5d875420e28..263628eb40b 100644 --- a/common/models/src/ns_api.rs +++ b/common/models/src/ns_api.rs @@ -1,3 +1,4 @@ +use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] @@ -5,3 +6,10 @@ pub struct TestrunAssignment { pub testrun_id: i64, pub gateway_identity_key: String, } + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SubmitResults { + pub message: String, + pub signature: Signature, + pub public_key: PublicKey, +} diff --git a/nym-node-status-agent/.gitignore b/nym-node-status-agent/.gitignore index bf462f3e052..4bb044ec5ef 100644 --- a/nym-node-status-agent/.gitignore +++ b/nym-node-status-agent/.gitignore @@ -1 +1,2 @@ nym-gateway-probe +keys/ diff --git a/nym-node-status-agent/Cargo.toml b/nym-node-status-agent/Cargo.toml index f7d8543a1f3..9a61cd5084b 100644 --- a/nym-node-status-agent/Cargo.toml +++ b/nym-node-status-agent/Cargo.toml @@ -19,9 +19,14 @@ anyhow = { workspace = true} clap = { workspace = true, features = ["derive", "env"] } nym-bin-common = { path = "../common/bin-common", features = ["models"]} nym-common-models = { path = "../common/models" } +nym-crypto = { path = "../common/crypto", features = ["asymmetric", "rand"] } +rand = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] } tokio-util = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } reqwest = { workspace = true, features = ["json"] } serde_json = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/nym-node-status-agent/gen_keypair.sh b/nym-node-status-agent/gen_keypair.sh new file mode 100755 index 00000000000..b93bd269315 --- /dev/null +++ b/nym-node-status-agent/gen_keypair.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +mkdir -p keys +cargo run --package nym-node-status-agent -- generate-keypair --path keys/private diff --git a/nym-node-status-agent/run.sh b/nym-node-status-agent/run.sh index 675e39c109e..46b997d3c0e 100755 --- a/nym-node-status-agent/run.sh +++ b/nym-node-status-agent/run.sh @@ -4,7 +4,7 @@ set -eu environment="qa" -source ../envs/${environment}.env +probe_git_ref="0dd5dacdda92b1ddd51cd30a3399515e45613371" export RUST_LOG="debug" @@ -13,13 +13,17 @@ gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-cor echo "gateway_probe_src=$gateway_probe_src" echo "crate_root=$crate_root" +export RUST_LOG="debug" +export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1" +export NODE_STATUS_AGENT_SERVER_PORT="8000" export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe" +export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT" # build & copy over GW probe function copy_gw_probe() { pushd $gateway_probe_src - git switch main - git pull + git fetch -a + git checkout $probe_git_ref cargo build --release --package nym-gateway-probe cp target/release/nym-gateway-probe "$crate_root" $crate_root/nym-gateway-probe --version @@ -45,9 +49,6 @@ function swarm() { echo "All agents completed" } -export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1" -export NODE_STATUS_AGENT_SERVER_PORT="8000" - copy_gw_probe swarm 8 diff --git a/nym-node-status-agent/src/cli.rs b/nym-node-status-agent/src/cli.rs deleted file mode 100644 index a7bbef5ebe8..00000000000 --- a/nym-node-status-agent/src/cli.rs +++ /dev/null @@ -1,118 +0,0 @@ -use anyhow::bail; -use clap::{Parser, Subcommand}; -use nym_bin_common::bin_info; -use nym_common_models::ns_api::TestrunAssignment; -use std::sync::OnceLock; -use tracing::instrument; - -use crate::probe::GwProbe; - -// Helper for passing LONG_VERSION to clap -fn pretty_build_info_static() -> &'static str { - static PRETTY_BUILD_INFORMATION: OnceLock = OnceLock::new(); - PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print()) -} - -#[derive(Parser, Debug)] -#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)] -pub(crate) struct Args { - #[command(subcommand)] - pub(crate) command: Command, - #[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")] - pub(crate) server_address: String, - - #[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")] - pub(crate) server_port: u16, - // TODO dz accept keypair for identification / auth -} - -#[derive(Subcommand, Debug)] -pub(crate) enum Command { - RunProbe { - /// path of binary to run - #[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")] - probe_path: String, - }, -} - -impl Args { - pub(crate) async fn execute(&self) -> anyhow::Result<()> { - match &self.command { - Command::RunProbe { probe_path } => self.run_probe(probe_path).await?, - } - - Ok(()) - } - - async fn run_probe(&self, probe_path: &str) -> anyhow::Result<()> { - let server_address = format!("{}:{}", &self.server_address, self.server_port); - - let probe = GwProbe::new(probe_path.to_string()); - - let version = probe.version().await; - tracing::info!("Probe version:\n{}", version); - - if let Some(testrun) = request_testrun(&server_address).await? { - let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key)); - - submit_results(&server_address, testrun.testrun_id, log).await?; - } else { - tracing::info!("No testruns available, exiting") - } - - Ok(()) - } -} - -const URL_BASE: &str = "internal/testruns"; - -#[instrument(level = "debug", skip_all)] -async fn request_testrun(server_addr: &str) -> anyhow::Result> { - let target_url = format!("{}/{}", server_addr, URL_BASE); - let client = reqwest::Client::new(); - let res = client.get(target_url).send().await?; - let status = res.status(); - let response_text = res.text().await?; - - if status.is_client_error() { - bail!("{}: {}", status, response_text); - } else if status.is_server_error() { - if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE) - && response_text.contains("No testruns available") - { - return Ok(None); - } else { - bail!("{}: {}", status, response_text); - } - } - - serde_json::from_str(&response_text) - .map(|testrun| { - tracing::info!("Received testrun assignment: {:?}", testrun); - testrun - }) - .map_err(|err| { - tracing::error!("err"); - err.into() - }) -} - -#[instrument(level = "debug", skip(probe_outcome))] -async fn submit_results( - server_addr: &str, - testrun_id: i64, - probe_outcome: String, -) -> anyhow::Result<()> { - let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id); - let client = reqwest::Client::new(); - - let res = client - .post(target_url) - .body(probe_outcome) - .send() - .await - .and_then(|response| response.error_for_status())?; - - tracing::debug!("Submitted results: {})", res.status()); - Ok(()) -} diff --git a/nym-node-status-agent/src/cli/generate_keypair.rs b/nym-node-status-agent/src/cli/generate_keypair.rs new file mode 100644 index 00000000000..ba86db838db --- /dev/null +++ b/nym-node-status-agent/src/cli/generate_keypair.rs @@ -0,0 +1,59 @@ +use std::{fs::File, io::Write, path::Path}; + +use tracing::info; + +pub(crate) fn generate_key_pair(path: impl AsRef) -> anyhow::Result<()> { + let priv_key_path = path.as_ref(); + let mut rng = rand::thread_rng(); + let keypair = nym_crypto::asymmetric::identity::KeyPair::new(&mut rng); + info!("Generated keypair as Base58-encoded string"); + + let mut private_key_file = File::create(priv_key_path)?; + private_key_file.write_all(keypair.private_key().to_base58_string().as_bytes())?; + + let pub_key_path = priv_key_path.with_extension("public"); + let mut public_key_file = File::create(&pub_key_path)?; + public_key_file.write_all(keypair.public_key().to_base58_string().as_bytes())?; + + info!( + "Saved Base58-encoded keypair, private key to {}, public key to {}", + priv_key_path.display(), + pub_key_path.display() + ); + info!("Public key should be whitelisted with NS API"); + + Ok(()) +} + +#[cfg(test)] +mod test { + use nym_crypto::asymmetric::ed25519::PrivateKey; + use tempfile::TempDir; + + use super::*; + + use std::{ + fs::{self}, + path::PathBuf, + }; + + #[test] + fn can_generate_valid_keypair() { + let tmp_dir = TempDir::new().unwrap(); + let pkey_file = PathBuf::from_iter(&[ + tmp_dir.path().to_path_buf(), + PathBuf::from("agent-key-private"), + ]); + generate_key_pair(&pkey_file).expect("Failed to generate keypair"); + + let pkey_raw = fs::read_to_string(&pkey_file).expect("Failed to read file"); + let key = PrivateKey::from_base58_string(pkey_raw).expect("Failed to load key"); + + let msg = "hello, world"; + + let signature = key.sign(msg); + key.public_key() + .verify(msg, &signature) + .expect("Failed to verify signature"); + } +} diff --git a/nym-node-status-agent/src/cli/mod.rs b/nym-node-status-agent/src/cli/mod.rs new file mode 100644 index 00000000000..5b7b7beadbe --- /dev/null +++ b/nym-node-status-agent/src/cli/mod.rs @@ -0,0 +1,74 @@ +use crate::probe::GwProbe; +use clap::{Parser, Subcommand}; +use nym_bin_common::bin_info; +use std::sync::OnceLock; + +pub(crate) mod generate_keypair; +pub(crate) mod run_probe; + +// Helper for passing LONG_VERSION to clap +fn pretty_build_info_static() -> &'static str { + static PRETTY_BUILD_INFORMATION: OnceLock = OnceLock::new(); + PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print()) +} + +#[derive(Parser, Debug)] +#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)] +pub(crate) struct Args { + #[command(subcommand)] + pub(crate) command: Command, +} + +#[derive(Subcommand, Debug)] +pub(crate) enum Command { + RunProbe { + #[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")] + server_address: String, + + #[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")] + server_port: u16, + + /// base58-encoded private key + #[arg(long, env = "NODE_STATUS_AGENT_AUTH_KEY")] + ns_api_auth_key: String, + + /// path of binary to run + #[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")] + probe_path: String, + }, + + GenerateKeypair { + #[arg(long)] + path: Option, + }, +} + +impl Args { + pub(crate) async fn execute(&self) -> anyhow::Result<()> { + match &self.command { + Command::RunProbe { + server_address, + server_port, + ns_api_auth_key, + probe_path, + } => run_probe::run_probe( + server_address, + server_port.to_owned(), + ns_api_auth_key, + probe_path, + ) + .await + .inspect_err(|err| { + tracing::error!("{err}"); + })?, + Command::GenerateKeypair { path } => { + let path = path + .to_owned() + .unwrap_or_else(|| String::from("private-key")); + generate_keypair::generate_key_pair(path)? + } + } + + Ok(()) + } +} diff --git a/nym-node-status-agent/src/cli/run_probe.rs b/nym-node-status-agent/src/cli/run_probe.rs new file mode 100644 index 00000000000..3c8ce13509f --- /dev/null +++ b/nym-node-status-agent/src/cli/run_probe.rs @@ -0,0 +1,119 @@ +use anyhow::{bail, Context}; +use nym_common_models::ns_api::{SubmitResults, TestrunAssignment}; +use nym_crypto::asymmetric::ed25519::{PrivateKey, PublicKey}; +use tracing::instrument; + +use crate::cli::GwProbe; + +const URL_BASE: &str = "internal/testruns"; + +pub(crate) async fn run_probe( + server_address: &str, + server_port: u16, + ns_api_auth_key: &str, + probe_path: &str, +) -> anyhow::Result<()> { + let server_address = format!("{}:{}", server_address, server_port); + test_ns_api_conn(&server_address).await?; + + let auth_key = PrivateKey::from_base58_string(ns_api_auth_key) + .context("Couldn't parse auth key, exiting")?; + + let probe = GwProbe::new(probe_path.to_string()); + + let version = probe.version().await; + tracing::info!("Probe version:\n{}", version); + + if let Some(testrun) = request_testrun(auth_key.public_key(), &server_address).await? { + let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key)); + + submit_results(auth_key, &server_address, testrun.testrun_id, log).await?; + } else { + tracing::info!("No testruns available, exiting") + } + + Ok(()) +} + +async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> { + reqwest::get(server_addr) + .await + .map(|res| { + tracing::info!( + "Testing connection to NS API at {server_addr}: {}", + res.status() + ); + }) + .map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err)) +} + +#[instrument(level = "debug", skip_all)] +pub(crate) async fn request_testrun( + auth_key: PublicKey, + server_addr: &str, +) -> anyhow::Result> { + let target_url = format!("{}/{}", server_addr, URL_BASE); + let client = reqwest::Client::new(); + let res = client + .get(target_url) + .body(auth_key.to_base58_string()) + .send() + .await?; + let status = res.status(); + let response_text = res.text().await?; + + if status.is_client_error() { + bail!("{}: {}", status, response_text); + } else if status.is_server_error() { + if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE) + && response_text.contains("No testruns available") + { + return Ok(None); + } else { + bail!("{}: {}", status, response_text); + } + } + + serde_json::from_str(&response_text) + .map(|testrun| { + tracing::info!("Received testrun assignment: {:?}", testrun); + testrun + }) + .map_err(|err| { + tracing::error!("err"); + err.into() + }) +} + +#[instrument(level = "debug", skip(auth_key, probe_outcome))] +pub(crate) async fn submit_results( + auth_key: PrivateKey, + server_addr: &str, + testrun_id: i64, + probe_outcome: String, +) -> anyhow::Result<()> { + let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id); + + let results = sign_message(auth_key, probe_outcome); + + let client = reqwest::Client::new(); + let res = client + .post(target_url) + .json(&results) + .send() + .await + .and_then(|response| response.error_for_status())?; + + tracing::debug!("Submitted results: {})", res.status()); + Ok(()) +} + +fn sign_message(key: PrivateKey, probe_outcome: String) -> SubmitResults { + let signature = key.sign(&probe_outcome); + + SubmitResults { + message: probe_outcome, + signature, + public_key: key.public_key(), + } +} diff --git a/nym-node-status-agent/src/main.rs b/nym-node-status-agent/src/main.rs index 133828b7fa1..d3078753fac 100644 --- a/nym-node-status-agent/src/main.rs +++ b/nym-node-status-agent/src/main.rs @@ -11,26 +11,11 @@ async fn main() -> anyhow::Result<()> { setup_tracing(); let args = Args::parse(); - let server_addr = format!("{}:{}", args.server_address, args.server_port); - test_ns_api_conn(&server_addr).await?; - args.execute().await?; Ok(()) } -async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> { - reqwest::get(server_addr) - .await - .map(|res| { - tracing::info!( - "Testing connection to NS API at {server_addr}: {}", - res.status() - ); - }) - .map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err)) -} - pub(crate) fn setup_tracing() { fn directive_checked(directive: impl Into) -> Directive { directive diff --git a/nym-node-status-api/Cargo.toml b/nym-node-status-api/Cargo.toml index 511ab91b591..102f06c3bb7 100644 --- a/nym-node-status-api/Cargo.toml +++ b/nym-node-status-api/Cargo.toml @@ -23,6 +23,7 @@ futures-util = { workspace = true } moka = { workspace = true, features = ["future"] } nym-bin-common = { path = "../common/bin-common", features = ["models"]} nym-common-models = { path = "../common/models" } +nym-crypto = { path = "../common/crypto", features = ["asymmetric", "serde"] } nym-explorer-client = { path = "../explorer-api/explorer-client" } nym-network-defaults = { path = "../common/network-defaults" } nym-validator-client = { path = "../common/client-libs/validator-client" } diff --git a/nym-node-status-api/launch_node_status_api.sh b/nym-node-status-api/launch_node_status_api.sh index 5d926754128..20c15167493 100755 --- a/nym-node-status-api/launch_node_status_api.sh +++ b/nym-node-status-api/launch_node_status_api.sh @@ -7,6 +7,7 @@ export RUST_LOG=${RUST_LOG:-debug} export NYM_API_CLIENT_TIMEOUT=60 export EXPLORER_CLIENT_TIMEOUT=60 export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60 +export NODE_STATUS_API_AGENT_KEY_LIST="H4z8kx5Kkf5JMQHhxaW1MwYndjKCDHC7HsVhHTFfBZ4J,AKYZeKaJTzJrWtidk3BZz7wsm4GapaTY4GwQYVL43cmc" export ENVIRONMENT="qa.env" diff --git a/nym-node-status-api/src/cli/mod.rs b/nym-node-status-api/src/cli/mod.rs index 84ee86577fb..0ca2b9bf48e 100644 --- a/nym-node-status-api/src/cli/mod.rs +++ b/nym-node-status-api/src/cli/mod.rs @@ -1,5 +1,6 @@ use clap::Parser; use nym_bin_common::bin_info; +use nym_crypto::asymmetric::ed25519::PublicKey; use reqwest::Url; use std::{sync::OnceLock, time::Duration}; @@ -56,7 +57,7 @@ pub(crate) struct Cli { #[clap( long, - default_value = "600", + default_value = "300", env = "NODE_STATUS_API_MONITOR_REFRESH_INTERVAL" )] #[arg(value_parser = parse_duration)] @@ -64,11 +65,35 @@ pub(crate) struct Cli { #[clap( long, - default_value = "600", + default_value = "300", env = "NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL" )] #[arg(value_parser = parse_duration)] pub(crate) testruns_refresh_interval: Duration, + + #[clap(long, env = "NODE_STATUS_API_AGENT_KEY_LIST")] + #[arg(value_parser = parse_key_list)] + agent_key_list: KeyList, +} + +impl Cli { + pub(crate) fn agent_key_list(&self) -> &Vec { + &self.agent_key_list.0 + } +} + +// We need a list of keys from clap. But if we define CLI argument as Vec, +// clap interprets that as type T which can be given as a CLI argument multiple +// times (so all of them are stored in a Vec). Thus we wrap Vec in a newtype +// pattern to have a list of keys and make clap happy. +#[derive(Debug, Clone)] +struct KeyList(Vec); + +fn parse_key_list(arg: &str) -> anyhow::Result { + arg.split(',') + .map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from)) + .collect::>>() + .map(KeyList) } fn parse_duration(arg: &str) -> Result { diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index b13d62ba882..02074f4f3b2 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -4,6 +4,8 @@ use axum::{ extract::{Path, State}, Router, }; +use nym_common_models::ns_api::SubmitResults; +use nym_crypto::asymmetric::ed25519::PublicKey; use reqwest::StatusCode; use crate::db::models::TestRunStatus; @@ -28,10 +30,14 @@ pub(crate) fn routes() -> Router { } #[tracing::instrument(level = "debug", skip_all)] -async fn request_testrun(State(state): State) -> HttpResult> { - // TODO dz log agent's key +async fn request_testrun( + State(state): State, + body: String, +) -> HttpResult> { // TODO dz log agent's network probe version - tracing::debug!("Agent requested testrun"); + let agent_pubkey = authenticate_agent(&body, &state)?; + + tracing::debug!("Agent {} requested testrun", agent_pubkey); let db = state.db_pool(); let mut conn = db @@ -42,10 +48,11 @@ async fn request_testrun(State(state): State) -> HttpResult { if let Some(testrun) = res { - tracing::debug!( - "🏃‍ Assigned testrun row_id {} gateway {} to agent", + tracing::info!( + "🏃‍ Assigned testrun row_id {} gateway {} to agent {}", &testrun.testrun_id, - testrun.gateway_identity_key + testrun.gateway_identity_key, + agent_pubkey ); Ok(Json(testrun)) } else { @@ -57,13 +64,20 @@ async fn request_testrun(State(state): State) -> HttpResult, State(state): State, - body: String, + Json(probe_results): Json, ) -> HttpResult { + let agent_pubkey = authenticate_agent(&probe_results.public_key.to_base58_string(), &state)?; + agent_pubkey + .verify(&probe_results.message, &probe_results.signature) + .map_err(|_| { + tracing::warn!("Message verification failed, rejecting"); + HttpError::unauthorized() + })?; + let db = state.db_pool(); let mut conn = db .acquire() @@ -84,27 +98,31 @@ async fn submit_testrun( HttpError::internal_with_logging("No gateway found for testrun") })?; tracing::debug!( - "Agent submitted testrun {} for gateway {} ({} bytes)", + "Agent {} submitted testrun {} for gateway {} ({} bytes)", + agent_pubkey, testrun_id, gw_identity, - body.len(), + &probe_results.message.len(), ); // TODO dz this should be part of a single transaction: commit after everything is done queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete) .await .map_err(HttpError::internal_with_logging)?; - queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body) - .await - .map_err(HttpError::internal_with_logging)?; - let result = get_result_from_log(&body); + queries::testruns::update_gateway_last_probe_log( + &mut conn, + testrun.gateway_id, + &probe_results.message, + ) + .await + .map_err(HttpError::internal_with_logging)?; + let result = get_result_from_log(&probe_results.message); queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result) .await .map_err(HttpError::internal_with_logging)?; queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id) .await .map_err(HttpError::internal_with_logging)?; - // TODO dz log gw identity key tracing::info!( "✅ Testrun row_id {} for gateway {} complete", @@ -115,6 +133,23 @@ async fn submit_testrun( Ok(StatusCode::CREATED) } +fn authenticate_agent(base58_pubkey: &str, state: &AppState) -> HttpResult { + let agent_pubkey = PublicKey::from_base58_string(base58_pubkey).map_err(|_| { + if base58_pubkey.is_empty() { + tracing::warn!("Auth key missing from request body, rejecting"); + } else { + tracing::warn!("Failed to deserialize key from request body, rejecting"); + } + HttpError::unauthorized() + })?; + if !state.is_registered(&agent_pubkey) { + tracing::warn!("Public key {} not registered, rejecting", agent_pubkey); + return Err(HttpError::unauthorized()); + } + + Ok(agent_pubkey) +} + fn get_result_from_log(log: &str) -> String { let re = regex::Regex::new(r"\n\{\s").unwrap(); let result: Vec<_> = re.splitn(log, 2).collect(); diff --git a/nym-node-status-api/src/http/error.rs b/nym-node-status-api/src/http/error.rs index a7fe9d98ab5..021c993827b 100644 --- a/nym-node-status-api/src/http/error.rs +++ b/nym-node-status-api/src/http/error.rs @@ -15,6 +15,13 @@ impl HttpError { } } + pub(crate) fn unauthorized() -> Self { + Self { + message: serde_json::json!({"message": "Make sure your public key si registered with NS API"}).to_string(), + status: axum::http::StatusCode::UNAUTHORIZED, + } + } + pub(crate) fn internal_with_logging(msg: impl Display) -> Self { tracing::error!("{}", msg.to_string()); Self::internal() diff --git a/nym-node-status-api/src/http/server.rs b/nym-node-status-api/src/http/server.rs index 17d5d64ab01..714a4836e4f 100644 --- a/nym-node-status-api/src/http/server.rs +++ b/nym-node-status-api/src/http/server.rs @@ -1,5 +1,6 @@ use axum::Router; use core::net::SocketAddr; +use nym_crypto::asymmetric::ed25519::PublicKey; use tokio::{net::TcpListener, task::JoinHandle}; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; @@ -14,10 +15,11 @@ pub(crate) async fn start_http_api( db_pool: DbPool, http_port: u16, nym_http_cache_ttl: u64, + agent_key_list: Vec, ) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); - let state = AppState::new(db_pool, nym_http_cache_ttl); + let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list); let router = router_builder.with_state(state); // TODO dz do we need this to be configurable? diff --git a/nym-node-status-api/src/http/state.rs b/nym-node-status-api/src/http/state.rs index 6bccea39a12..2001244ef45 100644 --- a/nym-node-status-api/src/http/state.rs +++ b/nym-node-status-api/src/http/state.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use moka::{future::Cache, Entry}; +use nym_crypto::asymmetric::ed25519::PublicKey; use tokio::sync::RwLock; use crate::{ @@ -12,13 +13,15 @@ use crate::{ pub(crate) struct AppState { db_pool: DbPool, cache: HttpCache, + agent_key_list: Vec, } impl AppState { - pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self { + pub(crate) fn new(db_pool: DbPool, cache_ttl: u64, agent_key_list: Vec) -> Self { Self { db_pool, cache: HttpCache::new(cache_ttl), + agent_key_list, } } @@ -29,6 +32,10 @@ impl AppState { pub(crate) fn cache(&self) -> &HttpCache { &self.cache } + + pub(crate) fn is_registered(&self, agent_pubkey: &PublicKey) -> bool { + self.agent_key_list.contains(agent_pubkey) + } } static GATEWAYS_LIST_KEY: &str = "gateways"; diff --git a/nym-node-status-api/src/main.rs b/nym-node-status-api/src/main.rs index b2c68b391bb..facb4485b92 100644 --- a/nym-node-status-api/src/main.rs +++ b/nym-node-status-api/src/main.rs @@ -14,6 +14,9 @@ async fn main() -> anyhow::Result<()> { let args = cli::Cli::parse(); + let agent_key_list = args.agent_key_list(); + tracing::info!("Registered {} agent keys", agent_key_list.len()); + let connection_url = args.database_url.clone(); tracing::debug!("Using config:\n{:#?}", args); @@ -31,12 +34,14 @@ async fn main() -> anyhow::Result<()> { .await; tracing::info!("Started monitor task"); }); + testruns::spawn(storage.pool_owned(), args.testruns_refresh_interval).await; let shutdown_handles = http::server::start_http_api( storage.pool_owned(), args.http_port, args.nym_http_cache_ttl, + agent_key_list.to_owned(), ) .await .expect("Failed to start server"); diff --git a/nym-node-status-api/src/testruns/mod.rs b/nym-node-status-api/src/testruns/mod.rs index f487523f360..7ac916fc8e5 100644 --- a/nym-node-status-api/src/testruns/mod.rs +++ b/nym-node-status-api/src/testruns/mod.rs @@ -1,4 +1,5 @@ use crate::db::models::GatewayIdentityDto; + use crate::db::DbPool; use futures_util::TryStreamExt; use std::time::Duration; @@ -24,8 +25,6 @@ pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) { }); } -// TODO dz make number of max agents configurable - #[instrument(level = "debug", name = "testrun_queue", skip_all)] async fn run(pool: &DbPool) -> anyhow::Result<()> { tracing::info!("Spawning testruns..."); From f5ab647a7ab1d6928a15187404bc7485e2ceaa2b Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:17:42 +0100 Subject: [PATCH 2/9] /submit with better auth - also adjust agent run script to authenticate, even in parallel --- common/models/src/ns_api.rs | 3 +- nym-node-status-agent/run.sh | 44 +++++++--- .../src/cli/generate_keypair.rs | 1 - nym-node-status-agent/src/cli/run_probe.rs | 1 - nym-node-status-api/launch_node_status_api.sh | 14 ++- .../migrations/001_assigned_agent.sql | 2 + nym-node-status-api/src/cli/mod.rs | 27 +----- nym-node-status-api/src/db/models.rs | 10 +++ .../src/db/queries/testruns.rs | 45 +++++++++- nym-node-status-api/src/http/api/testruns.rs | 85 +++++++++++++------ nym-node-status-api/src/http/error.rs | 2 +- nym-node-status-api/src/main.rs | 7 +- nym-node-status-api/src/testruns/mod.rs | 2 +- nym-node-status-api/src/testruns/queue.rs | 3 +- 14 files changed, 168 insertions(+), 78 deletions(-) create mode 100644 nym-node-status-api/migrations/001_assigned_agent.sql diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs index 263628eb40b..306fd2ab93d 100644 --- a/common/models/src/ns_api.rs +++ b/common/models/src/ns_api.rs @@ -1,4 +1,4 @@ -use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; +use nym_crypto::asymmetric::ed25519::Signature; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] @@ -11,5 +11,4 @@ pub struct TestrunAssignment { pub struct SubmitResults { pub message: String, pub signature: Signature, - pub public_key: PublicKey, } diff --git a/nym-node-status-agent/run.sh b/nym-node-status-agent/run.sh index 46b997d3c0e..af1fa2353eb 100755 --- a/nym-node-status-agent/run.sh +++ b/nym-node-status-agent/run.sh @@ -1,23 +1,46 @@ #!/bin/bash set -eu - environment="qa" probe_git_ref="0dd5dacdda92b1ddd51cd30a3399515e45613371" -export RUST_LOG="debug" - crate_root=$(dirname $(realpath "$0")) -gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core +monorepo_root=$(dirname "${crate_root}") +echo "Expecting nym-vpn-client repo at a sibling level of nym monorepo dir" +gateway_probe_src=$(dirname "${monorepo_root}")/nym-vpn-client/nym-vpn-core echo "gateway_probe_src=$gateway_probe_src" echo "crate_root=$crate_root" +set -a +source "${monorepo_root}/envs/${environment}.env" +set +a + export RUST_LOG="debug" export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1" export NODE_STATUS_AGENT_SERVER_PORT="8000" export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe" -export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT" + +# each key is required for a separate agent when running in parallel: their +# public counterparts need to be registered with NS API +private_keys=("BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT" + "4RqSKydrEuyGF8Xtwyauvja62SAjqxFPYQzW2neZdkQD" + "CfudaSaaLTkgR8rkBijUnYocdFciWTbKqkSNYevepnbn" + "Dd3fDyPUg4edTpiCAkSweE17NdWJ7gAchbtqAeSj3MBc" + "HAtfcfnpw5XdpcVzAH6Qsxp6Zf75oe2W54HjTD8ngVbU" + "8aqWP8wZyhX5W8gfrvyh1SmS6CEgfLAR95DBhWXRUpTm" + "234U1PMkWpAsn7hD98g1D8hfRFkJJS91T2sJBQDyXmqx" + "5qUUFu83fgqpACsr3dC6iwGJxhTqN4JJDTecT2QSqwEe" + "4Pp7Cd9G3aMku9biFcxRMEja8RBMbBRGZuDAZt1yTS7H" + "4U136QykP8G831EZSDNLLvgWCGYA8naYtT8BQ9kLeL5B" +) + +workers=${1:-1} +if ((workers > ${#private_keys[@]})); then + echo "Error: ${workers} is larger than the number of private keys available (${#private_keys[@]})." + exit 1 +fi +echo "Running $workers workers in parallel" # build & copy over GW probe function copy_gw_probe() { @@ -36,11 +59,9 @@ function build_agent() { function swarm() { local workers=$1 - echo "Running $workers in parallel" - - build_agent - for ((i = 1; i <= $workers; i++)); do + for ((i = 1; i <= workers; i++)); do + export NODE_STATUS_AGENT_AUTH_KEY=${private_keys[i]} ../target/release/nym-node-status-agent run-probe & done @@ -50,7 +71,6 @@ function swarm() { } copy_gw_probe +build_agent -swarm 8 - -# cargo run -- run-probe +swarm $workers diff --git a/nym-node-status-agent/src/cli/generate_keypair.rs b/nym-node-status-agent/src/cli/generate_keypair.rs index ba86db838db..e89bf3e8d51 100644 --- a/nym-node-status-agent/src/cli/generate_keypair.rs +++ b/nym-node-status-agent/src/cli/generate_keypair.rs @@ -1,5 +1,4 @@ use std::{fs::File, io::Write, path::Path}; - use tracing::info; pub(crate) fn generate_key_pair(path: impl AsRef) -> anyhow::Result<()> { diff --git a/nym-node-status-agent/src/cli/run_probe.rs b/nym-node-status-agent/src/cli/run_probe.rs index 3c8ce13509f..aee1116f4ac 100644 --- a/nym-node-status-agent/src/cli/run_probe.rs +++ b/nym-node-status-agent/src/cli/run_probe.rs @@ -114,6 +114,5 @@ fn sign_message(key: PrivateKey, probe_outcome: String) -> SubmitResults { SubmitResults { message: probe_outcome, signature, - public_key: key.public_key(), } } diff --git a/nym-node-status-api/launch_node_status_api.sh b/nym-node-status-api/launch_node_status_api.sh index 20c15167493..83bd3dac933 100755 --- a/nym-node-status-api/launch_node_status_api.sh +++ b/nym-node-status-api/launch_node_status_api.sh @@ -6,8 +6,18 @@ export RUST_LOG=${RUST_LOG:-debug} export NYM_API_CLIENT_TIMEOUT=60 export EXPLORER_CLIENT_TIMEOUT=60 -export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60 -export NODE_STATUS_API_AGENT_KEY_LIST="H4z8kx5Kkf5JMQHhxaW1MwYndjKCDHC7HsVhHTFfBZ4J,AKYZeKaJTzJrWtidk3BZz7wsm4GapaTY4GwQYVL43cmc" +export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120 +# public keys corresponding to the agents NS API is expecting to be contacted from +export NODE_STATUS_API_AGENT_KEY_LIST="H4z8kx5Kkf5JMQHhxaW1MwYndjKCDHC7HsVhHTFfBZ4J, +5c2GW61135DEr73DxGrR4DR22BLEujvm1k8GYEjRB9at, +3PSFDH2iSJ61KoDNyJpAiw42xS5smV5iBXWnRGTmk2du, +2AH7pJL5PErbSFhZdu3uH8cKa1h1tyCUfSRUm6E5EBz8, +6wQ9ifPFm2EB73BrwpGSd3Ek7GFA5kiAMQDP2ox6JKZw, +G1tevJBnzaQ6zCUsFsxtGJf45BqCTDgzpEz6Sgxks8EH, +FwjL2nGrtgQQ48fPqAUzUZ8UkQZtMtgehqTqj4PQopvh, +Eujj4GmvwQBgHZaNSyqUbjMFSsnXWPSjEYUPgAsKmx1A, +5ZnfSGxW6EKcFxB8jftb9V3f897VpwpZtf7kCPYzB595, +H9kuRd8BGjEUD8Grh5U9YUPN5ZaQmSYz8U44R72AffKM" export ENVIRONMENT="qa.env" diff --git a/nym-node-status-api/migrations/001_assigned_agent.sql b/nym-node-status-api/migrations/001_assigned_agent.sql new file mode 100644 index 00000000000..96ec895924e --- /dev/null +++ b/nym-node-status-api/migrations/001_assigned_agent.sql @@ -0,0 +1,2 @@ +ALTER TABLE testruns +ADD COLUMN assigned_agent VARCHAR; diff --git a/nym-node-status-api/src/cli/mod.rs b/nym-node-status-api/src/cli/mod.rs index 0ca2b9bf48e..fa3d3c12bfd 100644 --- a/nym-node-status-api/src/cli/mod.rs +++ b/nym-node-status-api/src/cli/mod.rs @@ -1,6 +1,5 @@ use clap::Parser; use nym_bin_common::bin_info; -use nym_crypto::asymmetric::ed25519::PublicKey; use reqwest::Url; use std::{sync::OnceLock, time::Duration}; @@ -71,29 +70,9 @@ pub(crate) struct Cli { #[arg(value_parser = parse_duration)] pub(crate) testruns_refresh_interval: Duration, - #[clap(long, env = "NODE_STATUS_API_AGENT_KEY_LIST")] - #[arg(value_parser = parse_key_list)] - agent_key_list: KeyList, -} - -impl Cli { - pub(crate) fn agent_key_list(&self) -> &Vec { - &self.agent_key_list.0 - } -} - -// We need a list of keys from clap. But if we define CLI argument as Vec, -// clap interprets that as type T which can be given as a CLI argument multiple -// times (so all of them are stored in a Vec). Thus we wrap Vec in a newtype -// pattern to have a list of keys and make clap happy. -#[derive(Debug, Clone)] -struct KeyList(Vec); - -fn parse_key_list(arg: &str) -> anyhow::Result { - arg.split(',') - .map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from)) - .collect::>>() - .map(KeyList) + #[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")] + #[arg(value_delimiter = ',')] + pub(crate) agent_key_list: Vec, } fn parse_duration(arg: &str) -> Result { diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs index a5511787f9a..9e9f5aa8076 100644 --- a/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/src/db/models.rs @@ -2,6 +2,7 @@ use crate::{ http::{self, models::SummaryHistory}, monitor::NumericalCheckedCast, }; +use nym_crypto::asymmetric::ed25519::PublicKey; use nym_node_requests::api::v1::node::models::NodeDescription; use serde::{Deserialize, Serialize}; use strum_macros::{EnumString, FromRepr}; @@ -309,6 +310,15 @@ pub struct TestRunDto { pub timestamp_utc: i64, pub ip_address: String, pub log: String, + pub assigned_agent: Option, +} + +impl TestRunDto { + pub(crate) fn assigned_agent_key(&self) -> Option { + self.assigned_agent + .as_ref() + .and_then(|value| PublicKey::from_base58_string(value).ok()) + } } #[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)] diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index 91a3a86b139..38582e2a5b9 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::Context; use chrono::Duration; +use nym_crypto::asymmetric::ed25519::PublicKey; use sqlx::{pool::PoolConnection, Sqlite}; pub(crate) async fn get_in_progress_testrun_by_id( @@ -20,7 +21,8 @@ pub(crate) async fn get_in_progress_testrun_by_id( status as "status!", timestamp_utc as "timestamp_utc!", ip_address as "ip_address!", - log as "log!" + log as "log!", + assigned_agent FROM testruns WHERE id = ? @@ -35,6 +37,35 @@ pub(crate) async fn get_in_progress_testrun_by_id( .context(format!("Couldn't retrieve testrun {testrun_id}")) } +pub(crate) async fn get_testruns_assigned_to_agent( + conn: &mut PoolConnection, + agent_key: PublicKey, +) -> anyhow::Result { + let agent_key = agent_key.to_base58_string(); + sqlx::query_as!( + TestRunDto, + r#"SELECT + id as "id!", + gateway_id as "gateway_id!", + status as "status!", + timestamp_utc as "timestamp_utc!", + ip_address as "ip_address!", + log as "log!", + assigned_agent + FROM testruns + WHERE + assigned_agent = ? + AND + status = ? + ORDER BY timestamp_utc"#, + agent_key, + TestRunStatus::InProgress as i64, + ) + .fetch_one(conn.as_mut()) + .await + .context(format!("No testruns in progress for agent {agent_key}")) +} + pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result { let mut conn = db.acquire().await?; let previous_run = now_utc() - age; @@ -44,7 +75,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an r#"UPDATE testruns SET - status = ? + status = ?, + assigned_agent = NULL WHERE status = ? AND @@ -69,13 +101,17 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an Ok(stale_testruns) } -pub(crate) async fn get_oldest_testrun_and_make_it_pending( +pub(crate) async fn assign_oldest_testrun( conn: &mut PoolConnection, + agent_key: PublicKey, ) -> anyhow::Result> { + let agent_key = agent_key.to_base58_string(); // find & mark as "In progress" in the same transaction to avoid race conditions let returning = sqlx::query!( r#"UPDATE testruns - SET status = ? + SET + status = ?, + assigned_agent = ? WHERE rowid = ( SELECT rowid @@ -89,6 +125,7 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending( gateway_id "#, TestRunStatus::InProgress as i64, + agent_key, TestRunStatus::Queued as i64, ) .fetch_optional(conn.as_mut()) diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index 02074f4f3b2..5e19105e53c 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -45,7 +45,7 @@ async fn request_testrun( .await .map_err(HttpError::internal_with_logging)?; - return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await { + return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await { Ok(res) => { if let Some(testrun) = res { tracing::info!( @@ -66,67 +66,96 @@ async fn request_testrun( #[tracing::instrument(level = "debug", skip_all)] async fn submit_testrun( - Path(testrun_id): Path, + Path(submitted_testrun_id): Path, State(state): State, Json(probe_results): Json, ) -> HttpResult { - let agent_pubkey = authenticate_agent(&probe_results.public_key.to_base58_string(), &state)?; - agent_pubkey - .verify(&probe_results.message, &probe_results.signature) - .map_err(|_| { - tracing::warn!("Message verification failed, rejecting"); - HttpError::unauthorized() - })?; - let db = state.db_pool(); let mut conn = db .acquire() .await .map_err(HttpError::internal_with_logging)?; - let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id) - .await - .map_err(|e| { - tracing::error!("{e}"); - HttpError::not_found(testrun_id) + let submitted_testrun = + queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id) + .await + .map_err(|e| { + tracing::warn!("testrun_id {} not found: {}", submitted_testrun_id, e); + HttpError::not_found(submitted_testrun_id) + })?; + let agent_pubkey = submitted_testrun + .assigned_agent_key() + .ok_or_else(HttpError::unauthorized)?; + + let assigned_testrun = + queries::testruns::get_testruns_assigned_to_agent(&mut conn, agent_pubkey) + .await + .map_err(|err| { + tracing::warn!("{err}"); + HttpError::invalid_input("Invalid testrun submitted") + })?; + if submitted_testrun_id != assigned_testrun.id { + tracing::warn!( + "Agent {} submitted testrun {} but {} was expected", + agent_pubkey, + submitted_testrun_id, + assigned_testrun.id + ); + return Err(HttpError::invalid_input("Invalid testrun submitted")); + } + + agent_pubkey + .verify(&probe_results.message, &probe_results.signature) + .map_err(|_| { + tracing::warn!("Message verification failed, rejecting"); + HttpError::unauthorized() })?; - let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id) + let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id) .await .map_err(|_| { // should never happen: - HttpError::internal_with_logging("No gateway found for testrun") + HttpError::internal_with_logging(format!( + "No gateway found for testrun {submitted_testrun_id}" + )) })?; tracing::debug!( "Agent {} submitted testrun {} for gateway {} ({} bytes)", agent_pubkey, - testrun_id, + submitted_testrun_id, gw_identity, &probe_results.message.len(), ); - // TODO dz this should be part of a single transaction: commit after everything is done - queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete) - .await - .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_testrun_status( + &mut conn, + submitted_testrun_id, + TestRunStatus::Complete, + ) + .await + .map_err(HttpError::internal_with_logging)?; queries::testruns::update_gateway_last_probe_log( &mut conn, - testrun.gateway_id, + assigned_testrun.gateway_id, &probe_results.message, ) .await .map_err(HttpError::internal_with_logging)?; let result = get_result_from_log(&probe_results.message); - queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result) - .await - .map_err(HttpError::internal_with_logging)?; - queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id) + queries::testruns::update_gateway_last_probe_result( + &mut conn, + assigned_testrun.gateway_id, + &result, + ) + .await + .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_gateway_score(&mut conn, assigned_testrun.gateway_id) .await .map_err(HttpError::internal_with_logging)?; tracing::info!( "✅ Testrun row_id {} for gateway {} complete", - testrun.id, + assigned_testrun.id, gw_identity ); diff --git a/nym-node-status-api/src/http/error.rs b/nym-node-status-api/src/http/error.rs index 021c993827b..4ce1ca1ce1b 100644 --- a/nym-node-status-api/src/http/error.rs +++ b/nym-node-status-api/src/http/error.rs @@ -17,7 +17,7 @@ impl HttpError { pub(crate) fn unauthorized() -> Self { Self { - message: serde_json::json!({"message": "Make sure your public key si registered with NS API"}).to_string(), + message: serde_json::json!({"message": "Make sure your public key is registered with NS API"}).to_string(), status: axum::http::StatusCode::UNAUTHORIZED, } } diff --git a/nym-node-status-api/src/main.rs b/nym-node-status-api/src/main.rs index facb4485b92..0833c6a7b4c 100644 --- a/nym-node-status-api/src/main.rs +++ b/nym-node-status-api/src/main.rs @@ -1,4 +1,5 @@ use clap::Parser; +use nym_crypto::asymmetric::ed25519::PublicKey; use nym_task::signal::wait_for_signal; mod cli; @@ -14,7 +15,11 @@ async fn main() -> anyhow::Result<()> { let args = cli::Cli::parse(); - let agent_key_list = args.agent_key_list(); + let agent_key_list = args + .agent_key_list + .iter() + .map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from)) + .collect::>>()?; tracing::info!("Registered {} agent keys", agent_key_list.len()); let connection_url = args.database_url.clone(); diff --git a/nym-node-status-api/src/testruns/mod.rs b/nym-node-status-api/src/testruns/mod.rs index 7ac916fc8e5..522060850a9 100644 --- a/nym-node-status-api/src/testruns/mod.rs +++ b/nym-node-status-api/src/testruns/mod.rs @@ -71,7 +71,7 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> { testruns_created += 1; } } - tracing::debug!("{} testruns queued in total", testruns_created); + tracing::info!("{} testruns queued in total", testruns_created); Ok(()) } diff --git a/nym-node-status-api/src/testruns/queue.rs b/nym-node-status-api/src/testruns/queue.rs index 88804fff1b1..ec9676ed70f 100644 --- a/nym-node-status-api/src/testruns/queue.rs +++ b/nym-node-status-api/src/testruns/queue.rs @@ -55,7 +55,8 @@ pub(crate) async fn try_queue_testrun( status as "status!", timestamp_utc as "timestamp_utc!", ip_address as "ip_address!", - log as "log!" + log as "log!", + assigned_agent FROM testruns WHERE gateway_id = ? AND status != 2 ORDER BY id DESC From e4df6416f5c468b2eb483dde0920e54dc4259114 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Tue, 19 Nov 2024 23:41:13 +0100 Subject: [PATCH 3/9] /request better authentication - moved agent API calls to Client struct --- Cargo.lock | 4 + common/models/src/ns_api.rs | 17 +- nym-node-status-agent/Cargo.toml | 3 + nym-node-status-agent/run.sh | 6 +- nym-node-status-agent/src/cli/run_probe.rs | 173 ++++++++++-------- nym-node-status-api/Cargo.toml | 1 + nym-node-status-api/launch_node_status_api.sh | 4 +- .../src/db/queries/testruns.rs | 6 +- nym-node-status-api/src/http/api/testruns.rs | 60 +++--- 9 files changed, 166 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbd6905ab5a..c9c60e4298b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6053,12 +6053,15 @@ name = "nym-node-status-agent" version = "0.1.6" dependencies = [ "anyhow", + "bincode", + "chrono", "clap 4.5.20", "nym-bin-common", "nym-common-models", "nym-crypto", "rand", "reqwest 0.12.4", + "serde", "serde_json", "tempfile", "tokio", @@ -6073,6 +6076,7 @@ version = "0.1.6" dependencies = [ "anyhow", "axum 0.7.7", + "bincode", "chrono", "clap 4.5.20", "cosmwasm-std", diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs index 306fd2ab93d..f0c37c91afb 100644 --- a/common/models/src/ns_api.rs +++ b/common/models/src/ns_api.rs @@ -1,6 +1,21 @@ -use nym_crypto::asymmetric::ed25519::Signature; +use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; use serde::{Deserialize, Serialize}; +pub mod get_testrun { + use super::*; + #[derive(Debug, Clone, Deserialize, Serialize)] + pub struct Payload { + pub agent_public_key: PublicKey, + pub timestamp: i64, + } + + #[derive(Debug, Clone, Deserialize, Serialize)] + pub struct GetTestrunRequest { + pub payload: Payload, + pub signature: Signature, + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct TestrunAssignment { pub testrun_id: i64, diff --git a/nym-node-status-agent/Cargo.toml b/nym-node-status-agent/Cargo.toml index 9a61cd5084b..8bf6148cd22 100644 --- a/nym-node-status-agent/Cargo.toml +++ b/nym-node-status-agent/Cargo.toml @@ -16,6 +16,8 @@ readme.workspace = true [dependencies] anyhow = { workspace = true} +bincode = { workspace = true } +chrono = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } nym-bin-common = { path = "../common/bin-common", features = ["models"]} nym-common-models = { path = "../common/models" } @@ -26,6 +28,7 @@ tokio-util = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } serde_json = { workspace = true } [dev-dependencies] diff --git a/nym-node-status-agent/run.sh b/nym-node-status-agent/run.sh index af1fa2353eb..a0cf362e167 100755 --- a/nym-node-status-agent/run.sh +++ b/nym-node-status-agent/run.sh @@ -1,9 +1,9 @@ #!/bin/bash set -eu -environment="qa" +export ENVIRONMENT=${ENVIRONMENT:-"sandbox"} -probe_git_ref="0dd5dacdda92b1ddd51cd30a3399515e45613371" +probe_git_ref="nym-vpn-core-v1.0.0-rc.6" crate_root=$(dirname $(realpath "$0")) monorepo_root=$(dirname "${crate_root}") @@ -13,7 +13,7 @@ echo "gateway_probe_src=$gateway_probe_src" echo "crate_root=$crate_root" set -a -source "${monorepo_root}/envs/${environment}.env" +source "${monorepo_root}/envs/${ENVIRONMENT}.env" set +a export RUST_LOG="debug" diff --git a/nym-node-status-agent/src/cli/run_probe.rs b/nym-node-status-agent/src/cli/run_probe.rs index aee1116f4ac..17c7d2a36ac 100644 --- a/nym-node-status-agent/src/cli/run_probe.rs +++ b/nym-node-status-agent/src/cli/run_probe.rs @@ -1,33 +1,34 @@ use anyhow::{bail, Context}; -use nym_common_models::ns_api::{SubmitResults, TestrunAssignment}; -use nym_crypto::asymmetric::ed25519::{PrivateKey, PublicKey}; +use nym_common_models::ns_api::{get_testrun, SubmitResults, TestrunAssignment}; +use nym_crypto::asymmetric::ed25519::{PrivateKey, Signature}; +use std::fmt::Display; use tracing::instrument; use crate::cli::GwProbe; -const URL_BASE: &str = "internal/testruns"; +const INTERNAL_TESTRUNS: &str = "internal/testruns"; pub(crate) async fn run_probe( - server_address: &str, + server_ip: &str, server_port: u16, ns_api_auth_key: &str, probe_path: &str, ) -> anyhow::Result<()> { - let server_address = format!("{}:{}", server_address, server_port); - test_ns_api_conn(&server_address).await?; - let auth_key = PrivateKey::from_base58_string(ns_api_auth_key) .context("Couldn't parse auth key, exiting")?; + let ns_api_client = Client::new(server_ip, server_port, auth_key); let probe = GwProbe::new(probe_path.to_string()); let version = probe.version().await; tracing::info!("Probe version:\n{}", version); - if let Some(testrun) = request_testrun(auth_key.public_key(), &server_address).await? { + if let Some(testrun) = ns_api_client.request_testrun().await? { let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key)); - submit_results(auth_key, &server_address, testrun.testrun_id, log).await?; + ns_api_client + .submit_results(testrun.testrun_id, log) + .await?; } else { tracing::info!("No testruns available, exiting") } @@ -35,84 +36,102 @@ pub(crate) async fn run_probe( Ok(()) } -async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> { - reqwest::get(server_addr) - .await - .map(|res| { - tracing::info!( - "Testing connection to NS API at {server_addr}: {}", - res.status() - ); - }) - .map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err)) +struct Client { + server_address: String, + client: reqwest::Client, + auth_key: PrivateKey, } -#[instrument(level = "debug", skip_all)] -pub(crate) async fn request_testrun( - auth_key: PublicKey, - server_addr: &str, -) -> anyhow::Result> { - let target_url = format!("{}/{}", server_addr, URL_BASE); - let client = reqwest::Client::new(); - let res = client - .get(target_url) - .body(auth_key.to_base58_string()) - .send() - .await?; - let status = res.status(); - let response_text = res.text().await?; - - if status.is_client_error() { - bail!("{}: {}", status, response_text); - } else if status.is_server_error() { - if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE) - && response_text.contains("No testruns available") - { - return Ok(None); - } else { - bail!("{}: {}", status, response_text); +impl Client { + pub fn new(server_ip: &str, server_port: u16, auth_key: PrivateKey) -> Self { + let server_address = format!("{}:{}", server_ip, server_port); + let client = reqwest::Client::new(); + + Self { + server_address, + client, + auth_key, } } - serde_json::from_str(&response_text) - .map(|testrun| { - tracing::info!("Received testrun assignment: {:?}", testrun); - testrun - }) - .map_err(|err| { - tracing::error!("err"); - err.into() - }) -} + #[instrument(level = "debug", skip_all)] + pub(crate) async fn request_testrun(&self) -> anyhow::Result> { + let target_url = self.api_with_subpath(None::); -#[instrument(level = "debug", skip(auth_key, probe_outcome))] -pub(crate) async fn submit_results( - auth_key: PrivateKey, - server_addr: &str, - testrun_id: i64, - probe_outcome: String, -) -> anyhow::Result<()> { - let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id); + let payload = get_testrun::Payload { + agent_public_key: self.auth_key.public_key(), + timestamp: chrono::offset::Utc::now().timestamp(), + }; + let signature = self.sign_message(&payload)?; + let request = get_testrun::GetTestrunRequest { payload, signature }; + + let res = self.client.get(target_url).json(&request).send().await?; + let status = res.status(); + let response_text = res.text().await?; - let results = sign_message(auth_key, probe_outcome); + if status.is_client_error() { + bail!("{}: {}", status, response_text); + } else if status.is_server_error() { + if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE) + && response_text.contains("No testruns available") + { + return Ok(None); + } else { + bail!("{}: {}", status, response_text); + } + } - let client = reqwest::Client::new(); - let res = client - .post(target_url) - .json(&results) - .send() - .await - .and_then(|response| response.error_for_status())?; + serde_json::from_str(&response_text) + .map(|testrun| { + tracing::info!("Received testrun assignment: {:?}", testrun); + testrun + }) + .map_err(|err| { + tracing::error!("err"); + err.into() + }) + } - tracing::debug!("Submitted results: {})", res.status()); - Ok(()) -} + #[instrument(level = "debug", skip(self, probe_outcome))] + pub(crate) async fn submit_results( + &self, + testrun_id: i64, + probe_outcome: String, + ) -> anyhow::Result<()> { + let target_url = self.api_with_subpath(Some(testrun_id)); + + let signature = self.sign_message(&probe_outcome)?; + let submit_results = SubmitResults { + message: probe_outcome, + signature, + }; + + let res = self + .client + .post(target_url) + .json(&submit_results) + .send() + .await + .and_then(|response| response.error_for_status())?; + + tracing::debug!("Submitted results: {})", res.status()); + Ok(()) + } -fn sign_message(key: PrivateKey, probe_outcome: String) -> SubmitResults { - let signature = key.sign(&probe_outcome); + fn sign_message(&self, message: &T) -> anyhow::Result + where + T: serde::Serialize, + { + let serialized = bincode::serialize(message)?; + let signed = self.auth_key.sign(&serialized); + Ok(signed) + } - SubmitResults { - message: probe_outcome, - signature, + fn api_with_subpath(&self, subpath: Option) -> String { + if let Some(subpath) = subpath { + format!("{}/{}/{}", self.server_address, INTERNAL_TESTRUNS, subpath) + } else { + format!("{}/{}", self.server_address, INTERNAL_TESTRUNS) + } } } diff --git a/nym-node-status-api/Cargo.toml b/nym-node-status-api/Cargo.toml index 102f06c3bb7..576239ce39f 100644 --- a/nym-node-status-api/Cargo.toml +++ b/nym-node-status-api/Cargo.toml @@ -15,6 +15,7 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } axum = { workspace = true, features = ["tokio", "macros"] } +bincode = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["cargo", "derive", "env", "string"] } cosmwasm-std = { workspace = true } diff --git a/nym-node-status-api/launch_node_status_api.sh b/nym-node-status-api/launch_node_status_api.sh index 83bd3dac933..26a97fcbabf 100755 --- a/nym-node-status-api/launch_node_status_api.sh +++ b/nym-node-status-api/launch_node_status_api.sh @@ -19,12 +19,12 @@ Eujj4GmvwQBgHZaNSyqUbjMFSsnXWPSjEYUPgAsKmx1A, 5ZnfSGxW6EKcFxB8jftb9V3f897VpwpZtf7kCPYzB595, H9kuRd8BGjEUD8Grh5U9YUPN5ZaQmSYz8U44R72AffKM" -export ENVIRONMENT="qa.env" +export ENVIRONMENT=${ENVIRONMENT:-"sandbox"} function run_bare() { # export necessary env vars set -a - source ../envs/$ENVIRONMENT + source ../envs/${ENVIRONMENT}.env set +a export RUST_LOG=debug diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index 38582e2a5b9..a3adf15b190 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -33,8 +33,10 @@ pub(crate) async fn get_in_progress_testrun_by_id( TestRunStatus::InProgress as i64, ) .fetch_one(conn.as_mut()) - .await - .context(format!("Couldn't retrieve testrun {testrun_id}")) + .await.map_err(|e| { + anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}") + }) + } pub(crate) async fn get_testruns_assigned_to_agent( diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index 5e19105e53c..43122041dcb 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -4,8 +4,8 @@ use axum::{ extract::{Path, State}, Router, }; -use nym_common_models::ns_api::SubmitResults; -use nym_crypto::asymmetric::ed25519::PublicKey; +use nym_common_models::ns_api::{get_testrun, SubmitResults}; +use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; use reqwest::StatusCode; use crate::db::models::TestRunStatus; @@ -32,10 +32,12 @@ pub(crate) fn routes() -> Router { #[tracing::instrument(level = "debug", skip_all)] async fn request_testrun( State(state): State, - body: String, + Json(request): Json, ) -> HttpResult> { // TODO dz log agent's network probe version - let agent_pubkey = authenticate_agent(&body, &state)?; + + authenticate(&request, &state)?; + let agent_pubkey = request.payload.agent_public_key; tracing::debug!("Agent {} requested testrun", agent_pubkey); @@ -104,12 +106,11 @@ async fn submit_testrun( return Err(HttpError::invalid_input("Invalid testrun submitted")); } - agent_pubkey - .verify(&probe_results.message, &probe_results.signature) - .map_err(|_| { - tracing::warn!("Message verification failed, rejecting"); - HttpError::unauthorized() - })?; + verify_message( + &agent_pubkey, + &probe_results.message, + &probe_results.signature, + )?; let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id) .await @@ -162,21 +163,34 @@ async fn submit_testrun( Ok(StatusCode::CREATED) } -fn authenticate_agent(base58_pubkey: &str, state: &AppState) -> HttpResult { - let agent_pubkey = PublicKey::from_base58_string(base58_pubkey).map_err(|_| { - if base58_pubkey.is_empty() { - tracing::warn!("Auth key missing from request body, rejecting"); - } else { - tracing::warn!("Failed to deserialize key from request body, rejecting"); - } - HttpError::unauthorized() - })?; - if !state.is_registered(&agent_pubkey) { - tracing::warn!("Public key {} not registered, rejecting", agent_pubkey); +// TODO dz this should be middleware +fn authenticate(request: &get_testrun::GetTestrunRequest, state: &AppState) -> HttpResult<()> { + if !state.is_registered(&request.payload.agent_public_key) { + tracing::warn!("Public key not registered with NS API, rejecting"); return Err(HttpError::unauthorized()); - } + }; + + verify_message( + &request.payload.agent_public_key, + &request.payload, + &request.signature, + ) + .inspect_err(|_| tracing::warn!("Signature verification failed, rejecting"))?; + + Ok(()) +} - Ok(agent_pubkey) +fn verify_message(public_key: &PublicKey, message: &T, signature: &Signature) -> HttpResult<()> +where + T: serde::Serialize, +{ + bincode::serialize(message) + .map_err(HttpError::invalid_input) + .and_then(|serialized| { + public_key + .verify(serialized, signature) + .map_err(|_| HttpError::unauthorized()) + }) } fn get_result_from_log(log: &str) -> String { From 1b52856fe580a78eac90d77300e1b40b5c948ac2 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 20 Nov 2024 00:51:57 +0100 Subject: [PATCH 4/9] Replay protection --- nym-node-status-api/src/http/api/testruns.rs | 10 ++++- nym-node-status-api/src/http/state.rs | 43 +++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index 43122041dcb..d1825148830 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -37,6 +37,13 @@ async fn request_testrun( // TODO dz log agent's network probe version authenticate(&request, &state)?; + state + .check_last_request_time( + &request.payload.agent_public_key, + &request.payload.timestamp, + ) + .await?; + let agent_pubkey = request.payload.agent_public_key; tracing::debug!("Agent {} requested testrun", agent_pubkey); @@ -58,7 +65,7 @@ async fn request_testrun( ); Ok(Json(testrun)) } else { - tracing::debug!("No testruns available for agent"); + tracing::debug!("No testruns available for agent {}", agent_pubkey); Err(HttpError::no_testruns_available()) } } @@ -164,6 +171,7 @@ async fn submit_testrun( } // TODO dz this should be middleware +#[tracing::instrument(level = "debug", skip_all)] fn authenticate(request: &get_testrun::GetTestrunRequest, state: &AppState) -> HttpResult<()> { if !state.is_registered(&request.payload.agent_public_key) { tracing::warn!("Public key not registered with NS API, rejecting"); diff --git a/nym-node-status-api/src/http/state.rs b/nym-node-status-api/src/http/state.rs index 2001244ef45..77c1d6684cb 100644 --- a/nym-node-status-api/src/http/state.rs +++ b/nym-node-status-api/src/http/state.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use moka::{future::Cache, Entry}; use nym_crypto::asymmetric::ed25519::PublicKey; @@ -6,7 +6,10 @@ use tokio::sync::RwLock; use crate::{ db::DbPool, - http::models::{DailyStats, Gateway, Mixnode, SummaryHistory}, + http::{ + error::{HttpError, HttpResult}, + models::{DailyStats, Gateway, Mixnode, SummaryHistory}, + }, }; #[derive(Debug, Clone)] @@ -14,6 +17,8 @@ pub(crate) struct AppState { db_pool: DbPool, cache: HttpCache, agent_key_list: Vec, + /// last time agent requested a testrun + agent_last_request_times: Arc>>, } impl AppState { @@ -22,6 +27,7 @@ impl AppState { db_pool, cache: HttpCache::new(cache_ttl), agent_key_list, + agent_last_request_times: Arc::new(RwLock::new(HashMap::new())), } } @@ -36,6 +42,39 @@ impl AppState { pub(crate) fn is_registered(&self, agent_pubkey: &PublicKey) -> bool { self.agent_key_list.contains(agent_pubkey) } + + /// Only updates if request time is valid. Otherwise return error + pub(crate) async fn check_last_request_time( + &self, + agent_key: &PublicKey, + request_time: &i64, + ) -> HttpResult<()> { + // if entry exists with a newer time than this request's submit time, + // it's a repeated request + let agent_key = agent_key.to_base58_string(); + let request_times = self.agent_last_request_times.read().await; + if let Some(previous_request_time) = request_times.get(&agent_key) { + if request_time <= previous_request_time { + tracing::warn!( + "Request has timestamp {} but previous request was at {}, rejecting", + request_time, + previous_request_time + ); + return Err(HttpError::unauthorized()); + } + } + drop(request_times); + + // otherwise this is a newer (or a first) request + self.agent_last_request_times + .write() + .await + .entry(agent_key) + .and_modify(|value| *value = *request_time) + .or_insert(*request_time); + + Ok(()) + } } static GATEWAYS_LIST_KEY: &str = "gateways"; From 65f4c08050961dd7cc3af72da75695266d593834 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 20 Nov 2024 12:19:02 +0100 Subject: [PATCH 5/9] Fix testrun cleanup bug - introduce a new column last_assigned which is different than created_at so that stale testruns get cleaned up based on last_assigned - created_at is still useful for determining the "oldest" testrun to be picked up --- .../migrations/002_last_assigned_utc.sql | 5 +++ nym-node-status-api/src/db/models.rs | 3 +- .../src/db/queries/testruns.rs | 43 ++++++++++--------- nym-node-status-api/src/http/api/testruns.rs | 25 ++++++++--- nym-node-status-api/src/http/state.rs | 16 +++++-- nym-node-status-api/src/testruns/mod.rs | 4 +- nym-node-status-api/src/testruns/queue.rs | 9 ++-- 7 files changed, 67 insertions(+), 38 deletions(-) create mode 100644 nym-node-status-api/migrations/002_last_assigned_utc.sql diff --git a/nym-node-status-api/migrations/002_last_assigned_utc.sql b/nym-node-status-api/migrations/002_last_assigned_utc.sql new file mode 100644 index 00000000000..edd91053cf0 --- /dev/null +++ b/nym-node-status-api/migrations/002_last_assigned_utc.sql @@ -0,0 +1,5 @@ +ALTER TABLE testruns +ADD COLUMN last_assigned_utc INTEGER; + +ALTER TABLE testruns +RENAME COLUMN timestamp_utc TO created_utc; diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs index 9e9f5aa8076..08bd913d7eb 100644 --- a/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/src/db/models.rs @@ -307,10 +307,11 @@ pub struct TestRunDto { pub id: i64, pub gateway_id: i64, pub status: i64, - pub timestamp_utc: i64, + pub created_utc: i64, pub ip_address: String, pub log: String, pub assigned_agent: Option, + pub last_assigned_utc: Option, } impl TestRunDto { diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index a3adf15b190..275e39f90e6 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -4,7 +4,6 @@ use crate::{ db::models::{TestRunDto, TestRunStatus}, testruns::now_utc, }; -use anyhow::Context; use chrono::Duration; use nym_crypto::asymmetric::ed25519::PublicKey; use sqlx::{pool::PoolConnection, Sqlite}; @@ -19,30 +18,29 @@ pub(crate) async fn get_in_progress_testrun_by_id( id as "id!", gateway_id as "gateway_id!", status as "status!", - timestamp_utc as "timestamp_utc!", + created_utc as "created_utc!", ip_address as "ip_address!", log as "log!", - assigned_agent + assigned_agent, + last_assigned_utc FROM testruns WHERE id = ? AND status = ? - ORDER BY timestamp_utc"#, + ORDER BY created_utc"#, testrun_id, TestRunStatus::InProgress as i64, ) .fetch_one(conn.as_mut()) - .await.map_err(|e| { - anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}") - }) - + .await + .map_err(|e| anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}")) } -pub(crate) async fn get_testruns_assigned_to_agent( +pub(crate) async fn testrun_in_progress_assigned_to_agent( conn: &mut PoolConnection, - agent_key: PublicKey, -) -> anyhow::Result { + agent_key: &PublicKey, +) -> sqlx::Result { let agent_key = agent_key.to_base58_string(); sqlx::query_as!( TestRunDto, @@ -50,27 +48,30 @@ pub(crate) async fn get_testruns_assigned_to_agent( id as "id!", gateway_id as "gateway_id!", status as "status!", - timestamp_utc as "timestamp_utc!", + created_utc as "created_utc!", ip_address as "ip_address!", log as "log!", - assigned_agent + assigned_agent, + last_assigned_utc FROM testruns WHERE assigned_agent = ? AND status = ? - ORDER BY timestamp_utc"#, + ORDER BY created_utc"#, agent_key, TestRunStatus::InProgress as i64, ) .fetch_one(conn.as_mut()) .await - .context(format!("No testruns in progress for agent {agent_key}")) } -pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result { +pub(crate) async fn update_testruns_assigned_before( + db: &DbPool, + max_age: Duration, +) -> anyhow::Result { let mut conn = db.acquire().await?; - let previous_run = now_utc() - age; + let previous_run = now_utc() - max_age; let cutoff_timestamp = previous_run.timestamp(); let res = sqlx::query!( @@ -82,7 +83,7 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an WHERE status = ? AND - timestamp_utc < ? + last_assigned_utc < ? "#, TestRunStatus::Queued as i64, TestRunStatus::InProgress as i64, @@ -93,8 +94,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an let stale_testruns = res.rows_affected(); if stale_testruns > 0 { - tracing::debug!( - "Refreshed {} stale testruns, scheduled before {} but not yet finished", + tracing::info!( + "Refreshed {} stale testruns, assigned before {} but not yet finished", stale_testruns, previous_run ); @@ -119,7 +120,7 @@ pub(crate) async fn assign_oldest_testrun( SELECT rowid FROM testruns WHERE status = ? - ORDER BY timestamp_utc asc + ORDER BY created_utc asc LIMIT 1 ) RETURNING diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index d1825148830..369ac6c4aa3 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -35,17 +35,15 @@ async fn request_testrun( Json(request): Json, ) -> HttpResult> { // TODO dz log agent's network probe version - authenticate(&request, &state)?; state - .check_last_request_time( + .update_last_request_time( &request.payload.agent_public_key, &request.payload.timestamp, ) .await?; let agent_pubkey = request.payload.agent_public_key; - tracing::debug!("Agent {} requested testrun", agent_pubkey); let db = state.db_pool(); @@ -54,6 +52,19 @@ async fn request_testrun( .await .map_err(HttpError::internal_with_logging)?; + if let Ok(testrun) = + db::queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey).await + { + tracing::warn!( + "Testrun {} already in progress for agent {:?}, rejecting", + testrun.id, + testrun.assigned_agent + ); + return Err(HttpError::invalid_input( + "Testrun already in progress for this agent", + )); + }; + return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await { Ok(res) => { if let Some(testrun) = res { @@ -97,10 +108,10 @@ async fn submit_testrun( .ok_or_else(HttpError::unauthorized)?; let assigned_testrun = - queries::testruns::get_testruns_assigned_to_agent(&mut conn, agent_pubkey) + queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey) .await .map_err(|err| { - tracing::warn!("{err}"); + tracing::warn!("No testruns in progress for agent {agent_pubkey}: {err}"); HttpError::invalid_input("Invalid testrun submitted") })?; if submitted_testrun_id != assigned_testrun.id { @@ -188,11 +199,11 @@ fn authenticate(request: &get_testrun::GetTestrunRequest, state: &AppState) -> H Ok(()) } -fn verify_message(public_key: &PublicKey, message: &T, signature: &Signature) -> HttpResult<()> +fn verify_message(public_key: &PublicKey, payload: &T, signature: &Signature) -> HttpResult<()> where T: serde::Serialize, { - bincode::serialize(message) + bincode::serialize(payload) .map_err(HttpError::invalid_input) .and_then(|serialized| { public_key diff --git a/nym-node-status-api/src/http/state.rs b/nym-node-status-api/src/http/state.rs index 77c1d6684cb..fb273027ac2 100644 --- a/nym-node-status-api/src/http/state.rs +++ b/nym-node-status-api/src/http/state.rs @@ -10,6 +10,7 @@ use crate::{ error::{HttpError, HttpResult}, models::{DailyStats, Gateway, Mixnode, SummaryHistory}, }, + testruns::now_utc, }; #[derive(Debug, Clone)] @@ -18,6 +19,7 @@ pub(crate) struct AppState { cache: HttpCache, agent_key_list: Vec, /// last time agent requested a testrun + // if performance becomes a problem, consider a faster hashmap like `scc`` agent_last_request_times: Arc>>, } @@ -43,13 +45,21 @@ impl AppState { self.agent_key_list.contains(agent_pubkey) } - /// Only updates if request time is valid. Otherwise return error - pub(crate) async fn check_last_request_time( + /// Only updates if request is not a replay. Otherwise return error + pub(crate) async fn update_last_request_time( &self, agent_key: &PublicKey, request_time: &i64, ) -> HttpResult<()> { - // if entry exists with a newer time than this request's submit time, + // if a request took longer than N minutes to reach NS API, something is very wrong + let cutoff_duration = chrono::Duration::minutes(1); + let cutoff_timestamp = (now_utc() - cutoff_duration).timestamp(); + if *request_time < cutoff_timestamp { + tracing::warn!("Request older than {}s, rejecting", cutoff_timestamp); + return Err(HttpError::unauthorized()); + } + + // if a previous entry has a newer time than this request's submit time, // it's a repeated request let agent_key = agent_key.to_base58_string(); let request_times = self.agent_last_request_times.read().await; diff --git a/nym-node-status-api/src/testruns/mod.rs b/nym-node-status-api/src/testruns/mod.rs index 522060850a9..61246fa2dc7 100644 --- a/nym-node-status-api/src/testruns/mod.rs +++ b/nym-node-status-api/src/testruns/mod.rs @@ -78,8 +78,8 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> { #[instrument(level = "debug", skip_all)] async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> { - let chrono_duration = chrono::Duration::from_std(refresh_interval)?; - crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?; + let refresh_interval = chrono::Duration::from_std(refresh_interval)?; + crate::db::queries::testruns::update_testruns_assigned_before(pool, refresh_interval).await?; Ok(()) } diff --git a/nym-node-status-api/src/testruns/queue.rs b/nym-node-status-api/src/testruns/queue.rs index ec9676ed70f..a0e7a1422f1 100644 --- a/nym-node-status-api/src/testruns/queue.rs +++ b/nym-node-status-api/src/testruns/queue.rs @@ -28,7 +28,7 @@ pub(crate) async fn try_queue_testrun( LIMIT 1"#, identity_key, ) - // TODO dz shoudl call .fetch_one + // TODO dz should call .fetch_one // TODO dz replace this in other queries as well .fetch(conn.as_mut()) .try_collect::>() @@ -53,10 +53,11 @@ pub(crate) async fn try_queue_testrun( id as "id!", gateway_id as "gateway_id!", status as "status!", - timestamp_utc as "timestamp_utc!", + created_utc as "created_utc!", ip_address as "ip_address!", log as "log!", - assigned_agent + assigned_agent, + last_assigned_utc FROM testruns WHERE gateway_id = ? AND status != 2 ORDER BY id DESC @@ -90,7 +91,7 @@ pub(crate) async fn try_queue_testrun( ); let id = sqlx::query!( - "INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO testruns (gateway_id, status, ip_address, created_utc, log) VALUES (?, ?, ?, ?, ?)", gateway_id, status, ip_address, From 03f40a9ce2462275eaa58792fc35d351db442886 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:25:19 +0100 Subject: [PATCH 6/9] Uniform request authentication --- Cargo.lock | 2 + common/models/Cargo.toml | 4 +- common/models/src/ns_api.rs | 82 +++++++++++++++++-- nym-node-status-agent/src/cli/run_probe.rs | 19 +++-- nym-node-status-api/src/db/models.rs | 9 -- .../src/db/queries/testruns.rs | 35 ++------ nym-node-status-api/src/http/api/testruns.rs | 74 ++++++----------- nym-node-status-api/src/http/error.rs | 6 -- 8 files changed, 124 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9c60e4298b..2107724c62f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5010,6 +5010,8 @@ dependencies = [ name = "nym-common-models" version = "0.1.0" dependencies = [ + "anyhow", + "bincode", "nym-crypto", "serde", ] diff --git a/common/models/Cargo.toml b/common/models/Cargo.toml index 123f3f7fd25..3192a83b532 100644 --- a/common/models/Cargo.toml +++ b/common/models/Cargo.toml @@ -11,5 +11,7 @@ rust-version.workspace = true readme.workspace = true [dependencies] -serde = { workspace = true, features = ["derive"] } +anyhow = { workspace = true } +bincode = { workspace = true } nym-crypto = { path = "../crypto", features = ["asymmetric", "serde"] } +serde = { workspace = true, features = ["derive"] } diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs index f0c37c91afb..6f4b00a888c 100644 --- a/common/models/src/ns_api.rs +++ b/common/models/src/ns_api.rs @@ -1,4 +1,4 @@ -use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; +use nym_crypto::asymmetric::ed25519::{PublicKey, Signature, SignatureError}; use serde::{Deserialize, Serialize}; pub mod get_testrun { @@ -14,16 +14,88 @@ pub mod get_testrun { pub payload: Payload, pub signature: Signature, } + + impl SignedRequest for GetTestrunRequest { + type Payload = Payload; + + fn public_key(&self) -> &PublicKey { + &self.payload.agent_public_key + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn payload(&self) -> &Self::Payload { + &self.payload + } + } } #[derive(Debug, Clone, Deserialize, Serialize)] pub struct TestrunAssignment { pub testrun_id: i64, + pub assigned_at_utc: i64, pub gateway_identity_key: String, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct SubmitResults { - pub message: String, - pub signature: Signature, +pub mod submit_results { + use super::*; + #[derive(Debug, Clone, Deserialize, Serialize)] + pub struct Payload { + pub probe_result: String, + pub agent_public_key: PublicKey, + pub assigned_at_utc: i64, + } + + #[derive(Debug, Clone, Deserialize, Serialize)] + pub struct SubmitResults { + pub payload: Payload, + pub signature: Signature, + } + + impl SignedRequest for SubmitResults { + type Payload = Payload; + + fn public_key(&self) -> &PublicKey { + &self.payload.agent_public_key + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn payload(&self) -> &Self::Payload { + &self.payload + } + } +} + +pub trait SignedRequest { + type Payload: serde::Serialize; + + fn public_key(&self) -> &PublicKey; + fn signature(&self) -> &Signature; + fn payload(&self) -> &Self::Payload; +} + +pub trait VerifiableRequest: SignedRequest { + type Error: From + From; + + fn verify_signature(&self) -> Result<(), Self::Error> { + bincode::serialize(self.payload()) + .map_err(Self::Error::from) + .and_then(|serialized| { + self.public_key() + .verify(serialized, self.signature()) + .map_err(Self::Error::from) + }) + } +} + +impl VerifiableRequest for T +where + T: SignedRequest, +{ + type Error = anyhow::Error; } diff --git a/nym-node-status-agent/src/cli/run_probe.rs b/nym-node-status-agent/src/cli/run_probe.rs index 17c7d2a36ac..3ffd90d46c4 100644 --- a/nym-node-status-agent/src/cli/run_probe.rs +++ b/nym-node-status-agent/src/cli/run_probe.rs @@ -1,5 +1,5 @@ use anyhow::{bail, Context}; -use nym_common_models::ns_api::{get_testrun, SubmitResults, TestrunAssignment}; +use nym_common_models::ns_api::{get_testrun, submit_results, TestrunAssignment}; use nym_crypto::asymmetric::ed25519::{PrivateKey, Signature}; use std::fmt::Display; use tracing::instrument; @@ -27,7 +27,7 @@ pub(crate) async fn run_probe( let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key)); ns_api_client - .submit_results(testrun.testrun_id, log) + .submit_results(testrun.testrun_id, log, testrun.assigned_at_utc) .await?; } else { tracing::info!("No testruns available, exiting") @@ -92,19 +92,22 @@ impl Client { }) } - #[instrument(level = "debug", skip(self, probe_outcome))] + #[instrument(level = "debug", skip(self, probe_result))] pub(crate) async fn submit_results( &self, testrun_id: i64, - probe_outcome: String, + probe_result: String, + assigned_at_utc: i64, ) -> anyhow::Result<()> { let target_url = self.api_with_subpath(Some(testrun_id)); - let signature = self.sign_message(&probe_outcome)?; - let submit_results = SubmitResults { - message: probe_outcome, - signature, + let payload = submit_results::Payload { + probe_result, + agent_public_key: self.auth_key.public_key(), + assigned_at_utc, }; + let signature = self.sign_message(&payload)?; + let submit_results = submit_results::SubmitResults { payload, signature }; let res = self .client diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs index 08bd913d7eb..4062e1451dc 100644 --- a/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/src/db/models.rs @@ -2,7 +2,6 @@ use crate::{ http::{self, models::SummaryHistory}, monitor::NumericalCheckedCast, }; -use nym_crypto::asymmetric::ed25519::PublicKey; use nym_node_requests::api::v1::node::models::NodeDescription; use serde::{Deserialize, Serialize}; use strum_macros::{EnumString, FromRepr}; @@ -314,14 +313,6 @@ pub struct TestRunDto { pub last_assigned_utc: Option, } -impl TestRunDto { - pub(crate) fn assigned_agent_key(&self) -> Option { - self.assigned_agent - .as_ref() - .and_then(|value| PublicKey::from_base58_string(value).ok()) - } -} - #[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)] #[repr(u8)] pub(crate) enum TestRunStatus { diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index 275e39f90e6..465444c8ec1 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -8,35 +8,6 @@ use chrono::Duration; use nym_crypto::asymmetric::ed25519::PublicKey; use sqlx::{pool::PoolConnection, Sqlite}; -pub(crate) async fn get_in_progress_testrun_by_id( - conn: &mut PoolConnection, - testrun_id: i64, -) -> anyhow::Result { - sqlx::query_as!( - TestRunDto, - r#"SELECT - id as "id!", - gateway_id as "gateway_id!", - status as "status!", - created_utc as "created_utc!", - ip_address as "ip_address!", - log as "log!", - assigned_agent, - last_assigned_utc - FROM testruns - WHERE - id = ? - AND - status = ? - ORDER BY created_utc"#, - testrun_id, - TestRunStatus::InProgress as i64, - ) - .fetch_one(conn.as_mut()) - .await - .map_err(|e| anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}")) -} - pub(crate) async fn testrun_in_progress_assigned_to_agent( conn: &mut PoolConnection, agent_key: &PublicKey, @@ -109,12 +80,14 @@ pub(crate) async fn assign_oldest_testrun( agent_key: PublicKey, ) -> anyhow::Result> { let agent_key = agent_key.to_base58_string(); + let now = now_utc().timestamp(); // find & mark as "In progress" in the same transaction to avoid race conditions let returning = sqlx::query!( r#"UPDATE testruns SET status = ?, - assigned_agent = ? + assigned_agent = ?, + last_assigned_utc = ? WHERE rowid = ( SELECT rowid @@ -129,6 +102,7 @@ pub(crate) async fn assign_oldest_testrun( "#, TestRunStatus::InProgress as i64, agent_key, + now, TestRunStatus::Queued as i64, ) .fetch_optional(conn.as_mut()) @@ -151,6 +125,7 @@ pub(crate) async fn assign_oldest_testrun( Ok(Some(TestrunAssignment { testrun_id: testrun.id, gateway_identity_key: gw_identity.gateway_identity_key, + assigned_at_utc: now, })) } else { Ok(None) diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index 369ac6c4aa3..bc965186d08 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -4,8 +4,7 @@ use axum::{ extract::{Path, State}, Router, }; -use nym_common_models::ns_api::{get_testrun, SubmitResults}; -use nym_crypto::asymmetric::ed25519::{PublicKey, Signature}; +use nym_common_models::ns_api::{get_testrun, submit_results, VerifiableRequest}; use reqwest::StatusCode; use crate::db::models::TestRunStatus; @@ -88,47 +87,41 @@ async fn request_testrun( async fn submit_testrun( Path(submitted_testrun_id): Path, State(state): State, - Json(probe_results): Json, + Json(submitted_result): Json, ) -> HttpResult { + authenticate(&submitted_result, &state)?; + let db = state.db_pool(); let mut conn = db .acquire() .await .map_err(HttpError::internal_with_logging)?; - let submitted_testrun = - queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id) - .await - .map_err(|e| { - tracing::warn!("testrun_id {} not found: {}", submitted_testrun_id, e); - HttpError::not_found(submitted_testrun_id) - })?; - let agent_pubkey = submitted_testrun - .assigned_agent_key() - .ok_or_else(HttpError::unauthorized)?; - + let submitter_pubkey = submitted_result.payload.agent_public_key; let assigned_testrun = - queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey) + queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &submitter_pubkey) .await .map_err(|err| { - tracing::warn!("No testruns in progress for agent {agent_pubkey}: {err}"); + tracing::warn!("No testruns in progress for agent {submitter_pubkey}: {err}"); HttpError::invalid_input("Invalid testrun submitted") })?; if submitted_testrun_id != assigned_testrun.id { tracing::warn!( "Agent {} submitted testrun {} but {} was expected", - agent_pubkey, + submitter_pubkey, submitted_testrun_id, assigned_testrun.id ); return Err(HttpError::invalid_input("Invalid testrun submitted")); } - - verify_message( - &agent_pubkey, - &probe_results.message, - &probe_results.signature, - )?; + if Some(submitted_result.payload.assigned_at_utc) != assigned_testrun.last_assigned_utc { + tracing::warn!( + "Submitted testrun timestamp mismatch: {} != {:?}, rejecting", + submitted_result.payload.assigned_at_utc, + assigned_testrun.last_assigned_utc + ); + return Err(HttpError::invalid_input("Invalid testrun submitted")); + } let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id) .await @@ -140,10 +133,10 @@ async fn submit_testrun( })?; tracing::debug!( "Agent {} submitted testrun {} for gateway {} ({} bytes)", - agent_pubkey, + submitter_pubkey, submitted_testrun_id, gw_identity, - &probe_results.message.len(), + &submitted_result.payload.probe_result.len(), ); queries::testruns::update_testrun_status( @@ -156,11 +149,11 @@ async fn submit_testrun( queries::testruns::update_gateway_last_probe_log( &mut conn, assigned_testrun.gateway_id, - &probe_results.message, + &submitted_result.payload.probe_result, ) .await .map_err(HttpError::internal_with_logging)?; - let result = get_result_from_log(&probe_results.message); + let result = get_result_from_log(&submitted_result.payload.probe_result); queries::testruns::update_gateway_last_probe_result( &mut conn, assigned_testrun.gateway_id, @@ -183,35 +176,20 @@ async fn submit_testrun( // TODO dz this should be middleware #[tracing::instrument(level = "debug", skip_all)] -fn authenticate(request: &get_testrun::GetTestrunRequest, state: &AppState) -> HttpResult<()> { - if !state.is_registered(&request.payload.agent_public_key) { +fn authenticate(request: &impl VerifiableRequest, state: &AppState) -> HttpResult<()> { + if !state.is_registered(request.public_key()) { tracing::warn!("Public key not registered with NS API, rejecting"); return Err(HttpError::unauthorized()); }; - verify_message( - &request.payload.agent_public_key, - &request.payload, - &request.signature, - ) - .inspect_err(|_| tracing::warn!("Signature verification failed, rejecting"))?; + request.verify_signature().map_err(|_| { + tracing::warn!("Signature verification failed, rejecting"); + HttpError::unauthorized() + })?; Ok(()) } -fn verify_message(public_key: &PublicKey, payload: &T, signature: &Signature) -> HttpResult<()> -where - T: serde::Serialize, -{ - bincode::serialize(payload) - .map_err(HttpError::invalid_input) - .and_then(|serialized| { - public_key - .verify(serialized, signature) - .map_err(|_| HttpError::unauthorized()) - }) -} - fn get_result_from_log(log: &str) -> String { let re = regex::Regex::new(r"\n\{\s").unwrap(); let result: Vec<_> = re.splitn(log, 2).collect(); diff --git a/nym-node-status-api/src/http/error.rs b/nym-node-status-api/src/http/error.rs index 4ce1ca1ce1b..ed37966742d 100644 --- a/nym-node-status-api/src/http/error.rs +++ b/nym-node-status-api/src/http/error.rs @@ -40,12 +40,6 @@ impl HttpError { status: axum::http::StatusCode::SERVICE_UNAVAILABLE, } } - pub(crate) fn not_found(msg: impl Display) -> Self { - Self { - message: serde_json::json!({"message": msg.to_string()}).to_string(), - status: axum::http::StatusCode::NOT_FOUND, - } - } } impl axum::response::IntoResponse for HttpError { From 6c4333a9623e2ceec1a30395206dc2818fdc4b21 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:42:07 +0100 Subject: [PATCH 7/9] Suppress ts-rs serde warnings --- common/cosmwasm-smart-contracts/vesting-contract/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/cosmwasm-smart-contracts/vesting-contract/Cargo.toml b/common/cosmwasm-smart-contracts/vesting-contract/Cargo.toml index bdea746ff25..9edf0ce49c2 100644 --- a/common/cosmwasm-smart-contracts/vesting-contract/Cargo.toml +++ b/common/cosmwasm-smart-contracts/vesting-contract/Cargo.toml @@ -15,7 +15,9 @@ mixnet-contract-common = { path = "../mixnet-contract", package = "nym-mixnet-co contracts-common = { path = "../contracts-common", package = "nym-contracts-common", version = "0.5.0" } serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } -ts-rs = { workspace = true, optional = true} +# without this feature, cargo clippy emits a ton of incompatibility warnings +# https://docs.rs/ts-rs/latest/ts_rs/#serde-compatability +ts-rs = { workspace = true, optional = true, features = ["no-serde-warnings"] } [features] schema = ["cw2"] From bacf68697c46435d9bd544bcc7350f36208d1d86 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:43:36 +0100 Subject: [PATCH 8/9] Update cargo version --- Cargo.lock | 4 ++-- nym-node-status-agent/Cargo.toml | 2 +- nym-node-status-api/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2107724c62f..d308c4ff771 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6052,7 +6052,7 @@ dependencies = [ [[package]] name = "nym-node-status-agent" -version = "0.1.6" +version = "1.0.0-rc.1" dependencies = [ "anyhow", "bincode", @@ -6074,7 +6074,7 @@ dependencies = [ [[package]] name = "nym-node-status-api" -version = "0.1.6" +version = "1.0.0-rc.1" dependencies = [ "anyhow", "axum 0.7.7", diff --git a/nym-node-status-agent/Cargo.toml b/nym-node-status-agent/Cargo.toml index 8bf6148cd22..db64eeec993 100644 --- a/nym-node-status-agent/Cargo.toml +++ b/nym-node-status-agent/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "nym-node-status-agent" -version = "0.1.6" +version = "1.0.0-rc.1" authors.workspace = true repository.workspace = true homepage.workspace = true diff --git a/nym-node-status-api/Cargo.toml b/nym-node-status-api/Cargo.toml index 576239ce39f..df853bcb5f2 100644 --- a/nym-node-status-api/Cargo.toml +++ b/nym-node-status-api/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "nym-node-status-api" -version = "0.1.6" +version = "1.0.0-rc.1" authors.workspace = true repository.workspace = true homepage.workspace = true From d474a2840f3b351b0e5f42ecb67d54036e3f98db Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Thu, 21 Nov 2024 00:18:42 +0100 Subject: [PATCH 9/9] All agents use the same key - remove assigned_agent column - remove logic which would stop agents with the same key to connect - as a safety measure, add cap to total no. of agents --- nym-node-status-agent/run.sh | 22 +---- .../src/cli/generate_keypair.rs | 5 +- .../migrations/001_assigned_agent.sql | 2 - ...gned_utc.sql => 001_last_assigned_utc.sql} | 4 +- nym-node-status-api/src/cli/mod.rs | 7 ++ nym-node-status-api/src/db/models.rs | 1 - .../src/db/queries/testruns.rs | 41 +++++---- nym-node-status-api/src/http/api/testruns.rs | 83 ++++++++++--------- nym-node-status-api/src/http/server.rs | 3 +- nym-node-status-api/src/http/state.rs | 62 +++----------- nym-node-status-api/src/main.rs | 1 + nym-node-status-api/src/testruns/queue.rs | 1 - 12 files changed, 98 insertions(+), 134 deletions(-) delete mode 100644 nym-node-status-api/migrations/001_assigned_agent.sql rename nym-node-status-api/migrations/{002_last_assigned_utc.sql => 001_last_assigned_utc.sql} (100%) diff --git a/nym-node-status-agent/run.sh b/nym-node-status-agent/run.sh index a0cf362e167..0b384660319 100755 --- a/nym-node-status-agent/run.sh +++ b/nym-node-status-agent/run.sh @@ -16,30 +16,13 @@ set -a source "${monorepo_root}/envs/${ENVIRONMENT}.env" set +a -export RUST_LOG="debug" +export RUST_LOG="info" export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1" export NODE_STATUS_AGENT_SERVER_PORT="8000" export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe" - -# each key is required for a separate agent when running in parallel: their -# public counterparts need to be registered with NS API -private_keys=("BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT" - "4RqSKydrEuyGF8Xtwyauvja62SAjqxFPYQzW2neZdkQD" - "CfudaSaaLTkgR8rkBijUnYocdFciWTbKqkSNYevepnbn" - "Dd3fDyPUg4edTpiCAkSweE17NdWJ7gAchbtqAeSj3MBc" - "HAtfcfnpw5XdpcVzAH6Qsxp6Zf75oe2W54HjTD8ngVbU" - "8aqWP8wZyhX5W8gfrvyh1SmS6CEgfLAR95DBhWXRUpTm" - "234U1PMkWpAsn7hD98g1D8hfRFkJJS91T2sJBQDyXmqx" - "5qUUFu83fgqpACsr3dC6iwGJxhTqN4JJDTecT2QSqwEe" - "4Pp7Cd9G3aMku9biFcxRMEja8RBMbBRGZuDAZt1yTS7H" - "4U136QykP8G831EZSDNLLvgWCGYA8naYtT8BQ9kLeL5B" -) +export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT" workers=${1:-1} -if ((workers > ${#private_keys[@]})); then - echo "Error: ${workers} is larger than the number of private keys available (${#private_keys[@]})." - exit 1 -fi echo "Running $workers workers in parallel" # build & copy over GW probe @@ -61,7 +44,6 @@ function swarm() { local workers=$1 for ((i = 1; i <= workers; i++)); do - export NODE_STATUS_AGENT_AUTH_KEY=${private_keys[i]} ../target/release/nym-node-status-agent run-probe & done diff --git a/nym-node-status-agent/src/cli/generate_keypair.rs b/nym-node-status-agent/src/cli/generate_keypair.rs index e89bf3e8d51..e8360af47fa 100644 --- a/nym-node-status-agent/src/cli/generate_keypair.rs +++ b/nym-node-status-agent/src/cli/generate_keypair.rs @@ -31,10 +31,7 @@ mod test { use super::*; - use std::{ - fs::{self}, - path::PathBuf, - }; + use std::{fs, path::PathBuf}; #[test] fn can_generate_valid_keypair() { diff --git a/nym-node-status-api/migrations/001_assigned_agent.sql b/nym-node-status-api/migrations/001_assigned_agent.sql deleted file mode 100644 index 96ec895924e..00000000000 --- a/nym-node-status-api/migrations/001_assigned_agent.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE testruns -ADD COLUMN assigned_agent VARCHAR; diff --git a/nym-node-status-api/migrations/002_last_assigned_utc.sql b/nym-node-status-api/migrations/001_last_assigned_utc.sql similarity index 100% rename from nym-node-status-api/migrations/002_last_assigned_utc.sql rename to nym-node-status-api/migrations/001_last_assigned_utc.sql index edd91053cf0..2a801763e40 100644 --- a/nym-node-status-api/migrations/002_last_assigned_utc.sql +++ b/nym-node-status-api/migrations/001_last_assigned_utc.sql @@ -1,5 +1,5 @@ ALTER TABLE testruns -ADD COLUMN last_assigned_utc INTEGER; +RENAME COLUMN timestamp_utc TO created_utc; ALTER TABLE testruns -RENAME COLUMN timestamp_utc TO created_utc; +ADD COLUMN last_assigned_utc INTEGER; diff --git a/nym-node-status-api/src/cli/mod.rs b/nym-node-status-api/src/cli/mod.rs index fa3d3c12bfd..10485ab4953 100644 --- a/nym-node-status-api/src/cli/mod.rs +++ b/nym-node-status-api/src/cli/mod.rs @@ -73,6 +73,13 @@ pub(crate) struct Cli { #[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")] #[arg(value_delimiter = ',')] pub(crate) agent_key_list: Vec, + + #[clap( + long, + default_value_t = 40, + env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL" + )] + pub(crate) max_agent_count: i64, } fn parse_duration(arg: &str) -> Result { diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs index 4062e1451dc..596f634f2e7 100644 --- a/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/src/db/models.rs @@ -309,7 +309,6 @@ pub struct TestRunDto { pub created_utc: i64, pub ip_address: String, pub log: String, - pub assigned_agent: Option, pub last_assigned_utc: Option, } diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index 465444c8ec1..dc1e20ee2ee 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -5,14 +5,29 @@ use crate::{ testruns::now_utc, }; use chrono::Duration; -use nym_crypto::asymmetric::ed25519::PublicKey; use sqlx::{pool::PoolConnection, Sqlite}; -pub(crate) async fn testrun_in_progress_assigned_to_agent( +pub(crate) async fn count_testruns_in_progress( conn: &mut PoolConnection, - agent_key: &PublicKey, -) -> sqlx::Result { - let agent_key = agent_key.to_base58_string(); +) -> anyhow::Result { + sqlx::query_scalar!( + r#"SELECT + COUNT(id) as "count: i64" + FROM testruns + WHERE + status = ? + "#, + TestRunStatus::InProgress as i64, + ) + .fetch_one(conn.as_mut()) + .await + .map_err(anyhow::Error::from) +} + +pub(crate) async fn get_in_progress_testrun_by_id( + conn: &mut PoolConnection, + testrun_id: i64, +) -> anyhow::Result { sqlx::query_as!( TestRunDto, r#"SELECT @@ -22,19 +37,20 @@ pub(crate) async fn testrun_in_progress_assigned_to_agent( created_utc as "created_utc!", ip_address as "ip_address!", log as "log!", - assigned_agent, last_assigned_utc FROM testruns WHERE - assigned_agent = ? + id = ? AND status = ? - ORDER BY created_utc"#, - agent_key, + ORDER BY created_utc + LIMIT 1"#, + testrun_id, TestRunStatus::InProgress as i64, ) .fetch_one(conn.as_mut()) .await + .map_err(|e| anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}")) } pub(crate) async fn update_testruns_assigned_before( @@ -49,8 +65,7 @@ pub(crate) async fn update_testruns_assigned_before( r#"UPDATE testruns SET - status = ?, - assigned_agent = NULL + status = ? WHERE status = ? AND @@ -77,16 +92,13 @@ pub(crate) async fn update_testruns_assigned_before( pub(crate) async fn assign_oldest_testrun( conn: &mut PoolConnection, - agent_key: PublicKey, ) -> anyhow::Result> { - let agent_key = agent_key.to_base58_string(); let now = now_utc().timestamp(); // find & mark as "In progress" in the same transaction to avoid race conditions let returning = sqlx::query!( r#"UPDATE testruns SET status = ?, - assigned_agent = ?, last_assigned_utc = ? WHERE rowid = ( @@ -101,7 +113,6 @@ pub(crate) async fn assign_oldest_testrun( gateway_id "#, TestRunStatus::InProgress as i64, - agent_key, now, TestRunStatus::Queued as i64, ) diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index bc965186d08..6f5ab862eab 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -9,6 +9,7 @@ use reqwest::StatusCode; use crate::db::models::TestRunStatus; use crate::db::queries; +use crate::testruns::now_utc; use crate::{ db, http::{ @@ -35,15 +36,9 @@ async fn request_testrun( ) -> HttpResult> { // TODO dz log agent's network probe version authenticate(&request, &state)?; - state - .update_last_request_time( - &request.payload.agent_public_key, - &request.payload.timestamp, - ) - .await?; + is_fresh(&request.payload.timestamp)?; - let agent_pubkey = request.payload.agent_public_key; - tracing::debug!("Agent {} requested testrun", agent_pubkey); + tracing::debug!("Agent requested testrun"); let db = state.db_pool(); let mut conn = db @@ -51,31 +46,29 @@ async fn request_testrun( .await .map_err(HttpError::internal_with_logging)?; - if let Ok(testrun) = - db::queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey).await - { + let active_testruns = db::queries::testruns::count_testruns_in_progress(&mut conn) + .await + .map_err(HttpError::internal_with_logging)?; + if active_testruns >= state.agent_max_count() { tracing::warn!( - "Testrun {} already in progress for agent {:?}, rejecting", - testrun.id, - testrun.assigned_agent + "{}/{} testruns in progress, rejecting", + active_testruns, + state.agent_max_count() ); - return Err(HttpError::invalid_input( - "Testrun already in progress for this agent", - )); - }; + return Err(HttpError::no_testruns_available()); + } - return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await { + return match db::queries::testruns::assign_oldest_testrun(&mut conn).await { Ok(res) => { if let Some(testrun) = res { tracing::info!( - "🏃‍ Assigned testrun row_id {} gateway {} to agent {}", + "🏃‍ Assigned testrun row_id {} gateway {} to agent", &testrun.testrun_id, testrun.gateway_identity_key, - agent_pubkey ); Ok(Json(testrun)) } else { - tracing::debug!("No testruns available for agent {}", agent_pubkey); + tracing::debug!("No testruns available"); Err(HttpError::no_testruns_available()) } } @@ -97,23 +90,17 @@ async fn submit_testrun( .await .map_err(HttpError::internal_with_logging)?; - let submitter_pubkey = submitted_result.payload.agent_public_key; let assigned_testrun = - queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &submitter_pubkey) + queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id) .await .map_err(|err| { - tracing::warn!("No testruns in progress for agent {submitter_pubkey}: {err}"); + tracing::warn!( + "No testruns in progress for testrun_id {}: {}", + submitted_testrun_id, + err + ); HttpError::invalid_input("Invalid testrun submitted") })?; - if submitted_testrun_id != assigned_testrun.id { - tracing::warn!( - "Agent {} submitted testrun {} but {} was expected", - submitter_pubkey, - submitted_testrun_id, - assigned_testrun.id - ); - return Err(HttpError::invalid_input("Invalid testrun submitted")); - } if Some(submitted_result.payload.assigned_at_utc) != assigned_testrun.last_assigned_utc { tracing::warn!( "Submitted testrun timestamp mismatch: {} != {:?}, rejecting", @@ -132,8 +119,7 @@ async fn submit_testrun( )) })?; tracing::debug!( - "Agent {} submitted testrun {} for gateway {} ({} bytes)", - submitter_pubkey, + "Agent submitted testrun {} for gateway {} ({} bytes)", submitted_testrun_id, gw_identity, &submitted_result.payload.probe_result.len(), @@ -165,10 +151,20 @@ async fn submit_testrun( .await .map_err(HttpError::internal_with_logging)?; + let created_at = chrono::DateTime::from_timestamp(assigned_testrun.created_utc, 0) + .map(|d| d.to_rfc3339()) + .unwrap_or_default(); + let last_assigned = assigned_testrun + .last_assigned_utc + .and_then(|d| chrono::DateTime::from_timestamp(d, 0)) + .map(|d| d.to_rfc3339()) + .unwrap_or_default(); tracing::info!( - "✅ Testrun row_id {} for gateway {} complete", + "✅ Testrun row_id {} for gateway {} complete (last assigned at {}, created at {})", assigned_testrun.id, - gw_identity + gw_identity, + last_assigned, + created_at ); Ok(StatusCode::CREATED) @@ -190,6 +186,17 @@ fn authenticate(request: &impl VerifiableRequest, state: &AppState) -> HttpResul Ok(()) } +fn is_fresh(request_time: &i64) -> HttpResult<()> { + // if a request took longer than N minutes to reach NS API, something is very wrong + let freshness_cutoff = chrono::Duration::minutes(1); + let cutoff_timestamp = (now_utc() - freshness_cutoff).timestamp(); + if *request_time < cutoff_timestamp { + tracing::warn!("Request older than {}s, rejecting", cutoff_timestamp); + return Err(HttpError::unauthorized()); + } + Ok(()) +} + fn get_result_from_log(log: &str) -> String { let re = regex::Regex::new(r"\n\{\s").unwrap(); let result: Vec<_> = re.splitn(log, 2).collect(); diff --git a/nym-node-status-api/src/http/server.rs b/nym-node-status-api/src/http/server.rs index 714a4836e4f..c8be0d63a1c 100644 --- a/nym-node-status-api/src/http/server.rs +++ b/nym-node-status-api/src/http/server.rs @@ -16,10 +16,11 @@ pub(crate) async fn start_http_api( http_port: u16, nym_http_cache_ttl: u64, agent_key_list: Vec, + agent_max_count: i64, ) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); - let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list); + let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count); let router = router_builder.with_state(state); // TODO dz do we need this to be configurable? diff --git a/nym-node-status-api/src/http/state.rs b/nym-node-status-api/src/http/state.rs index fb273027ac2..393da5c0b87 100644 --- a/nym-node-status-api/src/http/state.rs +++ b/nym-node-status-api/src/http/state.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use moka::{future::Cache, Entry}; use nym_crypto::asymmetric::ed25519::PublicKey; @@ -6,11 +6,7 @@ use tokio::sync::RwLock; use crate::{ db::DbPool, - http::{ - error::{HttpError, HttpResult}, - models::{DailyStats, Gateway, Mixnode, SummaryHistory}, - }, - testruns::now_utc, + http::models::{DailyStats, Gateway, Mixnode, SummaryHistory}, }; #[derive(Debug, Clone)] @@ -18,18 +14,21 @@ pub(crate) struct AppState { db_pool: DbPool, cache: HttpCache, agent_key_list: Vec, - /// last time agent requested a testrun - // if performance becomes a problem, consider a faster hashmap like `scc`` - agent_last_request_times: Arc>>, + agent_max_count: i64, } impl AppState { - pub(crate) fn new(db_pool: DbPool, cache_ttl: u64, agent_key_list: Vec) -> Self { + pub(crate) fn new( + db_pool: DbPool, + cache_ttl: u64, + agent_key_list: Vec, + agent_max_count: i64, + ) -> Self { Self { db_pool, cache: HttpCache::new(cache_ttl), agent_key_list, - agent_last_request_times: Arc::new(RwLock::new(HashMap::new())), + agent_max_count, } } @@ -45,45 +44,8 @@ impl AppState { self.agent_key_list.contains(agent_pubkey) } - /// Only updates if request is not a replay. Otherwise return error - pub(crate) async fn update_last_request_time( - &self, - agent_key: &PublicKey, - request_time: &i64, - ) -> HttpResult<()> { - // if a request took longer than N minutes to reach NS API, something is very wrong - let cutoff_duration = chrono::Duration::minutes(1); - let cutoff_timestamp = (now_utc() - cutoff_duration).timestamp(); - if *request_time < cutoff_timestamp { - tracing::warn!("Request older than {}s, rejecting", cutoff_timestamp); - return Err(HttpError::unauthorized()); - } - - // if a previous entry has a newer time than this request's submit time, - // it's a repeated request - let agent_key = agent_key.to_base58_string(); - let request_times = self.agent_last_request_times.read().await; - if let Some(previous_request_time) = request_times.get(&agent_key) { - if request_time <= previous_request_time { - tracing::warn!( - "Request has timestamp {} but previous request was at {}, rejecting", - request_time, - previous_request_time - ); - return Err(HttpError::unauthorized()); - } - } - drop(request_times); - - // otherwise this is a newer (or a first) request - self.agent_last_request_times - .write() - .await - .entry(agent_key) - .and_modify(|value| *value = *request_time) - .or_insert(*request_time); - - Ok(()) + pub(crate) fn agent_max_count(&self) -> i64 { + self.agent_max_count } } diff --git a/nym-node-status-api/src/main.rs b/nym-node-status-api/src/main.rs index 0833c6a7b4c..6fcdf344732 100644 --- a/nym-node-status-api/src/main.rs +++ b/nym-node-status-api/src/main.rs @@ -47,6 +47,7 @@ async fn main() -> anyhow::Result<()> { args.http_port, args.nym_http_cache_ttl, agent_key_list.to_owned(), + args.max_agent_count, ) .await .expect("Failed to start server"); diff --git a/nym-node-status-api/src/testruns/queue.rs b/nym-node-status-api/src/testruns/queue.rs index a0e7a1422f1..7ce0a851ef4 100644 --- a/nym-node-status-api/src/testruns/queue.rs +++ b/nym-node-status-api/src/testruns/queue.rs @@ -56,7 +56,6 @@ pub(crate) async fn try_queue_testrun( created_utc as "created_utc!", ip_address as "ip_address!", log as "log!", - assigned_agent, last_assigned_utc FROM testruns WHERE gateway_id = ? AND status != 2