From 82fa01df03c2b6e021abdffe30d102f3629908f1 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:49:27 -0600 Subject: [PATCH] wip --- src/procs.rs | 2 +- src/supervisor.rs | 76 ++++++++++++++++++++++++++++------------------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/src/procs.rs b/src/procs.rs index 35ee969..756c9eb 100644 --- a/src/procs.rs +++ b/src/procs.rs @@ -42,7 +42,7 @@ impl Procs { .process(sysinfo::Pid::from_u32(pid)) { process.kill(); - process.wait(); // TODO: make this async + process.wait(); true } else { false diff --git a/src/supervisor.rs b/src/supervisor.rs index 398cc42..04d5b7d 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -19,14 +19,14 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; #[cfg(unix)] use tokio::signal::unix::SignalKind; -use tokio::sync::broadcast; use tokio::sync::mpsc::Sender; +use tokio::sync::{broadcast, Mutex}; use tokio::{select, signal, task, time}; pub struct Supervisor { - state_file: StateFile, - last_refreshed_at: time::Instant, - active_pids: HashMap, + state_file: Mutex, + last_refreshed_at: Mutex, + active_pids: Mutex>, event_tx: broadcast::Sender, event_rx: broadcast::Receiver, pitchfork_bin_file_size: u64, @@ -50,8 +50,8 @@ impl Supervisor { pub async fn new(state_file: StateFile) -> Result { let (event_tx, event_rx) = broadcast::channel(1); Ok(Self { - state_file, - last_refreshed_at: time::Instant::now(), + state_file: Mutex::new(state_file), + last_refreshed_at: Mutex::new(time::Instant::now()), active_pids: Default::default(), event_tx, event_rx, @@ -69,12 +69,16 @@ impl Supervisor { pid: Some(pid), status: DaemonStatus::Running, }; - self.state_file.daemons.insert("pitchfork".into(), daemon); - self.state_file.write()?; + self.state_file + .lock() + .await + .daemons + .insert("pitchfork".into(), daemon); + self.state_file.lock().await.write()?; self.interval_watch()?; self.signals()?; - self.file_watch()?; + self.file_watch().await?; self.conn_watch().await?; loop { @@ -88,7 +92,7 @@ impl Supervisor { async fn handle(&mut self, event: Event) -> Result<()> { match event { Event::Interval => { - if self.last_refreshed_at.elapsed() < INTERVAL { + if self.last_refreshed_at.lock().await.elapsed() < INTERVAL { return Ok(()); } self.refresh().await @@ -99,11 +103,12 @@ impl Supervisor { let new_size = fs::metadata(&*env::BIN_PATH).into_diagnostic()?.len(); if new_size != self.pitchfork_bin_file_size { info!("pitchfork cli updated, restarting"); - self.restart(); + self.restart().await; } } - if paths.contains(&self.state_file.path) { - self.state_file = StateFile::read(&self.state_file.path)?; + let path = self.state_file.lock().await.path.clone(); + if paths.contains(&path) { + *self.state_file.lock().await = StateFile::read(&path)?; } self.refresh().await } @@ -113,28 +118,31 @@ impl Supervisor { } Event::Signal => { info!("received signal, stopping"); - self.close(); + self.close().await; exit(0) } Event::DaemonStop(daemon) => { - self.active_pids.remove(&daemon.pid.unwrap()); + self.active_pids.lock().await.remove(&daemon.pid.unwrap()); self.state_file + .lock() + .await .daemons .entry(daemon.name.clone()) .and_modify(|d| { d.pid = None; d.status = DaemonStatus::Stopped; }); - self.state_file.write()?; + self.state_file.lock().await.write()?; Ok(()) } _ => Ok(()), } } - async fn refresh(&mut self) -> Result<()> { + async fn refresh(&self) -> Result<()> { debug!("refreshing"); - self.last_refreshed_at = time::Instant::now(); + let mut last_refreshed_at = self.last_refreshed_at.lock().await; + *last_refreshed_at = time::Instant::now(); Ok(()) } @@ -157,7 +165,8 @@ impl Supervisor { let tx = self.event_tx.clone(); let mut event_rx = self.event_tx.subscribe(); info!("received run message: {name:?} cmd: {cmd:?}"); - if let Some(daemon) = self.state_file.daemons.get(name) { + let daemon = self.state_file.lock().await.daemons.get(name).cloned(); + if let Some(daemon) = daemon { if let Some(pid) = daemon.pid { warn!("daemon {name} already running with pid {}", pid); if let Err(err) = send @@ -215,7 +224,7 @@ impl Supervisor { { Ok(mut child) => { let pid = child.id().unwrap(); - self.active_pids.insert(pid, name.to_string()); + self.active_pids.lock().await.insert(pid, name.to_string()); info!("started daemon {name} with pid {pid}"); let daemon = StateFileDaemon { name: name.to_string(), @@ -223,9 +232,11 @@ impl Supervisor { status: DaemonStatus::Running, }; self.state_file + .lock() + .await .daemons .insert(name.to_string(), daemon.clone()); - self.state_file.write()?; + self.state_file.lock().await.write()?; tx.send(Event::DaemonStart(daemon.clone())).unwrap(); let name = name.to_string(); tokio::spawn(async move { @@ -276,21 +287,24 @@ impl Supervisor { async fn stop(&mut self, send: Sender, name: &str) -> Result<()> { info!("received stop message: {name}"); - if let Some(daemon) = self.state_file.daemons.get(name) { + let daemon = self.state_file.lock().await.daemons.get(name).cloned(); + if let Some(daemon) = daemon { if let Some(pid) = daemon.pid { PROCS.refresh_processes(); if PROCS.is_running(pid) { PROCS.kill_async(pid).await?; } - self.active_pids.remove(&pid); + self.active_pids.lock().await.remove(&pid); self.state_file + .lock() + .await .daemons .entry(name.to_string()) .and_modify(|d| { d.pid = None; d.status = DaemonStatus::Stopped; }); - self.state_file.write()?; + self.state_file.lock().await.write()?; if let Err(err) = send .send(IpcMessage::DaemonStop { name: name.to_string(), @@ -311,9 +325,9 @@ impl Supervisor { Ok(()) } - fn restart(&mut self) -> ! { + async fn restart(&mut self) -> ! { debug!("restarting"); - self.close(); + self.close().await; if *env::PITCHFORK_EXEC || cfg!(windows) { if let Err(err) = cmd!(&*env::BIN_PATH, "supervisor", "run", "--force").start() { panic!("failed to restart: {err:?}"); @@ -370,9 +384,9 @@ impl Supervisor { Ok(()) } - fn file_watch(&self) -> Result<()> { + async fn file_watch(&self) -> Result<()> { let bin_path = env::BIN_PATH.clone(); - let state_file = self.state_file.path.clone(); + let state_file = self.state_file.lock().await.path.clone(); let tx = self.event_tx.clone(); task::spawn(async move { let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap(); @@ -420,9 +434,9 @@ impl Supervisor { Ok(()) } - fn close(&mut self) { - self.state_file.daemons.remove("pitchfork"); - if let Err(err) = self.state_file.write() { + async fn close(&mut self) { + self.state_file.lock().await.daemons.remove("pitchfork"); + if let Err(err) = self.state_file.lock().await.write() { warn!("failed to update state file: {:?}", err); } let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);