Skip to content

Commit

Permalink
subscriber-01: Remove taskprov feature flag, always behave as if task…
Browse files Browse the repository at this point in the history
…prov is enabled (#1969)

* Remove taskprov feature flag, always behave as if taskprov is enabled

* Insert global key before running integration tests

* Clippy

* Fix comment
  • Loading branch information
inahga authored Sep 22, 2023
1 parent 21a0631 commit 64d46cf
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 389 deletions.
126 changes: 33 additions & 93 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ use crate::{
report_writer::{ReportWriteBatcher, WritableReport},
},
cache::{GlobalHpkeKeypairCache, PeerAggregatorCache},
config::TaskprovConfig,
Operation,
};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use bytes::Bytes;
use futures::future::try_join_all;
use http::{header::CONTENT_TYPE, Method};
Expand Down Expand Up @@ -184,8 +182,6 @@ pub struct Config {
/// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator
/// becomes aware of key state changes.
pub global_hpke_configs_refresh_interval: StdDuration,

pub taskprov_config: TaskprovConfig,
}

impl Default for Config {
Expand All @@ -195,7 +191,6 @@ impl Default for Config {
max_upload_batch_write_delay: StdDuration::ZERO,
batch_aggregation_shard_count: 1,
global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL,
taskprov_config: TaskprovConfig::default(),
}
}
}
Expand Down Expand Up @@ -252,52 +247,13 @@ impl<C: Clock> Aggregator<C> {
})
}

async fn handle_hpke_config(&self, task_id_base64: Option<&[u8]>) -> Result<HpkeConfig, Error> {
// If we're running in taskprov mode, unconditionally provide the global keys and ignore
// the task_id parameter.
if self.cfg.taskprov_config.enabled {
self.global_hpke_keypairs
.configs()
.iter()
.max_by_key(|cfg| cfg.id())
.ok_or_else(|| {
Error::Internal("this server is missing its global HPKE config".into())
})
.cloned()
} else {
// Otherwise, try to get the task-specific key.
match task_id_base64 {
Some(task_id_base64) => {
let task_id_bytes = URL_SAFE_NO_PAD
.decode(task_id_base64)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
let task_id = TaskId::get_decoded(&task_id_bytes)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
let task_aggregator = self
.task_aggregator_for(&task_id)
.await?
.ok_or(Error::UnrecognizedTask(task_id))?;

match task_aggregator.handle_hpke_config() {
Some(hpke_config_list) => Ok(hpke_config_list),
// Assuming something hasn't gone horribly wrong with the database, this
// should only happen in the case where the system has been moved from taskprov
// mode to non-taskprov mode. Thus there's still taskprov tasks in the database.
// This isn't a supported use case, so the operator needs to delete these tasks
// or move the system back into taskprov mode.
None => Err(Error::Internal("task has no HPKE configs".to_string())),
}
}
// No task ID present, try to fall back to a global config.
None => self
.global_hpke_keypairs
.configs()
.iter()
.max_by_key(|cfg| cfg.id())
.ok_or(Error::MissingTaskId)
.cloned(),
}
}
async fn handle_hpke_config(&self) -> Result<HpkeConfig, Error> {
self.global_hpke_keypairs
.configs()
.iter()
.max_by_key(|cfg| cfg.id())
.ok_or_else(|| Error::Internal("this server is missing its global HPKE config".into()))
.cloned()
}

async fn handle_upload(&self, report_bytes: &[u8]) -> Result<(), Arc<Error>> {
Expand Down Expand Up @@ -341,7 +297,7 @@ impl<C: Clock> Aggregator<C> {
}
task_aggregator
}
None if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() => {
None if taskprov_task_config.is_some() => {
self.taskprov_opt_in(
&Role::Leader,
task_id,
Expand Down Expand Up @@ -390,7 +346,7 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnrecognizedTask(*task_id));
}

if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() {
if taskprov_task_config.is_some() {
self.taskprov_authorize_request(
&Role::Leader,
task_id,
Expand Down Expand Up @@ -521,33 +477,32 @@ impl<C: Clock> Aggregator<C> {

// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we
// have to use the peer aggregator's collector config rather than the main task.
let collector_hpke_config =
if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() {
let (peer_aggregator, _) = self
.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;
let collector_hpke_config = if taskprov_task_config.is_some() {
let (peer_aggregator, _) = self
.taskprov_authorize_request(
&Role::Leader,
task_id,
taskprov_task_config.unwrap(),
auth_token.as_ref(),
)
.await?;

peer_aggregator.collector_hpke_config()
} else {
if !auth_token
.map(|t| task_aggregator.task.check_aggregator_auth_token(&t))
.unwrap_or(false)
{
return Err(Error::UnauthorizedRequest(*task_id));
}
peer_aggregator.collector_hpke_config()
} else {
if !auth_token
.map(|t| task_aggregator.task.check_aggregator_auth_token(&t))
.unwrap_or(false)
{
return Err(Error::UnauthorizedRequest(*task_id));
}

task_aggregator
.task
.collector_hpke_config()
.ok_or_else(|| {
Error::Internal("task is missing collector_hpke_config".to_string())
})?
};
task_aggregator
.task
.collector_hpke_config()
.ok_or_else(|| {
Error::Internal("task is missing collector_hpke_config".to_string())
})?
};

task_aggregator
.handle_aggregate_share(
Expand Down Expand Up @@ -817,21 +772,6 @@ impl<C: Clock> TaskAggregator<C> {
})
}

fn handle_hpke_config(&self) -> Option<HpkeConfig> {
// TODO(#239): consider deciding a better way to determine "primary" (e.g. most-recent) HPKE
// config/key -- right now it's the one with the maximal config ID, but that will run into
// trouble if we ever need to wrap-around, which we may since config IDs are effectively a u8.
Some(
self.task
.hpke_keys()
.iter()
.max_by_key(|(&id, _)| id)?
.1
.config()
.clone(),
)
}

async fn handle_upload(
&self,
clock: &C,
Expand Down
Loading

0 comments on commit 64d46cf

Please sign in to comment.