Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
facilitator: address review feedback
Browse files Browse the repository at this point in the history
 - Improve ring error representation
 - Rename `FileTransport` to `LocalFileTransport`
 - use `Result::expect` in tests and elsewhere
 - Simplify result handling in main()
 - better use of `map_err` and `?` in idl.rs
 - Derive `Eq` where possible
 - Delete unused implementations of `Transport`
  • Loading branch information
tgeoghegan committed Sep 28, 2020
1 parent a58064d commit fc697d1
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 461 deletions.
92 changes: 36 additions & 56 deletions facilitator/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<'a, H: Header, P: Packet> BatchReader<'a, H, P> {
.map_err(|e| Error::IoError("failed to read header from transport".to_owned(), e))?;

key.verify(&header_buf, &signature).map_err(|e| {
Error::CryptographyError("invalid signature on header".to_owned(), None, Some(e))
Error::CryptographyUnspecifiedError("invalid signature on header".to_owned(), e)
})?;

H::read(Cursor::new(header_buf))
Expand Down Expand Up @@ -213,10 +213,8 @@ impl<'a, H: Header, P: Packet> BatchReader<'a, H, P> {

// ... then verify the digest over it ...
if header.packet_file_digest().as_slice() != sidecar_writer.sidecar.finish().as_ref() {
return Err(Error::CryptographyError(
return Err(Error::MalformedDataPacketError(
"packet file digest does not match header".to_owned(),
None,
None,
));
}

Expand Down Expand Up @@ -261,7 +259,7 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
let header_signature = key
.sign(&SystemRandom::new(), &sidecar_writer.sidecar)
.map_err(|e| {
Error::CryptographyError("failed to sign header file".to_owned(), None, Some(e))
Error::CryptographyUnspecifiedError("failed to sign header file".to_owned(), e)
})?;
Ok(header_signature)
}
Expand Down Expand Up @@ -310,7 +308,7 @@ mod tests {
default_facilitator_signing_public_key, default_ingestor_private_key,
default_ingestor_public_key,
idl::{IngestionDataSharePacket, IngestionHeader},
transport::FileTransport,
transport::LocalFileTransport,
};

fn roundtrip_batch<'a>(
Expand All @@ -320,7 +318,7 @@ mod tests {
filenames: &[String],
batch_writer: &mut BatchWriter<'a, IngestionHeader, IngestionDataSharePacket>,
batch_reader: &BatchReader<'a, IngestionHeader, IngestionDataSharePacket>,
transport: &mut FileTransport,
transport: &mut LocalFileTransport,
write_key: &EcdsaKeyPair,
read_key: &UnparsedPublicKey<Vec<u8>>,
keys_match: bool,
Expand Down Expand Up @@ -357,17 +355,14 @@ mod tests {
},
];

let packet_file_digest = batch_writer.packet_file_writer(|mut packet_writer| {
packets[0].write(&mut packet_writer)?;
packets[1].write(&mut packet_writer)?;
packets[2].write(&mut packet_writer)?;
Ok(())
});
assert!(
packet_file_digest.is_ok(),
"failed to write packets: {:?}",
packet_file_digest.err()
);
let packet_file_digest = batch_writer
.packet_file_writer(|mut packet_writer| {
packets[0].write(&mut packet_writer)?;
packets[1].write(&mut packet_writer)?;
packets[2].write(&mut packet_writer)?;
Ok(())
})
.expect("failed to write packets");

let header = IngestionHeader {
batch_uuid: batch_id,
Expand All @@ -379,28 +374,21 @@ mod tests {
hamming_weight: None,
batch_start_time: 789456123,
batch_end_time: 789456321,
packet_file_digest: packet_file_digest.unwrap().as_ref().to_vec(),
packet_file_digest: packet_file_digest.as_ref().to_vec(),
};

let header_signature = batch_writer.put_header(&header, write_key);
assert!(
header_signature.is_ok(),
"failed to write header: {:?}",
header_signature.err()
);
let header_signature = batch_writer
.put_header(&header, write_key)
.expect("failed to write header");

let res = batch_writer.put_signature(&header_signature.unwrap());
let res = batch_writer.put_signature(&header_signature);
assert!(res.is_ok(), "failed to put signature: {:?}", res.err());

// Verify file layout is as expected
for extension in filenames {
let res = transport.get(&base_path.with_extension(extension));
assert!(
res.is_ok(),
"could not get batch file {}: {:?}",
extension,
res.err()
);
transport
.get(&base_path.with_extension(extension))
.expect(&format!("could not get batch file {}", extension));
}

let header_again = batch_reader.header(read_key);
Expand All @@ -420,22 +408,14 @@ mod tests {

assert_eq!(header, header_again, "header does not match");

let packet_file_reader = batch_reader.packet_file_reader(&header_again);
assert!(
packet_file_reader.is_ok(),
"failed to get packet file reader {:?}",
packet_file_reader.err()
);
let mut packet_file_reader = packet_file_reader.unwrap();
let mut packet_file_reader = batch_reader
.packet_file_reader(&header_again)
.expect("failed to get packet file reader");

for packet in packets {
let packet_again = IngestionDataSharePacket::read(&mut packet_file_reader);
assert!(
packet_again.is_ok(),
"failed to read packet: {:?}",
packet_again.is_err()
);
assert_eq!(packet, &packet_again.unwrap(), "packet does not match");
let packet_again = IngestionDataSharePacket::read(&mut packet_file_reader)
.expect("failed to read packet");
assert_eq!(packet, &packet_again, "packet does not match");
}

// One more read should get EOF
Expand All @@ -457,9 +437,9 @@ mod tests {

fn roundtrip_ingestion_batch(keys_match: bool) {
let tempdir = tempfile::TempDir::new().unwrap();
let mut write_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut write_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = LocalFileTransport::new(tempdir.path().to_path_buf());

let aggregation_name = "fake-aggregation";
let batch_id = Uuid::new_v4();
Expand Down Expand Up @@ -520,9 +500,9 @@ mod tests {

fn roundtrip_validation_batch(is_first: bool, keys_match: bool) {
let tempdir = tempfile::TempDir::new().unwrap();
let mut write_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut write_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = LocalFileTransport::new(tempdir.path().to_path_buf());

let aggregation_name = "fake-aggregation";
let batch_id = Uuid::new_v4();
Expand Down Expand Up @@ -605,9 +585,9 @@ mod tests {

fn roundtrip_sum_batch(is_first: bool, keys_match: bool) {
let tempdir = tempfile::TempDir::new().unwrap();
let mut write_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = FileTransport::new(tempdir.path().to_path_buf());
let mut write_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut read_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let mut verify_transport = LocalFileTransport::new(tempdir.path().to_path_buf());

let aggregation_name = "fake-aggregation";
let batch_id = Uuid::new_v4();
Expand Down
123 changes: 62 additions & 61 deletions facilitator/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::{prelude::Utc, NaiveDateTime};
use clap::{App, Arg, ArgMatches, SubCommand};
use facilitator::{
aggregation::BatchAggregator, intake::BatchIntaker, sample::generate_ingestion_sample,
transport::FileTransport, Error, DATE_FORMAT, DEFAULT_FACILITATOR_ECIES_PRIVATE_KEY,
transport::LocalFileTransport, Error, DATE_FORMAT, DEFAULT_FACILITATOR_ECIES_PRIVATE_KEY,
DEFAULT_FACILITATOR_SIGNING_PRIVATE_KEY, DEFAULT_INGESTOR_PRIVATE_KEY,
DEFAULT_PHA_ECIES_PRIVATE_KEY, DEFAULT_PHA_SIGNING_PRIVATE_KEY,
};
Expand Down Expand Up @@ -36,7 +36,7 @@ fn uuid_validator(s: String) -> Result<(), String> {

fn main() -> Result<(), Error> {
let matches = App::new("facilitator")
.about("Prio facilitator server")
.about("Prio data share processor")
// Environment variables are injected via build.rs
.version(&*format!(
"{} {} {}",
Expand Down Expand Up @@ -463,58 +463,59 @@ fn main() -> Result<(), Error> {
// The configuration of the Args above should guarantee that the
// various parameters are present and valid, so it is safe to use
// unwrap() here.
("generate-ingestion-sample", Some(sub_matches)) => generate_ingestion_sample(
&mut FileTransport::new(
Path::new(sub_matches.value_of("pha-output").unwrap()).to_path_buf(),
),
&mut FileTransport::new(
Path::new(sub_matches.value_of("facilitator-output").unwrap()).to_path_buf(),
),
&sub_matches
.value_of("batch-id")
.map_or_else(|| Uuid::new_v4(), |v| Uuid::parse_str(v).unwrap()),
&sub_matches.value_of("aggregation-id").unwrap(),
&sub_matches.value_of("date").map_or_else(
|| Utc::now().naive_utc(),
|v| NaiveDateTime::parse_from_str(&v, DATE_FORMAT).unwrap(),
),
&PrivateKey::from_base64(sub_matches.value_of("pha-private-key").unwrap()).unwrap(),
&PrivateKey::from_base64(sub_matches.value_of("facilitator-private-key").unwrap())
.unwrap(),
&base64::decode(sub_matches.value_of("ingestor-private-key").unwrap()).unwrap(),
sub_matches
.value_of("dimension")
.unwrap()
.parse::<i32>()
.unwrap(),
sub_matches
.value_of("packet-count")
.unwrap()
.parse::<usize>()
.unwrap(),
sub_matches
.value_of("epsilon")
.unwrap()
.parse::<f64>()
.unwrap(),
sub_matches
.value_of("batch-start-time")
.unwrap()
.parse::<i64>()
.unwrap(),
sub_matches
.value_of("batch-end-time")
.unwrap()
.parse::<i64>()
.unwrap(),
)
// Drop return value
.map(|_| ()),
("generate-ingestion-sample", Some(sub_matches)) => {
generate_ingestion_sample(
&mut LocalFileTransport::new(
Path::new(sub_matches.value_of("pha-output").unwrap()).to_path_buf(),
),
&mut LocalFileTransport::new(
Path::new(sub_matches.value_of("facilitator-output").unwrap()).to_path_buf(),
),
&sub_matches
.value_of("batch-id")
.map_or_else(|| Uuid::new_v4(), |v| Uuid::parse_str(v).unwrap()),
&sub_matches.value_of("aggregation-id").unwrap(),
&sub_matches.value_of("date").map_or_else(
|| Utc::now().naive_utc(),
|v| NaiveDateTime::parse_from_str(&v, DATE_FORMAT).unwrap(),
),
&PrivateKey::from_base64(sub_matches.value_of("pha-private-key").unwrap()).unwrap(),
&PrivateKey::from_base64(sub_matches.value_of("facilitator-private-key").unwrap())
.unwrap(),
&base64::decode(sub_matches.value_of("ingestor-private-key").unwrap()).unwrap(),
sub_matches
.value_of("dimension")
.unwrap()
.parse::<i32>()
.unwrap(),
sub_matches
.value_of("packet-count")
.unwrap()
.parse::<usize>()
.unwrap(),
sub_matches
.value_of("epsilon")
.unwrap()
.parse::<f64>()
.unwrap(),
sub_matches
.value_of("batch-start-time")
.unwrap()
.parse::<i64>()
.unwrap(),
sub_matches
.value_of("batch-end-time")
.unwrap()
.parse::<i64>()
.unwrap(),
)?;
Ok(())
}
("batch-intake", Some(sub_matches)) => {
let mut ingestion_transport = FileTransport::new(
let mut ingestion_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("ingestion-bucket").unwrap()).to_path_buf(),
);
let mut validation_transport = FileTransport::new(
let mut validation_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("validation-bucket").unwrap()).to_path_buf(),
);

Expand All @@ -535,8 +536,7 @@ fn main() -> Result<(), Error> {
Err(e) => {
return Err(Error::CryptographyError(
"failed to parse value for share-processor-private-key".to_owned(),
Some(e),
None,
e,
))
}
};
Expand All @@ -557,19 +557,20 @@ fn main() -> Result<(), Error> {
&share_processor_key,
&ingestor_pub_key,
)?;
batch_intaker.generate_validation_share()
batch_intaker.generate_validation_share()?;
Ok(())
}
("aggregate", Some(sub_matches)) => {
let mut ingestion_transport = FileTransport::new(
let mut ingestion_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("ingestion-bucket").unwrap()).to_path_buf(),
);
let mut own_validation_transport = FileTransport::new(
let mut own_validation_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("own-validation-bucket").unwrap()).to_path_buf(),
);
let mut peer_validation_transport = FileTransport::new(
let mut peer_validation_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("peer-validation-bucket").unwrap()).to_path_buf(),
);
let mut aggregation_transport = FileTransport::new(
let mut aggregation_transport = LocalFileTransport::new(
Path::new(sub_matches.value_of("aggregation-bucket").unwrap()).to_path_buf(),
);

Expand All @@ -587,8 +588,7 @@ fn main() -> Result<(), Error> {
Err(e) => {
return Err(Error::CryptographyError(
"failed to parse value for share-processor-private-key".to_owned(),
Some(e),
None,
e,
))
}
};
Expand Down Expand Up @@ -632,7 +632,8 @@ fn main() -> Result<(), Error> {
&peer_share_processor_pub_key,
&share_processor_ecies_key,
)?
.generate_sum_part(&batch_ids.into_iter().zip(batch_dates).collect())
.generate_sum_part(&batch_ids.into_iter().zip(batch_dates).collect())?;
Ok(())
}
(_, _) => Ok(()),
}
Expand Down
Loading

0 comments on commit fc697d1

Please sign in to comment.