diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 27db08daf..a17711eec 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -3648,7 +3648,7 @@ mod tests { .eq_report(&vdaf, leader_task.current_hpke_key(), &report)); assert_eq!( got_counter, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 1, 0, 0) + Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 1, 0, 0)) ) } @@ -3704,7 +3704,7 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 100, 0, 0), + Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 100, 0, 0)) ); } @@ -3768,8 +3768,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 1, 0, 0, 0), - ); + Some(TaskUploadCounter::new(0, 0, 0, 0, 1, 0, 0, 0)) + ) } #[tokio::test] @@ -3809,8 +3809,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 1, 0, 0), - ); + Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 1, 0, 0)) + ) } #[tokio::test] @@ -3858,8 +3858,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 0, 1, 0), - ); + Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 0, 1, 0)) + ) } #[tokio::test] @@ -3936,8 +3936,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 1, 0, 0, 0, 0, 0, 0, 0), - ); + Some(TaskUploadCounter::new(1, 0, 0, 0, 0, 0, 0, 0)) + ) } #[tokio::test] @@ -4068,8 +4068,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 0, 0, 1), - ); + Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 0, 0, 1)) + ) } #[tokio::test] @@ -4124,8 +4124,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 0, 1, 0, 0, 0, 0), - ); + Some(TaskUploadCounter::new(0, 0, 0, 1, 0, 0, 0, 0)) + ) } #[tokio::test] @@ -4180,8 +4180,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 0, 1, 0, 0, 0, 0, 0), - ); + Some(TaskUploadCounter::new(0, 0, 1, 0, 0, 0, 0, 0)) + ) } #[tokio::test] @@ -4235,8 +4235,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 1, 0, 0, 0, 0, 0, 0), - ); + Some(TaskUploadCounter::new(0, 1, 0, 0, 0, 0, 0, 0)) + ) } #[tokio::test] @@ -4304,8 +4304,8 @@ mod tests { .unwrap(); assert_eq!( got_counters, - TaskUploadCounter::new(*task.id(), 0, 1, 0, 0, 0, 0, 0, 0), - ); + Some(TaskUploadCounter::new(0, 1, 0, 0, 0, 0, 0, 0)) + ) } pub(crate) fn generate_helper_report_share>( diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index 8f72da4a7..db725fd24 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -93,6 +93,10 @@ pub fn aggregator_api_handler( .post("/tasks", instrumented(api(post_task::))) .get("/tasks/:task_id", instrumented(api(get_task::))) .delete("/tasks/:task_id", instrumented(api(delete_task::))) + .get( + "/tasks/:task_id/metrics/uploads", + instrumented(api(get_task_upload_metrics::)), + ) .get( "/tasks/:task_id/metrics", instrumented(api(get_task_metrics::)), diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index 735fb74b2..04fe5ca14 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -1,7 +1,7 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use janus_aggregator_core::{ - datastore::models::{GlobalHpkeKeypair, HpkeKeyState}, + datastore::models::{GlobalHpkeKeypair, HpkeKeyState, TaskUploadCounter}, task::{AggregatorTask, QueryType}, taskprov::{PeerAggregator, VerifyKeyInit}, }; @@ -173,6 +173,9 @@ pub(crate) struct GetTaskMetricsResp { pub(crate) report_aggregations: u64, } +#[derive(Serialize)] +pub(crate) struct GetTaskUploadMetricsResp(pub(crate) TaskUploadCounter); + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub(crate) struct GlobalHpkeConfigResp { pub(crate) config: HpkeConfig, diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index f6d4b8347..8b699a1a5 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -1,9 +1,9 @@ use crate::{ models::{ AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, - GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, - PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp, - TaskprovPeerAggregatorResp, + GetTaskMetricsResp, GetTaskUploadMetricsResp, GlobalHpkeConfigResp, + PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, + PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp, TaskprovPeerAggregatorResp, }, Config, ConnExt, Error, }; @@ -276,6 +276,20 @@ pub(super) async fn get_task_metrics( })) } +pub(super) async fn get_task_upload_metrics( + conn: &mut Conn, + State(ds): State>>, +) -> Result, Error> { + let task_id = conn.task_id_param()?; + Ok(Json(GetTaskUploadMetricsResp( + ds.run_tx("get_task_upload_metrics", |tx| { + Box::pin(async move { tx.get_task_upload_counter(&task_id).await }) + }) + .await? + .ok_or(Error::NotFound)?, + ))) +} + pub(super) async fn get_global_hpke_configs( _: &mut Conn, State(ds): State>>, diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 94e02a045..b41070e30 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -1,9 +1,10 @@ use crate::{ aggregator_api_handler, models::{ - DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp, - PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, - PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp, + DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp, + GetTaskUploadMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, + PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, TaskResp, + TaskprovPeerAggregatorResp, }, Config, CONTENT_TYPE, }; @@ -12,7 +13,10 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use futures::future::try_join_all; use janus_aggregator_core::{ datastore::{ - models::{AggregationJob, AggregationJobState, HpkeKeyState, LeaderStoredReport}, + models::{ + AggregationJob, AggregationJobState, HpkeKeyState, LeaderStoredReport, + TaskUploadCounter, TaskUploadIncrementor, + }, test_util::{ephemeral_datastore, EphemeralDatastore}, Datastore, }, @@ -855,6 +859,78 @@ async fn get_task_metrics() { ); } +#[tokio::test] +async fn get_task_upload_metrics() { + let (handler, _ephemeral_datastore, ds) = setup_api_test().await; + let task_id = ds + .run_unnamed_tx(|tx| { + Box::pin(async move { + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) + .build() + .leader_view() + .unwrap(); + let task_id = *task.id(); + tx.put_aggregator_task(&task).await?; + + for case in [ + (TaskUploadIncrementor::ReportDecryptFailure, 2), + (TaskUploadIncrementor::ReportExpired, 4), + (TaskUploadIncrementor::ReportOutdatedKey, 6), + (TaskUploadIncrementor::ReportSuccess, 100), + (TaskUploadIncrementor::ReportTooEarly, 25), + (TaskUploadIncrementor::TaskExpired, 12), + ] { + let ord = thread_rng().gen_range(0..32); + try_join_all( + (0..case.1) + .map(|_| tx.increment_task_upload_counter(&task_id, ord, &case.0)), + ) + .await + .unwrap(); + } + + Ok(task_id) + }) + }) + .await + .unwrap(); + + // Verify: requesting metrics on a task returns the correct result. + assert_response!( + get(&format!("/tasks/{}/metrics/uploads", &task_id)) + .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) + .with_request_header("Accept", CONTENT_TYPE) + .run_async(&handler) + .await, + Status::Ok, + serde_json::to_string(&GetTaskUploadMetricsResp(TaskUploadCounter::new( + 0, 0, 2, 4, 6, 100, 25, 12 + ))) + .unwrap(), + ); + + // Verify: requesting metrics on a nonexistent task returns NotFound. + assert_response!( + get(&format!("/tasks/{}/metrics/uploads", &random::())) + .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) + .with_request_header("Accept", CONTENT_TYPE) + .run_async(&handler) + .await, + Status::NotFound, + "", + ); + + // Verify: unauthorized requests are denied appropriately. + assert_response!( + get(&format!("/tasks/{}/metrics/uploads", &task_id)) + .with_request_header("Accept", CONTENT_TYPE) + .run_async(&handler) + .await, + Status::Unauthorized, + "", + ); +} + #[tokio::test] async fn get_global_hpke_configs() { let (handler, _ephemeral_datastore, ds) = setup_api_test().await; @@ -1974,3 +2050,36 @@ fn get_task_metrics_resp_serialization() { ], ) } + +#[test] +fn get_task_upload_metrics_serialization() { + assert_ser_tokens( + &GetTaskUploadMetricsResp(TaskUploadCounter::new(0, 1, 2, 3, 4, 5, 6, 7)), + &[ + Token::NewtypeStruct { + name: "GetTaskUploadMetricsResp", + }, + Token::Struct { + name: "TaskUploadCounter", + len: 8, + }, + Token::Str("interval_collected"), + Token::U64(0), + Token::Str("report_decode_failure"), + Token::U64(1), + Token::Str("report_decrypt_failure"), + Token::U64(2), + Token::Str("report_expired"), + Token::U64(3), + Token::Str("report_outdated_key"), + Token::U64(4), + Token::Str("report_success"), + Token::U64(5), + Token::Str("report_too_early"), + Token::U64(6), + Token::Str("task_expired"), + Token::U64(7), + Token::StructEnd, + ], + ) +} diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 10deeda88..e00afeb55 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -4839,39 +4839,45 @@ impl Transaction<'_, C> { check_single_row_mutation(self.execute(&stmt, &[&aggregator_url, &role]).await?) } - /// Get the [`TaskUploadCounter`] for a task. This is aggregated across all shards. + /// Get the [`TaskUploadCounter`] for a task. This is aggregated across all shards. Returns + /// `None` if the task doesn't exist. #[tracing::instrument(skip(self), err)] pub async fn get_task_upload_counter( &self, task_id: &TaskId, - ) -> Result { + ) -> Result, Error> { let stmt = self .prepare_cached( "SELECT - COALESCE(SUM(interval_collected)::BIGINT, 0) AS interval_collected, - COALESCE(SUM(report_decode_failure)::BIGINT, 0) AS report_decode_failure, - COALESCE(SUM(report_decrypt_failure)::BIGINT, 0) AS report_decrypt_failure, - COALESCE(SUM(report_expired)::BIGINT, 0) AS report_expired, - COALESCE(SUM(report_outdated_key)::BIGINT, 0) AS report_outdated_key, - COALESCE(SUM(report_success)::BIGINT, 0) AS report_success, - COALESCE(SUM(report_too_early)::BIGINT, 0) AS report_too_early, - COALESCE(SUM(task_expired)::BIGINT, 0) AS task_expired + SUM(interval_collected)::BIGINT AS interval_collected, + SUM(report_decode_failure)::BIGINT AS report_decode_failure, + SUM(report_decrypt_failure)::BIGINT AS report_decrypt_failure, + SUM(report_expired)::BIGINT AS report_expired, + SUM(report_outdated_key)::BIGINT AS report_outdated_key, + SUM(report_success)::BIGINT AS report_success, + SUM(report_too_early)::BIGINT AS report_too_early, + SUM(task_expired)::BIGINT AS task_expired FROM task_upload_counters WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)", ) .await?; let row = self.query_one(&stmt, &[task_id.as_ref()]).await?; - Ok(TaskUploadCounter { - task_id: *task_id, - interval_collected: row.get_bigint_and_convert("interval_collected")?, - report_decode_failure: row.get_bigint_and_convert("report_decode_failure")?, - report_decrypt_failure: row.get_bigint_and_convert("report_decrypt_failure")?, - report_expired: row.get_bigint_and_convert("report_expired")?, - report_outdated_key: row.get_bigint_and_convert("report_outdated_key")?, - report_success: row.get_bigint_and_convert("report_success")?, - report_too_early: row.get_bigint_and_convert("report_too_early")?, - task_expired: row.get_bigint_and_convert("task_expired")?, + let interval_collected = row.get_nullable_bigint_and_convert("interval_collected")?; + Ok(match interval_collected { + Some(interval_collected) => Some(TaskUploadCounter { + interval_collected, + // The remaining columns should exist if the first one did, due to a DEFAULT 0 + // clause, so we don't need to treat these as nullable. + report_decode_failure: row.get_bigint_and_convert("report_decode_failure")?, + report_decrypt_failure: row.get_bigint_and_convert("report_decrypt_failure")?, + report_expired: row.get_bigint_and_convert("report_expired")?, + report_outdated_key: row.get_bigint_and_convert("report_outdated_key")?, + report_success: row.get_bigint_and_convert("report_success")?, + report_too_early: row.get_bigint_and_convert("report_too_early")?, + task_expired: row.get_bigint_and_convert("task_expired")?, + }), + None => None, }) } diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index e4a812dbd..c9aeeec1e 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -2000,8 +2000,6 @@ impl GlobalHpkeKeypair { /// Per-task counts of uploaded reports and upload attempts. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct TaskUploadCounter { - pub(crate) task_id: TaskId, - pub(crate) interval_collected: u64, pub(crate) report_decode_failure: u64, pub(crate) report_decrypt_failure: u64, @@ -2020,7 +2018,6 @@ impl TaskUploadCounter { #[allow(clippy::too_many_arguments)] #[cfg(feature = "test-util")] pub fn new( - task_id: TaskId, interval_collected: u64, report_decode_failure: u64, report_decrypt_failure: u64, @@ -2031,7 +2028,6 @@ impl TaskUploadCounter { task_expired: u64, ) -> Self { Self { - task_id, interval_collected, report_decode_failure, report_decrypt_failure, diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 843d04e35..b6bcf66fd 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -7477,20 +7477,7 @@ async fn roundtrip_task_upload_counter(ephemeral_datastore: EphemeralDatastore) let task_id = *task.id(); Box::pin(async move { let counter = tx.get_task_upload_counter(&task_id).await.unwrap(); - assert_eq!( - counter, - TaskUploadCounter { - task_id, - interval_collected: 0, - report_decode_failure: 0, - report_decrypt_failure: 0, - report_expired: 0, - report_success: 0, - report_too_early: 0, - report_outdated_key: 0, - task_expired: 0, - } - ); + assert_eq!(counter, None); for case in [ (TaskUploadIncrementor::IntervalCollected, 2), @@ -7514,8 +7501,7 @@ async fn roundtrip_task_upload_counter(ephemeral_datastore: EphemeralDatastore) let counter = tx.get_task_upload_counter(&task_id).await.unwrap(); assert_eq!( counter, - TaskUploadCounter { - task_id, + Some(TaskUploadCounter { interval_collected: 2, report_decode_failure: 4, report_decrypt_failure: 6, @@ -7524,7 +7510,7 @@ async fn roundtrip_task_upload_counter(ephemeral_datastore: EphemeralDatastore) report_success: 100, report_too_early: 25, task_expired: 12, - } + }) ); Ok(())