Skip to content

Commit

Permalink
Add ctrl-c handler and kill child processes on second ctrl-c
Browse files Browse the repository at this point in the history
On first ctrl-c, send a stop command to all nodes. On second ctrl-c, exit immediately and kill all spawned nodes. On third ctrl-c, abort the process directly without waiting (child processes keep running).

This change affects both `dora run` and `dora daemon` commands.
  • Loading branch information
phil-opp committed Nov 11, 2024
1 parent cd70382 commit 4437618
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 30 deletions.
107 changes: 80 additions & 27 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ impl Daemon {

let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

let exit_when_done = spawn_command
.nodes
.iter()
Expand All @@ -193,8 +195,9 @@ impl Daemon {
timestamp,
}
});
let events = (coordinator_events, ctrlc_events).merge();
let run_result = Self::run_general(
Box::pin(coordinator_events),
Box::pin(events),
None,
"".to_string(),
Some(exit_when_done),
Expand Down Expand Up @@ -324,12 +327,17 @@ impl Daemon {
}
}
Event::CtrlC => {
tracing::info!("received ctrlc signal -> stopping all dataflows");
for dataflow in self.running.values_mut() {
dataflow
.stop_all(&mut self.coordinator_connection, &self.clock, None)
.await?;
}
}
Event::SecondCtrlC => {
tracing::warn!("received second ctrlc signal -> exit immediately");
bail!("received second ctrl-c signal");
}
}
}

Expand Down Expand Up @@ -1085,7 +1093,9 @@ impl Daemon {
)
.await?;

dataflow.running_nodes.remove(node_id);
if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
pid.mark_as_stopped()
}
if dataflow
.running_nodes
.iter()
Expand Down Expand Up @@ -1469,12 +1479,51 @@ fn close_input(
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RunningNode {
pid: Option<u32>,
pid: Option<ProcessId>,
node_config: NodeConfig,
}

#[derive(Debug)]
struct ProcessId(Option<u32>);

impl ProcessId {
pub fn new(process_id: u32) -> Self {
Self(Some(process_id))
}

pub fn mark_as_stopped(&mut self) {
self.0 = None;
}

pub fn kill(&mut self) -> bool {
if let Some(pid) = self.0 {
let mut system = sysinfo::System::new();
system.refresh_processes();

if let Some(process) = system.process(Pid::from(pid as usize)) {
process.kill();
self.mark_as_stopped();
return true;
}
}

false
}
}

impl Drop for ProcessId {
fn drop(&mut self) {
// kill the process if it's still running
if let Some(pid) = self.0 {
if self.kill() {
warn!("process {pid} was killed on drop because it was still running")
}
}
}
}

pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
Expand Down Expand Up @@ -1610,19 +1659,20 @@ impl RunningDataflow {
let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
}

let running_nodes = self.running_nodes.clone();
let running_processes: Vec<_> = self
.running_nodes
.iter_mut()
.map(|(id, n)| (id.clone(), n.pid.take()))
.collect();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();

for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
for (node, pid) in running_processes {
if let Some(mut pid) = pid {
if pid.kill() {
grace_duration_kills.insert(node.clone());
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
Expand Down Expand Up @@ -1697,6 +1747,7 @@ pub enum Event {
DynamicNode(DynamicNodeEventWrapper),
HeartbeatInterval,
CtrlC,
SecondCtrlC,
}

impl From<DoraEvent> for Event {
Expand Down Expand Up @@ -1777,25 +1828,27 @@ fn set_up_ctrlc_handler(
) -> Result<impl Stream<Item = Timestamped<Event>>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
let mut ctrlc_sent = 0;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx
.blocking_send(Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
let event = match ctrlc_sent {
0 => Event::CtrlC,
1 => Event::SecondCtrlC,
_ => {
tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
std::process::abort();
}

ctrlc_sent = true;
};
if ctrlc_tx
.blocking_send(Timestamped {
inner: event,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent += 1;
})
.wrap_err("failed to set ctrl-c handler")?;

Expand Down
8 changes: 5 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ pub async fn spawn_node(
}
};

let pid = crate::ProcessId::new(child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?);
tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}");

let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
Expand All @@ -272,9 +277,6 @@ pub async fn spawn_node(
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let pid = child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?;
let running_node = RunningNode {
pid: Some(pid),
node_config,
Expand Down

0 comments on commit 4437618

Please sign in to comment.