Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experimental] Implement one-helper, "ping-pong" aggregation. #1234

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ version = "0.4.6"
# (yet) need other default features.
# https://docs.rs/chrono/latest/chrono/#duration
chrono = { version = "0.4", default-features = false }
itertools = "0.10"
janus_aggregator = { version = "0.4", path = "aggregator" }
janus_aggregator_api = { version = "0.4", path = "aggregator_api" }
janus_aggregator_core = { version = "0.4", path = "aggregator_core" }
Expand All @@ -39,7 +40,8 @@ janus_interop_binaries = { version = "0.4", path = "interop_binaries" }
janus_messages = { version = "0.4", path = "messages" }
k8s-openapi = { version = "0.18.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.82.1", default-features = false, features = ["client", "rustls-tls"] }
prio = { version = "0.12.1", features = ["multithreaded"] }
# prio = { version = "0.12.0", features = ["multithreaded"] } # XXX: go back to a released version of prio once https://github.com/divviup/libprio-rs/commit/54a46230615d28c7e131d0595cc558e1619b8071 is released.
prio = { git = "https://github.com/divviup/libprio-rs.git", rev = "54a46230615d28c7e131d0595cc558e1619b8071", features = ["multithreaded", "experimental"] }
trillium = "0.2.8"
trillium-api = { version = "0.2.0-rc.2", default-features = false }
trillium-caching-headers = "0.2.1"
Expand Down
2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ uuid = { version = "1.3.1", features = ["v4"] }
[dev-dependencies]
assert_matches = "1"
hyper = "0.14.26"
itertools = "0.10.5"
itertools.workspace = true
janus_aggregator = { path = ".", features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator_core = { workspace = true, features = ["test-util"] }
mockito = "1.0.2"
Expand Down
444 changes: 231 additions & 213 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

133 changes: 71 additions & 62 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::aggregator::{aggregator_handler, tests::generate_helper_report_share, Config};
use crate::aggregator::{aggregator_handler, tests::generate_helper_report_init, Config};
use janus_aggregator_core::{
datastore::{
test_util::{ephemeral_datastore, EphemeralDatastore},
Expand All @@ -13,87 +13,94 @@ use janus_core::{
};
use janus_messages::{
query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, PartialBatchSelector,
ReportMetadata, ReportShare, Role,
ReportMetadata, ReportPrepInit, Role,
};
use prio::codec::Encode;
use prio::{codec::Encode, vdaf};
use rand::random;
use std::sync::Arc;
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{prelude::put, TestConn};

pub(super) struct ReportShareGenerator {
pub(super) struct ReportInitGenerator<const SEED_SIZE: usize, V>
where
V: vdaf::Vdaf,
{
clock: MockClock,
task: Task,
aggregation_param: dummy_vdaf::AggregationParam,
vdaf: dummy_vdaf::Vdaf,
vdaf: V,
aggregation_param: V::AggregationParam,
}

impl ReportShareGenerator {
impl<const SEED_SIZE: usize, V> ReportInitGenerator<SEED_SIZE, V>
where
V: vdaf::Vdaf + vdaf::Aggregator<SEED_SIZE, 16> + vdaf::Client<16>,
{
pub(super) fn new(
clock: MockClock,
task: Task,
aggregation_param: dummy_vdaf::AggregationParam,
vdaf: V,
aggregation_param: V::AggregationParam,
) -> Self {
Self {
clock,
task,
vdaf,
aggregation_param,
vdaf: dummy_vdaf::Vdaf::new(),
}
}

fn with_vdaf(mut self, vdaf: dummy_vdaf::Vdaf) -> Self {
self.vdaf = vdaf;
self
}

pub(super) fn next(&self) -> (ReportShare, VdafTranscript<0, dummy_vdaf::Vdaf>) {
self.next_with_metadata(ReportMetadata::new(
random(),
self.clock
.now()
.to_batch_interval_start(self.task.time_precision())
.unwrap(),
))
pub(super) fn next(
&self,
measurement: &V::Measurement,
) -> (ReportPrepInit, VdafTranscript<SEED_SIZE, V>) {
self.next_with_metadata(
ReportMetadata::new(
random(),
self.clock
.now()
.to_batch_interval_start(self.task.time_precision())
.unwrap(),
),
measurement,
)
}

pub(super) fn next_with_metadata(
&self,
report_metadata: ReportMetadata,
) -> (ReportShare, VdafTranscript<0, dummy_vdaf::Vdaf>) {
measurement: &V::Measurement,
) -> (ReportPrepInit, VdafTranscript<SEED_SIZE, V>) {
let transcript = run_vdaf(
&self.vdaf,
self.task.primary_vdaf_verify_key().unwrap().as_bytes(),
&self.aggregation_param,
report_metadata.id(),
&(),
measurement,
);
let report_share = generate_helper_report_share::<dummy_vdaf::Vdaf>(
let report_init = generate_helper_report_init::<SEED_SIZE, V>(
*self.task.id(),
report_metadata,
self.task.current_hpke_key().config(),
&transcript.public_share,
&transcript,
Vec::new(),
&transcript.input_shares[1],
);

(report_share, transcript)
(report_init, transcript)
}
}

pub(super) struct AggregationJobInitTestCase {
pub(super) struct AggregationJobInitTestCase<const SEED_SIZE: usize, V: vdaf::Vdaf> {
pub(super) clock: MockClock,
pub(super) task: Task,
pub(super) report_share_generator: ReportShareGenerator,
pub(super) report_shares: Vec<ReportShare>,
pub(super) report_init_generator: ReportInitGenerator<SEED_SIZE, V>,
pub(super) report_inits: Vec<ReportPrepInit>,
pub(super) aggregation_job_id: AggregationJobId,
pub(super) aggregation_param: dummy_vdaf::AggregationParam,
pub(super) aggregation_param: V::AggregationParam,
pub(super) handler: Box<dyn Handler>,
pub(super) datastore: Arc<Datastore<MockClock>>,
_ephemeral_datastore: EphemeralDatastore,
}

pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase {
pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase<0, dummy_vdaf::Vdaf> {
install_test_trace_subscriber();

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build();
Expand All @@ -108,19 +115,23 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase {

let aggregation_param = dummy_vdaf::AggregationParam(0);

let report_share_generator =
ReportShareGenerator::new(clock.clone(), task.clone(), aggregation_param);
let report_init_generator = ReportInitGenerator::new(
clock.clone(),
task.clone(),
dummy_vdaf::Vdaf::new(),
aggregation_param,
);

let report_shares = Vec::from([
report_share_generator.next().0,
report_share_generator.next().0,
let report_inits = Vec::from([
report_init_generator.next(&0).0,
report_init_generator.next(&0).0,
]);

let aggregation_job_id = random();
let aggregation_job_init_req = AggregationJobInitializeReq::new(
aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
report_shares.clone(),
report_inits.clone(),
);

let response = put_aggregation_job(
Expand All @@ -135,8 +146,8 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase {
AggregationJobInitTestCase {
clock,
task,
report_shares,
report_share_generator,
report_inits,
report_init_generator,
aggregation_job_id,
aggregation_param,
handler: Box::new(handler),
Expand Down Expand Up @@ -173,7 +184,7 @@ async fn aggregation_job_mutation_aggregation_job() {
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
dummy_vdaf::AggregationParam(1).get_encoded(),
PartialBatchSelector::new_time_interval(),
test_case.report_shares,
test_case.report_inits,
);

let response = put_aggregation_job(
Expand All @@ -192,28 +203,28 @@ async fn aggregation_job_mutation_report_shares() {

// Put the aggregation job again, mutating the associated report shares' metadata such that
// uniqueness constraints on client_reports are violated
for mutated_report_shares in [
for mutated_report_inits in [
// Omit a report share that was included previously
Vec::from(&test_case.report_shares[0..test_case.report_shares.len() - 1]),
Vec::from(&test_case.report_inits[0..test_case.report_inits.len() - 1]),
// Include a different report share than was included previously
[
&test_case.report_shares[0..test_case.report_shares.len() - 1],
&[test_case.report_share_generator.next().0],
&test_case.report_inits[0..test_case.report_inits.len() - 1],
&[test_case.report_init_generator.next(&0).0],
]
.concat(),
// Include an extra report share than was included previously
[
test_case.report_shares.as_slice(),
&[test_case.report_share_generator.next().0],
test_case.report_inits.as_slice(),
&[test_case.report_init_generator.next(&0).0],
]
.concat(),
// Reverse the order of the reports
test_case.report_shares.into_iter().rev().collect(),
test_case.report_inits.into_iter().rev().collect(),
] {
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
mutated_report_shares,
mutated_report_inits,
);
let response = put_aggregation_job(
&test_case.task,
Expand All @@ -230,26 +241,24 @@ async fn aggregation_job_mutation_report_shares() {
async fn aggregation_job_mutation_report_aggregations() {
let test_case = setup_aggregate_init_test().await;

// Generate some new reports using the existing reports' metadata, but varying the input shares
// such that the prepare state computed during aggregation initializaton won't match the first
// aggregation job.
let mutated_report_shares_generator = test_case
.report_share_generator
.with_vdaf(dummy_vdaf::Vdaf::new().with_input_share(dummy_vdaf::InputShare(1)));
let mutated_report_shares = test_case
.report_shares
// Generate some new reports using the existing reports' metadata, but varying the measurement
// values such that the prepare state computed during aggregation initializaton won't match the
// first aggregation job.
let mutated_report_inits = test_case
.report_inits
.iter()
.map(|s| {
mutated_report_shares_generator
.next_with_metadata(s.metadata().clone())
test_case
.report_init_generator
.next_with_metadata(s.report_share().metadata().clone(), &1)
.0
})
.collect();

let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
mutated_report_shares,
mutated_report_inits,
);
let response = put_aggregation_job(
&test_case.task,
Expand Down
Loading