Skip to content

Commit

Permalink
Adopt prio 0.14.0 (#1673)
Browse files Browse the repository at this point in the history
The bulk of the changes here deal with the change to the representation
of `Prio3Histogram`. Since `prio` 0.14.x implements VDAF-06, taking this
change will break compatibility with DAP-04. This also breaks compatibility
with existing `divviup-api` versions, because it has to deal with the new
histogram representation. Integration tests with Daphne and divviup-ts
are disabled for the same reason.
  • Loading branch information
tgeoghegan authored Aug 16, 2023
1 parent e6d15bf commit 74aa957
Show file tree
Hide file tree
Showing 23 changed files with 110 additions and 237 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ janus_messages = { version = "0.5", path = "messages" }
k8s-openapi = { version = "0.18.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.82.2", default-features = false, features = ["client", "rustls-tls"] }
opentelemetry = { version = "0.19", features = ["metrics"] }
prio = { version = "0.12.2", features = ["multithreaded"] }
prio = { version = "0.14.0", features = ["multithreaded"] }
serde = { version = "1.0.183", features = ["derive"] }
serde_json = "1.0.103"
serde_test = "1.0.175"
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,8 @@ impl<C: Clock> TaskAggregator<C> {
VdafOps::Prio3SumVec(Arc::new(vdaf), verify_key)
}

VdafInstance::Prio3Histogram { buckets } => {
let vdaf = Prio3::new_histogram(2, buckets)?;
VdafInstance::Prio3Histogram { length } => {
let vdaf = Prio3::new_histogram(2, *length)?;
let verify_key = task.primary_vdaf_verify_key()?;
VdafOps::Prio3Histogram(Arc::new(vdaf), verify_key)
}
Expand Down
13 changes: 5 additions & 8 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
.await
}

(task::QueryType::TimeInterval, VdafInstance::Prio3Histogram { buckets }) => {
let vdaf = Arc::new(Prio3::new_histogram(2, buckets)?);
(task::QueryType::TimeInterval, VdafInstance::Prio3Histogram { length }) => {
let vdaf = Arc::new(Prio3::new_histogram(2, *length)?);
self.create_aggregation_jobs_for_time_interval_task_no_param::<PRIO3_VERIFY_KEY_LENGTH, Prio3Histogram>(task, vdaf)
.await
}
Expand Down Expand Up @@ -404,9 +404,9 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
max_batch_size,
batch_time_window_size,
},
VdafInstance::Prio3Histogram { buckets },
VdafInstance::Prio3Histogram { length },
) => {
let vdaf = Arc::new(Prio3::new_histogram(2, buckets)?);
let vdaf = Arc::new(Prio3::new_histogram(2, *length)?);
let max_batch_size = *max_batch_size;
let batch_time_window_size = *batch_time_window_size;
self.create_aggregation_jobs_for_fixed_size_task_no_param::<
Expand Down Expand Up @@ -660,10 +660,7 @@ mod tests {
};
use janus_core::{
task::{VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
test_util::{
dummy_vdaf::{self},
install_test_trace_subscriber,
},
test_util::{dummy_vdaf, install_test_trace_subscriber},
time::{Clock, DurationExt, IntervalExt, MockClock, TimeExt},
};
use janus_messages::{
Expand Down
14 changes: 2 additions & 12 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,8 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) {
(VdafInstance::Prio3CountVec { length: 64 }, Role::Helper),
(VdafInstance::Prio3Sum { bits: 64 }, Role::Helper),
(VdafInstance::Prio3Sum { bits: 32 }, Role::Helper),
(
VdafInstance::Prio3Histogram {
buckets: Vec::from([0, 100, 200, 400]),
},
Role::Leader,
),
(
VdafInstance::Prio3Histogram {
buckets: Vec::from([0, 25, 50, 75, 100]),
},
Role::Leader,
),
(VdafInstance::Prio3Histogram { length: 4 }, Role::Leader),
(VdafInstance::Prio3Histogram { length: 5 }, Role::Leader),
(VdafInstance::Poplar1 { bits: 8 }, Role::Helper),
(VdafInstance::Poplar1 { bits: 64 }, Role::Helper),
] {
Expand Down
6 changes: 3 additions & 3 deletions collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,8 @@ mod tests {
async fn successful_collect_prio3_histogram() {
install_test_trace_subscriber();
let mut server = mockito::Server::new_async().await;
let vdaf = Prio3::new_histogram(2, &[25, 50, 75, 100]).unwrap();
let transcript = run_vdaf(&vdaf, &random(), &(), &random(), &80);
let vdaf = Prio3::new_histogram(2, 4).unwrap();
let transcript = run_vdaf(&vdaf, &random(), &(), &random(), &3);
let collector = setup_collector(&mut server, vdaf);

let batch_interval = Interval::new(
Expand Down Expand Up @@ -1058,7 +1058,7 @@ mod tests {
),
chrono::Duration::seconds(3600),
),
Vec::from([0, 0, 0, 1, 0])
Vec::from([0, 0, 0, 1])
)
);

Expand Down
67 changes: 28 additions & 39 deletions core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rand::{distributions::Standard, prelude::Distribution};
use reqwest::Url;
use ring::constant_time;
use serde::{de::Error, Deserialize, Deserializer, Serialize};
use std::{fmt, str};
use std::str;

/// HTTP header where auth tokens are provided in messages between participants.
pub const DAP_AUTH_HEADER: &str = "DAP-Auth-Token";
Expand All @@ -30,11 +30,8 @@ pub enum VdafInstance {
Prio3Sum { bits: usize },
/// A vector of `Prio3` sums.
Prio3SumVec { bits: usize, length: usize },
/// A `Prio3` histogram.
Prio3Histogram {
#[derivative(Debug(format_with = "bucket_count"))]
buckets: Vec<u64>,
},
/// A `Prio3` histogram with `length` buckets in it.
Prio3Histogram { length: usize },
/// A `Prio3` 16-bit fixed point vector sum with bounded L2 norm.
#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPoint16BitBoundedL2VecSum { length: usize },
Expand All @@ -61,6 +58,22 @@ pub enum VdafInstance {
FakeFailsPrepStep,
}

impl VdafInstance {
/// Returns the expected length of a VDAF verification key for a VDAF of this type.
pub fn verify_key_length(&self) -> usize {
match self {
#[cfg(feature = "test-util")]
VdafInstance::Fake
| VdafInstance::FakeFailsPrepInit
| VdafInstance::FakeFailsPrepStep => 0,

// All "real" VDAFs use a verify key of length 16 currently. (Poplar1 may not, but it's
// not yet done being specified, so choosing 16 bytes is fine for testing.)
_ => PRIO3_VERIFY_KEY_LENGTH,
}
}
}

impl TryFrom<&taskprov::VdafType> for VdafInstance {
type Error = &'static str;

Expand All @@ -71,7 +84,10 @@ impl TryFrom<&taskprov::VdafType> for VdafInstance {
bits: *bits as usize,
}),
taskprov::VdafType::Prio3Histogram { buckets } => Ok(Self::Prio3Histogram {
buckets: buckets.clone(),
// taskprov does not yet deal with the VDAF-06 representation of histograms. In the
// meantime, we translate the bucket boundaries to a length that Janus understands.
// https://github.com/wangshan/draft-wang-ppm-dap-taskprov/issues/33
length: buckets.len() + 1, // +1 to account for the top bucket extending to infinity
}),
taskprov::VdafType::Poplar1 { bits } => Ok(Self::Poplar1 {
bits: *bits as usize,
Expand All @@ -81,26 +97,6 @@ impl TryFrom<&taskprov::VdafType> for VdafInstance {
}
}

fn bucket_count(buckets: &Vec<u64>, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[{} buckets]", buckets.len() + 1)
}

impl VdafInstance {
/// Returns the expected length of a VDAF verification key for a VDAF of this type.
pub fn verify_key_length(&self) -> usize {
match self {
#[cfg(feature = "test-util")]
VdafInstance::Fake
| VdafInstance::FakeFailsPrepInit
| VdafInstance::FakeFailsPrepStep => 0,

// All "real" VDAFs use a verify key of length 16 currently. (Poplar1 may not, but it's
// not yet done being specified, so choosing 16 bytes is fine for testing.)
_ => PRIO3_VERIFY_KEY_LENGTH,
}
}
}

/// Internal implementation details of [`vdaf_dispatch`](crate::vdaf_dispatch).
#[macro_export]
macro_rules! vdaf_dispatch_impl_base {
Expand Down Expand Up @@ -174,8 +170,8 @@ macro_rules! vdaf_dispatch_impl_base {
$body
}

::janus_core::task::VdafInstance::Prio3Histogram { buckets } => {
let $vdaf = ::prio::vdaf::prio3::Prio3::new_histogram(2, buckets)?;
::janus_core::task::VdafInstance::Prio3Histogram { length } => {
let $vdaf = ::prio::vdaf::prio3::Prio3::new_histogram(2, *length)?;
type $Vdaf = ::prio::vdaf::prio3::Prio3Histogram;
const $VERIFY_KEY_LENGTH: usize = ::janus_core::task::PRIO3_VERIFY_KEY_LENGTH;
$body
Expand Down Expand Up @@ -768,22 +764,15 @@ mod tests {
],
);
assert_tokens(
&VdafInstance::Prio3Histogram {
buckets: Vec::from([0, 100, 200, 400]),
},
&VdafInstance::Prio3Histogram { length: 6 },
&[
Token::StructVariant {
name: "VdafInstance",
variant: "Prio3Histogram",
len: 1,
},
Token::Str("buckets"),
Token::Seq { len: Some(4) },
Token::U64(0),
Token::U64(100),
Token::U64(200),
Token::U64(400),
Token::SeqEnd,
Token::Str("length"),
Token::U64(6),
Token::StructVariantEnd,
],
);
Expand Down
10 changes: 2 additions & 8 deletions integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,10 @@ fn json_encode_vdaf(vdaf: &VdafInstance) -> Value {
"bits": format!("{bits}"),
"length": format!("{length}"),
}),
VdafInstance::Prio3Histogram { buckets } => {
let buckets = Value::Array(
buckets
.iter()
.map(|value| Value::String(format!("{value}")))
.collect(),
);
VdafInstance::Prio3Histogram { length } => {
json!({
"type": "Prio3Histogram",
"buckets": buckets,
"length": format!("{length}"),
})
}
_ => panic!("VDAF {vdaf:?} is not yet supported"),
Expand Down
16 changes: 12 additions & 4 deletions integration_tests/src/divviup_api_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use http::{
header::{ACCEPT, CONTENT_TYPE},
Method,
Expand Down Expand Up @@ -35,9 +35,17 @@ impl TryFrom<&VdafInstance> for ApiVdaf {
match vdaf {
VdafInstance::Prio3Count => Ok(ApiVdaf::Count),
VdafInstance::Prio3Sum { bits } => Ok(ApiVdaf::Sum { bits: *bits }),
VdafInstance::Prio3Histogram { buckets } => Ok(ApiVdaf::Histogram {
buckets: buckets.clone(),
}),
VdafInstance::Prio3Histogram { length } => {
// divviup-api does not yet support the new Prio3Histogram representation. Until it
// does, we synthesize fake bucket boundaries that will yield the number of buckets
// we want.
// https://github.com/divviup/divviup-api/issues/410
Ok(ApiVdaf::Histogram {
buckets: (0..*length - 1)
.map(|length| u64::try_from(length).context("cannot convert length to u64"))
.collect::<Result<Vec<_>, _>>()?,
})
}
_ => Err(anyhow!("unsupported VDAF: {vdaf:?}")),
}
}
Expand Down
17 changes: 5 additions & 12 deletions integration_tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,21 +322,14 @@ pub async fn submit_measurements_and_verify_aggregate(
)
.await;
}
VdafInstance::Prio3Histogram { buckets } => {
let vdaf = Prio3::new_histogram(2, buckets).unwrap();
VdafInstance::Prio3Histogram { length } => {
let vdaf = Prio3::new_histogram(2, *length).unwrap();

let mut aggregate_result = vec![0; buckets.len() + 1];
aggregate_result.resize(buckets.len() + 1, 0);
let mut aggregate_result = vec![0; *length];
let measurements = iter::repeat_with(|| {
let choice = thread_rng().gen_range(0..=buckets.len());
let choice = thread_rng().gen_range(0..*length);
aggregate_result[choice] += 1;
let measurement = if choice == buckets.len() {
// This goes into the counter covering the range that extends to positive infinity.
buckets[buckets.len() - 1] + 1
} else {
buckets[choice]
};
measurement as u128
choice
})
.take(total_measurements)
.collect::<Vec<_>>();
Expand Down
1 change: 1 addition & 0 deletions integration_tests/tests/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async fn daphne_janus() {

// This test places Janus in the leader role & Daphne in the helper role.
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Daphne does not currently support DAP-05 (issue #1669)"]
async fn janus_daphne() {
install_test_trace_subscriber();

Expand Down
7 changes: 4 additions & 3 deletions integration_tests/tests/divviup_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,29 @@ async fn run_divviup_ts_integration_test(container_client: &Cli, vdaf: VdafInsta
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "divviup-ts does not currently support DAP-05 (issue #1669)"]
async fn janus_divviup_ts_count() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(&container_client(), VdafInstance::Prio3Count).await;
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "divviup-ts does not currently support DAP-05 (issue #1669)"]
async fn janus_divviup_ts_sum() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(&container_client(), VdafInstance::Prio3Sum { bits: 8 }).await;
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "divviup-ts does not currently support DAP-05 (issue #1669)"]
async fn janus_divviup_ts_histogram() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(
&container_client(),
VdafInstance::Prio3Histogram {
buckets: Vec::from([1, 10, 100, 1000]),
},
VdafInstance::Prio3Histogram { length: 4 },
)
.await;
}
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/tests/in_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ async fn in_cluster_sum() {
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "divviup-api does not currently support DAP-05 (https://github.com/divviup/divviup-api/issues/410)"]
async fn in_cluster_histogram() {
install_test_trace_subscriber();

// Start port forwards and set up task.
let buckets = Vec::from([3, 6, 8]);
let janus_pair = InClusterJanusPair::new(
VdafInstance::Prio3Histogram { buckets },
VdafInstance::Prio3Histogram { length: 4 },
QueryType::TimeInterval,
)
.await;
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/tests/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,11 @@ async fn janus_janus_sum_16() {
async fn janus_janus_histogram_4_buckets() {
install_test_trace_subscriber();

let buckets = Vec::from([3, 6, 8]);

// Start servers.
let container_client = container_client();
let janus_pair = JanusPair::new(
&container_client,
VdafInstance::Prio3Histogram { buckets },
VdafInstance::Prio3Histogram { length: 4 },
QueryType::TimeInterval,
)
.await;
Expand Down
6 changes: 3 additions & 3 deletions interop_binaries/src/bin/janus_interop_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ async fn handle_upload(
handle_upload_generic(http_client, vdaf_client, request, measurement).await?;
}

VdafInstance::Prio3Histogram { ref buckets } => {
let measurement = parse_primitive_measurement::<u128>(request.measurement.clone())?;
let vdaf_client = Prio3::new_histogram(2, buckets)
VdafInstance::Prio3Histogram { length } => {
let measurement = parse_primitive_measurement::<usize>(request.measurement.clone())?;
let vdaf_client = Prio3::new_histogram(2, length)
.context("failed to construct Prio3Histogram VDAF")?;
handle_upload_generic(http_client, vdaf_client, request, measurement).await?;
}
Expand Down
Loading

0 comments on commit 74aa957

Please sign in to comment.