Skip to content

Commit

Permalink
Consistently announce service name in startup logs (#3339)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga authored Jul 29, 2024
1 parent 54b264a commit e249402
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 37 deletions.
2 changes: 2 additions & 0 deletions aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use janus_core::time::RealClock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tracing::info;

pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
// Start creating aggregation jobs.
Expand All @@ -22,6 +23,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
ctx.config.max_aggregation_job_size,
ctx.config.aggregation_job_creation_report_window,
));
info!("Running aggregation job creator");
aggregation_job_creator.run(ctx.stopper).await;

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions aggregator/src/binaries/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use clap::Parser;
use janus_core::{time::RealClock, TokioRuntime};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use tracing::info;

pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
const CLIENT_USER_AGENT: &str = concat!(
Expand Down Expand Up @@ -58,6 +59,8 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
ctx.config.job_driver_config.maximum_attempts_before_failure,
),
)?);

info!("Running aggregation job driver");
job_driver.run().await;

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions aggregator/src/binaries/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use clap::Parser;
use janus_core::{time::RealClock, TokioRuntime};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use tracing::info;

pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
const CLIENT_USER_AGENT: &str = concat!(
Expand Down Expand Up @@ -63,6 +64,8 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
ctx.config.job_driver_config.maximum_attempts_before_failure,
),
)?);

info!("Running collection job driver");
job_driver.run().await;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/binaries/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use janus_core::time::RealClock;
use opentelemetry::metrics::Meter;
use serde::{Deserialize, Serialize};
use tokio::time::interval;
use tracing::error;
use tracing::{error, info};
use trillium_tokio::Stopper;

use crate::{
Expand Down Expand Up @@ -49,6 +49,7 @@ pub(super) async fn run_garbage_collector(
gc_config.tasks_per_tx,
gc_config.concurrent_tx_limit,
);
info!("Running garbage collector");
let mut interval = interval(Duration::from_secs(gc_config.gc_frequency_s));
while stopper.stop_future(interval.tick()).await.is_some() {
if let Err(err) = gc.run().await {
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn run(command_line_options: CommandLineOptions) -> Result<()> {
version = env!("CARGO_PKG_VERSION"),
git_revision = git_revision(),
rust_version = env!("RUSTC_SEMVER"),
"Starting up"
"Starting janus_cli"
);

if command_line_options.dry_run {
Expand Down
4 changes: 3 additions & 1 deletion aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub struct BinaryContext<C: Clock, Options: BinaryOptions, Config: BinaryConfig>
}

pub fn janus_main<C, Options, Config, F, Fut>(
service_name: &str,
options: Options,
clock: C,
uses_rayon: bool,
Expand Down Expand Up @@ -309,7 +310,8 @@ where
version = env!("CARGO_PKG_VERSION"),
git_revision = git_revision(),
rust_version = env!("RUSTC_SEMVER"),
"Starting up"
"Starting {}",
service_name,
);

// Connect to database.
Expand Down
51 changes: 36 additions & 15 deletions aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,54 @@ enum Nested {
fn main() -> anyhow::Result<()> {
let clock = RealClock::default();
match Options::parse() {
Options::Aggregator(options) | Options::Default(Nested::Aggregator(options)) => {
janus_main(options, clock, true, aggregator::main_callback)
}
Options::Aggregator(options) | Options::Default(Nested::Aggregator(options)) => janus_main(
"aggregator",
options,
clock,
true,
aggregator::main_callback,
),
Options::GarbageCollector(options)
| Options::Default(Nested::GarbageCollector(options)) => {
janus_main(options, clock, false, garbage_collector::main_callback)
}
| Options::Default(Nested::GarbageCollector(options)) => janus_main(
"garbage_collector",
options,
clock,
false,
garbage_collector::main_callback,
),
Options::AggregationJobCreator(options)
| Options::Default(Nested::AggregationJobCreator(options)) => janus_main(
"aggregation_job_creator",
options,
clock,
false,
aggregation_job_creator::main_callback,
),
Options::AggregationJobDriver(options)
| Options::Default(Nested::AggregationJobDriver(options)) => {
janus_main(options, clock, true, aggregation_job_driver::main_callback)
}
| Options::Default(Nested::AggregationJobDriver(options)) => janus_main(
"aggregation_job_driver",
options,
clock,
true,
aggregation_job_driver::main_callback,
),
Options::CollectionJobDriver(options)
| Options::Default(Nested::CollectionJobDriver(options)) => {
janus_main(options, clock, false, collection_job_driver::main_callback)
}
| Options::Default(Nested::CollectionJobDriver(options)) => janus_main(
"collection_job_driver",
options,
clock,
false,
collection_job_driver::main_callback,
),
Options::JanusCli(options) | Options::Default(Nested::JanusCli(options)) => {
janus_cli::run(options)
}
Options::KeyRotator(options) | Options::Default(Nested::KeyRotator(options)) => {
janus_main(options, clock, false, key_rotator::main_callback)
}
Options::KeyRotator(options) | Options::Default(Nested::KeyRotator(options)) => janus_main(
"key_rotator",
options,
clock,
false,
key_rotator::main_callback,
),
}
}
44 changes: 25 additions & 19 deletions interop_binaries/src/commands/janus_interop_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,33 @@ impl BinaryConfig for Config {

impl Options {
pub fn run(self) -> anyhow::Result<()> {
janus_main::<_, _, Config, _, _>(self, RealClock::default(), true, |ctx| async move {
let datastore = Arc::new(ctx.datastore);
janus_main::<_, _, Config, _, _>(
"janus_interop_aggregator",
self,
RealClock::default(),
true,
|ctx| async move {
let datastore = Arc::new(ctx.datastore);

// Run an HTTP server with both the DAP aggregator endpoints and the interoperation test
// endpoints.
let handler = make_handler(
Arc::clone(&datastore),
ctx.config.dap_serving_prefix,
ctx.config.aggregator_address,
ctx.config.health_check_peers,
)
.await?;
trillium_tokio::config()
.with_host(&ctx.config.listen_address.ip().to_string())
.with_port(ctx.config.listen_address.port())
.without_signals()
.run_async(handler)
.await;
// Run an HTTP server with both the DAP aggregator endpoints and the interoperation test
// endpoints.
let handler = make_handler(
Arc::clone(&datastore),
ctx.config.dap_serving_prefix,
ctx.config.aggregator_address,
ctx.config.health_check_peers,
)
.await?;
trillium_tokio::config()
.with_host(&ctx.config.listen_address.ip().to_string())
.with_port(ctx.config.listen_address.port())
.without_signals()
.run_async(handler)
.await;

Ok(())
})
Ok(())
},
)
}
}

Expand Down

0 comments on commit e249402

Please sign in to comment.