Skip to content

Commit

Permalink
aggregator auth tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Sep 22, 2023
1 parent fde4e09 commit 61b4668
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 212 deletions.
53 changes: 45 additions & 8 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,11 @@ pub struct Config {
/// New tasks will have this tolerable clock skew.
pub tolerable_clock_skew: Duration,

/// Defines the key used to deterministically derive the VDAF verify key for new tasks.
/// Defines the key used to derive the VDAF verify key for new tasks.
pub verify_key_init: VerifyKeyInit,

/// Authentication tokens used for requests from the leader.
pub auth_tokens: Vec<AuthenticationToken>,
}

// subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper
Expand All @@ -218,6 +221,7 @@ impl Default for Config {
report_expiry_age: None,
tolerable_clock_skew: Duration::from_minutes(60).unwrap(),
verify_key_init: random(),
auth_tokens: Vec::new(),
}
}
}
Expand Down Expand Up @@ -322,8 +326,13 @@ impl<C: Clock> Aggregator<C> {
task_aggregator
}
None if taskprov_task_config.is_some() => {
self.taskprov_opt_in(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_opt_in(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;

// Retry fetching the aggregator, since the last function would have just inserted
// its task.
Expand Down Expand Up @@ -366,8 +375,13 @@ impl<C: Clock> Aggregator<C> {
}

if taskprov_task_config.is_some() {
self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
} else if !auth_token
.map(|t| task_aggregator.task.check_aggregator_auth_token(&t))
.unwrap_or(false)
Expand Down Expand Up @@ -492,8 +506,13 @@ impl<C: Clock> Aggregator<C> {
// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we
// have to use the peer aggregator's collector config rather than the main task.
let collector_hpke_config = if taskprov_task_config.is_some() {
self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap())
.await?;
self.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
&self.cfg.collector_hpke_config
} else {
if !auth_token
Expand Down Expand Up @@ -574,15 +593,22 @@ impl<C: Clock> Aggregator<C> {
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(), Error> {
self.taskprov_authorize_request(peer_role, task_id, task_config)
self.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token)
.await?;

let aggregator_urls = task_config
.aggregator_endpoints()
.iter()
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() < 2 {
return Err(Error::UnrecognizedMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
}

// TODO(#1647): Check whether task config parameters are acceptable for privacy and
// availability of the system.
Expand Down Expand Up @@ -662,7 +688,18 @@ impl<C: Clock> Aggregator<C> {
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(), Error> {
let request_token = aggregator_auth_token.ok_or(Error::UnauthorizedRequest(*task_id))?;
if !self
.cfg
.auth_tokens
.iter()
.any(|token| token == request_token)
{
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_expiration() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired));
}
Expand Down
52 changes: 49 additions & 3 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ async fn aggregator_handler_with_aggregator<C: Clock>(
"hpke_config",
hpke_config_cors_preflight,
)
.post("upload", instrumented(api(upload::<C>)))
.with_route(trillium::Method::Options, "upload", upload_cors_preflight)
.post("aggregate", instrumented(api(aggregate::<C>)))
.post("collect", instrumented(api(collect_post::<C>)))
.post("aggregate_share", instrumented(api(aggregate_share::<C>)))
// TODO(#1728): remove these unnecessary routes, subscriber-01 is helper-only.
.get(
"collect/:task_id/:collection_job_id",
instrumented(api(collect_get::<C>)),
Expand All @@ -257,11 +257,57 @@ async fn aggregator_handler_with_aggregator<C: Clock>(
"collect/:task_id/:collection_job_id",
instrumented(api(collect_delete::<C>)),
)
.post("aggregate_share", instrumented(api(aggregate_share::<C>))),
.post("upload", instrumented(api(upload::<C>)))
.with_route(trillium::Method::Options, "upload", upload_cors_preflight),
StatusCounter::new(meter),
))
}

// pub fn authenticated<H: Handler, C: Clock>(handler: H) -> impl Handler {
// AuthenticatedHandler(handler, PhantomData::<C>)
// }

// #[derive(Handler)]
// struct AuthenticatedHandler<H, C: Clock>(#[handler(except = [run])] H, PhantomData<C>);

// impl<H: Handler, C: Clock> AuthenticatedHandler<H, C> {
// async fn run(&self, mut conn: Conn) -> Conn {
// let aggregator: Arc<Aggregator<C>> = conn.take_state().unwrap();

// let request_auth = {
// let bearer_token = match extract_bearer_token(&conn) {
// Ok(bearer_token) => bearer_token,
// Err(_) => todo!("bad request"),
// };

// match bearer_token {
// Some(bearer_token) => bearer_token,
// None => match conn.request_headers().get(DAP_AUTH_HEADER) {
// Some(dap_auth) => {
// match AuthenticationToken::new_dap_auth_token_from_bytes(dap_auth.as_ref())
// {
// Ok(dap_auth) => dap_auth,
// Err(_) => todo!("bad request"),
// }
// }
// None => todo!("unauthorized"),
// },
// }
// };

// if aggregator
// .cfg
// .auth_tokens
// .iter()
// .any(|token| *token == request_auth)
// {
// self.0.run(conn).await
// } else {
// conn.with_status(Status::Unauthorized).halt()
// }
// }
// }

/// API handler for the "/hpke_config" GET endpoint.
async fn hpke_config<C: Clock>(
conn: &mut Conn,
Expand Down
Loading

0 comments on commit 61b4668

Please sign in to comment.