Skip to content

Commit

Permalink
Move logic from binary targets to library crate (#2116)
Browse files Browse the repository at this point in the history
* Move logic from bin entrypoints into library crate

* Use Config structs in graceful shutdown test

* Move setup_signal_handler into janus_main
  • Loading branch information
divergentdave authored Oct 11, 2023
1 parent d66f887 commit 5683bc7
Show file tree
Hide file tree
Showing 12 changed files with 1,440 additions and 1,388 deletions.
139 changes: 2 additions & 137 deletions aggregator/src/bin/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
@@ -1,144 +1,9 @@
use anyhow::Context;
use clap::Parser;
use janus_aggregator::{
aggregator::aggregation_job_creator::AggregationJobCreator,
binary_utils::{janus_main, setup_signal_handler, BinaryOptions, CommonBinaryOptions},
config::{BinaryConfig, CommonConfig},
binaries::aggregation_job_creator::main_callback, binary_utils::janus_main,
};
use janus_core::time::RealClock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use trillium_tokio::Stopper;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let stopper = Stopper::new();
setup_signal_handler(stopper.clone())
.context("failed to register SIGTERM signal handler")?;

// Start creating aggregation jobs.
let aggregation_job_creator = Arc::new(AggregationJobCreator::new(
ctx.datastore,
ctx.meter,
Duration::from_secs(ctx.config.tasks_update_frequency_secs),
Duration::from_secs(ctx.config.aggregation_job_creation_interval_secs),
ctx.config.min_aggregation_job_size,
ctx.config.max_aggregation_job_size,
));
aggregation_job_creator.run(stopper).await;

Ok(())
})
.await
}

#[derive(Debug, Parser)]
#[clap(
name = "janus-aggregation-job-creator",
about = "Janus aggregation job creator",
rename_all = "kebab-case",
version = env!("CARGO_PKG_VERSION"),
)]
struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

/// Non-secret configuration options for the Janus Aggregation Job Creator job.
///
/// # Examples
///
/// ```
/// let yaml_config = r#"
/// ---
/// database:
/// url: "postgres://postgres:postgres@localhost:5432/postgres"
/// logging_config: # logging_config is optional
/// force_json_output: true
/// tasks_update_frequency_secs: 3600
/// aggregation_job_creation_interval_secs: 60
/// min_aggregation_job_size: 100
/// max_aggregation_job_size: 500
/// "#;
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Config {
#[serde(flatten)]
common_config: CommonConfig,

/// How frequently we look for new tasks to start creating aggregation jobs for, in seconds.
tasks_update_frequency_secs: u64,
/// How frequently we attempt to create new aggregation jobs for each task, in seconds.
aggregation_job_creation_interval_secs: u64,
/// The minimum number of client reports to include in an aggregation job. Applies to the
/// "current" batch only; historical batches will create aggregation jobs of any size, on the
/// theory that almost all reports will have be received for these batches already.
min_aggregation_job_size: usize,
/// The maximum number of client reports to include in an aggregation job.
max_aggregation_job_size: usize,
}

impl BinaryConfig for Config {
fn common_config(&self) -> &CommonConfig {
&self.common_config
}

fn common_config_mut(&mut self) -> &mut CommonConfig {
&mut self.common_config
}
}

#[cfg(test)]
mod tests {
use super::{Config, Options};
use clap::CommandFactory;
use janus_aggregator::config::{
test_util::{generate_db_config, generate_metrics_config, generate_trace_config},
CommonConfig,
};
use janus_core::test_util::roundtrip_encoding;
use std::net::{Ipv4Addr, SocketAddr};

#[test]
fn verify_app() {
Options::command().debug_assert()
}

#[test]
fn roundtrip_config() {
roundtrip_encoding(Config {
common_config: CommonConfig {
database: generate_db_config(),
logging_config: generate_trace_config(),
metrics_config: generate_metrics_config(),
health_check_listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)),
},
tasks_update_frequency_secs: 3600,
aggregation_job_creation_interval_secs: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 500,
})
}

#[test]
fn documentation_config_examples() {
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/basic_config/aggregation_job_creator.yaml"
))
.unwrap();
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/advanced_config/aggregation_job_creator.yaml"
))
.unwrap();
}
janus_main(RealClock::default(), main_callback).await
}
183 changes: 3 additions & 180 deletions aggregator/src/bin/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
@@ -1,184 +1,7 @@
use anyhow::Context;
use clap::Parser;
use janus_aggregator::{
aggregator::aggregation_job_driver::AggregationJobDriver,
binary_utils::{
janus_main, job_driver::JobDriver, setup_signal_handler, BinaryOptions, CommonBinaryOptions,
},
config::{BinaryConfig, CommonConfig, JobDriverConfig, TaskprovConfig},
};
use janus_core::{time::RealClock, TokioRuntime};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use trillium_tokio::Stopper;
use janus_aggregator::{binaries::aggregation_job_driver::main_callback, binary_utils::janus_main};
use janus_core::time::RealClock;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
const CLIENT_USER_AGENT: &str = concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
"/aggregation_job_driver",
);

janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let datastore = Arc::new(ctx.datastore);
let aggregation_job_driver = Arc::new(AggregationJobDriver::new(
reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.build()
.context("couldn't create HTTP client")?,
&ctx.meter,
ctx.config.batch_aggregation_shard_count,
));
let lease_duration =
Duration::from_secs(ctx.config.job_driver_config.worker_lease_duration_secs);
let stopper = Stopper::new();
setup_signal_handler(stopper.clone())
.context("failed to register SIGTERM signal handler")?;

// Start running.
let job_driver = Arc::new(JobDriver::new(
ctx.clock,
TokioRuntime,
ctx.meter,
stopper,
Duration::from_secs(ctx.config.job_driver_config.min_job_discovery_delay_secs),
Duration::from_secs(ctx.config.job_driver_config.max_job_discovery_delay_secs),
ctx.config.job_driver_config.max_concurrent_job_workers,
Duration::from_secs(
ctx.config
.job_driver_config
.worker_lease_clock_skew_allowance_secs,
),
aggregation_job_driver
.make_incomplete_job_acquirer_callback(Arc::clone(&datastore), lease_duration),
aggregation_job_driver.make_job_stepper_callback(
Arc::clone(&datastore),
ctx.config.job_driver_config.maximum_attempts_before_failure,
),
)?);
job_driver.run().await;

Ok(())
})
.await
}

#[derive(Debug, Parser)]
#[clap(
name = "janus-aggregation-job-driver",
about = "Janus aggregation job driver",
rename_all = "kebab-case",
version = env!("CARGO_PKG_VERSION"),
)]
struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

/// Non-secret configuration options for Janus Aggregation Job Driver jobs.
///
/// # Examples
///
/// ```
/// let yaml_config = r#"
/// ---
/// database:
/// url: "postgres://postgres:postgres@localhost:5432/postgres"
/// logging_config: # logging_config is optional
/// force_json_output: true
/// min_job_discovery_delay_secs: 10
/// max_job_discovery_delay_secs: 60
/// max_concurrent_job_workers: 10
/// worker_lease_duration_secs: 600
/// worker_lease_clock_skew_allowance_secs: 60
/// maximum_attempts_before_failure: 5
/// batch_aggregation_shard_count: 32
/// taskprov_config:
/// enabled: false
/// "#;
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Config {
#[serde(flatten)]
common_config: CommonConfig,
#[serde(flatten)]
job_driver_config: JobDriverConfig,
#[serde(default)]
taskprov_config: TaskprovConfig,

/// Defines the number of shards to break each batch aggregation into. Increasing this value
/// will reduce the amount of database contention during leader aggregation, while increasing
/// the cost of collection.
batch_aggregation_shard_count: u64,
}

impl BinaryConfig for Config {
fn common_config(&self) -> &CommonConfig {
&self.common_config
}

fn common_config_mut(&mut self) -> &mut CommonConfig {
&mut self.common_config
}
}

#[cfg(test)]
mod tests {
use super::{Config, Options};
use clap::CommandFactory;
use janus_aggregator::config::{
test_util::{generate_db_config, generate_metrics_config, generate_trace_config},
CommonConfig, JobDriverConfig, TaskprovConfig,
};
use janus_core::test_util::roundtrip_encoding;
use std::net::{Ipv4Addr, SocketAddr};

#[test]
fn verify_app() {
Options::command().debug_assert()
}

#[test]
fn roundtrip_config() {
roundtrip_encoding(Config {
common_config: CommonConfig {
database: generate_db_config(),
logging_config: generate_trace_config(),
metrics_config: generate_metrics_config(),
health_check_listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)),
},
job_driver_config: JobDriverConfig {
min_job_discovery_delay_secs: 10,
max_job_discovery_delay_secs: 60,
max_concurrent_job_workers: 10,
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
},
batch_aggregation_shard_count: 32,
taskprov_config: TaskprovConfig::default(),
})
}

#[test]
fn documentation_config_examples() {
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/basic_config/aggregation_job_driver.yaml"
))
.unwrap();
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/advanced_config/aggregation_job_driver.yaml"
))
.unwrap();
}
janus_main(RealClock::default(), main_callback).await
}
Loading

0 comments on commit 5683bc7

Please sign in to comment.