Skip to content

Commit

Permalink
Expose upload metrics through aggregator API (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Feb 5, 2024
1 parent cf22c8d commit 82588e0
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 69 deletions.
40 changes: 20 additions & 20 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3496,7 +3496,7 @@ mod tests {
.eq_report(&vdaf, leader_task.current_hpke_key(), &report));
assert_eq!(
got_counter,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 1, 0, 0)
Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 1, 0, 0))
)
}

Expand Down Expand Up @@ -3552,7 +3552,7 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 100, 0, 0),
Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 100, 0, 0))
);
}

Expand Down Expand Up @@ -3616,8 +3616,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 1, 0, 0, 0),
);
Some(TaskUploadCounter::new(0, 0, 0, 0, 1, 0, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -3657,8 +3657,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 1, 0, 0),
);
Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 1, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -3706,8 +3706,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 0, 1, 0),
);
Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 0, 1, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -3784,8 +3784,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 1, 0, 0, 0, 0, 0, 0, 0),
);
Some(TaskUploadCounter::new(1, 0, 0, 0, 0, 0, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -3916,8 +3916,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 0, 0, 0, 0, 1),
);
Some(TaskUploadCounter::new(0, 0, 0, 0, 0, 0, 0, 1))
)
}

#[tokio::test]
Expand Down Expand Up @@ -3972,8 +3972,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 0, 1, 0, 0, 0, 0),
);
Some(TaskUploadCounter::new(0, 0, 0, 1, 0, 0, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -4028,8 +4028,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 0, 1, 0, 0, 0, 0, 0),
);
Some(TaskUploadCounter::new(0, 0, 1, 0, 0, 0, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -4083,8 +4083,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 1, 0, 0, 0, 0, 0, 0),
);
Some(TaskUploadCounter::new(0, 1, 0, 0, 0, 0, 0, 0))
)
}

#[tokio::test]
Expand Down Expand Up @@ -4152,8 +4152,8 @@ mod tests {
.unwrap();
assert_eq!(
got_counters,
TaskUploadCounter::new(*task.id(), 0, 1, 0, 0, 0, 0, 0, 0),
);
Some(TaskUploadCounter::new(0, 1, 0, 0, 0, 0, 0, 0))
)
}

pub(crate) fn generate_helper_report_share<V: vdaf::Client<16>>(
Expand Down
4 changes: 4 additions & 0 deletions aggregator_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ pub fn aggregator_api_handler<C: Clock>(
.post("/tasks", instrumented(api(post_task::<C>)))
.get("/tasks/:task_id", instrumented(api(get_task::<C>)))
.delete("/tasks/:task_id", instrumented(api(delete_task::<C>)))
.get(
"/tasks/:task_id/metrics/uploads",
instrumented(api(get_task_upload_metrics::<C>)),
)
.get(
"/tasks/:task_id/metrics",
instrumented(api(get_task_metrics::<C>)),
Expand Down
5 changes: 4 additions & 1 deletion aggregator_api/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use derivative::Derivative;
use janus_aggregator_core::{
datastore::models::{GlobalHpkeKeypair, HpkeKeyState},
datastore::models::{GlobalHpkeKeypair, HpkeKeyState, TaskUploadCounter},
task::{AggregatorTask, QueryType},
taskprov::{PeerAggregator, VerifyKeyInit},
};
Expand Down Expand Up @@ -173,6 +173,9 @@ pub(crate) struct GetTaskMetricsResp {
pub(crate) report_aggregations: u64,
}

#[derive(Serialize)]
pub(crate) struct GetTaskUploadMetricsResp(pub(crate) TaskUploadCounter);

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GlobalHpkeConfigResp {
pub(crate) config: HpkeConfig,
Expand Down
20 changes: 17 additions & 3 deletions aggregator_api/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
models::{
AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp,
GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq,
PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp,
TaskprovPeerAggregatorResp,
GetTaskMetricsResp, GetTaskUploadMetricsResp, GlobalHpkeConfigResp,
PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq,
PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp, TaskprovPeerAggregatorResp,
},
Config, ConnExt, Error,
};
Expand Down Expand Up @@ -276,6 +276,20 @@ pub(super) async fn get_task_metrics<C: Clock>(
}))
}

pub(super) async fn get_task_upload_metrics<C: Clock>(
conn: &mut Conn,
State(ds): State<Arc<Datastore<C>>>,
) -> Result<Json<GetTaskUploadMetricsResp>, Error> {
let task_id = conn.task_id_param()?;
Ok(Json(GetTaskUploadMetricsResp(
ds.run_tx("get_task_upload_metrics", |tx| {
Box::pin(async move { tx.get_task_upload_counter(&task_id).await })
})
.await?
.ok_or(Error::NotFound)?,
)))
}

pub(super) async fn get_global_hpke_configs<C: Clock>(
_: &mut Conn,
State(ds): State<Arc<Datastore<C>>>,
Expand Down
114 changes: 110 additions & 4 deletions aggregator_api/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
aggregator_api_handler,
models::{
DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp,
PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq,
PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp,
DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp,
GetTaskUploadMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq,
PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, TaskResp,
TaskprovPeerAggregatorResp,
},
Config, CONTENT_TYPE,
};
Expand All @@ -14,7 +15,7 @@ use janus_aggregator_core::{
datastore::{
models::{
AggregationJob, AggregationJobState, HpkeKeyState, LeaderStoredReport,
ReportAggregation, ReportAggregationState,
ReportAggregation, ReportAggregationState, TaskUploadCounter, TaskUploadIncrementor,
},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore,
Expand Down Expand Up @@ -865,6 +866,78 @@ async fn get_task_metrics() {
);
}

#[tokio::test]
async fn get_task_upload_metrics() {
let (handler, _ephemeral_datastore, ds) = setup_api_test().await;
let task_id = ds
.run_unnamed_tx(|tx| {
Box::pin(async move {
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake)
.build()
.leader_view()
.unwrap();
let task_id = *task.id();
tx.put_aggregator_task(&task).await?;

for case in [
(TaskUploadIncrementor::ReportDecryptFailure, 2),
(TaskUploadIncrementor::ReportExpired, 4),
(TaskUploadIncrementor::ReportOutdatedKey, 6),
(TaskUploadIncrementor::ReportSuccess, 100),
(TaskUploadIncrementor::ReportTooEarly, 25),
(TaskUploadIncrementor::TaskExpired, 12),
] {
let ord = thread_rng().gen_range(0..32);
try_join_all(
(0..case.1)
.map(|_| tx.increment_task_upload_counter(&task_id, ord, &case.0)),
)
.await
.unwrap();
}

Ok(task_id)
})
})
.await
.unwrap();

// Verify: requesting metrics on a task returns the correct result.
assert_response!(
get(&format!("/tasks/{}/metrics/uploads", &task_id))
.with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}"))
.with_request_header("Accept", CONTENT_TYPE)
.run_async(&handler)
.await,
Status::Ok,
serde_json::to_string(&GetTaskUploadMetricsResp(TaskUploadCounter::new(
0, 0, 2, 4, 6, 100, 25, 12
)))
.unwrap(),
);

// Verify: requesting metrics on a nonexistent task returns NotFound.
assert_response!(
get(&format!("/tasks/{}/metrics/uploads", &random::<TaskId>()))
.with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}"))
.with_request_header("Accept", CONTENT_TYPE)
.run_async(&handler)
.await,
Status::NotFound,
"",
);

// Verify: unauthorized requests are denied appropriately.
assert_response!(
get(&format!("/tasks/{}/metrics/uploads", &task_id))
.with_request_header("Accept", CONTENT_TYPE)
.run_async(&handler)
.await,
Status::Unauthorized,
"",
);
}

#[tokio::test]
async fn get_global_hpke_configs() {
let (handler, _ephemeral_datastore, ds) = setup_api_test().await;
Expand Down Expand Up @@ -1984,3 +2057,36 @@ fn get_task_metrics_resp_serialization() {
],
)
}

#[test]
fn get_task_upload_metrics_serialization() {
assert_ser_tokens(
&GetTaskUploadMetricsResp(TaskUploadCounter::new(0, 1, 2, 3, 4, 5, 6, 7)),
&[
Token::NewtypeStruct {
name: "GetTaskUploadMetricsResp",
},
Token::Struct {
name: "TaskUploadCounter",
len: 8,
},
Token::Str("interval_collected"),
Token::U64(0),
Token::Str("report_decode_failure"),
Token::U64(1),
Token::Str("report_decrypt_failure"),
Token::U64(2),
Token::Str("report_expired"),
Token::U64(3),
Token::Str("report_outdated_key"),
Token::U64(4),
Token::Str("report_success"),
Token::U64(5),
Token::Str("report_too_early"),
Token::U64(6),
Token::Str("task_expired"),
Token::U64(7),
Token::StructEnd,
],
)
}
46 changes: 26 additions & 20 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4760,39 +4760,45 @@ impl<C: Clock> Transaction<'_, C> {
check_single_row_mutation(self.execute(&stmt, &[&aggregator_url, &role]).await?)
}

/// Get the [`TaskUploadCounter`] for a task. This is aggregated across all shards.
/// Get the [`TaskUploadCounter`] for a task. This is aggregated across all shards. Returns
/// `None` if the task doesn't exist.
#[tracing::instrument(skip(self), err)]
pub async fn get_task_upload_counter(
&self,
task_id: &TaskId,
) -> Result<TaskUploadCounter, Error> {
) -> Result<Option<TaskUploadCounter>, Error> {
let stmt = self
.prepare_cached(
"SELECT
COALESCE(SUM(interval_collected)::BIGINT, 0) AS interval_collected,
COALESCE(SUM(report_decode_failure)::BIGINT, 0) AS report_decode_failure,
COALESCE(SUM(report_decrypt_failure)::BIGINT, 0) AS report_decrypt_failure,
COALESCE(SUM(report_expired)::BIGINT, 0) AS report_expired,
COALESCE(SUM(report_outdated_key)::BIGINT, 0) AS report_outdated_key,
COALESCE(SUM(report_success)::BIGINT, 0) AS report_success,
COALESCE(SUM(report_too_early)::BIGINT, 0) AS report_too_early,
COALESCE(SUM(task_expired)::BIGINT, 0) AS task_expired
SUM(interval_collected)::BIGINT AS interval_collected,
SUM(report_decode_failure)::BIGINT AS report_decode_failure,
SUM(report_decrypt_failure)::BIGINT AS report_decrypt_failure,
SUM(report_expired)::BIGINT AS report_expired,
SUM(report_outdated_key)::BIGINT AS report_outdated_key,
SUM(report_success)::BIGINT AS report_success,
SUM(report_too_early)::BIGINT AS report_too_early,
SUM(task_expired)::BIGINT AS task_expired
FROM task_upload_counters
WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)",
)
.await?;

let row = self.query_one(&stmt, &[task_id.as_ref()]).await?;
Ok(TaskUploadCounter {
task_id: *task_id,
interval_collected: row.get_bigint_and_convert("interval_collected")?,
report_decode_failure: row.get_bigint_and_convert("report_decode_failure")?,
report_decrypt_failure: row.get_bigint_and_convert("report_decrypt_failure")?,
report_expired: row.get_bigint_and_convert("report_expired")?,
report_outdated_key: row.get_bigint_and_convert("report_outdated_key")?,
report_success: row.get_bigint_and_convert("report_success")?,
report_too_early: row.get_bigint_and_convert("report_too_early")?,
task_expired: row.get_bigint_and_convert("task_expired")?,
let interval_collected = row.get_nullable_bigint_and_convert("interval_collected")?;
Ok(match interval_collected {
Some(interval_collected) => Some(TaskUploadCounter {
interval_collected,
// The remaining columns should exist if the first one did, due to a DEFAULT 0
// clause, so we don't need to treat these as nullable.
report_decode_failure: row.get_bigint_and_convert("report_decode_failure")?,
report_decrypt_failure: row.get_bigint_and_convert("report_decrypt_failure")?,
report_expired: row.get_bigint_and_convert("report_expired")?,
report_outdated_key: row.get_bigint_and_convert("report_outdated_key")?,
report_success: row.get_bigint_and_convert("report_success")?,
report_too_early: row.get_bigint_and_convert("report_too_early")?,
task_expired: row.get_bigint_and_convert("task_expired")?,
}),
None => None,
})
}

Expand Down
Loading

0 comments on commit 82588e0

Please sign in to comment.