Skip to content

Commit

Permalink
Better error reporting and cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
jaywonchung committed Nov 29, 2023
1 parent 3584f00 commit 1ce274a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "pegasus-ssh"
description = "Pegasus: A Multi-Node SSH Command Runner"
authors = ["Jae-Won Chung <[email protected]>"]
version = "1.2.0"
version = "1.2.1"
edition = "2021"
repository = "https://github.com/jaywonchung/pegasus"
license = "MIT"
Expand Down
12 changes: 11 additions & 1 deletion src/job.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::str::FromStr;
use std::sync::Arc;

use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::time;
use void::Void;

Expand Down Expand Up @@ -68,13 +70,15 @@ impl FromStr for JobSpecInner {
pub struct JobQueue {
fetched: Vec<Cmd>,
queue_file: String,
cancelled: Arc<Mutex<bool>>,
}

impl JobQueue {
pub fn new(queue_file: &str) -> Self {
pub fn new(queue_file: &str, cancelled: Arc<Mutex<bool>>) -> Self {
Self {
fetched: Vec::new(),
queue_file: queue_file.to_owned(),
cancelled,
}
}

Expand All @@ -90,6 +94,12 @@ impl JobQueue {
async fn fetch(&mut self) {
loop {
let queue_file = LockedFile::acquire(&self.queue_file).await;
// This handles the case where the user killed Pegasus while having
// the queue file open in lock mode.
if *self.cancelled.lock().await {
eprintln!("[pegasus] Ctrl-c detected. Not fetching another job.");
return;
}
let file = queue_file.read_handle();
let job_specs: Result<Vec<JobSpec>, _> = serde_yaml::from_reader(file);
if let Ok(mut job_specs) = job_specs {
Expand Down
33 changes: 30 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> {
// them to SSH sessions. Wait 0.5s so that we don't touch the queue file
// when some sesions fail to connect.
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut job_queue = JobQueue::new(&cli.queue_file);
let mut job_queue = JobQueue::new(&cli.queue_file, Arc::clone(&cancelled));
loop {
// Check cancel.
if *cancelled.lock().await {
Expand Down Expand Up @@ -130,6 +130,13 @@ async fn run_broadcast(cli: &Config) -> Result<(), PegasusError> {
// Wait for all session tasks to finish cleanup.
join_all(tasks).await;

// TODO: Better reporting of which command failed on which host.
if errored.load(Ordering::SeqCst) {
eprintln!("[Pegasus] Some commands failed.");
} else {
eprintln!("[Pegasus] All commands finished successfully.");
}

Ok(())
}

Expand All @@ -149,6 +156,12 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
*cancelled_handler.lock().await = true;
});

// An atomic variable set whenever a session errors. Later read by
// the scheduling loop to determine whether or not to exit.
// TODO: Make this a Vec of hostnames so that we can report which hosts
// failed specifically.
let errored = Arc::new(AtomicBool::new(false));

// MPMC channel (used as MPSC) for requesting the scheduling loop a new command.
let (notify_tx, notify_rx) = flume::bounded(hosts.len());

Expand All @@ -161,6 +174,7 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
command_txs.push(command_tx);
let notify_tx = notify_tx.clone();
let print_period = cli.print_period;
let errored = Arc::clone(&errored);
// Open a new SSH session with the host.
let session = host.connect(color).await?;
tasks.push(tokio::spawn(async move {
Expand All @@ -179,7 +193,10 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
match command_rx.recv_async().await {
Ok(cmd) => {
let cmd = cmd.fill_template(&mut registry, &host);
let _ = session.run(cmd, print_period).await;
let result = session.run(cmd, print_period).await;
if result.is_err() || result.unwrap().code() != Some(0) {
errored.store(true, Ordering::Relaxed);
}
}
Err(_) => break,
};
Expand All @@ -189,7 +206,7 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {

// The scheduling loop that fetches jobs from the job queue and distributes
// them to SSH sessions.
let mut job_queue = JobQueue::new(&cli.queue_file);
let mut job_queue = JobQueue::new(&cli.queue_file, Arc::clone(&cancelled));
let mut host_index;
loop {
// `recv_async` will allow the scheduler to react to a new free session
Expand Down Expand Up @@ -232,6 +249,13 @@ async fn run_queue(cli: &Config) -> Result<(), PegasusError> {
// Wait for all of them to finish.
join_all(tasks).await;

// TODO: Better reporting of which command failed on which host.
if errored.load(Ordering::SeqCst) {
eprintln!("[Pegasus] Some commands failed.");
} else {
eprintln!("[Pegasus] All commands finished successfully.");
}

Ok(())
}

Expand Down Expand Up @@ -263,6 +287,9 @@ async fn main() -> Result<(), PegasusError> {
}
Mode::Queue => {
eprintln!("[Pegasus] Running in queue mode!");
if cli.error_aborts {
eprintln!("[Pegasus] Queue mode does not support aborting on error (-e).");
}
run_queue(&cli).await?;
}
Mode::Lock => {
Expand Down

0 comments on commit 1ce274a

Please sign in to comment.