Skip to content

Commit

Permalink
interop: Wait for subprocesses to be ready before reporting ready (#2954
Browse files Browse the repository at this point in the history
)

* interop: Wait for subprocesses to be ready before reporting ready

* Review feedback
  • Loading branch information
inahga authored Apr 2, 2024
1 parent 1f68ee9 commit 6eb247e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
3 changes: 1 addition & 2 deletions interop_binaries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ version.workspace = true
fpvec_bounded_l2 = ["dep:fixed", "janus_core/fpvec_bounded_l2", "janus_aggregator/fpvec_bounded_l2", "prio/experimental"]
test-util = [
"dep:hex",
"dep:futures",
"dep:regex",
"dep:zstd",
]
Expand All @@ -25,7 +24,7 @@ base64.workspace = true
clap.workspace = true
derivative.workspace = true
fixed = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
futures = { workspace = true }
hex = { workspace = true, optional = true }
janus_aggregator.workspace = true
janus_aggregator_core = { workspace = true, features = ["test-util"] }
Expand Down
5 changes: 5 additions & 0 deletions interop_binaries/config/janus_interop_aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ database:
url: postgres://[email protected]:5432/postgres
check_schema_version: false
health_check_listen_address: 0.0.0.0:8000
health_check_peers:
- http://127.0.0.1:8001/healthz
- http://127.0.0.1:8002/healthz
- http://127.0.0.1:8003/healthz
- http://127.0.0.1:8004/healthz
logging_config:
force_json_output: true
listen_address: 0.0.0.0:8080
Expand Down
34 changes: 31 additions & 3 deletions interop_binaries/src/commands/janus_interop_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
use anyhow::Context;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use clap::Parser;
use futures::future::try_join_all;
use janus_aggregator::{
binary_utils::{janus_main, BinaryOptions, CommonBinaryOptions},
config::{BinaryConfig, CommonConfig},
Expand All @@ -21,13 +22,15 @@ use janus_core::{
use janus_messages::{Duration, HpkeConfig, Time};
use prio::codec::Decode;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::Mutex;
use trillium::{Conn, Handler};
use trillium_api::{api, Json};
use trillium::{Conn, Handler, Status};
use trillium_api::{api, ApiConnExt, Json};
use trillium_proxy::{upstream::IntoUpstreamSelector, Client, Proxy};
use trillium_router::Router;
use trillium_tokio::ClientConfig;
use url::Url;

#[derive(Debug, Serialize)]
struct EndpointResponse {
Expand Down Expand Up @@ -130,6 +133,7 @@ async fn make_handler(
datastore: Arc<Datastore<RealClock>>,
dap_serving_prefix: String,
aggregator_address: SocketAddr,
health_check_peers: Vec<Url>,
) -> anyhow::Result<impl Handler> {
let keyring = Keyring::new();

Expand All @@ -138,9 +142,28 @@ async fn make_handler(
Client::new(ClientConfig::default()).with_default_pool(),
upstream,
);
let health_check_client = Client::new(ClientConfig::default()).with_default_pool();

let handler = Router::new()
.post("internal/test/ready", Json(serde_json::Map::new()))
.post("internal/test/ready", move |conn: Conn| {
let health_check_peers = health_check_peers.clone();
let health_check_client = health_check_client.clone();
async move {
let result: Result<_, anyhow::Error> =
try_join_all(health_check_peers.iter().map(|peer| {
let client = health_check_client.clone();
async move {
let _ = client.get(peer.as_str()).await?.success()?;
Ok(())
}
}))
.await;
match result {
Ok(_) => conn.with_json(&json!({})),
Err(_) => conn.with_status(Status::ServiceUnavailable),
}
}
})
.post(
"internal/test/endpoint_for_task",
Json(EndpointResponse {
Expand Down Expand Up @@ -213,6 +236,10 @@ struct Config {
/// Address on which the aggregator's HTTP server is listening. DAP requests will be proxied to
/// this.
aggregator_address: SocketAddr,

/// List of URLs to health check endpoints of aggregator subprocesses. The interop aggregator
/// will check these endpoints for readiness before considering itself ready.
health_check_peers: Vec<Url>,
}

impl BinaryConfig for Config {
Expand All @@ -236,6 +263,7 @@ impl Options {
Arc::clone(&datastore),
ctx.config.dap_serving_prefix,
ctx.config.aggregator_address,
ctx.config.health_check_peers,
)
.await?;
trillium_tokio::config()
Expand Down

0 comments on commit 6eb247e

Please sign in to comment.