Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for public extensions. #3503

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,7 @@ impl VdafOps {
Err(err) => {
debug!(
report.task_id = %task.id(),
report.metadata = ?report.metadata(),
report.id = ?report.metadata().id(),
?err,
"public share decoding failed",
);
Expand Down Expand Up @@ -1681,7 +1681,7 @@ impl VdafOps {
Err(error) => {
debug!(
report.task_id = %task.id(),
report.metadata = ?report.metadata(),
report.id = ?report.metadata().id(),
?error,
"Report decryption failed",
);
Expand All @@ -1695,20 +1695,20 @@ impl VdafOps {
)
.and_then(|plaintext_input_share| {
Ok((
plaintext_input_share.extensions().to_vec(),
plaintext_input_share.private_extensions().to_vec(),
A::InputShare::get_decoded_with_param(
&(&vdaf, Role::Leader.index().unwrap()),
plaintext_input_share.payload(),
)?,
))
});

let (extensions, leader_input_share) = match decoded_leader_input_share {
let (leader_private_extensions, leader_input_share) = match decoded_leader_input_share {
Ok(leader_input_share) => leader_input_share,
Err(err) => {
debug!(
report.task_id = %task.id(),
report.metadata = ?report.metadata(),
report.id = ?report.metadata().id(),
?err,
"Leader input share decoding failed",
);
Expand All @@ -1721,7 +1721,7 @@ impl VdafOps {
*task.id(),
report.metadata().clone(),
public_share,
extensions,
leader_private_extensions,
leader_input_share,
report.helper_encrypted_input_share().clone(),
);
Expand Down Expand Up @@ -1773,7 +1773,7 @@ impl VdafOps {

if existing_aggregation_job.last_request_hash() != Some(request_hash) {
if let Some(log_forbidden_mutations) = log_forbidden_mutations {
let original_report_metadatas: Vec<_> = tx
let original_report_ids: Vec<_> = tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
Expand All @@ -1782,18 +1782,18 @@ impl VdafOps {
)
.await?
.iter()
.map(|ra| ra.report_metadata())
.map(|ra| *ra.report_id())
.collect();
let mutating_request_report_metadatas: Vec<_> = req
let mutating_request_report_ids: Vec<_> = req
.prepare_inits()
.iter()
.map(|pi| pi.report_share().metadata().clone())
.map(|pi| *pi.report_share().metadata().id())
.collect();
let event = AggregationJobInitForbiddenMutationEvent {
task_id: *task_id,
aggregation_job_id: *aggregation_job_id,
original_request_hash: existing_aggregation_job.last_request_hash(),
original_report_metadatas,
original_report_ids,
original_batch_id: format!(
"{:?}",
existing_aggregation_job.partial_batch_identifier()
Expand All @@ -1803,7 +1803,7 @@ impl VdafOps {
.get_encoded()
.map_err(|e| datastore::Error::User(e.into()))?,
mutating_request_hash: Some(request_hash),
mutating_request_report_metadatas,
mutating_request_report_ids,
mutating_request_batch_id: format!(
"{:?}",
req.batch_selector().batch_identifier()
Expand Down Expand Up @@ -2013,7 +2013,7 @@ impl VdafOps {
.map_err(|err| {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
?err,
"Couldn't encode input share AAD"
);
Expand All @@ -2040,7 +2040,7 @@ impl VdafOps {
.map_err(|error| {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
?error,
"Couldn't decrypt helper's report share"
);
Expand All @@ -2056,7 +2056,7 @@ impl VdafOps {
.map_err(|error| {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
?error, "Couldn't decode helper's plaintext input share",
);
metrics.aggregate_step_failure_counter.add(
Expand All @@ -2071,14 +2071,14 @@ impl VdafOps {

// Build map of extension type to extension data, checking for duplicates.
let mut extensions = HashMap::new();
if !plaintext_input_share.extensions().iter().all(|extension| {
if !plaintext_input_share.private_extensions().iter().chain(prepare_init.report_share().metadata().public_extensions()).all(|extension| {
extensions
.insert(*extension.extension_type(), extension.extension_data())
.is_none()
}) {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
"Received report share with duplicate extensions",
);
metrics
Expand All @@ -2095,7 +2095,7 @@ impl VdafOps {
if !valid_taskprov_extension_present {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
"Taskprov task received report with missing or malformed \
taskprov extension",
);
Expand All @@ -2112,7 +2112,7 @@ impl VdafOps {
// taskprov not enabled, but the taskprov extension is present.
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
"Non-taskprov task received report with unexpected taskprov \
extension",
);
Expand All @@ -2133,7 +2133,7 @@ impl VdafOps {
.map_err(|error| {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
?error, "Couldn't decode helper's input share",
);
metrics
Expand All @@ -2150,7 +2150,7 @@ impl VdafOps {
.map_err(|error| {
debug!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
report_id = ?prepare_init.report_share().metadata().id(),
?error, "Couldn't decode public share",
);
metrics
Expand Down
16 changes: 10 additions & 6 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ where
vdaf: V,
aggregation_param: V::AggregationParam,
hpke_config: HpkeConfig,
extensions: Vec<Extension>,
private_extensions: Vec<Extension>,
}

impl<const VERIFY_KEY_SIZE: usize, V> PrepareInitGenerator<VERIFY_KEY_SIZE, V>
Expand All @@ -68,12 +68,12 @@ where
vdaf,
aggregation_param,
hpke_config,
extensions: Vec::new(),
private_extensions: Vec::new(),
}
}

pub(super) fn with_extensions(mut self, extensions: Vec<Extension>) -> Self {
self.extensions = extensions;
pub(super) fn with_private_extensions(mut self, extensions: Vec<Extension>) -> Self {
self.private_extensions = extensions;
self
}

Expand All @@ -88,6 +88,7 @@ where
.now()
.to_batch_interval_start(self.task.time_precision())
.unwrap(),
Vec::new(),
),
measurement,
)
Expand Down Expand Up @@ -120,6 +121,7 @@ where
.now()
.to_batch_interval_start(self.task.time_precision())
.unwrap(),
Vec::new(),
),
measurement,
)
Expand All @@ -142,7 +144,7 @@ where
report_metadata,
&self.hpke_config,
&transcript.public_share,
self.extensions.clone(),
self.private_extensions.clone(),
&transcript.helper_input_share,
);
(report_share, transcript)
Expand Down Expand Up @@ -397,7 +399,7 @@ async fn aggregation_job_init_unexpected_taskprov_extension() {
let prepare_init = test_case
.prepare_init_generator
.clone()
.with_extensions(Vec::from([Extension::new(
.with_private_extensions(Vec::from([Extension::new(
ExtensionType::Taskprov,
Vec::new(),
)]))
Expand Down Expand Up @@ -553,6 +555,7 @@ async fn aggregation_job_intolerable_clock_skew() {
.now()
.add(test_case.task.tolerable_clock_skew())
.unwrap(),
Vec::new(),
),
&0,
)
Expand All @@ -570,6 +573,7 @@ async fn aggregation_job_intolerable_clock_skew() {
.unwrap()
.add(&Duration::from_seconds(1))
.unwrap(),
Vec::new(),
),
&0,
)
Expand Down
Loading
Loading