diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index c86c3664a..9968cc8d8 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -17,7 +17,7 @@ use dora_message::{ }; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; -use dora_tracing::set_up_tracing_opts; +use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; use eyre::{bail, Context}; use formatting::FormatDataflowError; @@ -29,6 +29,7 @@ use std::{ }; use tabwriter::TabWriter; use tokio::runtime::Builder; +use tracing::level_filters::LevelFilter; use uuid::Uuid; mod attach; @@ -90,6 +91,15 @@ enum Command { #[clap(hide = true, long)] internal_create_with_path_dependencies: bool, }, + /// Run a dataflow locally. + /// + /// Directly runs the given dataflow without connecting to a dora + /// coordinator or daemon. The dataflow is executed on the local machine. + Run { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH")] + dataflow: String, + }, /// Spawn coordinator and daemon in local mode (with default config) Up { /// Use a custom configuration @@ -274,7 +284,12 @@ fn run(args: Args) -> eyre::Result<()> { .as_ref() .map(|id| format!("{name}-{id}")) .unwrap_or(name.to_string()); - set_up_tracing_opts(name, !quiet, Some(&filename)) + let stdout = (!quiet).then_some(LevelFilter::WARN); + let file = Some(FileLogging { + file_name: filename, + filter: LevelFilter::INFO, + }); + set_up_tracing_opts(name, stdout, file) .context("failed to set up tracing subscriber")?; } Command::Runtime => { @@ -282,7 +297,16 @@ fn run(args: Args) -> eyre::Result<()> { } Command::Coordinator { quiet, .. } => { let name = "dora-coordinator"; - set_up_tracing_opts(name, !quiet, Some(name)) + let stdout = (!quiet).then_some(LevelFilter::WARN); + let file = Some(FileLogging { + file_name: name.to_owned(), + filter: LevelFilter::INFO, + }); + set_up_tracing_opts(name, stdout, file) + .context("failed to set up tracing subscriber")?; + } + Command::Run { .. } => { + set_up_tracing_opts("run", Some(LevelFilter::INFO), None) .context("failed to set up tracing subscriber")?; } _ => { @@ -328,6 +352,15 @@ fn run(args: Args) -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, + Command::Run { dataflow } => { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + let result = rt.block_on(Daemon::run_dataflow(&dataflow_path))?; + handle_dataflow_result(result, None)? + } Command::Up { config } => { up::up(config.as_deref())?; } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 166d31642..75d358a4e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -177,6 +177,8 @@ impl Daemon { let clock = Arc::new(HLC::default()); + let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; + let exit_when_done = spawn_command .nodes .iter() @@ -193,8 +195,9 @@ impl Daemon { timestamp, } }); + let events = (coordinator_events, ctrlc_events).merge(); let run_result = Self::run_general( - Box::pin(coordinator_events), + Box::pin(events), None, "".to_string(), Some(exit_when_done), @@ -324,12 +327,17 @@ impl Daemon { } } Event::CtrlC => { + tracing::info!("received ctrlc signal -> stopping all dataflows"); for dataflow in self.running.values_mut() { dataflow .stop_all(&mut self.coordinator_connection, &self.clock, None) .await?; } } + Event::SecondCtrlC => { + tracing::warn!("received second ctrlc signal -> exit immediately"); + bail!("received second ctrl-c signal"); + } } } @@ -1085,7 +1093,9 @@ impl Daemon { ) .await?; - dataflow.running_nodes.remove(node_id); + if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) { + pid.mark_as_stopped() + } if dataflow .running_nodes .iter() @@ -1469,12 +1479,51 @@ fn close_input( } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct RunningNode { - pid: Option, + pid: Option, node_config: NodeConfig, } +#[derive(Debug)] +struct ProcessId(Option); + +impl ProcessId { + pub fn new(process_id: u32) -> Self { + Self(Some(process_id)) + } + + pub fn mark_as_stopped(&mut self) { + self.0 = None; + } + + pub fn kill(&mut self) -> bool { + if let Some(pid) = self.0 { + let mut system = sysinfo::System::new(); + system.refresh_processes(); + + if let Some(process) = system.process(Pid::from(pid as usize)) { + process.kill(); + self.mark_as_stopped(); + return true; + } + } + + false + } +} + +impl Drop for ProcessId { + fn drop(&mut self) { + // kill the process if it's still running + if let Some(pid) = self.0 { + if self.kill() { + warn!("process {pid} was killed on drop because it was still running") + } + } + } +} + pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet @@ -1610,19 +1659,20 @@ impl RunningDataflow { let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock); } - let running_nodes = self.running_nodes.clone(); + let running_processes: Vec<_> = self + .running_nodes + .iter_mut() + .map(|(id, n)| (id.clone(), n.pid.take())) + .collect(); let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(15000)); tokio::time::sleep(duration).await; - let mut system = sysinfo::System::new(); - system.refresh_processes(); - for (node, node_details) in running_nodes.iter() { - if let Some(pid) = node_details.pid { - if let Some(process) = system.process(Pid::from(pid as usize)) { + for (node, pid) in running_processes { + if let Some(mut pid) = pid { + if pid.kill() { grace_duration_kills.insert(node.clone()); - process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", duration @@ -1697,6 +1747,7 @@ pub enum Event { DynamicNode(DynamicNodeEventWrapper), HeartbeatInterval, CtrlC, + SecondCtrlC, } impl From for Event { @@ -1777,25 +1828,27 @@ fn set_up_ctrlc_handler( ) -> Result>, eyre::ErrReport> { let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); - let mut ctrlc_sent = false; + let mut ctrlc_sent = 0; ctrlc::set_handler(move || { - if ctrlc_sent { - tracing::warn!("received second ctrlc signal -> aborting immediately"); - std::process::abort(); - } else { - tracing::info!("received ctrlc signal"); - if ctrlc_tx - .blocking_send(Timestamped { - inner: Event::CtrlC, - timestamp: clock.new_timestamp(), - }) - .is_err() - { - tracing::error!("failed to report ctrl-c event to dora-coordinator"); + let event = match ctrlc_sent { + 0 => Event::CtrlC, + 1 => Event::SecondCtrlC, + _ => { + tracing::warn!("received 3rd ctrlc signal -> aborting immediately"); + std::process::abort(); } - - ctrlc_sent = true; + }; + if ctrlc_tx + .blocking_send(Timestamped { + inner: event, + timestamp: clock.new_timestamp(), + }) + .is_err() + { + tracing::error!("failed to report ctrl-c event to dora-coordinator"); } + + ctrlc_sent += 1; }) .wrap_err("failed to set ctrl-c handler")?; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 87eca5a3b..d8b438845 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -153,6 +153,11 @@ pub async fn spawn_node( command.env(key, value.to_string()); } } + + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); + command .stdin(Stdio::null()) .stdout(Stdio::piped()) @@ -249,6 +254,9 @@ pub async fn spawn_node( command.env(key, value.to_string()); } } + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); command .stdin(Stdio::null()) @@ -262,6 +270,11 @@ pub async fn spawn_node( } }; + let pid = crate::ProcessId::new(child.id().context( + "Could not get the pid for the just spawned node and indicate that there is an error", + )?); + tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}"); + let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string()); if !dataflow_dir.exists() { std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?; @@ -272,9 +285,6 @@ pub async fn spawn_node( .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); - let pid = child.id().context( - "Could not get the pid for the just spawned node and indicate that there is an error", - )?; let running_node = RunningNode { pid: Some(pid), node_config, diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index 10c277237..48fa6820f 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -16,25 +16,35 @@ use tracing_subscriber::Registry; pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { - set_up_tracing_opts(name, true, None) + set_up_tracing_opts(name, Some(LevelFilter::WARN), None) } -pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> { +pub struct FileLogging { + pub file_name: String, + pub filter: LevelFilter, +} + +pub fn set_up_tracing_opts( + name: &str, + stdout: Option, + file: Option, +) -> eyre::Result<()> { let mut layers = Vec::new(); - if stdout { + if let Some(level) = stdout { // Filter log using `RUST_LOG`. More useful for CLI. - let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); + let env_filter = EnvFilter::from_default_env().or(level); let layer = tracing_subscriber::fmt::layer() .compact() .with_filter(env_filter); layers.push(layer.boxed()); } - if let Some(filename) = filename { + if let Some(file) = file { + let FileLogging { file_name, filter } = file; let out_dir = Path::new("out"); std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?; - let path = out_dir.join(filename).with_extension("txt"); + let path = out_dir.join(file_name).with_extension("txt"); let file = std::fs::OpenOptions::new() .create(true) .append(true) @@ -44,7 +54,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> let layer = tracing_subscriber::fmt::layer() .with_ansi(false) .with_writer(file) - .with_filter(LevelFilter::INFO); + .with_filter(filter); layers.push(layer.boxed()); }