diff --git a/aggregator/src/binaries/aggregation_job_creator.rs b/aggregator/src/binaries/aggregation_job_creator.rs index 4de1dde1d..5b944f65b 100644 --- a/aggregator/src/binaries/aggregation_job_creator.rs +++ b/aggregator/src/binaries/aggregation_job_creator.rs @@ -9,6 +9,7 @@ use janus_core::time::RealClock; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Duration; +use tracing::info; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { // Start creating aggregation jobs. @@ -22,6 +23,7 @@ pub async fn main_callback(ctx: BinaryContext) -> Re ctx.config.max_aggregation_job_size, ctx.config.aggregation_job_creation_report_window, )); + info!("Running aggregation job creator"); aggregation_job_creator.run(ctx.stopper).await; Ok(()) diff --git a/aggregator/src/binaries/aggregation_job_driver.rs b/aggregator/src/binaries/aggregation_job_driver.rs index 636cf200b..e36950d02 100644 --- a/aggregator/src/binaries/aggregation_job_driver.rs +++ b/aggregator/src/binaries/aggregation_job_driver.rs @@ -8,6 +8,7 @@ use clap::Parser; use janus_core::{time::RealClock, TokioRuntime}; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc, time::Duration}; +use tracing::info; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { const CLIENT_USER_AGENT: &str = concat!( @@ -58,6 +59,8 @@ pub async fn main_callback(ctx: BinaryContext) -> Re ctx.config.job_driver_config.maximum_attempts_before_failure, ), )?); + + info!("Running aggregation job driver"); job_driver.run().await; Ok(()) diff --git a/aggregator/src/binaries/collection_job_driver.rs b/aggregator/src/binaries/collection_job_driver.rs index 62736c952..3838df37f 100644 --- a/aggregator/src/binaries/collection_job_driver.rs +++ b/aggregator/src/binaries/collection_job_driver.rs @@ -8,6 +8,7 @@ use clap::Parser; use janus_core::{time::RealClock, TokioRuntime}; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc, time::Duration}; +use tracing::info; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { const CLIENT_USER_AGENT: &str = concat!( @@ -63,6 +64,8 @@ pub async fn main_callback(ctx: BinaryContext) -> Re ctx.config.job_driver_config.maximum_attempts_before_failure, ), )?); + + info!("Running collection job driver"); job_driver.run().await; Ok(()) diff --git a/aggregator/src/binaries/garbage_collector.rs b/aggregator/src/binaries/garbage_collector.rs index 513181257..cf38afa91 100644 --- a/aggregator/src/binaries/garbage_collector.rs +++ b/aggregator/src/binaries/garbage_collector.rs @@ -7,7 +7,7 @@ use janus_core::time::RealClock; use opentelemetry::metrics::Meter; use serde::{Deserialize, Serialize}; use tokio::time::interval; -use tracing::error; +use tracing::{error, info}; use trillium_tokio::Stopper; use crate::{ @@ -49,6 +49,7 @@ pub(super) async fn run_garbage_collector( gc_config.tasks_per_tx, gc_config.concurrent_tx_limit, ); + info!("Running garbage collector"); let mut interval = interval(Duration::from_secs(gc_config.gc_frequency_s)); while stopper.stop_future(interval.tick()).await.is_some() { if let Err(err) = gc.run().await { diff --git a/aggregator/src/binaries/janus_cli.rs b/aggregator/src/binaries/janus_cli.rs index 957b4224c..a1c4b44f4 100644 --- a/aggregator/src/binaries/janus_cli.rs +++ b/aggregator/src/binaries/janus_cli.rs @@ -59,7 +59,7 @@ pub fn run(command_line_options: CommandLineOptions) -> Result<()> { version = env!("CARGO_PKG_VERSION"), git_revision = git_revision(), rust_version = env!("RUSTC_SEMVER"), - "Starting up" + "Starting janus_cli" ); if command_line_options.dry_run { diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index 3d996faf3..1fc3fdd94 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -247,6 +247,7 @@ pub struct BinaryContext } pub fn janus_main( + service_name: &str, options: Options, clock: C, uses_rayon: bool, @@ -309,7 +310,8 @@ where version = env!("CARGO_PKG_VERSION"), git_revision = git_revision(), rust_version = env!("RUSTC_SEMVER"), - "Starting up" + "Starting {}", + service_name, ); // Connect to database. diff --git a/aggregator/src/main.rs b/aggregator/src/main.rs index ffe5bfaf7..027f8e392 100644 --- a/aggregator/src/main.rs +++ b/aggregator/src/main.rs @@ -55,33 +55,54 @@ enum Nested { fn main() -> anyhow::Result<()> { let clock = RealClock::default(); match Options::parse() { - Options::Aggregator(options) | Options::Default(Nested::Aggregator(options)) => { - janus_main(options, clock, true, aggregator::main_callback) - } + Options::Aggregator(options) | Options::Default(Nested::Aggregator(options)) => janus_main( + "aggregator", + options, + clock, + true, + aggregator::main_callback, + ), Options::GarbageCollector(options) - | Options::Default(Nested::GarbageCollector(options)) => { - janus_main(options, clock, false, garbage_collector::main_callback) - } + | Options::Default(Nested::GarbageCollector(options)) => janus_main( + "garbage_collector", + options, + clock, + false, + garbage_collector::main_callback, + ), Options::AggregationJobCreator(options) | Options::Default(Nested::AggregationJobCreator(options)) => janus_main( + "aggregation_job_creator", options, clock, false, aggregation_job_creator::main_callback, ), Options::AggregationJobDriver(options) - | Options::Default(Nested::AggregationJobDriver(options)) => { - janus_main(options, clock, true, aggregation_job_driver::main_callback) - } + | Options::Default(Nested::AggregationJobDriver(options)) => janus_main( + "aggregation_job_driver", + options, + clock, + true, + aggregation_job_driver::main_callback, + ), Options::CollectionJobDriver(options) - | Options::Default(Nested::CollectionJobDriver(options)) => { - janus_main(options, clock, false, collection_job_driver::main_callback) - } + | Options::Default(Nested::CollectionJobDriver(options)) => janus_main( + "collection_job_driver", + options, + clock, + false, + collection_job_driver::main_callback, + ), Options::JanusCli(options) | Options::Default(Nested::JanusCli(options)) => { janus_cli::run(options) } - Options::KeyRotator(options) | Options::Default(Nested::KeyRotator(options)) => { - janus_main(options, clock, false, key_rotator::main_callback) - } + Options::KeyRotator(options) | Options::Default(Nested::KeyRotator(options)) => janus_main( + "key_rotator", + options, + clock, + false, + key_rotator::main_callback, + ), } } diff --git a/interop_binaries/src/commands/janus_interop_aggregator.rs b/interop_binaries/src/commands/janus_interop_aggregator.rs index 3bc2f0eb3..b5621104e 100644 --- a/interop_binaries/src/commands/janus_interop_aggregator.rs +++ b/interop_binaries/src/commands/janus_interop_aggregator.rs @@ -254,27 +254,33 @@ impl BinaryConfig for Config { impl Options { pub fn run(self) -> anyhow::Result<()> { - janus_main::<_, _, Config, _, _>(self, RealClock::default(), true, |ctx| async move { - let datastore = Arc::new(ctx.datastore); + janus_main::<_, _, Config, _, _>( + "janus_interop_aggregator", + self, + RealClock::default(), + true, + |ctx| async move { + let datastore = Arc::new(ctx.datastore); - // Run an HTTP server with both the DAP aggregator endpoints and the interoperation test - // endpoints. - let handler = make_handler( - Arc::clone(&datastore), - ctx.config.dap_serving_prefix, - ctx.config.aggregator_address, - ctx.config.health_check_peers, - ) - .await?; - trillium_tokio::config() - .with_host(&ctx.config.listen_address.ip().to_string()) - .with_port(ctx.config.listen_address.port()) - .without_signals() - .run_async(handler) - .await; + // Run an HTTP server with both the DAP aggregator endpoints and the interoperation test + // endpoints. + let handler = make_handler( + Arc::clone(&datastore), + ctx.config.dap_serving_prefix, + ctx.config.aggregator_address, + ctx.config.health_check_peers, + ) + .await?; + trillium_tokio::config() + .with_host(&ctx.config.listen_address.ip().to_string()) + .with_port(ctx.config.listen_address.port()) + .without_signals() + .run_async(handler) + .await; - Ok(()) - }) + Ok(()) + }, + ) } }