From bd9a09fc830e38403aa61d8f24b12b1fef5071f3 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 7 Dec 2022 10:00:24 -0600 Subject: [PATCH] Add per-task configuration for Daphne compatibility (#818) --- aggregator/src/aggregator.rs | 22 +++++-- .../src/aggregator/aggregation_job_driver.rs | 1 + aggregator/src/datastore.rs | 16 +++-- aggregator/src/task.rs | 48 ++++++++++++--- client/src/lib.rs | 18 +++++- core/src/hpke.rs | 61 +++++++++++++++++-- db/schema.sql | 25 ++++---- integration_tests/src/client.rs | 8 ++- integration_tests/tests/common/mod.rs | 2 + integration_tests/tests/divviup_ts.rs | 2 +- integration_tests/tests/janus.rs | 38 +++++++++++- .../src/bin/janus_interop_aggregator.rs | 1 + .../src/bin/janus_interop_client.rs | 1 + interop_binaries/src/lib.rs | 4 ++ 14 files changed, 207 insertions(+), 40 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index f8f3f697c..5c19a9d1c 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -1323,6 +1323,7 @@ impl VdafOps { report.task_id(), report.metadata(), report.public_share(), + task.input_share_aad_public_share_length_prefix(), ), ) { Ok(leader_decrypted_input_share) => leader_decrypted_input_share, @@ -1501,6 +1502,7 @@ impl VdafOps { task.id(), report_share.metadata(), report_share.public_share(), + task.input_share_aad_public_share_length_prefix(), ), ) .map_err(|error| { @@ -3500,6 +3502,7 @@ mod tests { task.id(), &report_metadata, &public_share.get_encoded(), + false, ); let leader_ciphertext = hpke::seal( @@ -4318,8 +4321,12 @@ mod tests { ); let mut input_share_bytes = input_share.get_encoded(); input_share_bytes.push(0); // can no longer be decoded. - let aad = - associated_data_for_report_share(task.id(), &report_metadata_2, &encoded_public_share); + let aad = associated_data_for_report_share( + task.id(), + &report_metadata_2, + &encoded_public_share, + false, + ); let report_share_2 = generate_helper_report_share_for_plaintext( report_metadata_2, &hpke_key.0, @@ -4415,7 +4422,8 @@ mod tests { .unwrap(), Vec::new(), ); - let aad = associated_data_for_report_share(task.id(), &report_metadata_6, &public_share_6); + let aad = + associated_data_for_report_share(task.id(), &report_metadata_6, &public_share_6, false); let report_share_6 = generate_helper_report_share_for_plaintext( report_metadata_6, &hpke_key.0, @@ -7940,8 +7948,12 @@ mod tests { for<'a> &'a V::AggregateShare: Into>, { let encoded_public_share = public_share.get_encoded(); - let associated_data = - associated_data_for_report_share(task_id, report_metadata, &encoded_public_share); + let associated_data = associated_data_for_report_share( + task_id, + report_metadata, + &encoded_public_share, + false, + ); generate_helper_report_share_for_plaintext( report_metadata.clone(), cfg, diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index eab6b4330..20c41550a 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -2199,6 +2199,7 @@ mod tests { task_id, report_metadata, &public_share.get_encoded(), + false, ), ) .unwrap(); diff --git a/aggregator/src/datastore.rs b/aggregator/src/datastore.rs index b77735e77..cf335528f 100644 --- a/aggregator/src/datastore.rs +++ b/aggregator/src/datastore.rs @@ -291,8 +291,9 @@ impl Transaction<'_, C> { .prepare_cached( "INSERT INTO tasks (task_id, aggregator_role, aggregator_endpoints, query_type, vdaf, max_batch_query_count, task_expiration, min_batch_size, time_precision, - tolerable_clock_skew, collector_hpke_config) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)", + tolerable_clock_skew, collector_hpke_config, + input_share_aad_public_share_length_prefix) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)", ) .await?; self.tx @@ -313,6 +314,8 @@ impl Transaction<'_, C> { /* tolerable_clock_skew */ &i64::try_from(task.tolerable_clock_skew().as_seconds())?, /* collector_hpke_config */ &task.collector_hpke_config().get_encoded(), + /* input_share_aad_public_share_length_prefix */ + &task.input_share_aad_public_share_length_prefix(), ], ) .await?; @@ -517,7 +520,8 @@ impl Transaction<'_, C> { .prepare_cached( "SELECT aggregator_role, aggregator_endpoints, query_type, vdaf, max_batch_query_count, task_expiration, min_batch_size, time_precision, - tolerable_clock_skew, collector_hpke_config + tolerable_clock_skew, collector_hpke_config, + input_share_aad_public_share_length_prefix FROM tasks WHERE task_id = $1", ) .await?; @@ -594,7 +598,8 @@ impl Transaction<'_, C> { .prepare_cached( "SELECT task_id, aggregator_role, aggregator_endpoints, query_type, vdaf, max_batch_query_count, task_expiration, min_batch_size, time_precision, - tolerable_clock_skew, collector_hpke_config + tolerable_clock_skew, collector_hpke_config, + input_share_aad_public_share_length_prefix FROM tasks", ) .await?; @@ -741,6 +746,8 @@ impl Transaction<'_, C> { let tolerable_clock_skew = Duration::from_seconds(row.get_bigint_and_convert("tolerable_clock_skew")?); let collector_hpke_config = HpkeConfig::get_decoded(row.get("collector_hpke_config"))?; + let input_share_aad_public_share_length_prefix = + row.get("input_share_aad_public_share_length_prefix"); // Aggregator authentication tokens. let mut aggregator_auth_tokens = Vec::new(); @@ -827,6 +834,7 @@ impl Transaction<'_, C> { aggregator_auth_tokens, collector_auth_tokens, hpke_configs, + input_share_aad_public_share_length_prefix, )?) } diff --git a/aggregator/src/task.rs b/aggregator/src/task.rs index f9974a646..9bc123dd5 100644 --- a/aggregator/src/task.rs +++ b/aggregator/src/task.rs @@ -111,6 +111,8 @@ pub struct Task { collector_auth_tokens: Vec, /// HPKE configurations & private keys used by this aggregator to decrypt client reports. hpke_keys: HashMap, + /// Configuration option to add a length prefix for the public share in the input share AAD. + input_share_aad_public_share_length_prefix: bool, } impl Task { @@ -131,6 +133,7 @@ impl Task { aggregator_auth_tokens: Vec, collector_auth_tokens: Vec, hpke_keys: I, + input_share_aad_public_share_length_prefix: bool, ) -> Result { // Ensure provided aggregator endpoints end with a slash, as we will be joining additional // path segments into these endpoints & the Url::join implementation is persnickety about @@ -161,6 +164,7 @@ impl Task { aggregator_auth_tokens, collector_auth_tokens, hpke_keys, + input_share_aad_public_share_length_prefix, }; task.validate()?; Ok(task) @@ -331,6 +335,12 @@ impl Task { let secret_bytes = self.vdaf_verify_keys.first().unwrap(); VerifyKey::try_from(secret_bytes).map_err(|_| Error::AggregatorVerifyKeySize) } + + /// Fetch the configuration setting specifying whether an additional length prefix should be + /// added to the input share AAD, before the public share. + pub fn input_share_aad_public_share_length_prefix(&self) -> bool { + self.input_share_aad_public_share_length_prefix + } } fn fmt_vector_of_urls(urls: &Vec, f: &mut Formatter<'_>) -> fmt::Result { @@ -360,6 +370,7 @@ struct SerializedTask { aggregator_auth_tokens: Vec, // in unpadded base64url collector_auth_tokens: Vec, // in unpadded base64url hpke_keys: Vec, // in unpadded base64url + input_share_aad_public_share_length_prefix: bool, } impl Serialize for Task { @@ -402,6 +413,8 @@ impl Serialize for Task { aggregator_auth_tokens, collector_auth_tokens, hpke_keys, + input_share_aad_public_share_length_prefix: self + .input_share_aad_public_share_length_prefix, } .serialize(serializer) } @@ -482,6 +495,7 @@ impl<'de> Deserialize<'de> for Task { aggregator_auth_tokens, collector_auth_tokens, hpke_keys, + serialized_task.input_share_aad_public_share_length_prefix, ) .map_err(D::Error::custom) } @@ -635,6 +649,7 @@ pub mod test_util { (aggregator_config_0, aggregator_private_key_0), (aggregator_config_1, aggregator_private_key_1), ]), + false, ) .unwrap(), ) @@ -741,6 +756,17 @@ pub mod test_util { }) } + /// Selects the input share AAD format. + pub fn with_input_share_aad_public_share_length_prefix( + self, + input_share_aad_public_share_length_prefix: bool, + ) -> Self { + Self(Task { + input_share_aad_public_share_length_prefix, + ..self.0 + }) + } + /// Consumes this task builder & produces a [`Task`] with the given specifications. pub fn build(self) -> Task { self.0.validate().unwrap(); @@ -772,14 +798,15 @@ mod tests { #[test] fn task_serialization() { - roundtrip_encoding( - TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Aes128Count, - Role::Leader, - ) - .build(), - ); + let mut task = TaskBuilder::new( + QueryType::TimeInterval, + VdafInstance::Prio3Aes128Count, + Role::Leader, + ) + .build(); + roundtrip_encoding(task.clone()); + task.input_share_aad_public_share_length_prefix = true; + roundtrip_encoding(task); } #[test] @@ -804,6 +831,7 @@ mod tests { Vec::from([generate_auth_token()]), Vec::new(), Vec::from([generate_test_hpke_config_and_private_key()]), + false, ) .unwrap_err(); @@ -827,6 +855,7 @@ mod tests { Vec::from([generate_auth_token()]), Vec::from([generate_auth_token()]), Vec::from([generate_test_hpke_config_and_private_key()]), + false, ) .unwrap(); @@ -850,6 +879,7 @@ mod tests { Vec::from([generate_auth_token()]), Vec::new(), Vec::from([generate_test_hpke_config_and_private_key()]), + false, ) .unwrap(); @@ -873,6 +903,7 @@ mod tests { Vec::from([generate_auth_token()]), Vec::from([generate_auth_token()]), Vec::from([generate_test_hpke_config_and_private_key()]), + false, ) .unwrap_err(); } @@ -898,6 +929,7 @@ mod tests { Vec::from([generate_auth_token()]), Vec::from([generate_auth_token()]), Vec::from([generate_test_hpke_config_and_private_key()]), + false, ) .unwrap(); diff --git a/client/src/lib.rs b/client/src/lib.rs index f10e987a6..801c4fd90 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -65,15 +65,24 @@ pub struct ClientParameters { time_precision: Duration, /// Parameters to use when retrying HTTP requests. http_request_retry_parameters: ExponentialBackoff, + /// Configuration setting to add an additional length prefix to the input share AAD, before + /// the public share. + input_share_aad_public_share_length_prefix: bool, } impl ClientParameters { /// Creates a new set of client task parameters. - pub fn new(task_id: TaskId, aggregator_endpoints: Vec, time_precision: Duration) -> Self { + pub fn new( + task_id: TaskId, + aggregator_endpoints: Vec, + time_precision: Duration, + input_share_aad_public_share_length_prefix: bool, + ) -> Self { Self::new_with_backoff( task_id, aggregator_endpoints, time_precision, + input_share_aad_public_share_length_prefix, http_request_exponential_backoff(), ) } @@ -83,6 +92,7 @@ impl ClientParameters { task_id: TaskId, mut aggregator_endpoints: Vec, time_precision: Duration, + input_share_aad_public_share_length_prefix: bool, http_request_retry_parameters: ExponentialBackoff, ) -> Self { // Ensure provided aggregator endpoints end with a slash, as we will be joining additional @@ -97,6 +107,7 @@ impl ClientParameters { aggregator_endpoints, time_precision, http_request_retry_parameters, + input_share_aad_public_share_length_prefix, } } @@ -222,6 +233,7 @@ where &self.parameters.task_id, &report_metadata, &public_share, + self.parameters.input_share_aad_public_share_length_prefix, ); let encrypted_input_shares: Vec = [ @@ -305,6 +317,7 @@ mod tests { random(), Vec::from([server_url.clone(), server_url]), Duration::from_seconds(1), + false, test_http_request_exponential_backoff(), ), vdaf_client, @@ -324,6 +337,7 @@ mod tests { "http://helper_endpoint".parse().unwrap(), ]), Duration::from_seconds(1), + false, ); assert_eq!( @@ -424,7 +438,7 @@ mod tests { install_test_trace_subscriber(); let client_parameters = - ClientParameters::new(random(), Vec::new(), Duration::from_seconds(0)); + ClientParameters::new(random(), Vec::new(), Duration::from_seconds(0), false); let client = Client::new( client_parameters, Prio3::new_aes128_count(2).unwrap(), diff --git a/core/src/hpke.rs b/core/src/hpke.rs index ead546d1a..d085fb0c0 100644 --- a/core/src/hpke.rs +++ b/core/src/hpke.rs @@ -6,7 +6,7 @@ use janus_messages::{ query_type::QueryType, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, ReportMetadata, Role, TaskId, }; -use prio::codec::Encode; +use prio::codec::{encode_u32_items, Encode}; use std::{fmt::Debug, str::FromStr}; #[derive(Debug, thiserror::Error)] @@ -40,11 +40,16 @@ pub fn associated_data_for_report_share( task_id: &TaskId, report_metadata: &ReportMetadata, public_share: &[u8], + tweak: bool, ) -> Vec { let mut associated_data = Vec::new(); task_id.encode(&mut associated_data); report_metadata.encode(&mut associated_data); - associated_data.extend(public_share); + if tweak { + encode_u32_items(&mut associated_data, &(), public_share); + } else { + associated_data.extend(public_share); + } associated_data } @@ -221,12 +226,15 @@ pub mod test_util { #[cfg(test)] mod tests { - use super::{test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, Label}; - use crate::hpke::{open, seal, HpkePrivateKey}; + use super::{ + associated_data_for_report_share, open, seal, + test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, HpkePrivateKey, + Label, + }; use hpke_dispatch::{Kem, Keypair}; use janus_messages::{ HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, - Role, + ReportId, ReportMetadata, Role, TaskId, Time, }; use serde::Deserialize; use std::collections::HashSet; @@ -479,4 +487,47 @@ mod tests { // total of 2 * 2 * 3 = 12 unique combinations of algorithms. assert_eq!(algorithms_tested.len(), 12); } + + #[test] + fn report_share_aad() { + let task_id = TaskId::from([1; 32]); + let report_id = ReportId::from([2; 16]); + let time = Time::from_seconds_since_epoch(1_000_000_000); + let report_metadata = ReportMetadata::new(report_id, time, Vec::new()); + let public_share = b"public share"; + assert_eq!( + associated_data_for_report_share(&task_id, &report_metadata, &public_share[..], false), + hex::decode(concat!( + // Task ID + "0101010101010101010101010101010101010101010101010101010101010101", + // Report ID + "02020202020202020202020202020202", + // Time + "000000003b9aca00", + // Length of extensions (0) + "0000", + // Public share + "7075626c6963207368617265", + )) + .unwrap(), + ); + assert_eq!( + associated_data_for_report_share(&task_id, &report_metadata, &public_share[..], true), + hex::decode(concat!( + // Task ID + "0101010101010101010101010101010101010101010101010101010101010101", + // Report ID + "02020202020202020202020202020202", + // Time + "000000003b9aca00", + // Length of extensions (0) + "0000", + // Length of public share (12) + "0000000c", + // Public share + "7075626c6963207368617265", + )) + .unwrap(), + ); + } } diff --git a/db/schema.sql b/db/schema.sql index a154c37c5..d9566b33f 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -11,18 +11,19 @@ CREATE TYPE AGGREGATOR_ROLE AS ENUM( -- Corresponds to a DAP task, containing static data associated with the task. CREATE TABLE tasks( - id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only - task_id BYTEA UNIQUE NOT NULL, -- 32-byte TaskID as defined by the DAP specification - aggregator_role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator for this task - aggregator_endpoints TEXT[] NOT NULL, -- aggregator HTTPS endpoints, leader first - query_type JSONB NOT NULL, -- the query type in use for this task, along with its parameters - vdaf JSON NOT NULL, -- the VDAF instance in use for this task, along with its parameters - max_batch_query_count BIGINT NOT NULL, -- the maximum number of times a given batch may be collected - task_expiration TIMESTAMP NOT NULL, -- the time after which client reports are no longer accepted - min_batch_size BIGINT NOT NULL, -- the minimum number of reports in a batch to allow it to be collected - time_precision BIGINT NOT NULL, -- the duration to which clients are expected to round their report timestamps, in seconds - tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds - collector_hpke_config BYTEA NOT NULL -- the HPKE config of the collector (encoded HpkeConfig message) + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BYTEA UNIQUE NOT NULL, -- 32-byte TaskID as defined by the DAP specification + aggregator_role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator for this task + aggregator_endpoints TEXT[] NOT NULL, -- aggregator HTTPS endpoints, leader first + query_type JSONB NOT NULL, -- the query type in use for this task, along with its parameters + vdaf JSON NOT NULL, -- the VDAF instance in use for this task, along with its parameters + max_batch_query_count BIGINT NOT NULL, -- the maximum number of times a given batch may be collected + task_expiration TIMESTAMP NOT NULL, -- the time after which client reports are no longer accepted + min_batch_size BIGINT NOT NULL, -- the minimum number of reports in a batch to allow it to be collected + time_precision BIGINT NOT NULL, -- the duration to which clients are expected to round their report timestamps, in seconds + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + input_share_aad_public_share_length_prefix BOOLEAN NOT NULL DEFAULT false -- selects which input share AAD format should be used (true: includes a length prefix for the public share, akin to DAP-03, false: no length prefix for the public share) ); -- The aggregator authentication tokens used by a given task. diff --git a/integration_tests/src/client.rs b/integration_tests/src/client.rs index 0a1a21dfe..f2f9021e2 100644 --- a/integration_tests/src/client.rs +++ b/integration_tests/src/client.rs @@ -218,8 +218,12 @@ where aggregator_endpoints: Vec, vdaf: V, ) -> Result, janus_client::Error> { - let client_parameters = - ClientParameters::new(*task.id(), aggregator_endpoints, *task.time_precision()); + let client_parameters = ClientParameters::new( + *task.id(), + aggregator_endpoints, + *task.time_precision(), + task.input_share_aad_public_share_length_prefix(), + ); let http_client = default_http_client()?; let leader_config = aggregator_hpke_config(&client_parameters, &Role::Leader, task.id(), &http_client) diff --git a/integration_tests/tests/common/mod.rs b/integration_tests/tests/common/mod.rs index 8456571b6..cb921f02b 100644 --- a/integration_tests/tests/common/mod.rs +++ b/integration_tests/tests/common/mod.rs @@ -25,6 +25,7 @@ use tokio::time::{self, sleep}; pub fn test_task_builders( vdaf: VdafInstance, query_type: QueryType, + input_share_aad_public_share_length_prefix: bool, ) -> (HpkePrivateKey, TaskBuilder, TaskBuilder) { let endpoint_random_value = hex::encode(random::<[u8; 4]>()); let (collector_hpke_config, collector_private_key) = @@ -35,6 +36,7 @@ pub fn test_task_builders( Url::parse(&format!("http://helper-{endpoint_random_value}:8080/")).unwrap(), ])) .with_query_type(query_type) + .with_input_share_aad_public_share_length_prefix(input_share_aad_public_share_length_prefix) .with_min_batch_size(46) .with_collector_hpke_config(collector_hpke_config); let helper_task = leader_task diff --git a/integration_tests/tests/divviup_ts.rs b/integration_tests/tests/divviup_ts.rs index 2e5b37da4..aafea31c2 100644 --- a/integration_tests/tests/divviup_ts.rs +++ b/integration_tests/tests/divviup_ts.rs @@ -17,7 +17,7 @@ use common::{submit_measurements_and_verify_aggregate, test_task_builders}; async fn run_divviup_ts_integration_test(container_client: &Cli, vdaf: VdafInstance) { let (collector_private_key, leader_task, helper_task) = - test_task_builders(vdaf, janus_aggregator::task::QueryType::TimeInterval); + test_task_builders(vdaf, janus_aggregator::task::QueryType::TimeInterval, false); let leader_task = leader_task.build(); let network = generate_network_name(); let leader = Janus::new_in_container(container_client, &network, &leader_task).await; diff --git a/integration_tests/tests/janus.rs b/integration_tests/tests/janus.rs index 38680b20b..b5da99ac9 100644 --- a/integration_tests/tests/janus.rs +++ b/integration_tests/tests/janus.rs @@ -52,9 +52,10 @@ impl<'a> JanusPair<'a> { container_client: &'a Cli, vdaf: VdafInstance, query_type: QueryType, + input_share_aad_public_share_length_prefix: bool, ) -> JanusPair<'a> { let (collector_private_key, leader_task, helper_task) = - test_task_builders(vdaf, query_type); + test_task_builders(vdaf, query_type, input_share_aad_public_share_length_prefix); // The environment variables should either all be present, or all be absent let (leader_task, leader, helper) = match ( @@ -153,6 +154,7 @@ async fn janus_janus_count() { &container_client, VdafInstance::Prio3Aes128Count, QueryType::TimeInterval, + false, ) .await; @@ -178,6 +180,7 @@ async fn janus_janus_sum_16() { &container_client, VdafInstance::Prio3Aes128Sum { bits: 16 }, QueryType::TimeInterval, + false, ) .await; @@ -205,6 +208,7 @@ async fn janus_janus_histogram_4_buckets() { &container_client, VdafInstance::Prio3Aes128Histogram { buckets }, QueryType::TimeInterval, + false, ) .await; @@ -230,6 +234,7 @@ async fn janus_janus_count_vec_15() { &container_client, VdafInstance::Prio3Aes128CountVec { length: 15 }, QueryType::TimeInterval, + false, ) .await; @@ -255,6 +260,7 @@ async fn janus_janus_fixed_size() { &container_client, VdafInstance::Prio3Aes128Count, QueryType::FixedSize { max_batch_size: 50 }, + false, ) .await; @@ -268,3 +274,33 @@ async fn janus_janus_fixed_size() { ) .await; } + +/// This test runs an aggregation using an alternate input share AAD construction in both the +/// client and the aggregators. +#[tokio::test(flavor = "multi_thread")] +async fn janus_janus_aad_tweak() { + install_test_trace_subscriber(); + + // Start servers. + let container_client = container_client(); + let janus_pair = JanusPair::new( + &container_client, + VdafInstance::Prio3Aes128Count, + QueryType::TimeInterval, + true, + ) + .await; + assert!(janus_pair + .leader_task + .input_share_aad_public_share_length_prefix()); + + // Run the behavioral test. + submit_measurements_and_verify_aggregate( + (janus_pair.leader.port(), janus_pair.helper.port()), + &janus_pair.leader_task, + &janus_pair.collector_private_key, + &ClientBackend::InProcess, + janus_pair.leader.batch_discovery(), + ) + .await; +} diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index f8d0bb8a2..d775169e8 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -141,6 +141,7 @@ async fn handle_add_task( Vec::from([leader_authentication_token]), collector_authentication_tokens, [(hpke_config, private_key)], + request.input_share_aad_public_share_length_prefix, ) .context("error constructing task")?; diff --git a/interop_binaries/src/bin/janus_interop_client.rs b/interop_binaries/src/bin/janus_interop_client.rs index accfca3eb..d49ab7a95 100644 --- a/interop_binaries/src/bin/janus_interop_client.rs +++ b/interop_binaries/src/bin/janus_interop_client.rs @@ -89,6 +89,7 @@ where task_id, Vec::::from([request.leader, request.helper]), time_precision, + false, ); let leader_hpke_config = janus_client::aggregator_hpke_config( diff --git a/interop_binaries/src/lib.rs b/interop_binaries/src/lib.rs index d2982e257..6b43b1947 100644 --- a/interop_binaries/src/lib.rs +++ b/interop_binaries/src/lib.rs @@ -197,6 +197,8 @@ pub struct AggregatorAddTaskRequest { pub time_precision: u64, // in seconds pub collector_hpke_config: String, // in unpadded base64url pub task_expiration: u64, // in seconds since the epoch + #[serde(default)] + pub input_share_aad_public_share_length_prefix: bool, } #[derive(Debug, Serialize, Deserialize)] @@ -246,6 +248,8 @@ impl From for AggregatorAddTaskRequest { URL_SAFE_NO_PAD, ), task_expiration: task.task_expiration().as_seconds_since_epoch(), + input_share_aad_public_share_length_prefix: task + .input_share_aad_public_share_length_prefix(), } } }