From 1c93902b7ee9693eca9479cf07f9f5c3e8f620e9 Mon Sep 17 00:00:00 2001 From: Jarrod Overson Date: Fri, 8 Sep 2023 12:37:03 -0400 Subject: [PATCH] fix: surfaced output/errors from completed triggers --- crates/wick/wick-host/Cargo.toml | 1 + crates/wick/wick-host/src/app_host.rs | 9 +++-- crates/wick/wick-runtime/src/triggers/cli.rs | 35 +++++++++++-------- crates/wick/wick-runtime/src/triggers/http.rs | 18 ++++++++-- crates/wick/wick-runtime/src/triggers/time.rs | 12 ++++--- src/commands/run.rs | 22 +++++++++--- 6 files changed, 70 insertions(+), 27 deletions(-) diff --git a/crates/wick/wick-host/Cargo.toml b/crates/wick/wick-host/Cargo.toml index ed0f9ccc..56054af6 100644 --- a/crates/wick/wick-host/Cargo.toml +++ b/crates/wick/wick-host/Cargo.toml @@ -33,6 +33,7 @@ futures = { workspace = true } derive_builder = { workspace = true } option-utils = { workspace = true } async-trait = { workspace = true } +structured-output = { workspace = true } [dev-dependencies] diff --git a/crates/wick/wick-host/src/app_host.rs b/crates/wick/wick-host/src/app_host.rs index 9e458c9a..1b750f96 100644 --- a/crates/wick/wick-host/src/app_host.rs +++ b/crates/wick/wick-host/src/app_host.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use futures::future::{join_all, select}; use futures::pin_mut; +use structured_output::StructuredOutput; use tokio::task::{JoinError, JoinHandle}; use tracing::Span; use wick_config::config::AppConfiguration; @@ -129,7 +130,7 @@ impl AppHost { } #[allow(clippy::unused_async)] - pub async fn wait_for_done(&mut self) -> Result<()> { + pub async fn wait_for_done(&mut self) -> Result> { let state = self.triggers.take().unwrap(); let (triggers, start_tasks): (Vec<_>, Vec<_>) = state .triggers @@ -138,6 +139,7 @@ impl AppHost { .unzip(); join_all(start_tasks).await; self.span.in_scope(|| debug!("all triggers started")); + let mut all_output = Vec::new(); for trigger in &triggers { let ctrl_c = async { let _ = tokio::signal::ctrl_c().await; @@ -148,14 +150,15 @@ impl AppHost { self.span.in_scope(|| debug!("ctrl-c received, stopping triggers")); break; } - futures::future::Either::Right(_) => { + futures::future::Either::Right((output, _)) => { self.span.in_scope(|| debug!("trigger finished")); + all_output.push(output); } } } self.span.in_scope(|| debug!("all triggers finished")); - Ok(()) + Ok(all_output) } } diff --git a/crates/wick/wick-runtime/src/triggers/cli.rs b/crates/wick/wick-runtime/src/triggers/cli.rs index b9851c2f..b5c84a67 100644 --- a/crates/wick/wick-runtime/src/triggers/cli.rs +++ b/crates/wick/wick-runtime/src/triggers/cli.rs @@ -19,8 +19,8 @@ use crate::Runtime; #[derive(Debug)] pub(crate) struct Cli { - done_tx: Mutex>>, - done_rx: Mutex>>, + done_tx: Mutex>>, + done_rx: Mutex>>, } #[derive(Debug, PartialEq, Serialize, Deserialize, Default)] @@ -43,12 +43,7 @@ impl Cli { }) } - async fn handle( - &self, - runtime: Runtime, - operation: Entity, - args: Vec, - ) -> Result { + async fn handle(&self, runtime: Runtime, operation: Entity, args: Vec) -> Result<(), RuntimeError> { let is_interactive = Interactive { stdin: atty::is(atty::Stream::Stdin), stdout: atty::is(atty::Stream::Stdout), @@ -72,12 +67,24 @@ impl Cli { Ok(p) => { if p.port() == "code" && p.has_data() { let code: u32 = p.decode().unwrap(); - break StructuredOutput::new(format!("Exit code: {}", code), json!({ "code": code })); + let message = if code > 0 { + format!("Exit code: {}", code) + } else { + String::new() + }; + break StructuredOutput::new(message, json!({ "code": code })); + } + if p.is_error() { + let err = p.unwrap_err(); + break StructuredOutput::new( + format!("CLI Trigger produced error, {}", err.msg()), + json!({ "error": err.to_string() }), + ); } } Err(e) => { break StructuredOutput::new( - format!("CLI Trigger produced error: {}", e), + format!("CLI Trigger produced error, {}", e), json!({ "error": e.to_string() }), ); } @@ -90,9 +97,9 @@ impl Cli { } }; - let _ = self.done_tx.lock().take().unwrap().send(()); + let _ = self.done_tx.lock().take().unwrap().send(output); - Ok(output) + Ok(()) } } @@ -134,9 +141,9 @@ impl Trigger for Cli { Ok(()) } - async fn wait_for_done(&self) { + async fn wait_for_done(&self) -> StructuredOutput { let rx = self.done_rx.lock().take().unwrap(); - let _ = rx.await; + rx.await.unwrap_or_default() } } diff --git a/crates/wick/wick-runtime/src/triggers/http.rs b/crates/wick/wick-runtime/src/triggers/http.rs index 71724e27..45f62dee 100644 --- a/crates/wick/wick-runtime/src/triggers/http.rs +++ b/crates/wick/wick-runtime/src/triggers/http.rs @@ -193,16 +193,30 @@ impl Trigger for Http { Ok(()) } - async fn wait_for_done(&self) { + async fn wait_for_done(&self) -> StructuredOutput { let rx = if let Some(instance) = self.instance.lock().as_mut() { instance.running_rx.take() } else { None }; if let Some(rx) = rx { - let _ = rx.await; + match rx.await { + Ok(_) => { + info!("http trigger finished"); + StructuredOutput::new("http trigger finished", json!({"status": "http trigger finished"})) + } + Err(e) => { + error!(err=%e,"http trigger failed"); + let message = format!("http trigger failed: {}", e); + StructuredOutput::new(format!("http trigger failed: {}", e), json!({"status": message})) + } + } } else { error!("http trigger not running"); + StructuredOutput::new( + "http trigger not running", + json!({"status": "http trigger not running"}), + ) } } } diff --git a/crates/wick/wick-runtime/src/triggers/time.rs b/crates/wick/wick-runtime/src/triggers/time.rs index 04568a07..0576888b 100644 --- a/crates/wick/wick-runtime/src/triggers/time.rs +++ b/crates/wick/wick-runtime/src/triggers/time.rs @@ -171,17 +171,21 @@ impl Trigger for Time { Ok(()) } - async fn wait_for_done(&self) { + async fn wait_for_done(&self) -> StructuredOutput { let Some(handler) = self.handler.lock().take() else { - return; + return StructuredOutput::new("scheduled job never ran", json!({"status": "schedule job never ran"})); }; match handler.await { Ok(_) => { - info!("cron done"); + info!("time trigger done"); + StructuredOutput::new("scheduled job complete", json!({"status": "schedule job complete"})) } Err(e) => { - error!("cron error: {}", e); + error!(err=%e,"time trigger error"); + let message = format!("time trigger error: {}", e); + let json = json!({"error": message}); + StructuredOutput::new(message, json) } } } diff --git a/src/commands/run.rs b/src/commands/run.rs index be4420f8..1836de9b 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -70,14 +70,28 @@ pub(crate) async fn handle( .span(span.clone()) .build()?; - if !opts.dryrun { + let output = if !opts.dryrun { host.start()?; span.in_scope(|| debug!("waiting on triggers to finish...")); - host.wait_for_done().instrument(span.clone()).await?; + let output = host.wait_for_done().instrument(span.clone()).await?; + let mut lines = String::new(); + let mut json = Vec::new(); + for output in output { + if !output.lines.trim().is_empty() { + lines.push_str(&output.lines); + lines.push('\n'); + } + json.push(output.json); + } + StructuredOutput::new(lines, json!({"output":json})) } else { info!("application valid but not started because --dryrun set"); - } + StructuredOutput::new( + "application valid but not started because --dryrun set", + json!({"status":"valid"}), + ) + }; - Ok(StructuredOutput::new("", json!({}))) + Ok(output) }