Skip to content

Commit

Permalink
Specialize taskprov to only one peer aggregator
Browse files Browse the repository at this point in the history
Use a global aggregator auth token
  • Loading branch information
inahga committed Sep 25, 2023
1 parent febbd2d commit b91106d
Show file tree
Hide file tree
Showing 20 changed files with 420 additions and 1,359 deletions.
134 changes: 74 additions & 60 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
cache::{GlobalHpkeKeypairCache, PeerAggregatorCache},
cache::GlobalHpkeKeypairCache,
Operation,
};
use bytes::Bytes;
Expand All @@ -27,10 +27,10 @@ use janus_aggregator_core::{
},
query_type::AccumulableQueryType,
task::{self, Task, VerifyKey},
taskprov::{self, PeerAggregator},
taskprov::{self, VerifyKeyInit},
};
#[cfg(feature = "test-util")]
use janus_core::test_util::dummy_vdaf;
use janus_core::{hpke::generate_hpke_config_and_private_key, test_util::dummy_vdaf};
use janus_core::{
hpke::{self, aggregate_share_aad, input_share_aad, HpkeApplicationInfo, HpkeKeypair, Label},
http::response_to_problem_details,
Expand All @@ -46,6 +46,8 @@ use janus_messages::{
Duration, HpkeCiphertext, HpkeConfig, Interval, PartialBatchSelector, PrepareStep,
PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId,
};
#[cfg(feature = "test-util")]
use janus_messages::{HpkeAeadId, HpkeConfigId, HpkeKdfId, HpkeKemId};
use opentelemetry::{
metrics::{Counter, Histogram, Meter, Unit},
KeyValue,
Expand Down Expand Up @@ -157,13 +159,10 @@ pub struct Aggregator<C: Clock> {

/// Cache of global HPKE keypairs and configs.
global_hpke_keypairs: GlobalHpkeKeypairCache,

/// Cache of taskprov peer aggregators.
peer_aggregators: PeerAggregatorCache,
}

/// Config represents a configuration for an Aggregator.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Config {
/// Defines the maximum size of a batch of uploaded reports which will be written in a single
/// transaction.
Expand All @@ -182,15 +181,45 @@ pub struct Config {
/// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator
/// becomes aware of key state changes.
pub global_hpke_configs_refresh_interval: StdDuration,

/// Collection results will be encrypted to this public key.
pub collector_hpke_config: HpkeConfig,

/// New tasks will have this report expiration age.
pub report_expiry_age: Option<Duration>,

/// New tasks will have this tolerable clock skew.
pub tolerable_clock_skew: Duration,

/// Defines the key used to derive the VDAF verify key for new tasks.
pub verify_key_init: VerifyKeyInit,

/// Authentication tokens used for requests from the leader.
pub auth_tokens: Vec<AuthenticationToken>,
}

// subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper
// impl inside unit tests.
#[cfg(feature = "test-util")]
impl Default for Config {
fn default() -> Self {
Self {
max_upload_batch_size: 1,
max_upload_batch_write_delay: StdDuration::ZERO,
batch_aggregation_shard_count: 1,
global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL,
collector_hpke_config: generate_hpke_config_and_private_key(
HpkeConfigId::from(1),
HpkeKemId::X25519HkdfSha256,
HpkeKdfId::HkdfSha256,
HpkeAeadId::Aes128Gcm,
)
.config()
.clone(),
report_expiry_age: None,
tolerable_clock_skew: Duration::from_minutes(60).unwrap(),
verify_key_init: random(),
auth_tokens: Vec::new(),
}
}
}
Expand Down Expand Up @@ -231,8 +260,6 @@ impl<C: Clock> Aggregator<C> {
)
.await?;

let peer_aggregators = PeerAggregatorCache::new(&datastore).await?;

Ok(Self {
datastore,
clock,
Expand All @@ -243,7 +270,6 @@ impl<C: Clock> Aggregator<C> {
upload_decode_failure_counter,
aggregate_step_failure_counter,
global_hpke_keypairs,
peer_aggregators,
})
}

Expand Down Expand Up @@ -478,16 +504,14 @@ impl<C: Clock> Aggregator<C> {
// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we
// have to use the peer aggregator's collector config rather than the main task.
let collector_hpke_config = if taskprov_task_config.is_some() {
let (peer_aggregator, _) = self
.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;

peer_aggregator.collector_hpke_config()
self.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
&self.cfg.collector_hpke_config
} else {
if !auth_token
.map(|t| task_aggregator.task.check_aggregator_auth_token(&t))
Expand Down Expand Up @@ -561,18 +585,29 @@ impl<C: Clock> Aggregator<C> {
}

/// Opts in or out of a taskprov task.
#[tracing::instrument(skip(self, aggregator_auth_token), err)]
#[tracing::instrument(skip(self), err)]
async fn taskprov_opt_in(
&self,
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(), Error> {
let (peer_aggregator, aggregator_urls) = self
.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token)
self.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token)
.await?;

let aggregator_urls = task_config
.aggregator_endpoints()
.iter()
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() < 2 {
return Err(Error::UnrecognizedMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
}

// TODO(#1647): Check whether task config parameters are acceptable for privacy and
// availability of the system.

Expand All @@ -595,8 +630,10 @@ impl<C: Clock> Aggregator<C> {
}
};

let vdaf_verify_keys =
Vec::from([peer_aggregator.derive_vdaf_verify_key(task_id, &vdaf_instance)]);
let vdaf_verify_keys = Vec::from([self
.cfg
.verify_key_init
.derive_vdaf_verify_key(task_id, &vdaf_instance)]);

let task = taskprov::Task::new(
*task_id,
Expand All @@ -607,10 +644,10 @@ impl<C: Clock> Aggregator<C> {
vdaf_verify_keys,
task_config.query_config().max_batch_query_count() as u64,
Some(*task_config.task_expiration()),
peer_aggregator.report_expiry_age().cloned(),
self.cfg.report_expiry_age,
task_config.query_config().min_batch_size() as u64,
*task_config.query_config().time_precision(),
*peer_aggregator.tolerable_clock_skew(),
self.cfg.tolerable_clock_skew,
)
.map_err(|err| Error::InvalidTask(*task_id, OptOutReason::TaskParameters(err)))?;
self.datastore
Expand All @@ -636,45 +673,27 @@ impl<C: Clock> Aggregator<C> {
}
})?;

info!(?task, ?peer_aggregator, "taskprov: opted into new task");
info!(?task, "taskprov: opted into new task");
Ok(())
}

/// Validate and authorize a taskprov request. Returns values necessary for determining whether
/// we can opt into the task. This function might return an opt-out error for conditions that
/// are relevant for all DAP workflows (e.g. task expiration).
#[tracing::instrument(skip(self, aggregator_auth_token), err)]
#[tracing::instrument(skip(self), err)]
async fn taskprov_authorize_request(
&self,
peer_role: &Role,
task_id: &TaskId,
task_config: &TaskConfig,
aggregator_auth_token: Option<&AuthenticationToken>,
) -> Result<(&PeerAggregator, Vec<Url>), Error> {
let aggregator_urls = task_config
.aggregator_endpoints()
) -> Result<(), Error> {
let request_token = aggregator_auth_token.ok_or(Error::UnauthorizedRequest(*task_id))?;
if !self
.cfg
.auth_tokens
.iter()
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() < 2 {
return Err(Error::UnrecognizedMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
}
let peer_aggregator_url = &aggregator_urls[peer_role.index().unwrap()];

let peer_aggregator = self
.peer_aggregators
.get(peer_aggregator_url, peer_role)
.ok_or(Error::InvalidTask(
*task_id,
OptOutReason::NoSuchPeer(*peer_role),
))?;

if !aggregator_auth_token
.map(|t| peer_aggregator.check_aggregator_auth_token(t))
.unwrap_or(false)
.any(|token| token == request_token)
{
return Err(Error::UnauthorizedRequest(*task_id));
}
Expand All @@ -683,13 +702,8 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired));
}

debug!(
?task_id,
?task_config,
?peer_aggregator,
"taskprov: authorized request"
);
Ok((peer_aggregator, aggregator_urls))
debug!(?task_id, ?task_config, "taskprov: authorized request");
Ok(())
}

#[cfg(feature = "test-util")]
Expand Down
Loading

0 comments on commit b91106d

Please sign in to comment.