Skip to content

Commit

Permalink
fix: surfaced output/errors from completed triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Sep 8, 2023
1 parent 2ce019f commit 1c93902
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 27 deletions.
1 change: 1 addition & 0 deletions crates/wick/wick-host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ futures = { workspace = true }
derive_builder = { workspace = true }
option-utils = { workspace = true }
async-trait = { workspace = true }
structured-output = { workspace = true }


[dev-dependencies]
Expand Down
9 changes: 6 additions & 3 deletions crates/wick/wick-host/src/app_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use futures::future::{join_all, select};
use futures::pin_mut;
use structured_output::StructuredOutput;
use tokio::task::{JoinError, JoinHandle};
use tracing::Span;
use wick_config::config::AppConfiguration;
Expand Down Expand Up @@ -129,7 +130,7 @@ impl AppHost {
}

#[allow(clippy::unused_async)]
pub async fn wait_for_done(&mut self) -> Result<()> {
pub async fn wait_for_done(&mut self) -> Result<Vec<StructuredOutput>> {
let state = self.triggers.take().unwrap();
let (triggers, start_tasks): (Vec<_>, Vec<_>) = state
.triggers
Expand All @@ -138,6 +139,7 @@ impl AppHost {
.unzip();
join_all(start_tasks).await;
self.span.in_scope(|| debug!("all triggers started"));
let mut all_output = Vec::new();
for trigger in &triggers {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
Expand All @@ -148,14 +150,15 @@ impl AppHost {
self.span.in_scope(|| debug!("ctrl-c received, stopping triggers"));
break;
}
futures::future::Either::Right(_) => {
futures::future::Either::Right((output, _)) => {
self.span.in_scope(|| debug!("trigger finished"));
all_output.push(output);
}
}
}
self.span.in_scope(|| debug!("all triggers finished"));

Ok(())
Ok(all_output)
}
}

Expand Down
35 changes: 21 additions & 14 deletions crates/wick/wick-runtime/src/triggers/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::Runtime;

#[derive(Debug)]
pub(crate) struct Cli {
done_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
done_rx: Mutex<Option<tokio::sync::oneshot::Receiver<()>>>,
done_tx: Mutex<Option<tokio::sync::oneshot::Sender<StructuredOutput>>>,
done_rx: Mutex<Option<tokio::sync::oneshot::Receiver<StructuredOutput>>>,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
Expand All @@ -43,12 +43,7 @@ impl Cli {
})
}

async fn handle(
&self,
runtime: Runtime,
operation: Entity,
args: Vec<String>,
) -> Result<StructuredOutput, RuntimeError> {
async fn handle(&self, runtime: Runtime, operation: Entity, args: Vec<String>) -> Result<(), RuntimeError> {
let is_interactive = Interactive {
stdin: atty::is(atty::Stream::Stdin),
stdout: atty::is(atty::Stream::Stdout),
Expand All @@ -72,12 +67,24 @@ impl Cli {
Ok(p) => {
if p.port() == "code" && p.has_data() {
let code: u32 = p.decode().unwrap();
break StructuredOutput::new(format!("Exit code: {}", code), json!({ "code": code }));
let message = if code > 0 {
format!("Exit code: {}", code)
} else {
String::new()
};
break StructuredOutput::new(message, json!({ "code": code }));
}
if p.is_error() {
let err = p.unwrap_err();
break StructuredOutput::new(
format!("CLI Trigger produced error, {}", err.msg()),
json!({ "error": err.to_string() }),
);
}
}
Err(e) => {
break StructuredOutput::new(
format!("CLI Trigger produced error: {}", e),
format!("CLI Trigger produced error, {}", e),
json!({ "error": e.to_string() }),
);
}
Expand All @@ -90,9 +97,9 @@ impl Cli {
}
};

let _ = self.done_tx.lock().take().unwrap().send(());
let _ = self.done_tx.lock().take().unwrap().send(output);

Ok(output)
Ok(())
}
}

Expand Down Expand Up @@ -134,9 +141,9 @@ impl Trigger for Cli {
Ok(())
}

async fn wait_for_done(&self) {
async fn wait_for_done(&self) -> StructuredOutput {
let rx = self.done_rx.lock().take().unwrap();
let _ = rx.await;
rx.await.unwrap_or_default()
}
}

Expand Down
18 changes: 16 additions & 2 deletions crates/wick/wick-runtime/src/triggers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,30 @@ impl Trigger for Http {
Ok(())
}

async fn wait_for_done(&self) {
async fn wait_for_done(&self) -> StructuredOutput {
let rx = if let Some(instance) = self.instance.lock().as_mut() {
instance.running_rx.take()
} else {
None
};
if let Some(rx) = rx {
let _ = rx.await;
match rx.await {
Ok(_) => {
info!("http trigger finished");
StructuredOutput::new("http trigger finished", json!({"status": "http trigger finished"}))
}
Err(e) => {
error!(err=%e,"http trigger failed");
let message = format!("http trigger failed: {}", e);
StructuredOutput::new(format!("http trigger failed: {}", e), json!({"status": message}))
}
}
} else {
error!("http trigger not running");
StructuredOutput::new(
"http trigger not running",
json!({"status": "http trigger not running"}),
)
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions crates/wick/wick-runtime/src/triggers/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,21 @@ impl Trigger for Time {
Ok(())
}

async fn wait_for_done(&self) {
async fn wait_for_done(&self) -> StructuredOutput {
let Some(handler) = self.handler.lock().take() else {
return;
return StructuredOutput::new("scheduled job never ran", json!({"status": "schedule job never ran"}));
};

match handler.await {
Ok(_) => {
info!("cron done");
info!("time trigger done");
StructuredOutput::new("scheduled job complete", json!({"status": "schedule job complete"}))
}
Err(e) => {
error!("cron error: {}", e);
error!(err=%e,"time trigger error");
let message = format!("time trigger error: {}", e);
let json = json!({"error": message});
StructuredOutput::new(message, json)
}
}
}
Expand Down
22 changes: 18 additions & 4 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,28 @@ pub(crate) async fn handle(
.span(span.clone())
.build()?;

if !opts.dryrun {
let output = if !opts.dryrun {
host.start()?;
span.in_scope(|| debug!("waiting on triggers to finish..."));

host.wait_for_done().instrument(span.clone()).await?;
let output = host.wait_for_done().instrument(span.clone()).await?;
let mut lines = String::new();
let mut json = Vec::new();
for output in output {
if !output.lines.trim().is_empty() {
lines.push_str(&output.lines);
lines.push('\n');
}
json.push(output.json);
}
StructuredOutput::new(lines, json!({"output":json}))
} else {
info!("application valid but not started because --dryrun set");
}
StructuredOutput::new(
"application valid but not started because --dryrun set",
json!({"status":"valid"}),
)
};

Ok(StructuredOutput::new("", json!({})))
Ok(output)
}

0 comments on commit 1c93902

Please sign in to comment.