Skip to content

Commit

Permalink
Add GC subcommand (#3027)
Browse files Browse the repository at this point in the history
* Enable GC in aggregator graceful shutdown test

* Add garbage collector subcommand and Docker image
  • Loading branch information
divergentdave authored Apr 18, 2024
1 parent 117a807 commit c43819c
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 48 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ jobs:
targets: janus
load: true
- run: docker run --rm janus_aggregator --help
- run: docker run --rm janus_garbage_collector --help
- run: docker run --rm janus_aggregation_job_creator --help
- run: docker run --rm janus_aggregation_job_driver --help
- run: docker run --rm janus_collection_job_driver --help
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/push-docker-images-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
password: ${{ steps.gcp-auth-private.outputs.access_token }}

- run: docker push us-west2-docker.pkg.dev/janus-artifacts/janus/janus_aggregator:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/janus-artifacts/janus/janus_garbage_collector:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/janus-artifacts/janus/janus_aggregation_job_creator:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/janus-artifacts/janus/janus_aggregation_job_driver:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/janus-artifacts/janus/janus_collection_job_driver:${{ steps.get_version.outputs.VERSION }}
Expand All @@ -80,6 +81,7 @@ jobs:
password: ${{ steps.gcp-auth-public.outputs.access_token }}

- run: docker push us-west2-docker.pkg.dev/divviup-artifacts-public/janus/janus_aggregator:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/divviup-artifacts-public/janus/janus_garbage_collector:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/divviup-artifacts-public/janus/janus_aggregation_job_creator:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/divviup-artifacts-public/janus/janus_aggregation_job_driver:${{ steps.get_version.outputs.VERSION }}
- run: docker push us-west2-docker.pkg.dev/divviup-artifacts-public/janus/janus_collection_job_driver:${{ steps.get_version.outputs.VERSION }}
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ARG GIT_REVISION=unknown
LABEL revision ${GIT_REVISION}
COPY --from=builder /src/target/release/janus_aggregator /janus_aggregator
RUN ln -s /janus_aggregator /aggregator && \
ln -s /janus_aggregator /garbage_collector && \
ln -s /janus_aggregator /aggregation_job_creator && \
ln -s /janus_aggregator /aggregation_job_driver && \
ln -s /janus_aggregator /collection_job_driver && \
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ should not be depended on directly. If you find yourself needing to depend
on it directly while using any other Janus crates, open a bug report.

The following crates are stable on their external configuration, CLI arguments,
and HTTP API. Their Rust API may arbitrarily change and should not be depended
and HTTP API. Their Rust API may arbitrarily change and should not be depended
on. They are not published to crates.io.
- `janus_aggregator`
- `janus_tools`
Expand All @@ -105,9 +105,10 @@ subtle incompatibilities between the two that will cause tests to fail.
### Container image

To build container images, run `docker buildx bake --load`. This will produce
images tagged `janus_aggregator`, `janus_aggregation_job_creator`,
`janus_aggregation_job_driver`, `janus_collection_job_driver`, `janus_cli`,
`janus_db_migrator`, `janus_interop_client`, `janus_interop_aggregator`, and
images tagged `janus_aggregator`, `janus_garbage_collector`,
`janus_aggregation_job_creator`, `janus_aggregation_job_driver`,
`janus_collection_job_driver`, `janus_cli`, `janus_db_migrator`,
`janus_interop_client`, `janus_interop_aggregator`, and
`janus_interop_collector` by default.

Pre-built container images are available at
Expand Down
1 change: 1 addition & 0 deletions aggregator/src/binaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod aggregation_job_creator;
pub mod aggregation_job_driver;
pub mod aggregator;
pub mod collection_job_driver;
pub mod garbage_collector;
pub mod janus_cli;
25 changes: 6 additions & 19 deletions aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
aggregator::{self, garbage_collector::GarbageCollector, http_handlers::aggregator_handler},
aggregator::{self, http_handlers::aggregator_handler},
binaries::garbage_collector::run_garbage_collector,
binary_utils::{setup_server, BinaryContext, BinaryOptions, CommonBinaryOptions},
cache::GlobalHpkeKeypairCache,
config::{BinaryConfig, CommonConfig, TaskprovConfig},
Expand All @@ -22,8 +23,8 @@ use std::{
pin::Pin,
};
use std::{iter::Iterator, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{join, sync::watch, time::interval};
use tracing::{error, info};
use tokio::{join, sync::watch};
use tracing::info;
use trillium::Handler;
use trillium_router::router;
use url::Url;
Expand Down Expand Up @@ -77,24 +78,10 @@ async fn run_aggregator(
let datastore = Arc::clone(&datastore);
let gc_config = config.garbage_collection.take();
let meter = meter.clone();
let stopper = stopper.clone();
async move {
if let Some(gc_config) = gc_config {
let gc = GarbageCollector::new(
datastore,
&meter,
gc_config.report_limit,
gc_config.aggregation_limit,
gc_config.collection_limit,
gc_config.tasks_per_tx,
gc_config.concurrent_tx_limit,
);
let mut interval = interval(Duration::from_secs(gc_config.gc_frequency_s));
loop {
interval.tick().await;
if let Err(err) = gc.run().await {
error!(?err, "GC error");
}
}
run_garbage_collector(datastore, gc_config, meter, stopper).await;
}
}
};
Expand Down
172 changes: 172 additions & 0 deletions aggregator/src/binaries/garbage_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use clap::Parser;
use janus_aggregator_core::datastore::Datastore;
use janus_core::time::RealClock;
use opentelemetry::metrics::Meter;
use serde::{Deserialize, Serialize};
use tokio::time::interval;
use tracing::error;
use trillium_tokio::Stopper;

use crate::{
aggregator::garbage_collector::GarbageCollector,
binary_utils::{BinaryContext, BinaryOptions, CommonBinaryOptions},
config::{BinaryConfig, CommonConfig},
};

use super::aggregator::GarbageCollectorConfig;

pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
let BinaryContext {
config,
datastore,
meter,
stopper,
..
} = ctx;

let datastore = Arc::new(datastore);

run_garbage_collector(datastore, config.garbage_collection, meter, stopper).await;

Ok(())
}

pub(super) async fn run_garbage_collector(
datastore: Arc<Datastore<RealClock>>,
gc_config: GarbageCollectorConfig,
meter: Meter,
stopper: Stopper,
) {
let gc = GarbageCollector::new(
datastore,
&meter,
gc_config.report_limit,
gc_config.aggregation_limit,
gc_config.collection_limit,
gc_config.tasks_per_tx,
gc_config.concurrent_tx_limit,
);
let mut interval = interval(Duration::from_secs(gc_config.gc_frequency_s));
while stopper.stop_future(interval.tick()).await.is_some() {
if let Err(err) = gc.run().await {
error!(?err, "GC error");
}
}
}

#[derive(Debug, Default, Parser)]
#[clap(
name = "garbage-collector",
about = "Janus garbage collector",
rename_all = "kebab-case",
version = env!("CARGO_PKG_VERSION"),
)]
pub struct Options {
#[clap(flatten)]
pub common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

/// Non-secret configuration options for a Janus garbage collector, deserialized from YAML.
///
/// # Examples
///
/// ```
/// # use janus_aggregator::binaries::garbage_collector::Config;
/// let yaml_config = r#"
/// ---
/// database:
/// url: "postgres://postgres:postgres@localhost:5432/postgres"
/// logging_config: # logging_config is optional
/// force_json_output: true
/// garbage_collection:
/// gc_frequency_s: 60
/// report_limit: 5000
/// aggregation_limit: 500
/// collection_limit: 50
/// "#;
///
/// let _decoded: Config = serde_yaml::from_str(yaml_config).unwrap();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
#[serde(flatten)]
pub common_config: CommonConfig,

pub garbage_collection: GarbageCollectorConfig,
}

impl BinaryConfig for Config {
fn common_config(&self) -> &CommonConfig {
&self.common_config
}

fn common_config_mut(&mut self) -> &mut CommonConfig {
&mut self.common_config
}
}

#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, SocketAddr};

use clap::CommandFactory;
use janus_core::test_util::roundtrip_encoding;

use crate::{
binaries::aggregator::GarbageCollectorConfig,
config::{
default_max_transaction_retries,
test_util::{generate_db_config, generate_metrics_config, generate_trace_config},
CommonConfig,
},
};

use super::{Config, Options};

#[test]
fn verify_app() {
Options::command().debug_assert();
}

#[test]
fn roundtrip_config() {
roundtrip_encoding(Config {
common_config: CommonConfig {
database: generate_db_config(),
logging_config: generate_trace_config(),
metrics_config: generate_metrics_config(),
health_check_listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)),
max_transaction_retries: default_max_transaction_retries(),
},
garbage_collection: GarbageCollectorConfig {
gc_frequency_s: 60,
report_limit: 5000,
aggregation_limit: 500,
collection_limit: 50,
tasks_per_tx: 1,
concurrent_tx_limit: None,
},
});
}

#[test]
fn documentation_config_examples() {
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/basic_config/garbage_collector.yaml"
))
.unwrap();
serde_yaml::from_str::<Config>(include_str!(
"../../../docs/samples/advanced_config/garbage_collector.yaml"
))
.unwrap();
}
}
37 changes: 14 additions & 23 deletions aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand};
use janus_aggregator::{
binaries::{
aggregation_job_creator, aggregation_job_driver, aggregator, collection_job_driver,
janus_cli,
garbage_collector, janus_cli,
},
binary_utils::janus_main,
};
Expand All @@ -13,6 +13,8 @@ use janus_core::time::RealClock;
enum Options {
#[clap(name = "aggregator")]
Aggregator(aggregator::Options),
#[clap(name = "garbage_collector")]
GarbageCollector(garbage_collector::Options),
#[clap(name = "aggregation_job_creator")]
AggregationJobCreator(aggregation_job_creator::Options),
#[clap(name = "aggregation_job_driver")]
Expand All @@ -34,6 +36,8 @@ enum Options {
enum Nested {
#[clap(name = "aggregator")]
Aggregator(aggregator::Options),
#[clap(name = "garbage_collector")]
GarbageCollector(garbage_collector::Options),
#[clap(name = "aggregation_job_creator")]
AggregationJobCreator(aggregation_job_creator::Options),
#[clap(name = "aggregation_job_driver")]
Expand All @@ -46,45 +50,32 @@ enum Nested {

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let clock = RealClock::default();
match Options::parse() {
Options::Aggregator(options) | Options::Default(Nested::Aggregator(options)) => {
janus_main(
options,
RealClock::default(),
true,
aggregator::main_callback,
)
.await
janus_main(options, clock, true, aggregator::main_callback).await
}
Options::GarbageCollector(options)
| Options::Default(Nested::GarbageCollector(options)) => {
janus_main(options, clock, false, garbage_collector::main_callback).await
}
Options::AggregationJobCreator(options)
| Options::Default(Nested::AggregationJobCreator(options)) => {
janus_main(
options,
RealClock::default(),
clock,
false,
aggregation_job_creator::main_callback,
)
.await
}
Options::AggregationJobDriver(options)
| Options::Default(Nested::AggregationJobDriver(options)) => {
janus_main(
options,
RealClock::default(),
true,
aggregation_job_driver::main_callback,
)
.await
janus_main(options, clock, true, aggregation_job_driver::main_callback).await
}
Options::CollectionJobDriver(options)
| Options::Default(Nested::CollectionJobDriver(options)) => {
janus_main(
options,
RealClock::default(),
false,
collection_job_driver::main_callback,
)
.await
janus_main(options, clock, false, collection_job_driver::main_callback).await
}
Options::JanusCli(options) | Options::Default(Nested::JanusCli(options)) => {
janus_cli::run(options).await
Expand Down
Loading

0 comments on commit c43819c

Please sign in to comment.