Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #609 from abetterinternet/timg/slogger-remainder
Browse files Browse the repository at this point in the history
Adopt `slog::Logger` in remainder of crate and elminate `slog_scope`
  • Loading branch information
tgeoghegan authored Apr 29, 2021
2 parents 7bab2e0 + 5fc44e5 commit 443fac8
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 153 deletions.
18 changes: 0 additions & 18 deletions facilitator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion facilitator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ rusoto_sts = { version = "^0.46", default_features = false, features = ["rustls"
slog = { version = "2.7.0", features = ["max_level_trace"] }
slog-async = "2.6.0"
slog-json = "2.3.0"
slog-scope = "4.4.0"
slog-term = "2.8.0"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions facilitator/src/aws_credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ impl ProvideAwsCredentials for Provider {
/// Calls the provided closure, retrying with exponential backoff on failure if
/// the error is retryable (see retryable() in this module and its comments for
/// details).
pub(crate) fn retry_request<F, T, E>(action: &str, f: F) -> RusotoResult<T, E>
pub(crate) fn retry_request<F, T, E>(logger: &Logger, f: F) -> RusotoResult<T, E>
where
F: FnMut() -> RusotoResult<T, E>,
E: Debug,
{
retries::retry_request(action, f, |rusoto_error| retryable(rusoto_error))
retries::retry_request(logger, f, |rusoto_error| retryable(rusoto_error))
}

/// Returns true if the error is transient and should be retried, false
Expand Down
9 changes: 3 additions & 6 deletions facilitator/src/bin/facilitator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,7 @@ fn main() -> Result<(), anyhow::Error> {

let force_json_log_output = value_t!(matches.value_of("force-json-log-output"), bool)?;

// We must keep _scope_logger_guard live or the global logger will be
// dropped and messages from modules that don't use their own slog::Logger
// will be discarded.
let (root_logger, _scope_logger_guard) = setup_logging(&LoggingConfiguration {
let root_logger = setup_logging(&LoggingConfiguration {
force_json_output: force_json_log_output,
version_string: option_env!("BUILD_INFO").unwrap_or("(BUILD_INFO unavailable)"),
log_level: option_env!("RUST_LOG").unwrap_or("INFO"),
Expand Down Expand Up @@ -1305,7 +1302,7 @@ fn intake_batch_worker(
) -> Result<(), anyhow::Error> {
let metrics_collector = IntakeMetricsCollector::new()?;
let scrape_port = value_t!(sub_matches.value_of("metrics-scrape-port"), u16)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port, parent_logger)?;
let mut queue = intake_task_queue_from_args(sub_matches, parent_logger)?;

loop {
Expand Down Expand Up @@ -1570,7 +1567,7 @@ fn aggregate_worker(sub_matches: &ArgMatches, parent_logger: &Logger) -> Result<
let mut queue = aggregation_task_queue_from_args(sub_matches, parent_logger)?;
let metrics_collector = AggregateMetricsCollector::new()?;
let scrape_port = value_t!(sub_matches.value_of("metrics-scrape-port"), u16)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port)?;
let _runtime = start_metrics_scrape_endpoint(scrape_port, parent_logger)?;

loop {
if let Some(task_handle) = queue.dequeue()? {
Expand Down
18 changes: 12 additions & 6 deletions facilitator/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::{Context, Result};
use slog::Logger;
use slog::{o, Logger};
use std::{convert::From, default::Default, fmt::Debug, time::Duration};
use ureq::{Agent, AgentBuilder, Request, Response, SerdeValue};
use url::Url;

use crate::retries::retry_request;
use crate::{logging::event, retries::retry_request};

/// Method contains the HTTP methods supported by this crate.
#[derive(Debug)]
Expand Down Expand Up @@ -103,8 +103,9 @@ impl RetryingAgent {
request: &Request,
body: &SerdeValue,
) -> Result<Response> {
let request_logger = self.logger.new(o!(event::ACTION => "send json request"));
retry_request(
"send json request",
&request_logger,
|| request.clone().send_json(body.clone()),
|ureq_error| self.is_error_retryable(ureq_error),
)
Expand All @@ -113,8 +114,9 @@ impl RetryingAgent {

/// Send the provided request with the provided bytes as the body.
pub(crate) fn send_bytes(&self, request: &Request, data: &[u8]) -> Result<Response> {
let request_logger = self.logger.new(o!(event::ACTION => "send bytes"));
retry_request(
"send bytes",
&request_logger,
|| request.clone().send_bytes(data),
|ureq_error| self.is_error_retryable(ureq_error),
)
Expand All @@ -123,8 +125,9 @@ impl RetryingAgent {

/// Send the provided request with the provided string as the body.
pub(crate) fn send_string(&self, request: &Request, data: &str) -> Result<Response> {
let request_logger = self.logger.new(o!(event::ACTION => "send string"));
retry_request(
"send string",
&request_logger,
|| request.clone().send_string(data),
|ureq_error| self.is_error_retryable(ureq_error),
)
Expand All @@ -133,8 +136,11 @@ impl RetryingAgent {

/// Send the provided request with no body.
pub(crate) fn call(&self, request: &Request) -> Result<Response> {
let request_logger = self
.logger
.new(o!(event::ACTION => "send request without body"));
retry_request(
"send request without body",
&request_logger,
|| request.clone().call(),
|ureq_error| self.is_error_retryable(ureq_error),
)
Expand Down
29 changes: 8 additions & 21 deletions facilitator/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use atty::{self, Stream};
use serde::Serialize;
use slog::{o, Drain, FnValue, Level, LevelFilter, Logger, PushFnValue};
use slog_json::Json;
use slog_scope::{self, GlobalLoggerGuard};
use slog_term::{FullFormat, PlainSyncDecorator, TermDecorator, TestStdoutWriter};
use std::{
convert::From,
Expand Down Expand Up @@ -48,6 +47,8 @@ pub mod event {
pub(crate) const TASK_ACKNOWLEDGEMENT_ID: EventKey = "task_ack_id";
/// Unique identifier for a task queue
pub(crate) const TASK_QUEUE_ID: EventKey = "task_queue-id";
/// Description of an action being retried
pub(crate) const ACTION: EventKey = "action";
}

/// Severity maps `log::Level` to Google Cloud Platform's notion of Severity.
Expand Down Expand Up @@ -112,22 +113,13 @@ trait IoErrorDrain: Drain<Ok = (), Err = std::io::Error> + Send {}
impl IoErrorDrain for Json<Stderr> {}
impl IoErrorDrain for FullFormat<TermDecorator> {}

/// Initialize logging resources. On success, returns a tuple consisting of:
///
/// - a root [`slog::Logger`][1]
/// - a `GlobalLoggerGuard` wrapping a [`slog_scope` global logger][2].
///
/// Child loggers should be created from the root `Logger` so that modules can
/// add more key-value pairs to the events they log. The `GlobalLoggerGuard`
/// must be kept live by the caller to enable the `slog_scope` global logger to
/// function in modules that haven't yet opted into managing their own
/// `slog::Logger`.
///
/// Returns an error if `LoggingConfiguration` is invalid.
/// Initialize logging resources. On success, returns a root [`slog::Logger`][1]
/// from which modules should create child loggers to add more key-value pairs
/// to the events they log. Returns an error if `LoggingConfiguration` is
/// invalid.
///
/// [1]: https://docs.rs/slog/2.7.0/slog/struct.Logger.html
/// [2]: https://docs.rs/slog-scope/4.4.0/slog_scope/
pub fn setup_logging(config: &LoggingConfiguration) -> Result<(Logger, GlobalLoggerGuard)> {
pub fn setup_logging(config: &LoggingConfiguration) -> Result<Logger> {
// We have to box the Drain so that both branches return the same type
let drain: Box<dyn IoErrorDrain> = if atty::isnt(Stream::Stderr) || config.force_json_output {
// If stderr is not a tty, output logs as JSON structures on the
Expand Down Expand Up @@ -184,12 +176,7 @@ pub fn setup_logging(config: &LoggingConfiguration) -> Result<(Logger, GlobalLog
),
);

// Over time, all modules in `facilitator` should begin using custom
// `slog::Loggers` decorated with appropriate k-v pairs, and we can stop
// creating a global `slog_scope` logger.
let slog_scope_guard = slog_scope::set_global_logger(root_logger.new(o!()));

Ok((root_logger, slog_scope_guard))
Ok(root_logger)
}

/// Initialize logging for unit or integration tests. Must be public for
Expand Down
19 changes: 14 additions & 5 deletions facilitator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use http::Response;
use prometheus::{
register_int_counter, register_int_counter_vec, Encoder, IntCounter, IntCounterVec, TextEncoder,
};
use slog_scope::{error, info};
use slog::{error, info, o, Logger};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::runtime::Runtime;
use warp::Filter;
Expand All @@ -12,13 +12,19 @@ use warp::Filter;
/// from this instance. On success, returns a Runtime value that the caller must
/// keep live, or the task that handles Prometheus scrapes will not run. Returns
/// an error if something goes wrong setting up the endpoint.
pub fn start_metrics_scrape_endpoint(port: u16) -> Result<Runtime> {
pub fn start_metrics_scrape_endpoint(port: u16, parent_logger: &Logger) -> Result<Runtime> {
// The default, multi-threaded runtime should suffice for our needs
let runtime = Runtime::new().context("failed to create runtime for metrics endpoint")?;

// scrape_logger will be moved into the closure passed to `runtime.spawn()`
let scrape_logger = parent_logger.new(o!());

// This task will run forever, so we intentionally drop the returned handle
runtime.spawn(async move {
let endpoint = warp::get().and(warp::path("metrics")).map(|| {
// Clone scrape_logger so it can safely be moved into the closure that
// handles metrics scrapes.
let scrape_logger_clone = scrape_logger.clone();
let endpoint = warp::get().and(warp::path("metrics")).map(move || {
match handle_scrape() {
Ok(body) => {
Response::builder()
Expand All @@ -27,13 +33,16 @@ pub fn start_metrics_scrape_endpoint(port: u16) -> Result<Runtime> {
.body(body)
}
Err(err) => {
error!("unable to scrape Prometheus metrics: {}", err);
error!(
scrape_logger_clone,
"unable to scrape Prometheus metrics: {}", err
);
Response::builder().status(500).body(vec![])
}
}
});

info!("serving metrics scrapes on 0.0.0.0:{}", port);
info!(scrape_logger, "serving metrics scrapes on 0.0.0.0:{}", port);
warp::serve(endpoint)
.run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port))
.await;
Expand Down
33 changes: 16 additions & 17 deletions facilitator/src/retries.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use backoff::{retry, ExponentialBackoff};
use slog_scope::{debug, info};
use slog::{debug, info, Logger};
use std::{fmt::Debug, time::Duration};

/// Executes the provided action `f`, retrying with exponential backoff if the
/// error returned by `f` is deemed retryable by `is_retryable`. On success,
/// returns the value returned by `f`. On failure, returns the error returned by
/// the last attempt to call `f`. Retryable failures will be logged using the
/// provided action string.
pub(crate) fn retry_request<F, T, E, R>(action: &str, f: F, is_retryable: R) -> Result<T, E>
/// provided logger.
pub(crate) fn retry_request<F, T, E, R>(logger: &Logger, f: F, is_retryable: R) -> Result<T, E>
where
F: FnMut() -> Result<T, E>,
R: FnMut(&E) -> bool,
Expand All @@ -19,7 +19,7 @@ where
// same parameters are probably fine for both.
// [1] https://github.com/googleapis/gax-go/blob/fbaf9882acf3297573f3a7cb832e54c7d8f40635/v2/call_option.go#L120
retry_request_with_params(
action,
logger,
Duration::from_secs(1),
Duration::from_secs(30),
// We don't have explicit guidance from Google on how long to retry
Expand All @@ -35,7 +35,7 @@ where
/// Private version of retry_request that exposes parameters for backoff. Should
/// only be used for testing. Othewise behaves identically to `retry_request`.
fn retry_request_with_params<F, T, E, R>(
action: &str,
logger: &Logger,
backoff_initial_interval: Duration,
backoff_max_interval: Duration,
backoff_max_elapsed: Duration,
Expand All @@ -59,16 +59,10 @@ where
// Invoke the function and wrap its E into backoff::Error
f().map_err(|error| {
if is_retryable(&error) {
info!(
"encountered retryable error while trying to {}: {:?}",
action, error
);
info!(logger, "encountered retryable error");
backoff::Error::Transient(error)
} else {
debug!(
"encountered non-retryable error while trying to {}: {:?}",
action, error
);
debug!(logger, "encountered non-retryable error");
backoff::Error::Permanent(error)
}
})
Expand All @@ -84,17 +78,19 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::logging::setup_test_logging;

#[test]
fn success() {
let logger = setup_test_logging();
let mut counter = 0;
let f = || -> Result<(), bool> {
counter += 1;
Ok(())
};

retry_request_with_params(
"test",
&logger,
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(10),
Expand All @@ -107,6 +103,7 @@ mod tests {

#[test]
fn retryable_failure() {
let logger = setup_test_logging();
let mut counter = 0;
let f = || -> Result<(), bool> {
counter += 1;
Expand All @@ -118,7 +115,7 @@ mod tests {
};

retry_request_with_params(
"test",
&logger,
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(30),
Expand All @@ -131,14 +128,15 @@ mod tests {

#[test]
fn retryable_failure_exhaust_max_elapsed() {
let logger = setup_test_logging();
let mut counter = 0;
let f = || -> std::result::Result<(), bool> {
counter += 1;
Err(false)
};

retry_request_with_params(
"test",
&logger,
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(30),
Expand All @@ -151,14 +149,15 @@ mod tests {

#[test]
fn unretryable_failure() {
let logger = setup_test_logging();
let mut counter = 0;
let f = || -> std::result::Result<(), bool> {
counter += 1;
Err(false)
};

retry_request_with_params(
"test",
&logger,
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(30),
Expand Down
Loading

0 comments on commit 443fac8

Please sign in to comment.