Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DAP-05 ping-pong topology
Browse files Browse the repository at this point in the history
This change implements the DAP-05 ping-pong topology in which
aggregators take turns preprocessing prepare shares into prepare
messages. While this topology first appeared in DAP-05, this
implementation follows DAP-06.

This change depends on the implementation of the VDAF ping-pong topology
added to crate `prio` in [1], which in turn conforms to the
specification in VDAF-07.

In the ping-pong topology, each DAP-layer step of aggregation now spans
two VDAF rounds. An aggregator will use the prepare message it gets from
its peer to advance by one VDAF round, and then can use the prepare
share it just computed along with the peer's prepare share to advance by
another. This incurs some changes to what intermediate values are stored
by aggregators.

In the case where a leader is continuing/waiting, it will have computed
a prepare state, a prepare message for the current round and a prepare
share for the next round. The naive implementation would store all three
objects in the database, significantly increasing the per-report storage
use. To mitigate this, the leader stores a
`prio::topology::ping_pong::PingPongTransition`, which will contain a
prepare state and a prepare message (both of which are generally much
smaller than prepare shares), from which the next prepare state and
importantly prepare share can be recomputed.

On the helper side, there's no way around storing the prepare share: we
store the most recently computed `PrepareResp` so that we can handle
aggregation jobs idempotently. But to avoid storing prepare messages
twice, the continuing/waiting helper stores just a prepare state and a
`PingPongMessage`.

The main benefit of this change is to reduce how many round trips
between aggregators are needed to prepare reports. Quite a few tests
used Prio3 but depended on having the leader or helper in the `Waiting`
state after running aggregation initialization. Accordingly, those tests
are changed to run Poplar1, which now takes 2 rounds.

[1]: divviup/libprio-rs#683

Co-authored-by: Tim Geoghegan <timg@divviup.org>

Part of #1669
branlwyd authored and tgeoghegan committed Sep 13, 2023
1 parent 0914505 commit 07d8ce8
Showing 27 changed files with 3,921 additions and 2,191 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ version = "0.6.0"

[workspace.dependencies]
anyhow = "1"
base64 = "0.21.3"
# Disable default features to disable compatibility with the old `time` crate, and we also don't
# (yet) need other default features.
# https://docs.rs/chrono/latest/chrono/#duration
@@ -42,7 +43,9 @@ janus_messages = { version = "0.6", path = "messages" }
k8s-openapi = { version = "0.18.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.82.2", default-features = false, features = ["client", "rustls-tls"] }
opentelemetry = { version = "0.20", features = ["metrics"] }
prio = { version = "0.15.0", features = ["multithreaded"] }
# TODO(timg): go back to a released version of prio
#prio = { version = "0.15.0", features = ["multithreaded"] }
prio = { git = "https://github.com/divviup/libprio-rs", branch = "timg/ping-pong-topology", features = ["multithreaded", "experimental"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.106"
serde_test = "1.0.175"
2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ test-util = [
async-trait = "0.1"
anyhow.workspace = true
backoff = { version = "0.4.0", features = ["tokio"] }
base64 = "0.21.4"
base64.workspace = true
bytes = "1.5.0"
chrono.workspace = true
clap = { version = "4.4.2", features = ["derive", "env"] }
285 changes: 167 additions & 118 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ pub struct Accumulator<
aggregations: HashMap<Q::BatchIdentifier, BatchData<SEED_SIZE, Q, A>>,
}

#[derive(Debug)]
#[derive(Clone, Debug)]
struct BatchData<
const SEED_SIZE: usize,
Q: AccumulableQueryType,
Loading

0 comments on commit 07d8ce8

Please sign in to comment.