diff --git a/Cargo.toml b/Cargo.toml index 0020ce2ef..8d1dc95df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ itertools = "0.11.0" modinverse = "0.1.0" num-bigint = "0.4.4" once_cell = "1.18.0" -prio = { path = ".", features = ["crypto-dependencies"] } +prio = { path = ".", features = ["crypto-dependencies", "test-util"] } rand = "0.8" serde_json = "1.0" statrs = "0.16.0" @@ -58,6 +58,7 @@ experimental = ["bitvec", "fiat-crypto", "fixed", "num-bigint", "num-rational", multithreaded = ["rayon"] prio2 = ["crypto-dependencies", "hmac", "sha2"] crypto-dependencies = ["aes", "ctr"] +test-util = ["rand"] [workspace] members = [".", "binaries"] diff --git a/src/lib.rs b/src/lib.rs index a7ac4ddbb..c9d4e22c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,4 +30,5 @@ mod fp; pub mod idpf; mod polynomial; mod prng; +pub mod topology; pub mod vdaf; diff --git a/src/topology/mod.rs b/src/topology/mod.rs new file mode 100644 index 000000000..fdce6d722 --- /dev/null +++ b/src/topology/mod.rs @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: MPL-2.0 + +//! Implementations of some aggregator communication topologies specified in [VDAF]. +//! +//! [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-06#section-5.7 + +pub mod ping_pong; diff --git a/src/topology/ping_pong.rs b/src/topology/ping_pong.rs new file mode 100644 index 000000000..c55d4f638 --- /dev/null +++ b/src/topology/ping_pong.rs @@ -0,0 +1,968 @@ +// SPDX-License-Identifier: MPL-2.0 + +//! Implements the Ping-Pong Topology described in [VDAF]. This topology assumes there are exactly +//! two aggregators, designated "Leader" and "Helper". This topology is required for implementing +//! the [Distributed Aggregation Protocol][DAP]. +//! +//! [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 +//! [DAP]: https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap + +use crate::{ + codec::{decode_u32_items, encode_u32_items, CodecError, Decode, Encode, ParameterizedDecode}, + vdaf::{Aggregator, PrepareTransition, VdafError}, +}; +use std::fmt::Debug; + +/// Errors emitted by this module. +#[derive(Debug, thiserror::Error)] +pub enum PingPongError { + /// Error running prepare_init + #[error("vdaf.prepare_init: {0}")] + VdafPrepareInit(VdafError), + + /// Error running prepare_shares_to_prepare_message + #[error("vdaf.prepare_shares_to_prepare_message {0}")] + VdafPrepareSharesToPrepareMessage(VdafError), + + /// Error running prepare_next + #[error("vdaf.prepare_next {0}")] + VdafPrepareNext(VdafError), + + /// Error decoding a prepare share + #[error("decode prep share {0}")] + CodecPrepShare(CodecError), + + /// Error decoding a prepare message + #[error("decode prep message {0}")] + CodecPrepMessage(CodecError), + + /// Host is in an unexpected state + #[error("host state mismatch: in {found} expected {expected}")] + HostStateMismatch { + /// The state the host is in. + found: &'static str, + /// The state the host expected to be in. + expected: &'static str, + }, + + /// Message from peer indicates it is in an unexpected state + #[error("peer message mismatch: message is {found} expected {expected}")] + PeerMessageMismatch { + /// The state in the message from the peer. + found: &'static str, + /// The message expected from the peer. + expected: &'static str, + }, + + /// Internal error + #[error("internal error: {0}")] + InternalError(&'static str), +} + +/// Corresponds to `struct Message` in [VDAF's Ping-Pong Topology][VDAF]. All of the fields of the +/// variants are opaque byte buffers. This is because the ping-pong routines take responsibility for +/// decoding preparation shares and messages, which usually requires having the preparation state. +/// +/// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 +#[derive(Clone, PartialEq, Eq)] +pub enum PingPongMessage { + /// Corresponds to MessageType.initialize. + Initialize { + /// The leader's initial preparation share. + prep_share: Vec, + }, + /// Corresponds to MessageType.continue. + Continue { + /// The current round's preparation message. + prep_msg: Vec, + /// The next round's preparation share. + prep_share: Vec, + }, + /// Corresponds to MessageType.finish. + Finish { + /// The current round's preparation message. + prep_msg: Vec, + }, +} + +impl PingPongMessage { + fn variant(&self) -> &'static str { + match self { + Self::Initialize { .. } => "Initialize", + Self::Continue { .. } => "Continue", + Self::Finish { .. } => "Finish", + } + } +} + +impl Debug for PingPongMessage { + // We want `PingPongMessage` to implement `Debug`, but we don't want that impl to print out + // prepare shares or messages, because (1) their contents are sensitive and (2) their contents + // are long and not intelligible to humans. For both reasons they generally shouldn't get + // logged. Normally, we'd use the `derivative` crate to customize a derived `Debug`, but that + // crate has not been audited (in the `cargo vet` sense) so we can't use it here unless we audit + // 8,000+ lines of proc macros. + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple(self.variant()).finish() + } +} + +impl Encode for PingPongMessage { + fn encode(&self, bytes: &mut Vec) { + // The encoding includes an implicit discriminator byte, called MessageType in the VDAF + // spec. + match self { + Self::Initialize { prep_share } => { + 0u8.encode(bytes); + encode_u32_items(bytes, &(), prep_share); + } + Self::Continue { + prep_msg, + prep_share, + } => { + 1u8.encode(bytes); + encode_u32_items(bytes, &(), prep_msg); + encode_u32_items(bytes, &(), prep_share); + } + Self::Finish { prep_msg } => { + 2u8.encode(bytes); + encode_u32_items(bytes, &(), prep_msg); + } + } + } + + fn encoded_len(&self) -> Option { + match self { + Self::Initialize { prep_share } => Some(1 + 4 + prep_share.len()), + Self::Continue { + prep_msg, + prep_share, + } => Some(1 + 4 + prep_msg.len() + 4 + prep_share.len()), + Self::Finish { prep_msg } => Some(1 + 4 + prep_msg.len()), + } + } +} + +impl Decode for PingPongMessage { + fn decode(bytes: &mut std::io::Cursor<&[u8]>) -> Result { + let message_type = u8::decode(bytes)?; + Ok(match message_type { + 0 => { + let prep_share = decode_u32_items(&(), bytes)?; + Self::Initialize { prep_share } + } + 1 => { + let prep_msg = decode_u32_items(&(), bytes)?; + let prep_share = decode_u32_items(&(), bytes)?; + Self::Continue { + prep_msg, + prep_share, + } + } + 2 => { + let prep_msg = decode_u32_items(&(), bytes)?; + Self::Finish { prep_msg } + } + _ => return Err(CodecError::UnexpectedValue), + }) + } +} + +/// A transition in the pong-pong topology. This represents the `ping_pong_transition` function +/// defined in [VDAF]. +/// +/// # Discussion +/// +/// The obvious implementation of `ping_pong_transition` would be a method on trait +/// [`PingPongTopology`] that returns `(State, Message)`, and then `ContinuedValue::WithMessage` +/// would contain those values. But then DAP implementations would have to store relatively large +/// VDAF prepare shares between rounds of input preparation. +/// +/// Instead, this structure stores just the previous round's prepare state and the current round's +/// preprocessed prepare message. Their encoding is much smaller than the `(State, Message)` tuple, +/// which can always be recomputed with [`Self::evaluate`]. +/// +/// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 +#[derive(Clone, Debug, Eq)] +pub struct PingPongTransition< + const VERIFY_KEY_SIZE: usize, + const NONCE_SIZE: usize, + A: Aggregator, +> { + previous_prepare_state: A::PrepareState, + current_prepare_message: A::PrepareMessage, +} + +impl< + const VERIFY_KEY_SIZE: usize, + const NONCE_SIZE: usize, + A: Aggregator, + > PingPongTransition +{ + /// Evaluate this transition to obtain a new [`PingPongState`] and a [`PingPongMessage`] which + /// should be transmitted to the peer. + #[allow(clippy::type_complexity)] + pub fn evaluate( + &self, + vdaf: &A, + ) -> Result< + ( + PingPongState, + PingPongMessage, + ), + PingPongError, + > { + let prep_msg = self.current_prepare_message.get_encoded(); + + vdaf.prepare_next( + self.previous_prepare_state.clone(), + self.current_prepare_message.clone(), + ) + .map(|transition| match transition { + PrepareTransition::Continue(prep_state, prep_share) => ( + PingPongState::Continued(prep_state), + PingPongMessage::Continue { + prep_msg, + prep_share: prep_share.get_encoded(), + }, + ), + PrepareTransition::Finish(output_share) => ( + PingPongState::Finished(output_share), + PingPongMessage::Finish { prep_msg }, + ), + }) + .map_err(PingPongError::VdafPrepareNext) + } +} + +impl< + const VERIFY_KEY_SIZE: usize, + const NONCE_SIZE: usize, + A: Aggregator, + > PartialEq for PingPongTransition +{ + fn eq(&self, other: &Self) -> bool { + self.previous_prepare_state == other.previous_prepare_state + && self.current_prepare_message == other.current_prepare_message + } +} + +impl Encode + for PingPongTransition +where + A: Aggregator, + A::PrepareState: Encode, +{ + fn encode(&self, bytes: &mut Vec) { + self.previous_prepare_state.encode(bytes); + self.current_prepare_message.encode(bytes); + } + + fn encoded_len(&self) -> Option { + Some( + self.previous_prepare_state.encoded_len()? + + self.current_prepare_message.encoded_len()?, + ) + } +} + +impl + ParameterizedDecode for PingPongTransition +where + A: Aggregator, + A::PrepareState: ParameterizedDecode + PartialEq, + A::PrepareMessage: PartialEq, +{ + fn decode_with_param( + decoding_param: &PrepareStateDecode, + bytes: &mut std::io::Cursor<&[u8]>, + ) -> Result { + let previous_prepare_state = A::PrepareState::decode_with_param(decoding_param, bytes)?; + let current_prepare_message = + A::PrepareMessage::decode_with_param(&previous_prepare_state, bytes)?; + + Ok(Self { + previous_prepare_state, + current_prepare_message, + }) + } +} + +/// Corresponds to the `State` enumeration implicitly defined in [VDAF's Ping-Pong Topology][VDAF]. +/// VDAF describes `Start` and `Rejected` states, but the `Start` state is never instantiated in +/// code, and the `Rejected` state is represented as `std::result::Result::Err`, so this enum does +/// not include those variants. +/// +/// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum PingPongState< + const VERIFY_KEY_SIZE: usize, + const NONCE_SIZE: usize, + A: Aggregator, +> { + /// Preparation of the report will continue with the enclosed state. + Continued(A::PrepareState), + /// Preparation of the report is finished and has yielded the enclosed output share. + Finished(A::OutputShare), +} + +/// Values returned by [`PingPongTopology::leader_continued`] or +/// [`PingPongTopology::helper_continued`]. +#[derive(Clone, Debug)] +pub enum PingPongContinuedValue< + const VERIFY_KEY_SIZE: usize, + const NONCE_SIZE: usize, + A: Aggregator, +> { + /// The operation resulted in a new state and a message to transmit to the peer. + WithMessage { + /// The transition that will be executed. Call `PingPongTransition::evaluate` to obtain the + /// next + /// [`PingPongState`] and a [`PingPongMessage`] to transmit to the peer. + transition: PingPongTransition, + }, + /// The operation caused the host to finish preparation of the input share, yielding an output + /// share and no message for the peer. + FinishedNoMessage { + /// The output share which may now be accumulated. + output_share: A::OutputShare, + }, +} + +/// Extension trait on [`crate::vdaf::Aggregator`] which adds the [VDAF Ping-Pong Topology][VDAF]. +/// +/// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 +pub trait PingPongTopology: + Aggregator +{ + /// Specialization of [`PingPongState`] for this VDAF. + type State; + /// Specialization of [`PingPongContinuedValue`] for this VDAF. + type ContinuedValue; + /// Specializaton of [`PingPongTransition`] for this VDAF. + type Transition; + + /// Initialize leader state using the leader's input share. Corresponds to + /// `ping_pong_leader_init` in [VDAF]. + /// + /// If successful, the returned [`PingPongMessage`] (which will always be + /// `PingPongMessage::Initialize`) should be transmitted to the helper. The returned + /// [`PingPongState`] (which will always be `PingPongState::Continued`) should be used by the + /// leader along with the next [`PingPongMessage`] received from the helper as input to + /// [`Self::leader_continued`] to advance to the next round. + /// + /// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 + fn leader_initialized( + &self, + verify_key: &[u8; VERIFY_KEY_SIZE], + agg_param: &Self::AggregationParam, + nonce: &[u8; NONCE_SIZE], + public_share: &Self::PublicShare, + input_share: &Self::InputShare, + ) -> Result<(Self::State, PingPongMessage), PingPongError>; + + /// Initialize helper state using the helper's input share and the leader's first prepare share. + /// Corresponds to `ping_pong_helper_init` in the forthcoming `draft-irtf-cfrg-vdaf-07`. + /// + /// If successful, the returned [`PingPongTransition`] should be evaluated, yielding a + /// [`PingPongMessage`], which should be transmitted to the leader, and a [`PingPongState`]. + /// + /// If the state is `PingPongState::Continued`, then it should be used by the helper along with + /// the next `PingPongMessage` received from the leader as input to [`Self::helper_continued`] + /// to advance to the next round. The helper may store the `PingPongTransition` between rounds + /// of preparation instead of the `PingPongState` and `PingPongMessage`. + /// + /// If the state is `PingPongState::Finished`, then preparation is finished and the output share + /// may be accumulated. + /// + /// # Errors + /// + /// `inbound` must be `PingPongMessage::Initialize` or the function will fail. + fn helper_initialized( + &self, + verify_key: &[u8; VERIFY_KEY_SIZE], + agg_param: &Self::AggregationParam, + nonce: &[u8; NONCE_SIZE], + public_share: &Self::PublicShare, + input_share: &Self::InputShare, + inbound: &PingPongMessage, + ) -> Result, PingPongError>; + + /// Continue preparation based on the leader's current state and an incoming [`PingPongMessage`] + /// from the helper. Corresponds to `ping_pong_leader_continued` in [VDAF]. + /// + /// If successful, the returned [`PingPongContinuedValue`] will either be: + /// + /// - `PingPongContinuedValue::WithMessage { transition }`: `transition` should be evaluated, + /// yielding a [`PingPongMessage`], which should be transmitted to the helper, and a + /// [`PingPongState`]. + /// + /// If the state is `PingPongState::Continued`, then it should be used by the leader along + /// with the next `PingPongMessage` received from the helper as input to + /// [`Self::leader_continued`] to advance to the next round. The leader may store the + /// `PingPongTransition` between rounds of preparation instead of of the `PingPongState` and + /// `PingPongMessage`. + /// + /// If the state is `PingPongState::Finished`, then preparation is finished and the output + /// share may be accumulated. + /// + /// - `PingPongContinuedValue::FinishedNoMessage`: preparation is finished and the output share + /// may be accumulated. No message needs to be sent to the helper. + /// + /// # Errors + /// + /// `leader_state` must be `PingPongState::Continued` or the function will fail. + /// + /// `inbound` must not be `PingPongMessage::Initialize` or the function will fail. + /// + /// # Notes + /// + /// The specification of this function in [VDAF] takes the aggregation parameter. This version + /// does not, because [`crate::vdaf::Aggregator::prepare_preprocess`] does not take the + /// aggregation parameter. This may change in the future if/when [#670][issue] is addressed. + /// + /// + /// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 + /// [issue]: https://github.com/divviup/libprio-rs/issues/670 + fn leader_continued( + &self, + leader_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result; + + /// PingPongContinue preparation based on the helper's current state and an incoming + /// [`PingPongMessage`] from the leader. Corresponds to `ping_pong_helper_contnued` in [VDAF]. + /// + /// If successful, the returned [`PingPongContinuedValue`] will either be: + /// + /// - `PingPongContinuedValue::WithMessage { transition }`: `transition` should be evaluated, + /// yielding a [`PingPongMessage`], which should be transmitted to the leader, and a + /// [`PingPongState`]. + /// + /// If the state is `PingPongState::Continued`, then it should be used by the helper along + /// with the next `PingPongMessage` received from the leader as input to + /// [`Self::helper_continued`] to advance to the next round. The helper may store the + /// `PingPongTransition` between rounds of preparation instead of the `PingPongState` and + /// `PingPongMessage`. + /// + /// If the state is `PingPongState::Finished`, then preparation is finished and the output + /// share may be accumulated. + /// + /// - `PingPongContinuedValue::FinishedNoMessage`: preparation is finished and the output share + /// may be accumulated. No message needs to be sent to the leader. + /// + /// # Errors + /// + /// `helper_state` must be `PingPongState::Continued` or the function will fail. + /// + /// `inbound` must not be `PingPongMessage::Initialize` or the function will fail. + /// + /// # Notes + /// + /// The specification of this function in [VDAF] takes the aggregation parameter. This version + /// does not, because [`crate::vdaf::Aggregator::prepare_preprocess`] does not take the + /// aggregation parameter. This may change in the future if/when [#670][issue] is addressed. + /// + /// + /// [VDAF]: https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-vdaf-07#section-5.8 + /// [issue]: https://github.com/divviup/libprio-rs/issues/670 + fn helper_continued( + &self, + helper_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result; +} + +/// Private interfaces for implementing ping-pong +trait PingPongTopologyPrivate: + PingPongTopology +{ + fn continued( + &self, + is_leader: bool, + host_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result; +} + +impl + PingPongTopology for A +where + A: Aggregator, +{ + type State = PingPongState; + type ContinuedValue = PingPongContinuedValue; + type Transition = PingPongTransition; + + fn leader_initialized( + &self, + verify_key: &[u8; VERIFY_KEY_SIZE], + agg_param: &Self::AggregationParam, + nonce: &[u8; NONCE_SIZE], + public_share: &Self::PublicShare, + input_share: &Self::InputShare, + ) -> Result<(Self::State, PingPongMessage), PingPongError> { + self.prepare_init( + verify_key, + /* Leader */ 0, + agg_param, + nonce, + public_share, + input_share, + ) + .map(|(prep_state, prep_share)| { + ( + PingPongState::Continued(prep_state), + PingPongMessage::Initialize { + prep_share: prep_share.get_encoded(), + }, + ) + }) + .map_err(PingPongError::VdafPrepareInit) + } + + fn helper_initialized( + &self, + verify_key: &[u8; VERIFY_KEY_SIZE], + agg_param: &Self::AggregationParam, + nonce: &[u8; NONCE_SIZE], + public_share: &Self::PublicShare, + input_share: &Self::InputShare, + inbound: &PingPongMessage, + ) -> Result { + let (prep_state, prep_share) = self + .prepare_init( + verify_key, + /* Helper */ 1, + agg_param, + nonce, + public_share, + input_share, + ) + .map_err(PingPongError::VdafPrepareInit)?; + + let inbound_prep_share = if let PingPongMessage::Initialize { prep_share } = inbound { + Self::PrepareShare::get_decoded_with_param(&prep_state, prep_share) + .map_err(PingPongError::CodecPrepShare)? + } else { + return Err(PingPongError::PeerMessageMismatch { + found: inbound.variant(), + expected: "initialize", + }); + }; + + let current_prepare_message = self + .prepare_shares_to_prepare_message(agg_param, [inbound_prep_share, prep_share]) + .map_err(PingPongError::VdafPrepareSharesToPrepareMessage)?; + + Ok(PingPongTransition { + previous_prepare_state: prep_state, + current_prepare_message, + }) + } + + fn leader_continued( + &self, + leader_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result { + self.continued(true, leader_state, agg_param, inbound) + } + + fn helper_continued( + &self, + helper_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result { + self.continued(false, helper_state, agg_param, inbound) + } +} + +impl + PingPongTopologyPrivate for A +where + A: Aggregator, +{ + fn continued( + &self, + is_leader: bool, + host_state: Self::State, + agg_param: &Self::AggregationParam, + inbound: &PingPongMessage, + ) -> Result { + let host_prep_state = if let PingPongState::Continued(state) = host_state { + state + } else { + return Err(PingPongError::HostStateMismatch { + found: "finished", + expected: "continue", + }); + }; + + let (prep_msg, next_peer_prep_share) = match inbound { + PingPongMessage::Initialize { .. } => { + return Err(PingPongError::PeerMessageMismatch { + found: inbound.variant(), + expected: "continue", + }); + } + PingPongMessage::Continue { + prep_msg, + prep_share, + } => (prep_msg, Some(prep_share)), + PingPongMessage::Finish { prep_msg } => (prep_msg, None), + }; + + let prep_msg = Self::PrepareMessage::get_decoded_with_param(&host_prep_state, prep_msg) + .map_err(PingPongError::CodecPrepMessage)?; + let host_prep_transition = self + .prepare_next(host_prep_state, prep_msg) + .map_err(PingPongError::VdafPrepareNext)?; + + match (host_prep_transition, next_peer_prep_share) { + ( + PrepareTransition::Continue(next_prep_state, next_host_prep_share), + Some(next_peer_prep_share), + ) => { + let next_peer_prep_share = Self::PrepareShare::get_decoded_with_param( + &next_prep_state, + next_peer_prep_share, + ) + .map_err(PingPongError::CodecPrepShare)?; + let mut prep_shares = [next_peer_prep_share, next_host_prep_share]; + if is_leader { + prep_shares.reverse(); + } + let current_prepare_message = self + .prepare_shares_to_prepare_message(agg_param, prep_shares) + .map_err(PingPongError::VdafPrepareSharesToPrepareMessage)?; + + Ok(PingPongContinuedValue::WithMessage { + transition: PingPongTransition { + previous_prepare_state: next_prep_state, + current_prepare_message, + }, + }) + } + (PrepareTransition::Finish(output_share), None) => { + Ok(PingPongContinuedValue::FinishedNoMessage { output_share }) + } + (PrepareTransition::Continue(_, _), None) => { + return Err(PingPongError::PeerMessageMismatch { + found: inbound.variant(), + expected: "continue", + }) + } + (PrepareTransition::Finish(_), Some(_)) => { + return Err(PingPongError::PeerMessageMismatch { + found: inbound.variant(), + expected: "finish", + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::*; + use crate::vdaf::dummy; + use assert_matches::assert_matches; + + #[test] + fn ping_pong_one_round() { + let verify_key = []; + let aggregation_param = dummy::AggregationParam(0); + let nonce = [0; 16]; + #[allow(clippy::let_unit_value)] + let public_share = (); + let input_share = dummy::InputShare(0); + + let leader = dummy::Vdaf::new(1); + let helper = dummy::Vdaf::new(1); + + // Leader inits into round 0 + let (leader_state, leader_message) = leader + .leader_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + ) + .unwrap(); + + // Helper inits into round 1 + let (helper_state, helper_message) = helper + .helper_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + &leader_message, + ) + .unwrap() + .evaluate(&helper) + .unwrap(); + + // 1 round VDAF: helper should finish immediately. + assert_matches!(helper_state, PingPongState::Finished(_)); + + let leader_state = leader + .leader_continued(leader_state, &aggregation_param, &helper_message) + .unwrap(); + // 1 round VDAF: leader should finish when it gets helper message and emit no message. + assert_matches!( + leader_state, + PingPongContinuedValue::FinishedNoMessage { .. } + ); + } + + #[test] + fn ping_pong_two_rounds() { + let verify_key = []; + let aggregation_param = dummy::AggregationParam(0); + let nonce = [0; 16]; + #[allow(clippy::let_unit_value)] + let public_share = (); + let input_share = dummy::InputShare(0); + + let leader = dummy::Vdaf::new(2); + let helper = dummy::Vdaf::new(2); + + // Leader inits into round 0 + let (leader_state, leader_message) = leader + .leader_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + ) + .unwrap(); + + // Helper inits into round 1 + let (helper_state, helper_message) = helper + .helper_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + &leader_message, + ) + .unwrap() + .evaluate(&helper) + .unwrap(); + + // 2 round VDAF, round 1: helper should continue. + assert_matches!(helper_state, PingPongState::Continued(_)); + + let leader_state = leader + .leader_continued(leader_state, &aggregation_param, &helper_message) + .unwrap(); + // 2 round VDAF, round 1: leader should finish and emit a finish message. + let leader_message = assert_matches!( + leader_state, PingPongContinuedValue::WithMessage { transition } => { + let (state, message) = transition.evaluate(&leader).unwrap(); + assert_matches!(state, PingPongState::Finished(_)); + message + } + ); + + let helper_state = helper + .helper_continued(helper_state, &aggregation_param, &leader_message) + .unwrap(); + // 2 round vdaf, round 1: helper should finish and emit no message. + assert_matches!( + helper_state, + PingPongContinuedValue::FinishedNoMessage { .. } + ); + } + + #[test] + fn ping_pong_three_rounds() { + let verify_key = []; + let aggregation_param = dummy::AggregationParam(0); + let nonce = [0; 16]; + #[allow(clippy::let_unit_value)] + let public_share = (); + let input_share = dummy::InputShare(0); + + let leader = dummy::Vdaf::new(3); + let helper = dummy::Vdaf::new(3); + + // Leader inits into round 0 + let (leader_state, leader_message) = leader + .leader_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + ) + .unwrap(); + + // Helper inits into round 1 + let (helper_state, helper_message) = helper + .helper_initialized( + &verify_key, + &aggregation_param, + &nonce, + &public_share, + &input_share, + &leader_message, + ) + .unwrap() + .evaluate(&helper) + .unwrap(); + + // 3 round VDAF, round 1: helper should continue. + assert_matches!(helper_state, PingPongState::Continued(_)); + + let leader_state = leader + .leader_continued(leader_state, &aggregation_param, &helper_message) + .unwrap(); + // 3 round VDAF, round 1: leader should continue and emit a continue message. + let (leader_state, leader_message) = assert_matches!( + leader_state, PingPongContinuedValue::WithMessage { transition } => { + let (state, message) = transition.evaluate(&leader).unwrap(); + assert_matches!(state, PingPongState::Continued(_)); + (state, message) + } + ); + + let helper_state = helper + .helper_continued(helper_state, &aggregation_param, &leader_message) + .unwrap(); + // 3 round vdaf, round 2: helper should finish and emit a finish message. + let helper_message = assert_matches!( + helper_state, PingPongContinuedValue::WithMessage { transition } => { + let (state, message) = transition.evaluate(&helper).unwrap(); + assert_matches!(state, PingPongState::Finished(_)); + message + } + ); + + let leader_state = leader + .leader_continued(leader_state, &aggregation_param, &helper_message) + .unwrap(); + // 3 round VDAF, round 2: leader should finish and emit no message. + assert_matches!( + leader_state, + PingPongContinuedValue::FinishedNoMessage { .. } + ); + } + + #[test] + fn roundtrip_message() { + let messages = [ + ( + PingPongMessage::Initialize { + prep_share: Vec::from("prepare share"), + }, + concat!( + "00", // enum discriminant + concat!( + // prep_share + "0000000d", // length + "70726570617265207368617265", // contents + ), + ), + ), + ( + PingPongMessage::Continue { + prep_msg: Vec::from("prepare message"), + prep_share: Vec::from("prepare share"), + }, + concat!( + "01", // enum discriminant + concat!( + // prep_msg + "0000000f", // length + "70726570617265206d657373616765", // contents + ), + concat!( + // prep_share + "0000000d", // length + "70726570617265207368617265", // contents + ), + ), + ), + ( + PingPongMessage::Finish { + prep_msg: Vec::from("prepare message"), + }, + concat!( + "02", // enum discriminant + concat!( + // prep_msg + "0000000f", // length + "70726570617265206d657373616765", // contents + ), + ), + ), + ]; + + for (message, expected_hex) in messages { + let mut encoded_val = Vec::new(); + message.encode(&mut encoded_val); + let got_hex = hex::encode(&encoded_val); + assert_eq!( + &got_hex, expected_hex, + "Couldn't roundtrip (encoded value differs): {message:?}", + ); + let decoded_val = PingPongMessage::decode(&mut Cursor::new(&encoded_val)).unwrap(); + assert_eq!( + decoded_val, message, + "Couldn't roundtrip (decoded value differs): {message:?}" + ); + assert_eq!( + encoded_val.len(), + message.encoded_len().expect("No encoded length hint"), + "Encoded length hint is incorrect: {message:?}" + ) + } + } + + #[test] + fn roundtrip_transition() { + // VDAF implementations have tests for encoding/decoding their respective PrepareShare and + // PrepareMessage types, so we test here using the dummy VDAF. + let transition = PingPongTransition::<0, 16, dummy::Vdaf> { + previous_prepare_state: dummy::PrepareState::default(), + current_prepare_message: (), + }; + + let encoded = transition.get_encoded(); + let hex_encoded = hex::encode(&encoded); + + assert_eq!( + hex_encoded, + concat!( + concat!( + // previous_prepare_state + "00", // input_share + "00000000", // current_round + ), + // current_prepare_message (0 length encoding) + ) + ); + + let decoded = PingPongTransition::get_decoded_with_param(&(), &encoded).unwrap(); + assert_eq!(transition, decoded); + + assert_eq!( + encoded.len(), + transition.encoded_len().expect("No encoded length hint"), + ); + } +} diff --git a/src/vdaf.rs b/src/vdaf.rs index d7c1499f8..1a6c5f031 100644 --- a/src/vdaf.rs +++ b/src/vdaf.rs @@ -222,7 +222,7 @@ pub trait Client: Vdaf { /// The Aggregator's role in the execution of a VDAF. pub trait Aggregator: Vdaf { /// State of the Aggregator during the Prepare process. - type PrepareState: Clone + Debug; + type PrepareState: Clone + Debug + PartialEq + Eq; /// The type of messages sent by each aggregator at each round of the Prepare Process. /// @@ -235,7 +235,12 @@ pub trait Aggregator: Vda /// /// Decoding takes a [`Self::PrepareState`] as a parameter; this [`Self::PrepareState`] may be /// associated with any aggregator involved in the execution of the VDAF. - type PrepareMessage: Clone + Debug + ParameterizedDecode + Encode; + type PrepareMessage: Clone + + Debug + + PartialEq + + Eq + + ParameterizedDecode + + Encode; /// Begins the Prepare process with the other Aggregators. The [`Self::PrepareState`] returned /// is passed to [`Self::prepare_next`] to get this aggregator's first-round prepare message. @@ -393,7 +398,7 @@ pub trait Aggregatable: Clone + Debug + From { } /// An output share comprised of a vector of field elements. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct OutputShare(Vec); impl PartialEq for OutputShare { @@ -432,6 +437,12 @@ impl Encode for OutputShare { } } +impl Debug for OutputShare { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("OutputShare").finish() + } +} + /// An aggregate share comprised of a vector of field elements. /// /// This is suitable for VDAFs where both output shares and aggregate shares are vectors of field @@ -729,6 +740,8 @@ mod tests { } } +#[cfg(feature = "test-util")] +pub mod dummy; #[cfg(all(feature = "crypto-dependencies", feature = "experimental"))] #[cfg_attr( docsrs, diff --git a/src/vdaf/dummy.rs b/src/vdaf/dummy.rs new file mode 100644 index 000000000..507e7916b --- /dev/null +++ b/src/vdaf/dummy.rs @@ -0,0 +1,316 @@ +// SPDX-License-Identifier: MPL-2.0 + +//! Implementation of a dummy VDAF which conforms to the specification in [draft-irtf-cfrg-vdaf-06] +//! but does nothing. Useful for testing. +//! +//! [draft-irtf-cfrg-vdaf-06]: https://datatracker.ietf.org/doc/draft-irtf-cfrg-vdaf/06/ + +use crate::{ + codec::{CodecError, Decode, Encode}, + vdaf::{self, Aggregatable, PrepareTransition, VdafError}, +}; +use rand::random; +use std::{fmt::Debug, io::Cursor, sync::Arc}; + +type ArcPrepInitFn = + Arc Result<(), VdafError> + 'static + Send + Sync>; +type ArcPrepStepFn = Arc< + dyn Fn(&PrepareState) -> Result, VdafError> + + 'static + + Send + + Sync, +>; + +/// Dummy VDAF that does nothing. +#[derive(Clone)] +pub struct Vdaf { + prep_init_fn: ArcPrepInitFn, + prep_step_fn: ArcPrepStepFn, +} + +impl Debug for Vdaf { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Vdaf") + .field("prep_init_fn", &"[redacted]") + .field("prep_step_fn", &"[redacted]") + .finish() + } +} + +impl Vdaf { + /// The length of the verify key parameter for fake VDAF instantiations. + pub const VERIFY_KEY_LEN: usize = 0; + + /// Construct a new instance of the dummy VDAF. + pub fn new(rounds: u32) -> Self { + Self { + prep_init_fn: Arc::new(|_| -> Result<(), VdafError> { Ok(()) }), + prep_step_fn: Arc::new( + move |state| -> Result, VdafError> { + let new_round = state.current_round + 1; + if new_round == rounds { + Ok(PrepareTransition::Finish(OutputShare(state.input_share))) + } else { + Ok(PrepareTransition::Continue( + PrepareState { + current_round: new_round, + ..*state + }, + (), + )) + } + }, + ), + } + } + + /// Provide an alternate implementation of [`vdaf::Aggregator::prepare_init`]. + pub fn with_prep_init_fn Result<(), VdafError>>( + mut self, + f: F, + ) -> Self + where + F: 'static + Send + Sync, + { + self.prep_init_fn = Arc::new(f); + self + } + + /// Provide an alternate implementation of [`vdaf::Aggregator::prepare_step`]. + pub fn with_prep_step_fn< + F: Fn(&PrepareState) -> Result, VdafError>, + >( + mut self, + f: F, + ) -> Self + where + F: 'static + Send + Sync, + { + self.prep_step_fn = Arc::new(f); + self + } +} + +impl Default for Vdaf { + fn default() -> Self { + Self::new(1) + } +} + +impl vdaf::Vdaf for Vdaf { + const ID: u32 = 0xFFFF0000; + + type Measurement = u8; + type AggregateResult = u8; + type AggregationParam = AggregationParam; + type PublicShare = (); + type InputShare = InputShare; + type OutputShare = OutputShare; + type AggregateShare = AggregateShare; + + fn num_aggregators(&self) -> usize { + 2 + } +} + +impl vdaf::Aggregator<0, 16> for Vdaf { + type PrepareState = PrepareState; + type PrepareShare = (); + type PrepareMessage = (); + + fn prepare_init( + &self, + _verify_key: &[u8; 0], + _: usize, + aggregation_param: &Self::AggregationParam, + _nonce: &[u8; 16], + _: &Self::PublicShare, + input_share: &Self::InputShare, + ) -> Result<(Self::PrepareState, Self::PrepareShare), VdafError> { + (self.prep_init_fn)(aggregation_param)?; + Ok(( + PrepareState { + input_share: input_share.0, + current_round: 0, + }, + (), + )) + } + + fn prepare_shares_to_prepare_message>( + &self, + _: &Self::AggregationParam, + _: M, + ) -> Result { + Ok(()) + } + + fn prepare_next( + &self, + state: Self::PrepareState, + _: Self::PrepareMessage, + ) -> Result, VdafError> { + (self.prep_step_fn)(&state) + } + + fn aggregate>( + &self, + _: &Self::AggregationParam, + output_shares: M, + ) -> Result { + let mut aggregate_share = AggregateShare(0); + for output_share in output_shares { + aggregate_share.accumulate(&output_share)?; + } + Ok(aggregate_share) + } +} + +impl vdaf::Client<16> for Vdaf { + fn shard( + &self, + measurement: &Self::Measurement, + _nonce: &[u8; 16], + ) -> Result<(Self::PublicShare, Vec), VdafError> { + let first_input_share = random(); + let (second_input_share, _) = measurement.overflowing_sub(first_input_share); + Ok(( + (), + Vec::from([ + InputShare(first_input_share), + InputShare(second_input_share), + ]), + )) + } +} + +/// A dummy input share. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct InputShare(pub u8); + +impl Encode for InputShare { + fn encode(&self, bytes: &mut Vec) { + self.0.encode(bytes) + } + + fn encoded_len(&self) -> Option { + self.0.encoded_len() + } +} + +impl Decode for InputShare { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + Ok(Self(u8::decode(bytes)?)) + } +} + +/// Dummy aggregation parameter. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct AggregationParam(pub u8); + +impl Encode for AggregationParam { + fn encode(&self, bytes: &mut Vec) { + self.0.encode(bytes) + } + + fn encoded_len(&self) -> Option { + self.0.encoded_len() + } +} + +impl Decode for AggregationParam { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + Ok(Self(u8::decode(bytes)?)) + } +} + +/// Dummy output share. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct OutputShare(pub u8); + +impl Decode for OutputShare { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + Ok(Self(u8::decode(bytes)?)) + } +} + +impl Encode for OutputShare { + fn encode(&self, bytes: &mut Vec) { + self.0.encode(bytes); + } + + fn encoded_len(&self) -> Option { + self.0.encoded_len() + } +} + +/// Dummy prepare state. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct PrepareState { + input_share: u8, + current_round: u32, +} + +impl Encode for PrepareState { + fn encode(&self, bytes: &mut Vec) { + self.input_share.encode(bytes); + self.current_round.encode(bytes); + } + + fn encoded_len(&self) -> Option { + Some(self.input_share.encoded_len()? + self.current_round.encoded_len()?) + } +} + +impl Decode for PrepareState { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + let input_share = u8::decode(bytes)?; + let current_round = u32::decode(bytes)?; + + Ok(Self { + input_share, + current_round, + }) + } +} + +/// Dummy aggregate share. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct AggregateShare(pub u64); + +impl Aggregatable for AggregateShare { + type OutputShare = OutputShare; + + fn merge(&mut self, other: &Self) -> Result<(), VdafError> { + self.0 += other.0; + Ok(()) + } + + fn accumulate(&mut self, out_share: &Self::OutputShare) -> Result<(), VdafError> { + self.0 += u64::from(out_share.0); + Ok(()) + } +} + +impl From for AggregateShare { + fn from(out_share: OutputShare) -> Self { + Self(u64::from(out_share.0)) + } +} + +impl Decode for AggregateShare { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + let val = u64::decode(bytes)?; + Ok(Self(val)) + } +} + +impl Encode for AggregateShare { + fn encode(&self, bytes: &mut Vec) { + self.0.encode(bytes) + } + + fn encoded_len(&self) -> Option { + self.0.encoded_len() + } +} diff --git a/src/vdaf/poplar1.rs b/src/vdaf/poplar1.rs index 727e6f739..57868fb71 100644 --- a/src/vdaf/poplar1.rs +++ b/src/vdaf/poplar1.rs @@ -312,7 +312,7 @@ impl<'a, P, const SEED_SIZE: usize> ParameterizedDecode<(&'a Poplar1 { sketch: SketchState, output_share: Vec, @@ -332,6 +332,15 @@ impl ConstantTimeEq for PrepareState { } } +impl Debug for PrepareState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrepareState") + .field("sketch", &"[redacted]") + .field("output_share", &"[redacted]") + .finish() + } +} + impl Encode for PrepareState { fn encode(&self, bytes: &mut Vec) { self.sketch.encode(bytes); diff --git a/src/vdaf/prio3.rs b/src/vdaf/prio3.rs index 5dd224380..9c57231a0 100644 --- a/src/vdaf/prio3.rs +++ b/src/vdaf/prio3.rs @@ -942,7 +942,7 @@ where } /// State of each [`Aggregator`] during the Preparation phase. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Prio3PrepareState { measurement_share: Share, joint_rand_seed: Option>, @@ -973,6 +973,23 @@ impl ConstantTimeEq for Prio3PrepareS } } +impl Debug for Prio3PrepareState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Prio3PrepareState") + .field("measurement_share", &"[redacted]") + .field( + "joint_rand_seed", + match self.joint_rand_seed { + Some(_) => &"Some([redacted])", + None => &"None", + }, + ) + .field("agg_id", &self.agg_id) + .field("verifier_len", &self.verifier_len) + .finish() + } +} + impl Encode for Prio3PrepareState {