Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #348 from abetterinternet/timg/aggregation-with-in…
Browse files Browse the repository at this point in the history
…valid-batches

facilitator: don't abort integration for bad batch
  • Loading branch information
tgeoghegan authored Feb 4, 2021
2 parents b6f77be + af63ea3 commit 0730cb4
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 18 deletions.
89 changes: 77 additions & 12 deletions facilitator/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<'a> BatchAggregator<'a> {
self.aggregation_batch.path(),
);
let mut invalid_uuids = Vec::new();
let mut included_batch_uuids = Vec::new();

let ingestion_header = self.ingestion_header(&batch_ids[0].0, &batch_ids[0].1)?;

Expand All @@ -107,7 +108,18 @@ impl<'a> BatchAggregator<'a> {
.collect::<Vec<Server>>();

for batch_id in batch_ids {
self.aggregate_share(&batch_id.0, &batch_id.1, &mut servers, &mut invalid_uuids)?;
if let Err(error) =
self.aggregate_share(&batch_id.0, &batch_id.1, &mut servers, &mut invalid_uuids)
{
log::info!(
"ignoring batch {}/{} due to error {:?}",
batch_id.0,
batch_id.1,
error
);
} else {
included_batch_uuids.push(batch_id.0);
}
callback();
}

Expand Down Expand Up @@ -145,7 +157,7 @@ impl<'a> BatchAggregator<'a> {

let sum_signature = self.aggregation_batch.put_header(
&SumPart {
batch_uuids: batch_ids.iter().map(|pair| pair.0).collect(),
batch_uuids: included_batch_uuids,
name: ingestion_header.name,
bins: ingestion_header.bins,
epsilon: ingestion_header.epsilon,
Expand Down Expand Up @@ -207,10 +219,37 @@ impl<'a> BatchAggregator<'a> {
Batch::new_validation(self.aggregation_name, batch_id, batch_date, !self.is_first),
&mut *self.peer_validation_transport.transport,
);
let peer_validation_header = peer_validation_batch
.header(&self.peer_validation_transport.batch_signing_public_keys)?;
let own_validation_header = own_validation_batch
.header(&self.own_validation_transport.batch_signing_public_keys)?;

let peer_validation_header = match peer_validation_batch
.header(&self.peer_validation_transport.batch_signing_public_keys)
{
Ok(header) => header,
Err(error) => {
if let Some(collector) = self.metrics_collector {
collector
.invalid_peer_validation_batches
.with_label_values(&["header"])
.inc();
}
return Err(error);
}
};

let own_validation_header = match own_validation_batch
.header(&self.own_validation_transport.batch_signing_public_keys)
{
Ok(header) => header,
Err(error) => {
if let Some(collector) = self.metrics_collector {
collector
.invalid_own_validation_batches
.with_label_values(&["header"])
.inc();
}
return Err(error);
}
};

let ingestion_header = ingestion_batch
.header(&self.ingestion_transport.transport.batch_signing_public_keys)?;

Expand Down Expand Up @@ -239,12 +278,38 @@ impl<'a> BatchAggregator<'a> {
// iterate over the ingestion packets. For each ingestion packet, if we
// have the corresponding validation packets and the proofs are good, we
// accumulate. Otherwise we drop the packet and move on.
let peer_validation_packets: HashMap<Uuid, ValidationPacket> = validation_packet_map(
&mut peer_validation_batch.packet_file_reader(&peer_validation_header)?,
)?;
let own_validation_packets: HashMap<Uuid, ValidationPacket> = validation_packet_map(
&mut own_validation_batch.packet_file_reader(&own_validation_header)?,
)?;
let mut peer_validation_packet_file_reader =
match peer_validation_batch.packet_file_reader(&peer_validation_header) {
Ok(reader) => reader,
Err(error) => {
if let Some(collector) = self.metrics_collector {
collector
.invalid_peer_validation_batches
.with_label_values(&["packet_file"])
.inc();
}
return Err(error);
}
};
let peer_validation_packets: HashMap<Uuid, ValidationPacket> =
validation_packet_map(&mut peer_validation_packet_file_reader)?;

let mut own_validation_packet_file_reader =
match own_validation_batch.packet_file_reader(&own_validation_header) {
Ok(reader) => reader,
Err(error) => {
if let Some(collector) = self.metrics_collector {
collector
.invalid_own_validation_batches
.with_label_values(&["packet_file"])
.inc();
}
return Err(error);
}
};

let own_validation_packets: HashMap<Uuid, ValidationPacket> =
validation_packet_map(&mut own_validation_packet_file_reader)?;

// Keep track of the ingestion packets we have seen so we can reject
// duplicates.
Expand Down
30 changes: 29 additions & 1 deletion facilitator/src/bin/facilitator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ trait AppArgumentAdder {
fn add_task_queue_arguments(self: Self) -> Self;

fn add_metrics_scrape_port_argument(self: Self) -> Self;

fn add_use_bogus_packet_file_digest_argument(self: Self) -> Self;
}

const SHARED_HELP: &str = "Storage arguments: Any flag ending in -input or -output can take an \
Expand Down Expand Up @@ -380,6 +382,26 @@ impl<'a, 'b> AppArgumentAdder for App<'a, 'b> {
.validator(num_validator::<u16>),
)
}

fn add_use_bogus_packet_file_digest_argument(self: App<'a, 'b>) -> App<'a, 'b> {
self.arg(
Arg::with_name("use-bogus-packet-file-digest")
.long("use-bogus-packet-file-digest")
.env("USE_BOGUS_PACKET_FILE_DIGEST")
.help("whether to tamper with validation batch headers")
.long_help(
"If set, then instead of the computed digest of the packet \
file, a fixed, bogusddigest will be inserted into the own \
and peer validation batch headers written out during \
intake tasks. This should only be used in test scenarios \
to simulate buggy data share processors.",
)
.value_name("BOOL")
.possible_value("true")
.possible_value("false")
.default_value("false"),
)
}
}

fn main() -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -543,7 +565,8 @@ fn main() -> Result<(), anyhow::Error> {
.add_storage_arguments(Entity::Ingestor, InOut::Input)
.add_manifest_base_url_argument(Entity::Peer)
.add_storage_arguments(Entity::Peer, InOut::Output)
.add_storage_arguments(Entity::Own, InOut::Output),
.add_storage_arguments(Entity::Own, InOut::Output)
.add_use_bogus_packet_file_digest_argument()
)
.subcommand(
SubCommand::with_name("aggregate")
Expand Down Expand Up @@ -680,6 +703,7 @@ fn main() -> Result<(), anyhow::Error> {
.add_storage_arguments(Entity::Own, InOut::Output)
.add_task_queue_arguments()
.add_metrics_scrape_port_argument()
.add_use_bogus_packet_file_digest_argument()
)
.subcommand(
SubCommand::with_name("aggregate-worker")
Expand Down Expand Up @@ -822,6 +846,10 @@ fn intake_batch<F: FnMut()>(
is_first_from_arg(sub_matches),
)?;

if let Some("true") = sub_matches.value_of("use-bogus-packet-file-digest") {
batch_intaker.set_use_bogus_packet_file_digest(true);
}

if let Some(collector) = metrics_collector {
batch_intaker.set_metrics_collector(collector);
collector.intake_tasks_started.inc();
Expand Down
22 changes: 21 additions & 1 deletion facilitator/src/intake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct BatchIntaker<'a> {
is_first: bool,
callback_cadence: u32,
metrics_collector: Option<&'a IntakeMetricsCollector>,
use_bogus_packet_file_digest: bool,
}

impl<'a> BatchIntaker<'a> {
Expand Down Expand Up @@ -59,6 +60,7 @@ impl<'a> BatchIntaker<'a> {
is_first,
callback_cadence: 1000,
metrics_collector: None,
use_bogus_packet_file_digest: false,
})
}

Expand All @@ -76,6 +78,13 @@ impl<'a> BatchIntaker<'a> {
self.metrics_collector = Some(collector);
}

/// Sets whether this BatchIntaker will use a bogus value for the packet
/// file digest when constructing the header of a validation batch. This is
/// intended only for testing.
pub fn set_use_bogus_packet_file_digest(&mut self, bogus: bool) {
self.use_bogus_packet_file_digest = bogus;
}

/// Fetches the ingestion batch, validates the signatures over its header
/// and packet file, then computes validation shares and sends them to the
/// peer share processor. The provided callback is invoked once for every
Expand Down Expand Up @@ -162,6 +171,17 @@ impl<'a> BatchIntaker<'a> {
},
)?;

// If the caller requested it, we insert a bogus packet file digest into
// the own and peer validaton batch headers instead of the real computed
// digest. This is meant to simulate a buggy peer data share processor,
// so that we can test how the aggregation step behaves.
let packet_file_digest = if self.use_bogus_packet_file_digest {
info!("using bogus packet file digest");
vec![0u8, 1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8]
} else {
packet_file_digest.as_ref().to_vec()
};

// Construct validation header and write it out
let header = ValidationHeader {
batch_uuid: ingestion_header.batch_uuid,
Expand All @@ -171,7 +191,7 @@ impl<'a> BatchIntaker<'a> {
prime: ingestion_header.prime,
number_of_servers: ingestion_header.number_of_servers,
hamming_weight: ingestion_header.hamming_weight,
packet_file_digest: packet_file_digest.as_ref().to_vec(),
packet_file_digest,
};
let peer_header_signature = self
.peer_validation_batch
Expand Down
18 changes: 18 additions & 0 deletions facilitator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ impl IntakeMetricsCollector {
pub struct AggregateMetricsCollector {
pub aggregate_tasks_started: IntCounter,
pub aggregate_tasks_finished: IntCounterVec,
pub invalid_own_validation_batches: IntCounterVec,
pub invalid_peer_validation_batches: IntCounterVec,
}

impl AggregateMetricsCollector {
Expand All @@ -100,9 +102,25 @@ impl AggregateMetricsCollector {
)
.context("failed to register metrics counter for finished aggregations")?;

let invalid_own_validation_batches = register_int_counter_vec!(
"facilitator_invalid_own_validation_batches",
"Number of invalid own validation batches encountered during aggregation",
&["reason"]
)
.context("failed to register metrics counter for invalid own validation batches")?;

let invalid_peer_validation_batches = register_int_counter_vec!(
"facilitator_invalid_peer_validation_batches",
"Number of invalid peer validation batches encountered during aggregation",
&["reason"]
)
.context("failed to register metrics counter for invalid peer validation batches")?;

Ok(AggregateMetricsCollector {
aggregate_tasks_started,
aggregate_tasks_finished,
invalid_own_validation_batches,
invalid_peer_validation_batches,
})
}
}
Loading

0 comments on commit 0730cb4

Please sign in to comment.