Skip to content

Commit

Permalink
clean up messages
Browse files Browse the repository at this point in the history
  • Loading branch information
tgeoghegan committed Aug 26, 2023
1 parent fe362d8 commit 1301dad
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 51 deletions.
2 changes: 1 addition & 1 deletion aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ impl VdafOps {
prepare_init.report_share().metadata().id().as_ref(),
&public_share,
&input_share,
prepare_init.payload(),
prepare_init.message(),
)
.and_then(|transition| {
transition
Expand Down
5 changes: 2 additions & 3 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Implements portions of aggregation job continuation for the helper.
use super::error::handle_ping_pong_error;
use crate::aggregator::{accumulator::Accumulator, Error, VdafOps};
use futures::future::try_join_all;
use janus_aggregator_core::{
Expand All @@ -26,8 +27,6 @@ use std::sync::Arc;
use tokio::try_join;
use tracing::trace_span;

use super::error::handle_ping_pong_error;

impl VdafOps {
/// Step the helper's aggregation job to the next round of VDAF preparation using the round `n`
/// prepare state in `report_aggregations` with the round `n+1` broadcast prepare messages in
Expand Down Expand Up @@ -125,7 +124,7 @@ impl VdafOps {
.evaluate(vdaf.as_ref())
.and_then(|(state, _)| {
// Then continue with the incoming message.
vdaf.continued(ping_pong::Role::Helper, state, prep_step.payload())
vdaf.continued(ping_pong::Role::Helper, state, prep_step.message())
})
.and_then(|continued_value| match continued_value {
ping_pong::ContinuedValue::WithMessage {
Expand Down
8 changes: 4 additions & 4 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ mod tests {
transcript_1.public_share.get_encoded(),
corrupted_input_share,
),
prepare_init_1.payload().clone(),
prepare_init_1.message().clone(),
);

// prepare_init_2 fails decoding due to an issue with the input share.
Expand All @@ -1561,7 +1561,7 @@ mod tests {
.get_encoded(),
);

let prepare_init_2 = PrepareInit::new(report_share_2, prepare_init_2.payload().clone());
let prepare_init_2 = PrepareInit::new(report_share_2, prepare_init_2.message().clone());

// prepare_init_3 has an unknown HPKE config ID.
let (prepare_init_3, transcript_3) = prep_init_generator.next(&measurement);
Expand All @@ -1583,7 +1583,7 @@ mod tests {
&transcript_3.helper_input_share,
);

let prepare_init_3 = PrepareInit::new(report_share_3, prepare_init_3.payload().clone());
let prepare_init_3 = PrepareInit::new(report_share_3, prepare_init_3.message().clone());

// prepare_init_4 has already been aggregated in another aggregation job, with the same
// aggregation parameter.
Expand Down Expand Up @@ -2027,7 +2027,7 @@ mod tests {
transcript_same_id_corrupted.public_share.get_encoded(),
corrupted_input_share,
),
prepare_init_same_id_corrupted.payload().clone(),
prepare_init_same_id_corrupted.message().clone(),
);

// This report was encrypted with a global HPKE config that doesn't collide
Expand Down
4 changes: 1 addition & 3 deletions aggregator_core/src/datastore/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,9 +705,7 @@ where
pub enum ReportAggregationState<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> {
Start,
Waiting(
/// Current state of this report aggregation, which could either be A::PrepareState (we're
/// awaiting another prepare message from the peer aggregator) or A::OutputShare (we have
/// finished, but are waiting for the peer to finish before we commit the output share).
/// Most recent transition for this report aggregation.
ping_pong::Transition<SEED_SIZE, 16, A>,
),
Finished,
Expand Down
81 changes: 41 additions & 40 deletions messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use prio::{
decode_u16_items, decode_u32_items, encode_u16_items, encode_u32_items, CodecError, Decode,
Encode,
},
topology::ping_pong::Message,
topology::ping_pong,
};
use rand::{distributions::Standard, prelude::Distribution, Rng};
use serde::{
Expand Down Expand Up @@ -2119,15 +2119,15 @@ impl Decode for ReportShare {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PrepareInit {
report_share: ReportShare,
ping_pong_message: Message,
message: ping_pong::Message,
}

impl PrepareInit {
/// Constructs a new preparation initialization message from its components.
pub fn new(report_share: ReportShare, payload: Message) -> Self {
pub fn new(report_share: ReportShare, message: ping_pong::Message) -> Self {
Self {
report_share,
ping_pong_message: payload,
message,
}
}

Expand All @@ -2136,31 +2136,31 @@ impl PrepareInit {
&self.report_share
}

/// Gets the opaque payload associated with this prep init.
pub fn payload(&self) -> &Message {
&self.ping_pong_message
/// Gets the message associated with this prep init.
pub fn message(&self) -> &ping_pong::Message {
&self.message
}
}

impl Encode for PrepareInit {
fn encode(&self, bytes: &mut Vec<u8>) {
self.report_share.encode(bytes);
self.ping_pong_message.encode(bytes);
self.message.encode(bytes);
}

fn encoded_len(&self) -> Option<usize> {
Some(self.report_share.encoded_len()? + self.ping_pong_message.encoded_len()?)
Some(self.report_share.encoded_len()? + self.message.encoded_len()?)
}
}

impl Decode for PrepareInit {
fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
let report_share = ReportShare::decode(bytes)?;
let payload = Message::decode(bytes)?;
let message = ping_pong::Message::decode(bytes)?;

Ok(Self {
report_share,
ping_pong_message: payload,
message,
})
}
}
Expand Down Expand Up @@ -2216,7 +2216,7 @@ impl Decode for PrepareResp {
pub enum PrepareStepResult {
Continue {
#[derivative(Debug = "ignore")]
message: Message,
message: ping_pong::Message,
},
Finished,
Reject(PrepareError),
Expand Down Expand Up @@ -2253,7 +2253,7 @@ impl Decode for PrepareStepResult {
let val = u8::decode(bytes)?;
Ok(match val {
0 => {
let prep_msg = Message::decode(bytes)?;
let prep_msg = ping_pong::Message::decode(bytes)?;
Self::Continue { message: prep_msg }
}
1 => Self::Finished,
Expand Down Expand Up @@ -2297,48 +2297,48 @@ impl Decode for PrepareError {
}
}

/// DAP protocol message representing a request to contine preparation of a report share for
/// DAP protocol message representing a request to continue preparation of a report share for
/// aggregation.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PrepareContinue {
report_id: ReportId,
payload: Message,
message: ping_pong::Message,
}

impl PrepareContinue {
/// Constructs a new prepare continue from its components.
pub fn new(report_id: ReportId, payload: Message) -> Self {
Self { report_id, payload }
pub fn new(report_id: ReportId, message: ping_pong::Message) -> Self {
Self { report_id, message }
}

/// Gets the report ID associated with this prepare continue.
pub fn report_id(&self) -> &ReportId {
&self.report_id
}

/// Gets the payload associated with this prepare continue.
pub fn payload(&self) -> &Message {
&self.payload
/// Gets the message associated with this prepare continue.
pub fn message(&self) -> &ping_pong::Message {
&self.message
}
}

impl Encode for PrepareContinue {
fn encode(&self, bytes: &mut Vec<u8>) {
self.report_id.encode(bytes);
self.payload.encode(bytes);
self.message.encode(bytes);
}

fn encoded_len(&self) -> Option<usize> {
Some(self.report_id.encoded_len()? + self.payload.encoded_len()?)
Some(self.report_id.encoded_len()? + self.message.encoded_len()?)
}
}

impl Decode for PrepareContinue {
fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
let report_id = ReportId::decode(bytes)?;
let payload = Message::decode(bytes)?;
let message = ping_pong::Message::decode(bytes)?;

Ok(Self { report_id, payload })
Ok(Self { report_id, message })
}
}

Expand Down Expand Up @@ -2879,7 +2879,7 @@ mod tests {
use assert_matches::assert_matches;
use prio::{
codec::{CodecError, Decode, Encode},
topology::ping_pong::Message,
topology::ping_pong,
};
use serde_test::{assert_de_tokens_error, assert_tokens, Token};

Expand Down Expand Up @@ -4088,7 +4088,7 @@ mod tests {
Vec::from("543210"),
),
},
ping_pong_message: Message::Initialize {
message: ping_pong::Message::Initialize {
prep_share: Vec::from("012345"),
},
},
Expand Down Expand Up @@ -4121,7 +4121,7 @@ mod tests {
),
),
concat!(
// payload
// message
"00", // Message type
concat!(
"00000006", // length
Expand All @@ -4144,7 +4144,7 @@ mod tests {
Vec::from("abfd"),
),
},
ping_pong_message: Message::Finish {
message: ping_pong::Message::Finish {
prep_msg: Vec::new(),
},
},
Expand Down Expand Up @@ -4177,7 +4177,7 @@ mod tests {
),
),
concat!(
// payload
// message
"02", // Message type
concat!(
"00000000", // length
Expand All @@ -4198,7 +4198,7 @@ mod tests {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
]),
result: PrepareStepResult::Continue {
message: Message::Continue {
message: ping_pong::Message::Continue {
prep_msg: Vec::from("012345"),
prep_share: Vec::from("6789"),
},
Expand All @@ -4208,7 +4208,8 @@ mod tests {
"0102030405060708090A0B0C0D0E0F10", // report_id
"00", // prepare_step_result
concat!(
"01", // prep_msg
// message
"01", // message type
concat!(
"00000006", // length
"303132333435", // opaque data
Expand Down Expand Up @@ -4281,7 +4282,7 @@ mod tests {
Vec::from("543210"),
),
},
ping_pong_message: Message::Initialize {
message: ping_pong::Message::Initialize {
prep_share: Vec::from("012345"),
},
},
Expand All @@ -4300,7 +4301,7 @@ mod tests {
Vec::from("abfd"),
),
},
ping_pong_message: Message::Finish {
message: ping_pong::Message::Finish {
prep_msg: Vec::new(),
},
},
Expand Down Expand Up @@ -4348,7 +4349,7 @@ mod tests {
),
),
concat!(
// Payload
// message
"00", // Message type
concat!(
"00000006", // length
Expand Down Expand Up @@ -4384,7 +4385,7 @@ mod tests {
),
),
concat!(
// payload
// message
"02", // Message type
concat!(
"00000000", // length
Expand Down Expand Up @@ -4419,7 +4420,7 @@ mod tests {
Vec::from("543210"),
),
},
ping_pong_message: Message::Initialize {
message: ping_pong::Message::Initialize {
prep_share: Vec::from("012345"),
},
},
Expand All @@ -4438,7 +4439,7 @@ mod tests {
Vec::from("abfd"),
),
},
ping_pong_message: Message::Finish {
message: ping_pong::Message::Finish {
prep_msg: Vec::new(),
},
},
Expand Down Expand Up @@ -4546,15 +4547,15 @@ mod tests {
report_id: ReportId::from([
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
]),
payload: Message::Initialize {
message: ping_pong::Message::Initialize {
prep_share: Vec::from("012345"),
},
},
PrepareContinue {
report_id: ReportId::from([
16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1,
]),
payload: Message::Initialize {
message: ping_pong::Message::Initialize {
prep_share: Vec::from("012345"),
},
},
Expand Down Expand Up @@ -4602,7 +4603,7 @@ mod tests {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
]),
result: PrepareStepResult::Continue {
message: Message::Continue {
message: ping_pong::Message::Continue {
prep_msg: Vec::from("01234"),
prep_share: Vec::from("56789"),
},
Expand Down

0 comments on commit 1301dad

Please sign in to comment.