From 1301dadd777afd5b7b0d0b5b49eccd4c3711ec03 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Sat, 26 Aug 2023 11:00:51 -0700 Subject: [PATCH] clean up messages --- aggregator/src/aggregator.rs | 2 +- .../aggregator/aggregation_job_continue.rs | 5 +- aggregator/src/aggregator/http_handlers.rs | 8 +- aggregator_core/src/datastore/models.rs | 4 +- messages/src/lib.rs | 81 ++++++++++--------- 5 files changed, 49 insertions(+), 51 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index ae68c5286..e86a0a053 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -1777,7 +1777,7 @@ impl VdafOps { prepare_init.report_share().metadata().id().as_ref(), &public_share, &input_share, - prepare_init.payload(), + prepare_init.message(), ) .and_then(|transition| { transition diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 035208247..f4d4b5800 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -1,5 +1,6 @@ //! Implements portions of aggregation job continuation for the helper. +use super::error::handle_ping_pong_error; use crate::aggregator::{accumulator::Accumulator, Error, VdafOps}; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -26,8 +27,6 @@ use std::sync::Arc; use tokio::try_join; use tracing::trace_span; -use super::error::handle_ping_pong_error; - impl VdafOps { /// Step the helper's aggregation job to the next round of VDAF preparation using the round `n` /// prepare state in `report_aggregations` with the round `n+1` broadcast prepare messages in @@ -125,7 +124,7 @@ impl VdafOps { .evaluate(vdaf.as_ref()) .and_then(|(state, _)| { // Then continue with the incoming message. - vdaf.continued(ping_pong::Role::Helper, state, prep_step.payload()) + vdaf.continued(ping_pong::Role::Helper, state, prep_step.message()) }) .and_then(|continued_value| match continued_value { ping_pong::ContinuedValue::WithMessage { diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 24644e291..c28390be7 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -1540,7 +1540,7 @@ mod tests { transcript_1.public_share.get_encoded(), corrupted_input_share, ), - prepare_init_1.payload().clone(), + prepare_init_1.message().clone(), ); // prepare_init_2 fails decoding due to an issue with the input share. @@ -1561,7 +1561,7 @@ mod tests { .get_encoded(), ); - let prepare_init_2 = PrepareInit::new(report_share_2, prepare_init_2.payload().clone()); + let prepare_init_2 = PrepareInit::new(report_share_2, prepare_init_2.message().clone()); // prepare_init_3 has an unknown HPKE config ID. let (prepare_init_3, transcript_3) = prep_init_generator.next(&measurement); @@ -1583,7 +1583,7 @@ mod tests { &transcript_3.helper_input_share, ); - let prepare_init_3 = PrepareInit::new(report_share_3, prepare_init_3.payload().clone()); + let prepare_init_3 = PrepareInit::new(report_share_3, prepare_init_3.message().clone()); // prepare_init_4 has already been aggregated in another aggregation job, with the same // aggregation parameter. @@ -2027,7 +2027,7 @@ mod tests { transcript_same_id_corrupted.public_share.get_encoded(), corrupted_input_share, ), - prepare_init_same_id_corrupted.payload().clone(), + prepare_init_same_id_corrupted.message().clone(), ); // This report was encrypted with a global HPKE config that doesn't collide diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 8845446b3..f58e7ec73 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -705,9 +705,7 @@ where pub enum ReportAggregationState> { Start, Waiting( - /// Current state of this report aggregation, which could either be A::PrepareState (we're - /// awaiting another prepare message from the peer aggregator) or A::OutputShare (we have - /// finished, but are waiting for the peer to finish before we commit the output share). + /// Most recent transition for this report aggregation. ping_pong::Transition, ), Finished, diff --git a/messages/src/lib.rs b/messages/src/lib.rs index 89fff1d68..34adf35c3 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -13,7 +13,7 @@ use prio::{ decode_u16_items, decode_u32_items, encode_u16_items, encode_u32_items, CodecError, Decode, Encode, }, - topology::ping_pong::Message, + topology::ping_pong, }; use rand::{distributions::Standard, prelude::Distribution, Rng}; use serde::{ @@ -2119,15 +2119,15 @@ impl Decode for ReportShare { #[derive(Clone, Debug, PartialEq, Eq)] pub struct PrepareInit { report_share: ReportShare, - ping_pong_message: Message, + message: ping_pong::Message, } impl PrepareInit { /// Constructs a new preparation initialization message from its components. - pub fn new(report_share: ReportShare, payload: Message) -> Self { + pub fn new(report_share: ReportShare, message: ping_pong::Message) -> Self { Self { report_share, - ping_pong_message: payload, + message, } } @@ -2136,31 +2136,31 @@ impl PrepareInit { &self.report_share } - /// Gets the opaque payload associated with this prep init. - pub fn payload(&self) -> &Message { - &self.ping_pong_message + /// Gets the message associated with this prep init. + pub fn message(&self) -> &ping_pong::Message { + &self.message } } impl Encode for PrepareInit { fn encode(&self, bytes: &mut Vec) { self.report_share.encode(bytes); - self.ping_pong_message.encode(bytes); + self.message.encode(bytes); } fn encoded_len(&self) -> Option { - Some(self.report_share.encoded_len()? + self.ping_pong_message.encoded_len()?) + Some(self.report_share.encoded_len()? + self.message.encoded_len()?) } } impl Decode for PrepareInit { fn decode(bytes: &mut Cursor<&[u8]>) -> Result { let report_share = ReportShare::decode(bytes)?; - let payload = Message::decode(bytes)?; + let message = ping_pong::Message::decode(bytes)?; Ok(Self { report_share, - ping_pong_message: payload, + message, }) } } @@ -2216,7 +2216,7 @@ impl Decode for PrepareResp { pub enum PrepareStepResult { Continue { #[derivative(Debug = "ignore")] - message: Message, + message: ping_pong::Message, }, Finished, Reject(PrepareError), @@ -2253,7 +2253,7 @@ impl Decode for PrepareStepResult { let val = u8::decode(bytes)?; Ok(match val { 0 => { - let prep_msg = Message::decode(bytes)?; + let prep_msg = ping_pong::Message::decode(bytes)?; Self::Continue { message: prep_msg } } 1 => Self::Finished, @@ -2297,18 +2297,18 @@ impl Decode for PrepareError { } } -/// DAP protocol message representing a request to contine preparation of a report share for +/// DAP protocol message representing a request to continue preparation of a report share for /// aggregation. #[derive(Clone, Debug, PartialEq, Eq)] pub struct PrepareContinue { report_id: ReportId, - payload: Message, + message: ping_pong::Message, } impl PrepareContinue { /// Constructs a new prepare continue from its components. - pub fn new(report_id: ReportId, payload: Message) -> Self { - Self { report_id, payload } + pub fn new(report_id: ReportId, message: ping_pong::Message) -> Self { + Self { report_id, message } } /// Gets the report ID associated with this prepare continue. @@ -2316,29 +2316,29 @@ impl PrepareContinue { &self.report_id } - /// Gets the payload associated with this prepare continue. - pub fn payload(&self) -> &Message { - &self.payload + /// Gets the message associated with this prepare continue. + pub fn message(&self) -> &ping_pong::Message { + &self.message } } impl Encode for PrepareContinue { fn encode(&self, bytes: &mut Vec) { self.report_id.encode(bytes); - self.payload.encode(bytes); + self.message.encode(bytes); } fn encoded_len(&self) -> Option { - Some(self.report_id.encoded_len()? + self.payload.encoded_len()?) + Some(self.report_id.encoded_len()? + self.message.encoded_len()?) } } impl Decode for PrepareContinue { fn decode(bytes: &mut Cursor<&[u8]>) -> Result { let report_id = ReportId::decode(bytes)?; - let payload = Message::decode(bytes)?; + let message = ping_pong::Message::decode(bytes)?; - Ok(Self { report_id, payload }) + Ok(Self { report_id, message }) } } @@ -2879,7 +2879,7 @@ mod tests { use assert_matches::assert_matches; use prio::{ codec::{CodecError, Decode, Encode}, - topology::ping_pong::Message, + topology::ping_pong, }; use serde_test::{assert_de_tokens_error, assert_tokens, Token}; @@ -4088,7 +4088,7 @@ mod tests { Vec::from("543210"), ), }, - ping_pong_message: Message::Initialize { + message: ping_pong::Message::Initialize { prep_share: Vec::from("012345"), }, }, @@ -4121,7 +4121,7 @@ mod tests { ), ), concat!( - // payload + // message "00", // Message type concat!( "00000006", // length @@ -4144,7 +4144,7 @@ mod tests { Vec::from("abfd"), ), }, - ping_pong_message: Message::Finish { + message: ping_pong::Message::Finish { prep_msg: Vec::new(), }, }, @@ -4177,7 +4177,7 @@ mod tests { ), ), concat!( - // payload + // message "02", // Message type concat!( "00000000", // length @@ -4198,7 +4198,7 @@ mod tests { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ]), result: PrepareStepResult::Continue { - message: Message::Continue { + message: ping_pong::Message::Continue { prep_msg: Vec::from("012345"), prep_share: Vec::from("6789"), }, @@ -4208,7 +4208,8 @@ mod tests { "0102030405060708090A0B0C0D0E0F10", // report_id "00", // prepare_step_result concat!( - "01", // prep_msg + // message + "01", // message type concat!( "00000006", // length "303132333435", // opaque data @@ -4281,7 +4282,7 @@ mod tests { Vec::from("543210"), ), }, - ping_pong_message: Message::Initialize { + message: ping_pong::Message::Initialize { prep_share: Vec::from("012345"), }, }, @@ -4300,7 +4301,7 @@ mod tests { Vec::from("abfd"), ), }, - ping_pong_message: Message::Finish { + message: ping_pong::Message::Finish { prep_msg: Vec::new(), }, }, @@ -4348,7 +4349,7 @@ mod tests { ), ), concat!( - // Payload + // message "00", // Message type concat!( "00000006", // length @@ -4384,7 +4385,7 @@ mod tests { ), ), concat!( - // payload + // message "02", // Message type concat!( "00000000", // length @@ -4419,7 +4420,7 @@ mod tests { Vec::from("543210"), ), }, - ping_pong_message: Message::Initialize { + message: ping_pong::Message::Initialize { prep_share: Vec::from("012345"), }, }, @@ -4438,7 +4439,7 @@ mod tests { Vec::from("abfd"), ), }, - ping_pong_message: Message::Finish { + message: ping_pong::Message::Finish { prep_msg: Vec::new(), }, }, @@ -4546,7 +4547,7 @@ mod tests { report_id: ReportId::from([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ]), - payload: Message::Initialize { + message: ping_pong::Message::Initialize { prep_share: Vec::from("012345"), }, }, @@ -4554,7 +4555,7 @@ mod tests { report_id: ReportId::from([ 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, ]), - payload: Message::Initialize { + message: ping_pong::Message::Initialize { prep_share: Vec::from("012345"), }, }, @@ -4602,7 +4603,7 @@ mod tests { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ]), result: PrepareStepResult::Continue { - message: Message::Continue { + message: ping_pong::Message::Continue { prep_msg: Vec::from("01234"), prep_share: Vec::from("56789"), },