diff --git a/src/supervisor.rs b/src/supervisor.rs index 34d6a4e..c76a043 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -124,111 +124,119 @@ impl Supervisor { } async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender) -> Result<()> { - let tx = self.event_tx.clone(); - let mut event_rx = self.event_tx.subscribe(); match msg { IpcMessage::Run(name, cmd) => { - info!("received run message: {name:?} cmd: {cmd:?}"); - task::spawn({ - let name = name.clone(); - async move { - while let Ok(ev) = event_rx.recv().await { - match ev { - Event::DaemonStart(daemon) => { - if daemon.name == name { - if let Err(err) = - send.send(IpcMessage::DaemonStart(daemon)).await - { - error!("failed to send message: {err:?}"); - } - return; - } - } - Event::DaemonFailed { name: n, error } => { - if n == name { - if let Err(err) = send - .send(IpcMessage::DaemonFailed { name, error }) - .await - { - error!("failed to send message: {err:?}"); - } - return; - } + self.run(send, &name, cmd)?; + } + _ => { + debug!("received unknown message: {msg}"); + } + } + Ok(()) + } + + fn run(&mut self, send: Sender, name: &str, cmd: Vec) -> Result<()> { + let tx = self.event_tx.clone(); + let mut event_rx = self.event_tx.subscribe(); + info!("received run message: {name:?} cmd: {cmd:?}"); + task::spawn({ + let name = name.to_string(); + async move { + while let Ok(ev) = event_rx.recv().await { + match ev { + Event::DaemonStart(daemon) => { + if daemon.name == name { + if let Err(err) = send.send(IpcMessage::DaemonStart(daemon)).await { + error!("failed to send message: {err:?}"); } - _ => {} + return; } } - } - }); - let args = vec!["-c".to_string(), cmd.join(" ")]; - let log_path = env::PITCHFORK_LOGS_DIR - .join(&name) - .join(format!("{name}.log")); - xx::file::mkdirp(log_path.parent().unwrap())?; - debug!("starting daemon: {name} with args: {args:?}"); - match tokio::process::Command::new("sh") - .args(&args) - .stdin(std::process::Stdio::null()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - { - Ok(mut child) => { - let pid = child.id().unwrap(); - self.active_pids.insert(pid, name.clone()); - info!("started daemon {name} with pid {pid}"); - let daemon = StateFileDaemon { - name: name.clone(), - pid, - status: DaemonStatus::Running, - }; - self.state_file.daemons.insert(name.clone(), daemon.clone()); - self.state_file.write()?; - tx.send(Event::DaemonStart(daemon.clone())).unwrap(); - tokio::spawn(async move { - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let mut stdout = tokio::io::BufReader::new(stdout).lines(); - let mut stderr = tokio::io::BufReader::new(stderr).lines(); - let mut log_appender = tokio::fs::File::options() - .append(true) - .create(true) - .open(&log_path) - .await - .unwrap(); - loop { - select! { - Ok(Some(line)) = stdout.next_line() => { - log_appender.write_all(line.as_bytes()).await.unwrap(); - log_appender.write_all(b"\n").await.unwrap(); - } - Ok(Some(line)) = stderr.next_line() => { - log_appender.write_all(line.as_bytes()).await.unwrap(); - log_appender.write_all(b"\n").await.unwrap(); - }, - else => break, + Event::DaemonFailed { name: n, error } => { + if n == name { + if let Err(err) = + send.send(IpcMessage::DaemonFailed { name, error }).await + { + error!("failed to send message: {err:?}"); } + return; } - let status = child.wait().await.unwrap(); - info!("daemon {name} exited with status {status}"); - tx.send(Event::DaemonStop(daemon)).unwrap(); - }); - } - Err(err) => { - info!("failed to start daemon: {err:?}"); - self.event_tx - .send(Event::DaemonFailed { - name, - error: format!("{err:?}"), - }) - .unwrap(); + } + _ => {} } } } - _ => { - debug!("received unknown message: {msg}"); + }); + let args = vec!["-c".to_string(), cmd.join(" ")]; + let log_path = env::PITCHFORK_LOGS_DIR + .join(name) + .join(format!("{name}.log")); + xx::file::mkdirp(log_path.parent().unwrap())?; + debug!("starting daemon: {name} with args: {args:?}"); + match tokio::process::Command::new("sh") + .args(&args) + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + { + Ok(mut child) => { + let pid = child.id().unwrap(); + self.active_pids.insert(pid, name.to_string()); + info!("started daemon {name} with pid {pid}"); + let daemon = StateFileDaemon { + name: name.to_string(), + pid, + status: DaemonStatus::Running, + }; + self.state_file + .daemons + .insert(name.to_string(), daemon.clone()); + self.state_file.write()?; + tx.send(Event::DaemonStart(daemon.clone())).unwrap(); + let name = name.to_string(); + tokio::spawn(async move { + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + let mut stdout = tokio::io::BufReader::new(stdout).lines(); + let mut stderr = tokio::io::BufReader::new(stderr).lines(); + let mut log_appender = tokio::fs::File::options() + .append(true) + .create(true) + .open(&log_path) + .await + .unwrap(); + loop { + select! { + Ok(Some(line)) = stdout.next_line() => { + debug!("stdout: {name} {line}"); + log_appender.write_all(line.as_bytes()).await.unwrap(); + log_appender.write_all(b"\n").await.unwrap(); + } + Ok(Some(line)) = stderr.next_line() => { + debug!("stderr: {name} {line}"); + log_appender.write_all(line.as_bytes()).await.unwrap(); + log_appender.write_all(b"\n").await.unwrap(); + }, + else => break, + } + } + let status = child.wait().await.unwrap(); + info!("daemon {name} exited with status {status}"); + tx.send(Event::DaemonStop(daemon)).unwrap(); + }); + } + Err(err) => { + info!("failed to start daemon: {err:?}"); + self.event_tx + .send(Event::DaemonFailed { + name: name.to_string(), + error: format!("{err:?}"), + }) + .unwrap(); } } + Ok(()) }