Skip to content

Commit

Permalink
Fix testrun cleanup bug
Browse files Browse the repository at this point in the history
- introduce a new column last_assigned which is different than
  created_at so that stale testruns get cleaned up based on
  last_assigned
- created_at is still useful for determining the "oldest" testrun
  to be picked up
  • Loading branch information
dynco-nym committed Nov 20, 2024
1 parent a5cac0f commit 644ffb3
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 38 deletions.
5 changes: 5 additions & 0 deletions nym-node-status-api/migrations/002_last_assigned_utc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE testruns
ADD COLUMN last_assigned_utc INTEGER;

ALTER TABLE testruns
RENAME COLUMN timestamp_utc TO created_utc;
3 changes: 2 additions & 1 deletion nym-node-status-api/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ pub struct TestRunDto {
pub id: i64,
pub gateway_id: i64,
pub status: i64,
pub timestamp_utc: i64,
pub created_utc: i64,
pub ip_address: String,
pub log: String,
pub assigned_agent: Option<String>,
pub last_assigned_utc: Option<i64>,
}

impl TestRunDto {
Expand Down
43 changes: 22 additions & 21 deletions nym-node-status-api/src/db/queries/testruns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
db::models::{TestRunDto, TestRunStatus},
testruns::now_utc,
};
use anyhow::Context;
use chrono::Duration;
use nym_crypto::asymmetric::ed25519::PublicKey;
use sqlx::{pool::PoolConnection, Sqlite};
Expand All @@ -19,58 +18,60 @@ pub(crate) async fn get_in_progress_testrun_by_id(
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
created_utc as "created_utc!",
ip_address as "ip_address!",
log as "log!",
assigned_agent
assigned_agent,
last_assigned_utc
FROM testruns
WHERE
id = ?
AND
status = ?
ORDER BY timestamp_utc"#,
ORDER BY created_utc"#,
testrun_id,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await.map_err(|e| {
anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}")
})

.await
.map_err(|e| anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}"))
}

pub(crate) async fn get_testruns_assigned_to_agent(
pub(crate) async fn testrun_in_progress_assigned_to_agent(
conn: &mut PoolConnection<Sqlite>,
agent_key: PublicKey,
) -> anyhow::Result<TestRunDto> {
agent_key: &PublicKey,
) -> sqlx::Result<TestRunDto> {
let agent_key = agent_key.to_base58_string();
sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
created_utc as "created_utc!",
ip_address as "ip_address!",
log as "log!",
assigned_agent
assigned_agent,
last_assigned_utc
FROM testruns
WHERE
assigned_agent = ?
AND
status = ?
ORDER BY timestamp_utc"#,
ORDER BY created_utc"#,
agent_key,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await
.context(format!("No testruns in progress for agent {agent_key}"))
}

pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
pub(crate) async fn update_testruns_assigned_before(
db: &DbPool,
max_age: Duration,
) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let previous_run = now_utc() - age;
let previous_run = now_utc() - max_age;
let cutoff_timestamp = previous_run.timestamp();

let res = sqlx::query!(
Expand All @@ -82,7 +83,7 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
WHERE
status = ?
AND
timestamp_utc < ?
last_assigned_utc < ?
"#,
TestRunStatus::Queued as i64,
TestRunStatus::InProgress as i64,
Expand All @@ -93,8 +94,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an

let stale_testruns = res.rows_affected();
if stale_testruns > 0 {
tracing::debug!(
"Refreshed {} stale testruns, scheduled before {} but not yet finished",
tracing::info!(
"Refreshed {} stale testruns, assigned before {} but not yet finished",
stale_testruns,
previous_run
);
Expand All @@ -119,7 +120,7 @@ pub(crate) async fn assign_oldest_testrun(
SELECT rowid
FROM testruns
WHERE status = ?
ORDER BY timestamp_utc asc
ORDER BY created_utc asc
LIMIT 1
)
RETURNING
Expand Down
25 changes: 18 additions & 7 deletions nym-node-status-api/src/http/api/testruns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,15 @@ async fn request_testrun(
Json(request): Json<get_testrun::GetTestrunRequest>,
) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's network probe version

authenticate(&request, &state)?;
state
.check_last_request_time(
.update_last_request_time(
&request.payload.agent_public_key,
&request.payload.timestamp,
)
.await?;

let agent_pubkey = request.payload.agent_public_key;

tracing::debug!("Agent {} requested testrun", agent_pubkey);

let db = state.db_pool();
Expand All @@ -54,6 +52,19 @@ async fn request_testrun(
.await
.map_err(HttpError::internal_with_logging)?;

if let Ok(testrun) =
db::queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey).await
{
tracing::warn!(
"Testrun {} already in progress for agent {:?}, rejecting",
testrun.id,
testrun.assigned_agent
);
return Err(HttpError::invalid_input(
"Testrun already in progress for this agent",
));
};

return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await {
Ok(res) => {
if let Some(testrun) = res {
Expand Down Expand Up @@ -97,10 +108,10 @@ async fn submit_testrun(
.ok_or_else(HttpError::unauthorized)?;

let assigned_testrun =
queries::testruns::get_testruns_assigned_to_agent(&mut conn, agent_pubkey)
queries::testruns::testrun_in_progress_assigned_to_agent(&mut conn, &agent_pubkey)
.await
.map_err(|err| {
tracing::warn!("{err}");
tracing::warn!("No testruns in progress for agent {agent_pubkey}: {err}");
HttpError::invalid_input("Invalid testrun submitted")
})?;
if submitted_testrun_id != assigned_testrun.id {
Expand Down Expand Up @@ -188,11 +199,11 @@ fn authenticate(request: &get_testrun::GetTestrunRequest, state: &AppState) -> H
Ok(())
}

fn verify_message<T>(public_key: &PublicKey, message: &T, signature: &Signature) -> HttpResult<()>
fn verify_message<T>(public_key: &PublicKey, payload: &T, signature: &Signature) -> HttpResult<()>
where
T: serde::Serialize,
{
bincode::serialize(message)
bincode::serialize(payload)
.map_err(HttpError::invalid_input)
.and_then(|serialized| {
public_key
Expand Down
16 changes: 13 additions & 3 deletions nym-node-status-api/src/http/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
error::{HttpError, HttpResult},
models::{DailyStats, Gateway, Mixnode, SummaryHistory},
},
testruns::now_utc,
};

#[derive(Debug, Clone)]
Expand All @@ -18,6 +19,7 @@ pub(crate) struct AppState {
cache: HttpCache,
agent_key_list: Vec<PublicKey>,
/// last time agent requested a testrun
// if performance becomes a problem, consider a faster hashmap like `scc``
agent_last_request_times: Arc<RwLock<HashMap<String, i64>>>,
}

Expand All @@ -43,13 +45,21 @@ impl AppState {
self.agent_key_list.contains(agent_pubkey)
}

/// Only updates if request time is valid. Otherwise return error
pub(crate) async fn check_last_request_time(
/// Only updates if request is not a replay. Otherwise return error
pub(crate) async fn update_last_request_time(
&self,
agent_key: &PublicKey,
request_time: &i64,
) -> HttpResult<()> {
// if entry exists with a newer time than this request's submit time,
// if a request took longer than N minutes to reach NS API, something is very wrong
let cutoff_duration = chrono::Duration::minutes(1);
let cutoff_timestamp = (now_utc() - cutoff_duration).timestamp();
if *request_time < cutoff_timestamp {
tracing::warn!("Request older than {}s, rejecting", cutoff_timestamp);
return Err(HttpError::unauthorized());
}

// if a previous entry has a newer time than this request's submit time,
// it's a repeated request
let agent_key = agent_key.to_base58_string();
let request_times = self.agent_last_request_times.read().await;
Expand Down
4 changes: 2 additions & 2 deletions nym-node-status-api/src/testruns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> {

#[instrument(level = "debug", skip_all)]
async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> {
let chrono_duration = chrono::Duration::from_std(refresh_interval)?;
crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?;
let refresh_interval = chrono::Duration::from_std(refresh_interval)?;
crate::db::queries::testruns::update_testruns_assigned_before(pool, refresh_interval).await?;

Ok(())
}
9 changes: 5 additions & 4 deletions nym-node-status-api/src/testruns/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn try_queue_testrun(
LIMIT 1"#,
identity_key,
)
// TODO dz shoudl call .fetch_one
// TODO dz should call .fetch_one
// TODO dz replace this in other queries as well
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
Expand All @@ -53,10 +53,11 @@ pub(crate) async fn try_queue_testrun(
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
created_utc as "created_utc!",
ip_address as "ip_address!",
log as "log!",
assigned_agent
assigned_agent,
last_assigned_utc
FROM testruns
WHERE gateway_id = ? AND status != 2
ORDER BY id DESC
Expand Down Expand Up @@ -90,7 +91,7 @@ pub(crate) async fn try_queue_testrun(
);

let id = sqlx::query!(
"INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
"INSERT INTO testruns (gateway_id, status, ip_address, created_utc, log) VALUES (?, ?, ?, ?, ?)",
gateway_id,
status,
ip_address,
Expand Down

0 comments on commit 644ffb3

Please sign in to comment.