Skip to content

Commit

Permalink
implement logging of completed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-karpenko committed May 9, 2024
1 parent 7053f2e commit 3b9159e
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 6 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ futures = "0.3.30"
git2 = "0.18.3"
humantime = "2.1.0"
k8s-openapi = { version = "0.22.0", features = ["v1_27"] }
kube = { version = "0.91.0", features = ["runtime", "client", "derive"] }
kube = { version = "0.91.0", features = [
"runtime",
"client",
"derive",
"unstable-runtime",
] }
kubert = { version = "0.21.2", features = ["lease"], default-features = false }
kubert-k8s-openapi = { package = "k8s-openapi", version = "0.20.0", features = [
"v1_27",
Expand Down
2 changes: 1 addition & 1 deletion charts/git-events-runner/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ version: 0.1.0
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v0.0.13"
appVersion: "v0.0.14"
8 changes: 6 additions & 2 deletions src/bin/git-events-runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use git_events_runner::{
cli::{Cli, CliConfig},
config::RuntimeConfig,
controller::{run_leader_controllers, State},
leader_lock,
jobs, leader_lock,
resources::{
action::{Action, ClusterAction},
git_repo::{ClusterGitRepo, GitRepo},
Expand Down Expand Up @@ -34,16 +34,20 @@ async fn main() -> anyhow::Result<()> {

async fn run(cli_config: CliConfig) -> anyhow::Result<()> {
let client = Client::try_default().await?;
let identity = Uuid::new_v4().to_string();

tokio::spawn(RuntimeConfig::init_and_watch(
client.clone(),
cli_config.config_map_name.clone(),
));
tokio::spawn(CacheStore::watch(client.clone()));
tokio::spawn(jobs::watch(client.clone(), identity.clone()));

let secrets_cache = ExpiringSecretCache::new(
Duration::from_secs(cli_config.secrets_cache_time),
client.clone(),
);
let identity = Uuid::new_v4().to_string();

let state = State::new(
Arc::new(cli_config.clone()),
secrets_cache.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::{future::ready, StreamExt};
use k8s_openapi::{api::core::v1::ConfigMap, Metadata};
use kube::{
runtime::{watcher, WatchStreamExt},
runtime::{predicates, watcher, WatchStreamExt},
Api, Client,
};
use serde::Deserialize;
Expand Down Expand Up @@ -68,6 +68,7 @@ impl RuntimeConfig {
let cm_stream = watcher(cm_api, watcher_config);
cm_stream
.applied_objects()
.predicate_filter(predicates::generation)
.for_each(|cm| {
if let Ok(cm) = cm {
let cm_key = format!(
Expand Down
34 changes: 34 additions & 0 deletions src/jobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::resources::action::ACTION_JOB_IDENTITY_LABEL;
use futures::{future::ready, StreamExt};
use k8s_openapi::{api::batch::v1::Job, Metadata};
use kube::{
runtime::{predicates, watcher, WatchStreamExt},
Api, Client,
};
use tracing::{debug, info, warn};

pub async fn watch(client: Client, identity: String) {
let jobs_api: Api<Job> = Api::all(client.clone());
let watcher_config = watcher::Config::default()
.labels(format!("{ACTION_JOB_IDENTITY_LABEL}={identity}").as_str());
let jobs_stream = watcher(jobs_api, watcher_config);
jobs_stream
.applied_objects()
.predicate_filter(predicates::generation)
.for_each(|job| {
if let Ok(job) = job {
let name = job.metadata().name.clone().unwrap();
let ns = job.metadata().namespace.clone().unwrap();
let status = job.status.unwrap();

debug!("{status:?}");
if status.succeeded.is_some() {
info!("Job {ns}/{name} completed successfully.");
} else if status.failed.is_some() {
warn!("Job {ns}/{name} failed.");
}
}
ready(())
})
.await
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cache;
pub mod cli;
pub mod config;
pub mod controller;
pub mod jobs;
pub mod leader_lock;
pub mod resources;
pub mod secrets_cache;
Expand Down
2 changes: 1 addition & 1 deletion src/resources/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl ActionInternals for ClusterAction {

impl ActionInternals for Action {
fn kind(&self) -> String {
String::from("ClusterAction")
String::from("Action")
}

fn action_job_spec(&self) -> ActionJob {
Expand Down

0 comments on commit 3b9159e

Please sign in to comment.