Skip to content

Commit

Permalink
Use a global aggregator auth token
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Sep 25, 2023
1 parent fde4e09 commit cac0c67
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 230 deletions.
67 changes: 51 additions & 16 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ use janus_aggregator_core::{
taskprov::{self, VerifyKeyInit},
};
#[cfg(feature = "test-util")]
use janus_core::test_util::dummy_vdaf;
use janus_core::{hpke::generate_hpke_config_and_private_key, test_util::dummy_vdaf};
use janus_core::{
hpke::{
self, aggregate_share_aad, generate_hpke_config_and_private_key, input_share_aad,
HpkeApplicationInfo, HpkeKeypair, Label,
},
hpke::{self, aggregate_share_aad, input_share_aad, HpkeApplicationInfo, HpkeKeypair, Label},
http::response_to_problem_details,
task::{AuthenticationToken, VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
time::{Clock, DurationExt, IntervalExt, TimeExt},
Expand All @@ -46,10 +43,11 @@ use janus_messages::{
taskprov::TaskConfig,
AggregateContinueReq, AggregateContinueResp, AggregateInitializeReq, AggregateInitializeResp,
AggregateShareReq, AggregateShareResp, BatchSelector, CollectReq, CollectResp, CollectionJobId,
Duration, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, Interval,
PartialBatchSelector, PrepareStep, PrepareStepResult, Report, ReportIdChecksum, ReportShare,
ReportShareError, Role, TaskId,
Duration, HpkeCiphertext, HpkeConfig, Interval, PartialBatchSelector, PrepareStep,
PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId,
};
#[cfg(feature = "test-util")]
use janus_messages::{HpkeAeadId, HpkeConfigId, HpkeKdfId, HpkeKemId};
use opentelemetry::{
metrics::{Counter, Histogram, Meter, Unit},
KeyValue,
Expand Down Expand Up @@ -193,8 +191,11 @@ pub struct Config {
/// New tasks will have this tolerable clock skew.
pub tolerable_clock_skew: Duration,

/// Defines the key used to deterministically derive the VDAF verify key for new tasks.
/// Defines the key used to derive the VDAF verify key for new tasks.
pub verify_key_init: VerifyKeyInit,

/// Authentication tokens used for requests from the leader.
pub auth_tokens: Vec<AuthenticationToken>,
}

// subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper
Expand All @@ -218,6 +219,7 @@ impl Default for Config {
report_expiry_age: None,
tolerable_clock_skew: Duration::from_minutes(60).unwrap(),
verify_key_init: random(),
auth_tokens: Vec::new(),
}
}
}
Expand Down Expand Up @@ -322,8 +324,13 @@ impl<C: Clock> Aggregator<C> {
task_aggregator
}
None if taskprov_task_config.is_some() => {
self.taskprov_opt_in(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_opt_in(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;

// Retry fetching the aggregator, since the last function would have just inserted
// its task.
Expand Down Expand Up @@ -366,8 +373,13 @@ impl<C: Clock> Aggregator<C> {
}

if taskprov_task_config.is_some() {
self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
} else if !auth_token
.map(|t| task_aggregator.task.check_aggregator_auth_token(&t))
.unwrap_or(false)
Expand Down Expand Up @@ -492,8 +504,13 @@ impl<C: Clock> Aggregator<C> {
// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we
// have to use the peer aggregator's collector config rather than the main task.
let collector_hpke_config = if taskprov_task_config.is_some() {
self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
&self.cfg.collector_hpke_config
} else {
if !auth_token
Expand Down Expand Up @@ -574,15 +591,22 @@ impl<C: Clock> Aggregator<C> {
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(), Error> {
self.taskprov_authorize_request(peer_role, task_id, task_config)
self.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token)
.await?;

let aggregator_urls = task_config
.aggregator_endpoints()
.iter()
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() < 2 {
return Err(Error::UnrecognizedMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
}

// TODO(#1647): Check whether task config parameters are acceptable for privacy and
// availability of the system.
Expand Down Expand Up @@ -662,7 +686,18 @@ impl<C: Clock> Aggregator<C> {
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(), Error> {
let request_token = aggregator_auth_token.ok_or(Error::UnauthorizedRequest(*task_id))?;
if !self
.cfg
.auth_tokens
.iter()
.any(|token| token == request_token)
{
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_expiration() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired));
}
Expand Down
7 changes: 4 additions & 3 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ async fn aggregator_handler_with_aggregator<C: Clock>(
"hpke_config",
hpke_config_cors_preflight,
)
.post("upload", instrumented(api(upload::<C>)))
.with_route(trillium::Method::Options, "upload", upload_cors_preflight)
.post("aggregate", instrumented(api(aggregate::<C>)))
.post("collect", instrumented(api(collect_post::<C>)))
.post("aggregate_share", instrumented(api(aggregate_share::<C>)))
// TODO(#1728): remove these unnecessary routes, subscriber-01 is helper-only.
.get(
"collect/:task_id/:collection_job_id",
instrumented(api(collect_get::<C>)),
Expand All @@ -257,7 +257,8 @@ async fn aggregator_handler_with_aggregator<C: Clock>(
"collect/:task_id/:collection_job_id",
instrumented(api(collect_delete::<C>)),
)
.post("aggregate_share", instrumented(api(aggregate_share::<C>))),
.post("upload", instrumented(api(upload::<C>)))
.with_route(trillium::Method::Options, "upload", upload_cors_preflight),
StatusCounter::new(meter),
))
}
Expand Down
Loading

0 comments on commit cac0c67

Please sign in to comment.