Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #190 from abetterinternet/release/narnia
Browse files Browse the repository at this point in the history
Forward merge release/narnia into main
  • Loading branch information
tgeoghegan authored Nov 11, 2020
2 parents 3db3697 + 0f4a884 commit 68ba2cf
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 80 deletions.
4 changes: 4 additions & 0 deletions deploy-tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type SpecificManifest struct {
// IngestionBucket is the region+name of the bucket that the data share
// processor which owns the manifest reads ingestion batches from.
IngestionBucket string `json:"ingestion-bucket"`
// IngestionIdentity is the ARN of the AWS IAM role that should be assumed
// by an ingestion server to write to this data share processor's ingestion
// bucket, if the ingestor does not have an AWS account of their own.
IngestionIdentity string `json:"ingestion-identity"`
// PeerValidationBucket is the region+name of the bucket that the data share
// processor which owns the manifest reads peer validation batches from.
PeerValidationBucket string `json:"peer-validation-bucket"`
Expand Down
7 changes: 4 additions & 3 deletions facilitator/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct BatchAggregator<'a> {
ingestion_transport: &'a mut VerifiableAndDecryptableTransport,
aggregation_batch: BatchWriter<'a, SumPart, InvalidPacket>,
share_processor_signing_key: &'a BatchSigningKey,
total_individual_clients: i64,
}

impl<'a> BatchAggregator<'a> {
Expand Down Expand Up @@ -57,6 +58,7 @@ impl<'a> BatchAggregator<'a> {
&mut *aggregation_transport.transport,
),
share_processor_signing_key: &aggregation_transport.batch_signing_key,
total_individual_clients: 0,
})
}

Expand Down Expand Up @@ -115,8 +117,6 @@ impl<'a> BatchAggregator<'a> {
.map(|f| u32::from(*f) as i64)
.collect();

let total_individual_clients = accumulator_server.total_shares().len() as i64;

let sum_signature = self.aggregation_batch.put_header(
&SumPart {
batch_uuids: batch_ids.iter().map(|pair| pair.0).collect(),
Expand All @@ -130,7 +130,7 @@ impl<'a> BatchAggregator<'a> {
aggregation_start_time: self.aggregation_start.timestamp_millis(),
aggregation_end_time: self.aggregation_end.timestamp_millis(),
packet_file_digest: invalid_packets_digest.as_ref().to_vec(),
total_individual_clients,
total_individual_clients: self.total_individual_clients,
},
&self.share_processor_signing_key.key,
)?;
Expand Down Expand Up @@ -282,6 +282,7 @@ impl<'a> BatchAggregator<'a> {
if !valid {
invalid_uuids.push(peer_validation_packet.uuid);
}
self.total_individual_clients += 1;
did_aggregate_shares = true;
break;
}
Expand Down
8 changes: 8 additions & 0 deletions facilitator/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl<'a, H: Header, P: Packet> BatchReader<'a, H, P> {
}
}

pub fn path(&self) -> String {
self.transport.path()
}

/// Return the parsed header from this batch, but only if its signature is
/// valid. The signature is checked by getting the key_identifier value from
/// the signature message, using that to obtain a public key from the
Expand Down Expand Up @@ -222,6 +226,10 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
}
}

pub fn path(&self) -> String {
self.transport.path()
}

/// Encode the provided header into Avro, sign that representation with the
/// provided key and write the header into the batch. Returns the signature
/// on success.
Expand Down
32 changes: 19 additions & 13 deletions facilitator/src/intake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use crate::{
transport::{SignableTransport, VerifiableAndDecryptableTransport},
BatchSigningKey, Error,
};
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, ensure, Context, Result};
use chrono::NaiveDateTime;
use log::info;
use prio::{encrypt::PrivateKey, finite_field::Field, server::Server};
use ring::signature::UnparsedPublicKey;
use std::{collections::HashMap, convert::TryFrom, iter::Iterator};
Expand All @@ -15,8 +16,8 @@ use uuid::Uuid;
/// sent by the ingestion server and emitting validation shares to the other
/// share processor.
pub struct BatchIntaker<'a> {
ingestion_batch: BatchReader<'a, IngestionHeader, IngestionDataSharePacket>,
ingestor_public_keys: &'a HashMap<String, UnparsedPublicKey<Vec<u8>>>,
intake_batch: BatchReader<'a, IngestionHeader, IngestionDataSharePacket>,
intake_public_keys: &'a HashMap<String, UnparsedPublicKey<Vec<u8>>>,
packet_decryption_keys: &'a Vec<PrivateKey>,
peer_validation_batch: BatchWriter<'a, ValidationHeader, ValidationPacket>,
peer_validation_batch_signing_key: &'a BatchSigningKey,
Expand All @@ -36,11 +37,11 @@ impl<'a> BatchIntaker<'a> {
is_first: bool,
) -> Result<BatchIntaker<'a>> {
Ok(BatchIntaker {
ingestion_batch: BatchReader::new(
intake_batch: BatchReader::new(
Batch::new_ingestion(aggregation_name, batch_id, date),
&mut *ingestion_transport.transport.transport,
),
ingestor_public_keys: &ingestion_transport.transport.batch_signing_public_keys,
intake_public_keys: &ingestion_transport.transport.batch_signing_public_keys,
packet_decryption_keys: &ingestion_transport.packet_decryption_keys,
peer_validation_batch: BatchWriter::new(
Batch::new_validation(aggregation_name, batch_id, date, is_first),
Expand All @@ -60,13 +61,18 @@ impl<'a> BatchIntaker<'a> {
/// and packet file, then computes validation shares and sends them to the
/// peer share processor.
pub fn generate_validation_share(&mut self) -> Result<()> {
let ingestion_header = self.ingestion_batch.header(self.ingestor_public_keys)?;
if ingestion_header.bins <= 0 {
return Err(anyhow!(
"invalid bins/dimension value {}",
ingestion_header.bins
));
}
info!(
"processing intake from {} and saving validity to {} and {}",
self.intake_batch.path(),
self.own_validation_batch.path(),
self.peer_validation_batch.path()
);
let ingestion_header = self.intake_batch.header(self.intake_public_keys)?;
ensure!(
ingestion_header.bins > 0,
"invalid bin count {}",
ingestion_header.bins
);

// Ideally, we would use the encryption_key_id in the ingestion packet
// to figure out which private key to use for decryption, but that field
Expand All @@ -82,7 +88,7 @@ impl<'a> BatchIntaker<'a> {
// Read all the ingestion packets, generate a verification message for
// each, and write them to the validation batch.
let mut ingestion_packet_reader =
self.ingestion_batch.packet_file_reader(&ingestion_header)?;
self.intake_batch.packet_file_reader(&ingestion_header)?;

let packet_file_digest = self.peer_validation_batch.multi_packet_file_writer(
vec![&mut self.own_validation_batch],
Expand Down
54 changes: 48 additions & 6 deletions facilitator/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct SpecificManifest {
/// Region and name of the ingestion S3 bucket owned by this data share
/// processor.
ingestion_bucket: String,
// The ARN of the AWS IAM role that should be assumed by an ingestion server
// to write to this data share processor's ingestion bucket, if the ingestor
// does not have an AWS account of their own.
ingestion_identity: String,
/// Region and name of the peer validation S3 bucket owned by this data
/// share processor.
peer_validation_bucket: String,
Expand Down Expand Up @@ -171,8 +175,13 @@ struct IngestionServerIdentity {
aws_iam_entity: Option<String>,
/// The numeric identifier of the GCP service account that this ingestion
/// server uses to authenticate via OIDC identity federation to access
/// ingestion buckets.
google_service_account: Option<u64>,
/// ingestion buckets. While this field's value is a number, facilitator
/// treats it as an opaque string.
google_service_account: Option<String>,
/// The email address of the GCP service account that this ingestion server
/// uses to authenticate via OIDC identity federation to access ingestion
/// buckets.
gcp_service_account_email: Option<String>,
}

/// Represents an ingestion server's global manifest.
Expand Down Expand Up @@ -480,6 +489,7 @@ mod tests {
}}
}},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}}
"#,
Expand Down Expand Up @@ -509,8 +519,9 @@ mod tests {
format: 0,
batch_signing_public_keys: expected_batch_keys,
packet_encryption_certificates: expected_packet_encryption_certificates,
ingestion_bucket: "us-west-1/ingestion".to_string(),
peer_validation_bucket: "us-west-1/validation".to_string(),
ingestion_bucket: "us-west-1/ingestion".to_owned(),
ingestion_identity: "arn:aws:iam:something:fake".to_owned(),
peer_validation_bucket: "us-west-1/validation".to_owned(),
};
assert_eq!(manifest, expected_manifest);
let batch_signing_keys = manifest.batch_signing_public_keys().unwrap();
Expand Down Expand Up @@ -559,6 +570,7 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
Expand All @@ -578,6 +590,7 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
Expand All @@ -597,9 +610,30 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
// Role ARN with wrong type
r#"
{
"format": 0,
"packet-encryption-certificates": {
"fake-key-1": {
"certificate": "who cares"
}
},
"batch-signing-public-keys": {
"fake-key-2": {
"expiration": "",
"public-key": "-----BEGIN PUBLIC KEY-----\nfoo\n-----END PUBLIC KEY-----"
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": 1,
"peer-validation-bucket": "us-west-1/validation"
}
"#,
];

for invalid_manifest in &invalid_manifests {
Expand Down Expand Up @@ -627,6 +661,7 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
Expand All @@ -646,6 +681,7 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
Expand All @@ -665,6 +701,7 @@ mod tests {
}
},
"ingestion-bucket": "us-west-1/ingestion",
"ingestion-identity": "arn:aws:iam:something:fake",
"peer-validation-bucket": "us-west-1/validation"
}
"#,
Expand Down Expand Up @@ -696,7 +733,8 @@ mod tests {
{
"format": 0,
"server-identity": {
"google-service-account": 123456789012345
"google-service-account": "112310747466759665351",
"gcp-service-account-email": "[email protected]"
},
"batch-signing-public-keys": {
"key-identifier-2": {
Expand Down Expand Up @@ -729,7 +767,11 @@ mod tests {
assert_eq!(manifest.server_identity.aws_iam_entity, None);
assert_eq!(
manifest.server_identity.google_service_account,
Some(123456789012345)
Some("112310747466759665351".to_owned())
);
assert_eq!(
manifest.server_identity.gcp_service_account_email,
Some("[email protected]".to_owned())
);
let batch_signing_public_keys = manifest.batch_signing_public_keys().unwrap();
batch_signing_public_keys.get("key-identifier-2").unwrap();
Expand Down
2 changes: 2 additions & 0 deletions facilitator/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ pub trait Transport {
/// Returns an std::io::Write instance into which the contents of the value
/// may be written.
fn put(&mut self, key: &str) -> Result<Box<dyn TransportWriter>>;

fn path(&self) -> String;
}
Loading

0 comments on commit 68ba2cf

Please sign in to comment.