Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #329 from abetterinternet/timg/facilitator-metrics
Browse files Browse the repository at this point in the history
Scrape basic faciliator metrics
  • Loading branch information
tgeoghegan authored Jan 25, 2021
2 parents 74f82c2 + 2a8c2fa commit 05185b9
Show file tree
Hide file tree
Showing 10 changed files with 778 additions and 640 deletions.
1,081 changes: 499 additions & 582 deletions facilitator/Cargo.lock

Large diffs are not rendered by default.

20 changes: 11 additions & 9 deletions facilitator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,36 @@ base64 = "0.12.3"
chrono = { version ="0.4", features = ["serde"] }
clap = "2.33.3"
derivative = "2.1.1"
hyper = "0.13.8"
hyper-rustls = "0.21.0"
http = "^0.2"
hyper = "^0.14"
hyper-rustls = "^0.22"
jsonwebtoken = "7"
log = "0.4.11"
once_cell = "1.4"
pem = "0.8"
prio = "0.2"
prometheus = { version = "0.10", features = [ "push" ] }
prometheus = "0.11"
rand = "0.7"
regex = "1.4"
ring = { version = "0.16.15", features = ["std"] }
rusoto_core = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_s3 = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_sqs = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_sts = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_core = { version = "^0.46", default_features = false, features = ["rustls"] }
rusoto_s3 = { version = "^0.46", default_features = false, features = ["rustls"] }
rusoto_sqs = { version = "^0.46", default_features = false, features = ["rustls"] }
rusoto_sts = { version = "^0.46", default_features = false, features = ["rustls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.8.1"
structopt = "0.3"
tempfile = "3.1.0"
thiserror = "1.0"
tokio = { version = "0.2", features = ["rt-core", "io-util"] }
tokio = { version = "^1.0", features = ["full"] }
ureq = { version = "1.5.2", features = ["json"] }
urlencoding = "1.1.1"
uuid = { version = "0.8", features = ["serde", "v4"] }
warp = "^0.3"

[dev-dependencies]
assert_matches = "1.4.0"
mockito = "0.27.0"
rusoto_mock = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_mock = { version = "^0.46", default_features = false, features = ["rustls"] }
serde_test = "1.0"
7 changes: 7 additions & 0 deletions facilitator/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
IngestionDataSharePacket, IngestionHeader, InvalidPacket, Packet, SumPart,
ValidationHeader, ValidationPacket,
},
metrics::AggregateMetricsCollector,
transport::{SignableTransport, VerifiableAndDecryptableTransport, VerifiableTransport},
BatchSigningKey, Error,
};
Expand All @@ -30,6 +31,7 @@ pub struct BatchAggregator<'a> {
aggregation_batch: BatchWriter<'a, SumPart, InvalidPacket>,
share_processor_signing_key: &'a BatchSigningKey,
total_individual_clients: i64,
metrics_collector: Option<&'a AggregateMetricsCollector>,
}

impl<'a> BatchAggregator<'a> {
Expand Down Expand Up @@ -65,9 +67,14 @@ impl<'a> BatchAggregator<'a> {
),
share_processor_signing_key: &aggregation_transport.batch_signing_key,
total_individual_clients: 0,
metrics_collector: None,
})
}

pub fn set_metrics_collector(&mut self, collector: &'a AggregateMetricsCollector) {
self.metrics_collector = Some(collector);
}

/// Compute the sum part for all the provided batch IDs and write it out to
/// the aggregation transport. The provided callback is invoked after each
/// batch is aggregated.
Expand Down
2 changes: 1 addition & 1 deletion facilitator/src/aws_credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::runtime::{Builder, Runtime};

/// Constructs a basic runtime suitable for use in our single threaded context
pub(crate) fn basic_runtime() -> Result<Runtime> {
Ok(Builder::new().basic_scheduler().enable_all().build()?)
Ok(Builder::new_current_thread().enable_all().build()?)
}

// ------------- Everything below here was copied from rusoto/credential/src/lib.rs in the rusoto repo ---------------------------
Expand Down
112 changes: 72 additions & 40 deletions facilitator/src/bin/facilitator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use chrono::{prelude::Utc, NaiveDateTime};
use clap::{value_t, App, Arg, ArgMatches, SubCommand};
use log::{error, info};
use prio::encrypt::PrivateKey;
use prometheus::{register_counter, register_counter_vec, Counter};
use ring::signature::{
EcdsaKeyPair, KeyPair, UnparsedPublicKey, ECDSA_P256_SHA256_ASN1,
ECDSA_P256_SHA256_ASN1_SIGNING,
Expand All @@ -19,6 +18,7 @@ use facilitator::{
DataShareProcessorGlobalManifest, IngestionServerManifest, PortalServerGlobalManifest,
SpecificManifest,
},
metrics::{start_metrics_scrape_endpoint, AggregateMetricsCollector, IntakeMetricsCollector},
sample::{generate_ingestion_sample, SampleOutput},
task::{AggregationTask, AwsSqsTaskQueue, GcpPubSubTaskQueue, IntakeBatchTask, TaskQueue},
transport::{
Expand Down Expand Up @@ -70,6 +70,8 @@ trait AppArgumentAdder {
fn add_gcp_service_account_key_file_argument(self: Self) -> Self;

fn add_task_queue_arguments(self: Self) -> Self;

fn add_metrics_scrape_port_argument(self: Self) -> Self;
}

const SHARED_HELP: &str = "Storage arguments: Any flag ending in -input or -output can take an \
Expand Down Expand Up @@ -367,6 +369,17 @@ impl<'a, 'b> AppArgumentAdder for App<'a, 'b> {
.help("AWS region in which to use SQS"),
)
}

fn add_metrics_scrape_port_argument(self: App<'a, 'b>) -> App<'a, 'b> {
self.arg(
Arg::with_name("metrics-scrape-port")
.long("metrics-scrape-port")
.env("METRICS_SCRAPE_PORT")
.help("TCP port on which to expose Prometheus /metrics endpoint")
.default_value("8080")
.validator(num_validator::<u16>),
)
}
}

fn main() -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -666,6 +679,7 @@ fn main() -> Result<(), anyhow::Error> {
.add_storage_arguments(Entity::Peer, InOut::Output)
.add_storage_arguments(Entity::Own, InOut::Output)
.add_task_queue_arguments()
.add_metrics_scrape_port_argument()
)
.subcommand(
SubCommand::with_name("aggregate-worker")
Expand All @@ -686,6 +700,7 @@ fn main() -> Result<(), anyhow::Error> {
.add_packet_decryption_key_argument()
.add_batch_signing_key_arguments()
.add_task_queue_arguments()
.add_metrics_scrape_port_argument()
)
.get_matches();

Expand Down Expand Up @@ -760,6 +775,7 @@ fn intake_batch<F: FnMut()>(
batch_id: &str,
date: &str,
sub_matches: &ArgMatches,
metrics_collector: Option<&IntakeMetricsCollector>,
callback: F,
) -> Result<(), anyhow::Error> {
let mut intake_transport = intake_transport_from_args(sub_matches)?;
Expand Down Expand Up @@ -806,7 +822,27 @@ fn intake_batch<F: FnMut()>(
is_first_from_arg(sub_matches),
)?;

batch_intaker.generate_validation_share(callback)
if let Some(collector) = metrics_collector {
batch_intaker.set_metrics_collector(collector);
collector.intake_tasks_started.inc();
}

let result = batch_intaker.generate_validation_share(callback);

if let Some(collector) = metrics_collector {
match result {
Ok(()) => collector
.intake_tasks_finished
.with_label_values(&["success"])
.inc(),
Err(_) => collector
.intake_tasks_finished
.with_label_values(&["error"])
.inc(),
}
}

result
}

fn intake_batch_subcommand(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
Expand All @@ -815,37 +851,28 @@ fn intake_batch_subcommand(sub_matches: &ArgMatches) -> Result<(), anyhow::Error
sub_matches.value_of("batch-id").unwrap(),
sub_matches.value_of("date").unwrap(),
sub_matches,
None,
|| {}, // no-op callback
)
}

fn intake_batch_worker(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
let intake_started: Counter = register_counter!(
"intake_jobs_begun",
"Number of intake-batch jobs that started (on the facilitator side)"
)
.context("failed to register metrics counter for started intakes")?;

let intake_finished = register_counter_vec!(
"intake_jobs_finished",
"Number of intake-batch jobs that finished (on the facilitator side)",
&["status"]
)
.context("failed to register metrics counter for finished intakes")?;

let metrics_collector = IntakeMetricsCollector::new()?;
let scrape_port = value_t!(sub_matches.value_of("metrics-scrape-port"), u16)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port)?;
let mut queue = intake_task_queue_from_args(sub_matches)?;

loop {
if let Some(task_handle) = queue.dequeue()? {
info!("dequeued task: {}", task_handle);
intake_started.inc();
let task_start = Instant::now();

let result = intake_batch(
&task_handle.task.aggregation_id,
&task_handle.task.batch_id,
&task_handle.task.date,
sub_matches,
Some(&metrics_collector),
|| {
if let Err(e) =
queue.maybe_extend_task_deadline(&task_handle, &task_start.elapsed())
Expand All @@ -856,13 +883,9 @@ fn intake_batch_worker(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
);

match result {
Ok(_) => {
intake_finished.with_label_values(&["success"]).inc();
queue.acknowledge_task(task_handle)?;
}
Ok(_) => queue.acknowledge_task(task_handle)?,
Err(err) => {
error!("error while processing task {}: {:?}", task_handle, err);
intake_finished.with_label_values(&["error"]).inc();
queue.nacknowledge_task(task_handle)?;
}
}
Expand All @@ -878,6 +901,7 @@ fn aggregate<F: FnMut()>(
end: &str,
batches: Vec<(&str, &str)>,
sub_matches: &ArgMatches,
metrics_collector: Option<&AggregateMetricsCollector>,
callback: F,
) -> Result<()> {
let instance_name = sub_matches.value_of("instance-name").unwrap();
Expand Down Expand Up @@ -1006,7 +1030,27 @@ fn aggregate<F: FnMut()>(
&mut aggregation_transport,
)?;

aggregator.generate_sum_part(&parsed_batches, callback)
if let Some(collector) = metrics_collector {
aggregator.set_metrics_collector(collector);
collector.aggregate_tasks_started.inc();
}

let result = aggregator.generate_sum_part(&parsed_batches, callback);

if let Some(collector) = metrics_collector {
match result {
Ok(()) => collector
.aggregate_tasks_finished
.with_label_values(&["success"])
.inc(),
Err(_) => collector
.aggregate_tasks_finished
.with_label_values(&["error"])
.inc(),
}
}

result
}

fn aggregate_subcommand(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
Expand All @@ -1032,29 +1076,20 @@ fn aggregate_subcommand(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
sub_matches.value_of("aggregation-end").unwrap(),
batch_info,
sub_matches,
None,
|| {}, // no-op callback
)
}

fn aggregate_worker(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
let aggregation_started: Counter = register_counter!(
"aggregation_jobs_begun",
"Number of aggregation jobs that started (on the facilitator side)"
)
.context("failed to register counter for started aggregations")?;
let aggregation_finished = register_counter_vec!(
"aggregation_jobs_finished",
"Number of aggregation jobs that finished (on the facilitator side)",
&["status"]
)
.context("failed to register counter for finished aggregations")?;

let mut queue = aggregation_task_queue_from_args(sub_matches)?;
let metrics_collector = AggregateMetricsCollector::new()?;
let scrape_port = value_t!(sub_matches.value_of("metrics-scrape-port"), u16)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port)?;

loop {
if let Some(task_handle) = queue.dequeue()? {
info!("dequeued task: {}", task_handle);
aggregation_started.inc();
let task_start = Instant::now();

let batches: Vec<(&str, &str)> = task_handle
Expand All @@ -1069,6 +1104,7 @@ fn aggregate_worker(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
&task_handle.task.aggregation_end,
batches,
sub_matches,
Some(&metrics_collector),
|| {
if let Err(e) =
queue.maybe_extend_task_deadline(&task_handle, &task_start.elapsed())
Expand All @@ -1079,13 +1115,9 @@ fn aggregate_worker(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
);

match result {
Ok(_) => {
aggregation_finished.with_label_values(&["success"]).inc();
queue.acknowledge_task(task_handle)?;
}
Ok(_) => queue.acknowledge_task(task_handle)?,
Err(err) => {
error!("error while processing task {}: {:?}", task_handle, err);
aggregation_finished.with_label_values(&["error"]).inc();
queue.nacknowledge_task(task_handle)?;
}
}
Expand Down
10 changes: 10 additions & 0 deletions facilitator/src/intake.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
batch::{Batch, BatchReader, BatchWriter},
idl::{IngestionDataSharePacket, IngestionHeader, Packet, ValidationHeader, ValidationPacket},
metrics::IntakeMetricsCollector,
transport::{SignableTransport, VerifiableAndDecryptableTransport},
BatchSigningKey, Error,
};
Expand All @@ -25,6 +26,7 @@ pub struct BatchIntaker<'a> {
own_validation_batch_signing_key: &'a BatchSigningKey,
is_first: bool,
callback_cadence: u32,
metrics_collector: Option<&'a IntakeMetricsCollector>,
}

impl<'a> BatchIntaker<'a> {
Expand Down Expand Up @@ -56,6 +58,7 @@ impl<'a> BatchIntaker<'a> {
own_validation_batch_signing_key: &own_validation_transport.batch_signing_key,
is_first,
callback_cadence: 1000,
metrics_collector: None,
})
}

Expand All @@ -67,6 +70,12 @@ impl<'a> BatchIntaker<'a> {
self.callback_cadence = cadence;
}

/// Provide a collector in which metrics about this intake task will be
/// recorded.
pub fn set_metrics_collector(&mut self, collector: &'a IntakeMetricsCollector) {
self.metrics_collector = Some(collector);
}

/// Fetches the ingestion batch, validates the signatures over its header
/// and packet file, then computes validation shares and sends them to the
/// peer share processor. The provided callback is invoked once for every
Expand All @@ -78,6 +87,7 @@ impl<'a> BatchIntaker<'a> {
self.own_validation_batch.path(),
self.peer_validation_batch.path()
);

let ingestion_header = self.intake_batch.header(self.intake_public_keys)?;
ensure!(
ingestion_header.bins > 0,
Expand Down
1 change: 1 addition & 0 deletions facilitator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod http;
pub mod idl;
pub mod intake;
pub mod manifest;
pub mod metrics;
pub mod sample;
pub mod task;
pub mod test_utils;
Expand Down
Loading

0 comments on commit 05185b9

Please sign in to comment.