Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dora run command #703

Merged
merged 7 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions binaries/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use dora_message::{
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
Expand All @@ -29,6 +29,7 @@ use std::{
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
Expand Down Expand Up @@ -90,6 +91,15 @@ enum Command {
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Run a dataflow locally.
///
/// Directly runs the given dataflow without connecting to a dora
/// coordinator or daemon. The dataflow is executed on the local machine.
Run {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
},
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
Expand Down Expand Up @@ -274,15 +284,29 @@ fn run(args: Args) -> eyre::Result<()> {
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: filename,
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: name.to_owned(),
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Run { .. } => {
set_up_tracing_opts("run", Some(LevelFilter::INFO), None)
.context("failed to set up tracing subscriber")?;
}
_ => {
Expand Down Expand Up @@ -328,6 +352,15 @@ fn run(args: Args) -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path))?;
handle_dataflow_result(result, None)?
}
Command::Up { config } => {
up::up(config.as_deref())?;
}
Expand Down
107 changes: 80 additions & 27 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?

Check warning on line 160 in binaries/daemon/src/lib.rs

View workflow job for this annotation

GitHub Actions / Typos

"canoncialize" should be "canonicalize".
.parent()
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();
Expand All @@ -177,6 +177,8 @@

let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

let exit_when_done = spawn_command
.nodes
.iter()
Expand All @@ -193,8 +195,9 @@
timestamp,
}
});
let events = (coordinator_events, ctrlc_events).merge();
let run_result = Self::run_general(
Box::pin(coordinator_events),
Box::pin(events),
None,
"".to_string(),
Some(exit_when_done),
Expand Down Expand Up @@ -324,12 +327,17 @@
}
}
Event::CtrlC => {
tracing::info!("received ctrlc signal -> stopping all dataflows");
for dataflow in self.running.values_mut() {
dataflow
.stop_all(&mut self.coordinator_connection, &self.clock, None)
.await?;
}
}
Event::SecondCtrlC => {
tracing::warn!("received second ctrlc signal -> exit immediately");
bail!("received second ctrl-c signal");
}
}
}

Expand Down Expand Up @@ -1085,7 +1093,9 @@
)
.await?;

dataflow.running_nodes.remove(node_id);
if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
pid.mark_as_stopped()
}
if dataflow
.running_nodes
.iter()
Expand Down Expand Up @@ -1469,12 +1479,51 @@
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RunningNode {
pid: Option<u32>,
pid: Option<ProcessId>,
node_config: NodeConfig,
}

#[derive(Debug)]
struct ProcessId(Option<u32>);

impl ProcessId {
pub fn new(process_id: u32) -> Self {
Self(Some(process_id))
}

pub fn mark_as_stopped(&mut self) {
self.0 = None;
}

pub fn kill(&mut self) -> bool {
if let Some(pid) = self.0 {
let mut system = sysinfo::System::new();
system.refresh_processes();

if let Some(process) = system.process(Pid::from(pid as usize)) {
process.kill();
self.mark_as_stopped();
return true;
}
}

false
}
}

impl Drop for ProcessId {
fn drop(&mut self) {
// kill the process if it's still running
if let Some(pid) = self.0 {
if self.kill() {
warn!("process {pid} was killed on drop because it was still running")
}
}
}
}

pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
Expand Down Expand Up @@ -1610,19 +1659,20 @@
let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
}

let running_nodes = self.running_nodes.clone();
let running_processes: Vec<_> = self
.running_nodes
.iter_mut()
.map(|(id, n)| (id.clone(), n.pid.take()))
.collect();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();

for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
for (node, pid) in running_processes {
if let Some(mut pid) = pid {
if pid.kill() {
grace_duration_kills.insert(node.clone());
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
Expand Down Expand Up @@ -1697,6 +1747,7 @@
DynamicNode(DynamicNodeEventWrapper),
HeartbeatInterval,
CtrlC,
SecondCtrlC,
}

impl From<DoraEvent> for Event {
Expand Down Expand Up @@ -1777,25 +1828,27 @@
) -> Result<impl Stream<Item = Timestamped<Event>>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
let mut ctrlc_sent = 0;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx
.blocking_send(Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
let event = match ctrlc_sent {
0 => Event::CtrlC,
1 => Event::SecondCtrlC,
_ => {
tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
std::process::abort();
}

ctrlc_sent = true;
};
if ctrlc_tx
.blocking_send(Timestamped {
inner: event,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent += 1;
})
.wrap_err("failed to set ctrl-c handler")?;

Expand Down
8 changes: 5 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
let resolved_path = if source_is_url(source) {
// try to download the shared library
let target_dir = Path::new("build");
download_file(source, &target_dir)

Check warning on line 104 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.await
.wrap_err("failed to download custom node")?
} else {
Expand Down Expand Up @@ -262,6 +262,11 @@
}
};

let pid = crate::ProcessId::new(child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?);
tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}");

let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
Expand All @@ -272,9 +277,6 @@
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let pid = child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?;
let running_node = RunningNode {
pid: Some(pid),
node_config,
Expand Down
24 changes: 17 additions & 7 deletions libraries/extensions/telemetry/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,35 @@ use tracing_subscriber::Registry;
pub mod telemetry;

pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
set_up_tracing_opts(name, true, None)
set_up_tracing_opts(name, Some(LevelFilter::WARN), None)
}

pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> {
pub struct FileLogging {
pub file_name: String,
pub filter: LevelFilter,
}

pub fn set_up_tracing_opts(
name: &str,
stdout: Option<LevelFilter>,
file: Option<FileLogging>,
) -> eyre::Result<()> {
let mut layers = Vec::new();

if stdout {
if let Some(level) = stdout {
// Filter log using `RUST_LOG`. More useful for CLI.
let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN);
let env_filter = EnvFilter::from_default_env().or(level);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_filter(env_filter);
layers.push(layer.boxed());
}

if let Some(filename) = filename {
if let Some(file) = file {
let FileLogging { file_name, filter } = file;
let out_dir = Path::new("out");
std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?;
let path = out_dir.join(filename).with_extension("txt");
let path = out_dir.join(file_name).with_extension("txt");
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
Expand All @@ -44,7 +54,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) ->
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file)
.with_filter(LevelFilter::INFO);
.with_filter(filter);
layers.push(layer.boxed());
}

Expand Down
Loading