Skip to content

Commit

Permalink
Implement draft-ietf-ppm-dap-taskprov-01.
Browse files Browse the repository at this point in the history
  • Loading branch information
branlwyd committed Nov 27, 2024
1 parent 9db619b commit 89cb727
Show file tree
Hide file tree
Showing 22 changed files with 672 additions and 922 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ url.workspace = true
uuid = { workspace = true, features = ["serde"] }

[dev-dependencies]
janus_aggregator = { path = ".", features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator = { workspace = true, features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator_core = { workspace = true, features = ["test-util"] }
mockito = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["testing"] }
Expand Down
49 changes: 16 additions & 33 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use janus_aggregator_core::{
},
Datastore, Error as DatastoreError, Transaction,
},
task::{self, AggregatorTask, VerifyKey},
task::{self, AggregatorTask, BatchMode, VerifyKey},
taskprov::PeerAggregator,
};
#[cfg(feature = "fpvec_bounded_l2")]
Expand All @@ -65,7 +65,7 @@ use janus_core::{
};
use janus_messages::{
batch_mode::{LeaderSelected, TimeInterval},
taskprov::{DpMechanism, TaskConfig},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, ExtensionType, HpkeConfig,
Expand Down Expand Up @@ -691,44 +691,26 @@ impl<C: Clock> Aggregator<C> {
// TODO(#1647): Check whether task config parameters are acceptable for privacy and
// availability of the system.

if let DpMechanism::Unrecognized { .. } =
task_config.vdaf_config().dp_config().dp_mechanism()
{
if !self
.cfg
.taskprov_config
.ignore_unknown_differential_privacy_mechanism
{
return Err(Error::InvalidTask(
*task_id,
OptOutReason::InvalidParameter("unrecognized DP mechanism".into()),
));
}
}

let vdaf_instance =
task_config
.vdaf_config()
.vdaf_type()
.try_into()
.map_err(|err: &str| {
Error::InvalidTask(*task_id, OptOutReason::InvalidParameter(err.to_string()))
})?;
let vdaf_instance = task_config.vdaf_config().try_into().map_err(|err: &str| {
Error::InvalidTask(*task_id, OptOutReason::InvalidParameter(err.to_string()))
})?;

let vdaf_verify_key = peer_aggregator.derive_vdaf_verify_key(task_id, &vdaf_instance);

let task_end = task_config.task_start().add(task_config.task_duration())?;

let task = Arc::new(
AggregatorTask::new(
*task_id,
leader_url,
task_config.query_config().query().try_into()?,
BatchMode::try_from(*task_config.batch_mode())?,
vdaf_instance,
vdaf_verify_key,
None, // TODO(#3636): update taskprov implementation to specify task start
Some(*task_config.task_end()),
Some(*task_config.task_start()),
Some(task_end),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
*task_config.query_config().time_precision(),
u64::from(*task_config.min_batch_size()),
*task_config.time_precision(),
*peer_aggregator.tolerable_clock_skew(),
task::AggregatorTaskParameters::TaskprovHelper,
)
Expand Down Expand Up @@ -795,7 +777,8 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_end() {
let task_end = task_config.task_start().add(task_config.task_duration())?;
if self.clock.now() > task_end {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded));
}

Expand Down Expand Up @@ -2098,7 +2081,7 @@ impl VdafOps {

if require_taskprov_extension {
let valid_taskprov_extension_present = extensions
.get(&ExtensionType::Taskprov)
.get(&ExtensionType::Taskbind)
.map(|data| data.is_empty())
.unwrap_or(false);
if !valid_taskprov_extension_present {
Expand All @@ -2117,7 +2100,7 @@ impl VdafOps {
);
return Err(ReportError::InvalidMessage);
}
} else if extensions.contains_key(&ExtensionType::Taskprov) {
} else if extensions.contains_key(&ExtensionType::Taskbind) {
// taskprov not enabled, but the taskprov extension is present.
debug!(
task_id = %task.id(),
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn aggregation_job_init_unexpected_taskprov_extension() {
.prepare_init_generator
.clone()
.with_private_extensions(Vec::from([Extension::new(
ExtensionType::Taskprov,
ExtensionType::Taskbind,
Vec::new(),
)]))
.next(&0)
Expand Down
66 changes: 32 additions & 34 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument};
use async_trait::async_trait;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_aggregator_core::{datastore::Datastore, instrumented};
use janus_aggregator_core::{datastore::Datastore, instrumented, taskprov::taskprov_task_id};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
http::extract_bearer_token,
Expand All @@ -25,10 +25,9 @@ use opentelemetry::{
KeyValue,
};
use prio::codec::Encode;
use ring::digest::{digest, SHA256};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{borrow::Cow, time::Duration as StdDuration};
use std::{io::Cursor, sync::Arc};
use tracing::warn;
use trillium::{Conn, Handler, KnownHeaderName, Status};
use trillium_api::{api, State, TryFromConn};
Expand Down Expand Up @@ -757,38 +756,37 @@ fn parse_taskprov_header<C: Clock>(
task_id: &TaskId,
conn: &Conn,
) -> Result<Option<TaskConfig>, Error> {
if aggregator.cfg.taskprov_config.enabled {
match conn.request_headers().get(TASKPROV_HEADER) {
Some(taskprov_header) => {
let task_config_encoded =
&URL_SAFE_NO_PAD.decode(taskprov_header).map_err(|_| {
Error::InvalidMessage(
Some(*task_id),
"taskprov header could not be decoded",
)
})?;

if task_id.as_ref() != digest(&SHA256, task_config_encoded).as_ref() {
Err(Error::InvalidMessage(
Some(*task_id),
"derived taskprov task ID does not match task config",
))
} else {
// TODO(#1684): Parsing the taskprov header like this before we've been able
// to actually authenticate the client is undesireable. We should rework this
// such that the authorization header is handled before parsing the untrusted
// input.
Ok(Some(
TaskConfig::decode(&mut Cursor::new(task_config_encoded))
.map_err(Error::MessageDecode)?,
))
}
}
None => Ok(None),
}
} else {
Ok(None)
if !aggregator.cfg.taskprov_config.enabled {
return Ok(None);
}

let taskprov_header = match conn.request_headers().get(TASKPROV_HEADER) {
Some(taskprov_header) => taskprov_header,
None => return Ok(None),
};

let task_config_encoded = URL_SAFE_NO_PAD.decode(taskprov_header).map_err(|_| {
Error::InvalidMessage(
Some(*task_id),
"taskprov header could not be base64-decoded",
)
})?;

// Compute expected task ID & verify it matches the task ID from the request.
let expected_task_id = taskprov_task_id(&task_config_encoded);
if task_id != &expected_task_id {
return Err(Error::InvalidMessage(
Some(*task_id),
"derived taskprov task ID does not match task config",
));
}

// TODO(#1684): Parsing the taskprov header like this before we've been able to actually
// authenticate the client is undesireable. We should rework this such that the authorization
// header is handled before parsing the untrusted input.
Ok(Some(
TaskConfig::get_decoded(&task_config_encoded).map_err(Error::MessageDecode)?,
))
}

struct BodyBytes(Vec<u8>);
Expand Down
5 changes: 1 addition & 4 deletions aggregator/src/aggregator/http_handlers/tests/hpke_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ async fn hpke_config_with_taskprov() {
.unwrap();

let cfg = Config {
taskprov_config: TaskprovConfig {
enabled: true,
ignore_unknown_differential_privacy_mechanism: false,
},
taskprov_config: TaskprovConfig { enabled: true },
hpke_config_signing_key: Some(hpke_config_signing_key()),
..Default::default()
};
Expand Down
Loading

0 comments on commit 89cb727

Please sign in to comment.