Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 11, 2024
1 parent 775067a commit 82fa01d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/procs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Procs {
.process(sysinfo::Pid::from_u32(pid))
{
process.kill();
process.wait(); // TODO: make this async
process.wait();
true
} else {
false
Expand Down
76 changes: 45 additions & 31 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, Mutex};
use tokio::{select, signal, task, time};

pub struct Supervisor {
state_file: StateFile,
last_refreshed_at: time::Instant,
active_pids: HashMap<u32, String>,
state_file: Mutex<StateFile>,
last_refreshed_at: Mutex<time::Instant>,
active_pids: Mutex<HashMap<u32, String>>,
event_tx: broadcast::Sender<Event>,
event_rx: broadcast::Receiver<Event>,
pitchfork_bin_file_size: u64,
Expand All @@ -50,8 +50,8 @@ impl Supervisor {
pub async fn new(state_file: StateFile) -> Result<Self> {
let (event_tx, event_rx) = broadcast::channel(1);
Ok(Self {
state_file,
last_refreshed_at: time::Instant::now(),
state_file: Mutex::new(state_file),
last_refreshed_at: Mutex::new(time::Instant::now()),
active_pids: Default::default(),
event_tx,
event_rx,
Expand All @@ -69,12 +69,16 @@ impl Supervisor {
pid: Some(pid),
status: DaemonStatus::Running,
};
self.state_file.daemons.insert("pitchfork".into(), daemon);
self.state_file.write()?;
self.state_file
.lock()
.await
.daemons
.insert("pitchfork".into(), daemon);
self.state_file.lock().await.write()?;

self.interval_watch()?;
self.signals()?;
self.file_watch()?;
self.file_watch().await?;
self.conn_watch().await?;

loop {
Expand All @@ -88,7 +92,7 @@ impl Supervisor {
async fn handle(&mut self, event: Event) -> Result<()> {
match event {
Event::Interval => {
if self.last_refreshed_at.elapsed() < INTERVAL {
if self.last_refreshed_at.lock().await.elapsed() < INTERVAL {
return Ok(());
}
self.refresh().await
Expand All @@ -99,11 +103,12 @@ impl Supervisor {
let new_size = fs::metadata(&*env::BIN_PATH).into_diagnostic()?.len();
if new_size != self.pitchfork_bin_file_size {
info!("pitchfork cli updated, restarting");
self.restart();
self.restart().await;
}
}
if paths.contains(&self.state_file.path) {
self.state_file = StateFile::read(&self.state_file.path)?;
let path = self.state_file.lock().await.path.clone();
if paths.contains(&path) {
*self.state_file.lock().await = StateFile::read(&path)?;
}
self.refresh().await
}
Expand All @@ -113,28 +118,31 @@ impl Supervisor {
}
Event::Signal => {
info!("received signal, stopping");
self.close();
self.close().await;
exit(0)
}
Event::DaemonStop(daemon) => {
self.active_pids.remove(&daemon.pid.unwrap());
self.active_pids.lock().await.remove(&daemon.pid.unwrap());
self.state_file
.lock()
.await
.daemons
.entry(daemon.name.clone())
.and_modify(|d| {
d.pid = None;
d.status = DaemonStatus::Stopped;
});
self.state_file.write()?;
self.state_file.lock().await.write()?;
Ok(())
}
_ => Ok(()),
}
}

async fn refresh(&mut self) -> Result<()> {
async fn refresh(&self) -> Result<()> {
debug!("refreshing");
self.last_refreshed_at = time::Instant::now();
let mut last_refreshed_at = self.last_refreshed_at.lock().await;
*last_refreshed_at = time::Instant::now();
Ok(())
}

Expand All @@ -157,7 +165,8 @@ impl Supervisor {
let tx = self.event_tx.clone();
let mut event_rx = self.event_tx.subscribe();
info!("received run message: {name:?} cmd: {cmd:?}");
if let Some(daemon) = self.state_file.daemons.get(name) {
let daemon = self.state_file.lock().await.daemons.get(name).cloned();
if let Some(daemon) = daemon {
if let Some(pid) = daemon.pid {
warn!("daemon {name} already running with pid {}", pid);
if let Err(err) = send
Expand Down Expand Up @@ -215,17 +224,19 @@ impl Supervisor {
{
Ok(mut child) => {
let pid = child.id().unwrap();
self.active_pids.insert(pid, name.to_string());
self.active_pids.lock().await.insert(pid, name.to_string());
info!("started daemon {name} with pid {pid}");
let daemon = StateFileDaemon {
name: name.to_string(),
pid: Some(pid),
status: DaemonStatus::Running,
};
self.state_file
.lock()
.await
.daemons
.insert(name.to_string(), daemon.clone());
self.state_file.write()?;
self.state_file.lock().await.write()?;
tx.send(Event::DaemonStart(daemon.clone())).unwrap();
let name = name.to_string();
tokio::spawn(async move {
Expand Down Expand Up @@ -276,21 +287,24 @@ impl Supervisor {

async fn stop(&mut self, send: Sender<IpcMessage>, name: &str) -> Result<()> {
info!("received stop message: {name}");
if let Some(daemon) = self.state_file.daemons.get(name) {
let daemon = self.state_file.lock().await.daemons.get(name).cloned();
if let Some(daemon) = daemon {
if let Some(pid) = daemon.pid {
PROCS.refresh_processes();
if PROCS.is_running(pid) {
PROCS.kill_async(pid).await?;
}
self.active_pids.remove(&pid);
self.active_pids.lock().await.remove(&pid);
self.state_file
.lock()
.await
.daemons
.entry(name.to_string())
.and_modify(|d| {
d.pid = None;
d.status = DaemonStatus::Stopped;
});
self.state_file.write()?;
self.state_file.lock().await.write()?;
if let Err(err) = send
.send(IpcMessage::DaemonStop {
name: name.to_string(),
Expand All @@ -311,9 +325,9 @@ impl Supervisor {
Ok(())
}

fn restart(&mut self) -> ! {
async fn restart(&mut self) -> ! {
debug!("restarting");
self.close();
self.close().await;
if *env::PITCHFORK_EXEC || cfg!(windows) {
if let Err(err) = cmd!(&*env::BIN_PATH, "supervisor", "run", "--force").start() {
panic!("failed to restart: {err:?}");
Expand Down Expand Up @@ -370,9 +384,9 @@ impl Supervisor {
Ok(())
}

fn file_watch(&self) -> Result<()> {
async fn file_watch(&self) -> Result<()> {
let bin_path = env::BIN_PATH.clone();
let state_file = self.state_file.path.clone();
let state_file = self.state_file.lock().await.path.clone();
let tx = self.event_tx.clone();
task::spawn(async move {
let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap();
Expand Down Expand Up @@ -420,9 +434,9 @@ impl Supervisor {
Ok(())
}

fn close(&mut self) {
self.state_file.daemons.remove("pitchfork");
if let Err(err) = self.state_file.write() {
async fn close(&mut self) {
self.state_file.lock().await.daemons.remove("pitchfork");
if let Err(err) = self.state_file.lock().await.write() {
warn!("failed to update state file: {:?}", err);
}
let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
Expand Down

0 comments on commit 82fa01d

Please sign in to comment.