From 327a76a5b6fd0beb63dafc766eedda1c5022b6a2 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Thu, 12 Dec 2024 09:37:04 -0600 Subject: [PATCH] wip --- docs/cli/commands.json | 6 +- docs/cli/index.md | 2 +- docs/cli/run.md | 4 +- mise.toml | 2 +- pitchfork.usage.kdl | 2 +- src/cli/run.rs | 85 +++---- src/cli/supervisor/run.rs | 4 +- src/env.rs | 14 +- src/ipc/client.rs | 26 +- src/ipc/mod.rs | 25 +- src/ipc/server.rs | 42 ++-- src/logger.rs | 11 +- src/procs.rs | 13 +- src/state_file.rs | 17 +- src/supervisor.rs | 501 ++++++++++++++++---------------------- 15 files changed, 347 insertions(+), 407 deletions(-) diff --git a/docs/cli/commands.json b/docs/cli/commands.json index 6f607cd..b6ac4cc 100644 --- a/docs/cli/commands.json +++ b/docs/cli/commands.json @@ -316,12 +316,12 @@ "full_cmd": [ "run" ], - "usage": "run [-f --force] [CMD]...", + "usage": "run [-f --force] [CMD]...", "subcommands": {}, "args": [ { - "name": "NAME", - "usage": "", + "name": "ID", + "usage": "", "help": "Name of the daemon to run", "help_first_line": "Name of the daemon to run", "required": true, diff --git a/docs/cli/index.md b/docs/cli/index.md index 8f51481..af4db09 100644 --- a/docs/cli/index.md +++ b/docs/cli/index.md @@ -16,7 +16,7 @@ - [`pitchfork enable `](/cli/enable.md) - [`pitchfork list [--hide-header]`](/cli/list.md) - [`pitchfork logs [-n ] [-t --tail] [ID]...`](/cli/logs.md) -- [`pitchfork run [-f --force] [CMD]...`](/cli/run.md) +- [`pitchfork run [-f --force] [CMD]...`](/cli/run.md) - [`pitchfork start [NAME]...`](/cli/start.md) - [`pitchfork status `](/cli/status.md) - [`pitchfork stop `](/cli/stop.md) diff --git a/docs/cli/run.md b/docs/cli/run.md index 493cc2d..6db4f01 100644 --- a/docs/cli/run.md +++ b/docs/cli/run.md @@ -1,13 +1,13 @@ # `pitchfork run` -- **Usage**: `pitchfork run [-f --force] [CMD]...` +- **Usage**: `pitchfork run [-f --force] [CMD]...` - **Aliases**: `r` Runs a one-off daemon ## Arguments -### `` +### `` Name of the daemon to run diff --git a/mise.toml b/mise.toml index b2a3017..71e41df 100644 --- a/mise.toml +++ b/mise.toml @@ -9,7 +9,7 @@ run = [ [tasks.docs] dir = "docs" -run = "bun i && bun run docs:dev" +run = "bun i && exec bun run docs:dev" [tasks.lint-fix] run = [ diff --git a/pitchfork.usage.kdl b/pitchfork.usage.kdl index adfda23..2b5bdeb 100644 --- a/pitchfork.usage.kdl +++ b/pitchfork.usage.kdl @@ -54,7 +54,7 @@ cmd "logs" help="Displays logs for daemon(s)" { cmd "run" help="Runs a one-off daemon" { alias "r" flag "-f --force" - arg "" help="Name of the daemon to run" + arg "" help="Name of the daemon to run" arg "[CMD]..." var=true } cmd "start" help="Starts a daemon from a pitchfork.toml file" { diff --git a/src/cli/run.rs b/src/cli/run.rs index 8936899..9520754 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -1,17 +1,16 @@ use crate::cli::supervisor; use crate::ipc::client::IpcClient; -use crate::ipc::IpcMessage; +use crate::ipc::{IpcRequest, IpcResponse}; use crate::Result; use miette::bail; -use std::time::Duration; /// Runs a one-off daemon #[derive(Debug, clap::Args)] #[clap(visible_alias = "r", verbatim_doc_comment)] pub struct Run { /// Name of the daemon to run - name: String, - #[clap(trailing_var_arg = true)] + id: String, + #[clap(allow_hyphen_values = true, trailing_var_arg = true)] cmd: Vec, #[clap(short, long)] force: bool, @@ -28,55 +27,49 @@ impl Run { let ipc = IpcClient::connect().await?; if self.force { - ipc.send(IpcMessage::Stop(self.name.clone())).await?; - loop { - match ipc.read().await { - Some(IpcMessage::DaemonStop { name }) => { - info!("stopped daemon {}", name); - tokio::time::sleep(Duration::from_secs(3)).await; - break; - } - Some(IpcMessage::DaemonAlreadyStopped(name)) => { - info!("daemon {} already stopped", name); - break; - } - None => { - break; - } - msg => { - debug!("ignoring message: {:?}", msg); - } + let rsp = ipc + .request(IpcRequest::Stop { + id: self.id.clone(), + }) + .await?; + match rsp { + IpcResponse::Ok => { + info!("stopped daemon {}", self.id); } + IpcResponse::DaemonAlreadyStopped => { + info!("daemon {} already stopped", self.id); + } + rsp => unreachable!("unexpected response: {rsp:?}"), } } - ipc.send(IpcMessage::Run(self.name.clone(), self.cmd.clone())) + let rsp = ipc + .request(IpcRequest::Run { + id: self.id.clone(), + cmd: self.cmd.clone(), + }) .await?; - loop { - match ipc.read().await { - Some(IpcMessage::DaemonAlreadyRunning(id)) => { - if self.force { - bail!("failed to stop daemon {}", id); - } else { - info!("daemon {} already running", id); - } - break; - } - Some(IpcMessage::DaemonStart(daemon)) => { - info!( - "started daemon {} with pid {}", - daemon.name, - daemon.pid.unwrap() - ); - break; - } - Some(IpcMessage::DaemonFailed { name, error }) => { - bail!("Failed to start daemon {}: {}", name, error); - } - msg => { - debug!("ignoring message: {:?}", msg); + match rsp { + IpcResponse::DaemonAlreadyRunning => { + if self.force { + bail!("failed to stop daemon {}", self.id); + } else { + info!("daemon {} already running", self.id); } } + IpcResponse::DaemonStart { daemon } => { + info!( + "started daemon {} with pid {}", + daemon.id, + daemon.pid.unwrap() + ); + } + IpcResponse::DaemonFailed { error } => { + bail!("Failed to start daemon {}: {}", self.id, error); + } + msg => { + debug!("ignoring message: {:?}", msg); + } } Ok(()) } diff --git a/src/cli/supervisor/run.rs b/src/cli/supervisor/run.rs index 0f6624d..e2c35bb 100644 --- a/src/cli/supervisor/run.rs +++ b/src/cli/supervisor/run.rs @@ -1,7 +1,7 @@ use crate::cli::supervisor::kill_or_stop; use crate::env; use crate::state_file::StateFile; -use crate::supervisor::Supervisor; +use crate::supervisor::SUPERVISOR; use crate::Result; /// Runs the internal pitchfork daemon in the foreground @@ -23,6 +23,6 @@ impl Run { } } - Supervisor::new(pid_file).await?.start().await + SUPERVISOR.start().await } } diff --git a/src/env.rs b/src/env.rs index f761bc9..473778d 100644 --- a/src/env.rs +++ b/src/env.rs @@ -23,7 +23,7 @@ pub static PITCHFORK_LOGS_DIR: Lazy = Lazy::new(|| var_path("PITCHFORK_LOGS_DIR").unwrap_or(PITCHFORK_STATE_DIR.join("logs"))); pub static PITCHFORK_LOG_FILE: Lazy = Lazy::new(|| PITCHFORK_LOGS_DIR.join("pitchfork").join("pitchfork.log")); -pub static PITCHFORK_EXEC: Lazy = Lazy::new(|| var_true("PITCHFORK_EXEC")); +// pub static PITCHFORK_EXEC: Lazy = Lazy::new(|| var_true("PITCHFORK_EXEC")); pub static IPC_SOCK_DIR: Lazy = Lazy::new(|| PITCHFORK_STATE_DIR.join("sock")); pub static IPC_SOCK_MAIN: Lazy = Lazy::new(|| IPC_SOCK_DIR.join("main.sock")); @@ -44,9 +44,9 @@ fn var_false(name: &str) -> bool { .unwrap_or(false) } -fn var_true(name: &str) -> bool { - var(name) - .map(|val| val.to_lowercase()) - .map(|val| val == "true" || val == "1") - .unwrap_or(false) -} +// fn var_true(name: &str) -> bool { +// var(name) +// .map(|val| val.to_lowercase()) +// .map(|val| val == "true" || val == "1") +// .unwrap_or(false) +// } diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 0d9da05..08452a8 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -1,16 +1,16 @@ -use crate::ipc::{deserialize, fs_name, serialize, IpcMessage}; +use crate::ipc::{deserialize, fs_name, serialize, IpcRequest, IpcResponse}; use crate::Result; use exponential_backoff::Backoff; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; use interprocess::local_socket::traits::tokio::Stream; -use miette::{bail, IntoDiagnostic}; +use miette::{bail, ensure, IntoDiagnostic}; use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::Mutex; use uuid::Uuid; pub struct IpcClient { - id: String, + _id: String, recv: Mutex>, send: Mutex, } @@ -24,9 +24,8 @@ impl IpcClient { let id = Uuid::new_v4().to_string(); let client = Self::connect_(&id, "main").await?; trace!("Connected to IPC socket"); - client.send(IpcMessage::Connect(client.id.clone())).await?; - let msg = client.read().await.unwrap(); - assert!(msg.is_connect_ok()); + let rsp = client.request(IpcRequest::Connect).await?; + ensure!(rsp.is_ok(), "Failed to connect to IPC main"); debug!("Connected to IPC main"); Ok(client) } @@ -39,7 +38,7 @@ impl IpcClient { let recv = BufReader::new(recv); return Ok(Self { - id: id.to_string(), + _id: id.to_string(), recv: Mutex::new(recv), send: Mutex::new(send), }); @@ -61,7 +60,7 @@ impl IpcClient { unreachable!() } - pub async fn send(&self, msg: IpcMessage) -> Result<()> { + pub async fn send(&self, msg: IpcRequest) -> Result<()> { let mut msg = serialize(&msg)?; if msg.contains(&0) { panic!("IPC message contains null"); @@ -72,7 +71,7 @@ impl IpcClient { Ok(()) } - pub async fn read(&self) -> Option { + pub async fn read(&self) -> Option { let mut recv = self.recv.lock().await; let mut bytes = Vec::new(); if let Err(err) = recv.read_until(0, &mut bytes).await.into_diagnostic() { @@ -90,4 +89,13 @@ impl IpcClient { } } } + + pub async fn request(&self, msg: IpcRequest) -> Result { + self.send(msg).await?; + loop { + if let Some(msg) = self.read().await { + return Ok(msg); + } + } + } } diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index eef97e6..9d9519d 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -21,13 +21,30 @@ pub enum IpcMessage { Response(String), } -pub fn fs_name(name: &str) -> Result { +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)] +pub enum IpcRequest { + Connect, + Stop { id: String }, + Run { id: String, cmd: Vec }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)] +pub enum IpcResponse { + Ok, + Error(String), + DaemonAlreadyStopped, + DaemonAlreadyRunning, + DaemonStart { daemon: StateFileDaemon }, + DaemonFailed { error: String }, +} + +fn fs_name(name: &str) -> Result { let path = env::IPC_SOCK_DIR.join(name).with_extension("sock"); let fs_name = path.to_fs_name::().into_diagnostic()?; Ok(fs_name) } -pub fn serialize(msg: &IpcMessage) -> Result> { +fn serialize(msg: &T) -> Result> { let msg = if *env::IPC_JSON { serde_json::to_vec(msg).into_diagnostic()? } else { @@ -36,10 +53,10 @@ pub fn serialize(msg: &IpcMessage) -> Result> { Ok(msg) } -pub fn deserialize(bytes: &[u8]) -> Result { +fn deserialize(bytes: &[u8]) -> Result { let mut bytes = bytes.to_vec(); bytes.pop(); - trace!("msg: {:?}", std::str::from_utf8(&bytes)); + trace!("msg: {:?}", std::str::from_utf8(&bytes).unwrap_or_default()); let msg = if *env::IPC_JSON { serde_json::from_slice(&bytes).into_diagnostic()? } else { diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 8738c50..7e47893 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,23 +1,22 @@ -use crate::ipc::{deserialize, fs_name, serialize, IpcMessage}; +use crate::ipc::{deserialize, fs_name, serialize, IpcRequest, IpcResponse}; use crate::{env, Result}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; use interprocess::local_socket::traits::tokio::Listener; use interprocess::local_socket::traits::tokio::Stream; use interprocess::local_socket::ListenerOptions; use miette::{miette, IntoDiagnostic}; -use tokio::fs; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::mpsc::{Receiver, Sender}; pub struct IpcServer { // clients: Mutex>, - rx: Receiver<(IpcMessage, Sender)>, + rx: Receiver<(IpcRequest, Sender)>, } impl IpcServer { - pub async fn new() -> Result { + pub fn new() -> Result { xx::file::mkdirp(&*env::IPC_SOCK_DIR)?; - let _ = fs::remove_file(&*env::IPC_SOCK_MAIN).await; + let _ = xx::file::remove_file(&*env::IPC_SOCK_MAIN); let opts = ListenerOptions::new().name(fs_name("main")?); debug!("Listening on {}", env::IPC_SOCK_MAIN.display()); let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -30,14 +29,11 @@ impl IpcServer { } } }); - let server = Self { - // clients: Default::default(), - rx, - }; + let server = Self { rx }; Ok(server) } - async fn send(send: &mut SendHalf, msg: IpcMessage) -> Result<()> { + async fn send(send: &mut SendHalf, msg: IpcResponse) -> Result<()> { let mut msg = serialize(&msg)?; if msg.contains(&0) { panic!("IPC message contains null"); @@ -49,7 +45,7 @@ impl IpcServer { Ok(()) } - async fn read_message(recv: &mut BufReader) -> Result> { + async fn read_message(recv: &mut BufReader) -> Result> { let mut bytes = Vec::new(); recv.read_until(0, &mut bytes).await.into_diagnostic()?; if bytes.is_empty() { @@ -58,7 +54,7 @@ impl IpcServer { Ok(Some(deserialize(&bytes)?)) } - fn read_messages_chan(recv: RecvHalf) -> Receiver { + fn read_messages_chan(recv: RecvHalf) -> Receiver { let mut recv = BufReader::new(recv); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { @@ -85,7 +81,7 @@ impl IpcServer { rx } - fn send_messages_chan(mut send: SendHalf) -> Sender { + fn send_messages_chan(mut send: SendHalf) -> Sender { let (tx, mut rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { loop { @@ -107,7 +103,7 @@ impl IpcServer { tx } - pub async fn read(&mut self) -> Result<(IpcMessage, Sender)> { + pub async fn read(&mut self) -> Result<(IpcRequest, Sender)> { self.rx .recv() .await @@ -116,7 +112,7 @@ impl IpcServer { async fn listen( listener: &interprocess::local_socket::tokio::Listener, - tx: Sender<(IpcMessage, Sender)>, + tx: Sender<(IpcRequest, Sender)>, ) -> Result<()> { let stream = listener.accept().await.into_diagnostic()?; trace!("Client accepted"); @@ -124,19 +120,9 @@ impl IpcServer { let mut incoming_chan = Self::read_messages_chan(recv); let outgoing_chan = Self::send_messages_chan(send); tokio::spawn(async move { - while let Some(msg) = incoming_chan.recv().await { - match msg { - IpcMessage::Connect(id) => { - debug!("Client connected: {}", id); - if let Err(err) = outgoing_chan.send(IpcMessage::ConnectOK).await { - debug!("Failed to send message: {:?}", err); - } - } - _ => { - if let Err(err) = tx.send((msg, outgoing_chan.clone())).await { - debug!("Failed to send message: {:?}", err); - } - } + while let Some(req) = incoming_chan.recv().await { + if let Err(err) = tx.send((req, outgoing_chan.clone())).await { + debug!("Failed to send message: {:?}", err); } } }); diff --git a/src/logger.rs b/src/logger.rs index 0b087ca..9266278 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -38,7 +38,9 @@ impl log::Log for Logger { } if record.level() <= self.term_level { let out = self.render(record, self.term_level); - eprintln!("{}", out); + if !out.is_empty() { + eprintln!("{}", out); + } } } @@ -73,10 +75,15 @@ impl Logger { match level { LevelFilter::Off => "".to_string(), LevelFilter::Trace => { + let file = record.file().unwrap_or(""); + let ignore_crates = ["/notify-debouncer-full-", "/notify-"]; + if record.level() == Level::Trace && ignore_crates.iter().any(|c| file.contains(c)) + { + return "".to_string(); + } let meta = ui::style::edim(format!( "{thread_id:>2} [{file}:{line}]", thread_id = thread_id(), - file = record.file().unwrap_or(""), line = record.line().unwrap_or(0), )); format!( diff --git a/src/procs.rs b/src/procs.rs index 756c9eb..bf076b2 100644 --- a/src/procs.rs +++ b/src/procs.rs @@ -2,7 +2,7 @@ use crate::Result; use miette::IntoDiagnostic; use once_cell::sync::Lazy; use std::sync::Mutex; -use sysinfo::ProcessesToUpdate; +use sysinfo::{ProcessesToUpdate, Signal}; pub struct Procs { system: Mutex, @@ -19,6 +19,14 @@ impl Procs { procs } + pub fn title(&self, pid: u32) -> Option { + self.system + .lock() + .unwrap() + .process(sysinfo::Pid::from_u32(pid)) + .map(|p| p.name().to_string_lossy().to_string()) + } + pub fn is_running(&self, pid: u32) -> bool { self.system .lock() @@ -41,7 +49,10 @@ impl Procs { .unwrap() .process(sysinfo::Pid::from_u32(pid)) { + #[cfg(windows)] process.kill(); + #[cfg(unix)] + process.kill_with(Signal::Term); process.wait(); true } else { diff --git a/src/state_file.rs b/src/state_file.rs index e37efe9..4e14616 100644 --- a/src/state_file.rs +++ b/src/state_file.rs @@ -2,7 +2,7 @@ use crate::{env, Result}; use miette::IntoDiagnostic; use once_cell::sync::Lazy; use std::collections::{BTreeMap, BTreeSet}; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::path::{Path, PathBuf}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -15,18 +15,26 @@ pub struct StateFile { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct StateFileDaemon { - pub name: String, + pub id: String, + pub title: Option, pub pid: Option, pub status: DaemonStatus, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display)] +impl Display for StateFileDaemon { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)] #[strum(serialize_all = "snake_case")] #[serde(rename_all = "snake_case")] pub enum DaemonStatus { Failed(String), Waiting, Running, + Errored(Option), Stopped, } @@ -38,6 +46,7 @@ impl DaemonStatus { DaemonStatus::Waiting => console::style(s).yellow().to_string(), DaemonStatus::Running => console::style(s).green().to_string(), DaemonStatus::Stopped => console::style(s).dim().to_string(), + DaemonStatus::Errored(_) => console::style(s).red().to_string(), } } } @@ -75,7 +84,7 @@ impl StateFile { }); state_file.path = path.to_path_buf(); for (name, daemon) in state_file.daemons.iter_mut() { - daemon.name = name.clone(); + daemon.id = name.clone(); } Ok(state_file) } diff --git a/src/supervisor.rs b/src/supervisor.rs index 04d5b7d..44d2294 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1,14 +1,13 @@ use crate::ipc::server::IpcServer; -use crate::ipc::IpcMessage; +use crate::ipc::{IpcRequest, IpcResponse}; use crate::procs::PROCS; use crate::state_file::{DaemonStatus, StateFile, StateFileDaemon}; use crate::watch_files::WatchFiles; use crate::{env, Result}; -use duct::cmd; use itertools::Itertools; use miette::IntoDiagnostic; use notify::RecursiveMode; -use std::collections::HashMap; +use once_cell::sync::Lazy; use std::fs; use std::iter::once; use std::path::PathBuf; @@ -19,332 +18,154 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; #[cfg(unix)] use tokio::signal::unix::SignalKind; -use tokio::sync::mpsc::Sender; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::Mutex; use tokio::{select, signal, task, time}; pub struct Supervisor { state_file: Mutex, last_refreshed_at: Mutex, - active_pids: Mutex>, - event_tx: broadcast::Sender, - event_rx: broadcast::Receiver, - pitchfork_bin_file_size: u64, - // ipc: IpcServer, } const INTERVAL: Duration = Duration::from_secs(10); -#[derive(Debug, Clone)] -enum Event { - FileChange(Vec), - Ipc(IpcMessage, Sender), - Signal, - Interval, - DaemonStart(StateFileDaemon), - DaemonStop(StateFileDaemon), - DaemonFailed { name: String, error: String }, -} +pub static SUPERVISOR: Lazy = + Lazy::new(|| Supervisor::new().expect("Error creating supervisor")); impl Supervisor { - pub async fn new(state_file: StateFile) -> Result { - let (event_tx, event_rx) = broadcast::channel(1); + pub fn new() -> Result { Ok(Self { - state_file: Mutex::new(state_file), + state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())), last_refreshed_at: Mutex::new(time::Instant::now()), - active_pids: Default::default(), - event_tx, - event_rx, - pitchfork_bin_file_size: fs::metadata(&*env::BIN_PATH).into_diagnostic()?.len(), - // ipc: IpcServer::new().await?, }) } - pub async fn start(mut self) -> Result<()> { + pub async fn start(&self) -> Result<()> { let pid = std::process::id(); info!("Starting supervisor with pid {pid}"); - let daemon = StateFileDaemon { - name: "pitchfork".into(), - pid: Some(pid), - status: DaemonStatus::Running, - }; - self.state_file - .lock() - .await - .daemons - .insert("pitchfork".into(), daemon); - self.state_file.lock().await.write()?; + self.upsert_daemon("pitchfork", Some(pid), DaemonStatus::Running) + .await?; self.interval_watch()?; self.signals()?; self.file_watch().await?; - self.conn_watch().await?; - loop { - let e = self.event_rx.recv().await.unwrap(); - if let Err(err) = self.handle(e).await { - error!("supervisor error: {:?}", err); - } - } - } - - async fn handle(&mut self, event: Event) -> Result<()> { - match event { - Event::Interval => { - if self.last_refreshed_at.lock().await.elapsed() < INTERVAL { - return Ok(()); - } - self.refresh().await - } - Event::FileChange(paths) => { - debug!("file change: {:?}", paths); - if paths.contains(&*env::BIN_PATH) && env::BIN_PATH.exists() { - let new_size = fs::metadata(&*env::BIN_PATH).into_diagnostic()?.len(); - if new_size != self.pitchfork_bin_file_size { - info!("pitchfork cli updated, restarting"); - self.restart().await; - } - } - let path = self.state_file.lock().await.path.clone(); - if paths.contains(&path) { - *self.state_file.lock().await = StateFile::read(&path)?; - } - self.refresh().await - } - Event::Ipc(msg, send) => { - info!("received ipc message: {msg}"); - self.handle_ipc(msg, send).await - } - Event::Signal => { - info!("received signal, stopping"); - self.close().await; - exit(0) - } - Event::DaemonStop(daemon) => { - self.active_pids.lock().await.remove(&daemon.pid.unwrap()); - self.state_file - .lock() - .await - .daemons - .entry(daemon.name.clone()) - .and_modify(|d| { - d.pid = None; - d.status = DaemonStatus::Stopped; - }); - self.state_file.lock().await.write()?; - Ok(()) - } - _ => Ok(()), - } + let ipc = IpcServer::new()?; + self.conn_watch(ipc).await } async fn refresh(&self) -> Result<()> { - debug!("refreshing"); + trace!("refreshing"); let mut last_refreshed_at = self.last_refreshed_at.lock().await; *last_refreshed_at = time::Instant::now(); Ok(()) } - async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender) -> Result<()> { - match msg { - IpcMessage::Run(name, cmd) => { - self.run(send, &name, cmd).await?; - } - IpcMessage::Stop(name) => { - self.stop(send, &name).await?; - } - _ => { - debug!("received unknown message: {msg}"); - } - } - Ok(()) - } - - async fn run(&mut self, send: Sender, name: &str, cmd: Vec) -> Result<()> { - let tx = self.event_tx.clone(); - let mut event_rx = self.event_tx.subscribe(); - info!("received run message: {name:?} cmd: {cmd:?}"); - let daemon = self.state_file.lock().await.daemons.get(name).cloned(); + async fn run(&self, id: &str, cmd: Vec) -> Result { + info!("received run message: {id:?} cmd: {cmd:?}"); + let daemon = self.get_daemon(id).await; if let Some(daemon) = daemon { if let Some(pid) = daemon.pid { - warn!("daemon {name} already running with pid {}", pid); - if let Err(err) = send - .send(IpcMessage::DaemonAlreadyRunning(name.to_string())) - .await - { - warn!("failed to send message: {err:?}"); - } - return Ok(()); + warn!("daemon {id} already running with pid {}", pid); + return Ok(IpcResponse::DaemonAlreadyRunning); } } - task::spawn({ - let name = name.to_string(); - async move { - while let Ok(ev) = event_rx.recv().await { - match ev { - Event::DaemonStart(daemon) => { - if daemon.name == name { - if let Err(err) = send.send(IpcMessage::DaemonStart(daemon)).await { - error!("failed to send message: {err:?}"); - } - return; - } - } - Event::DaemonFailed { name: n, error } => { - if n == name { - if let Err(err) = - send.send(IpcMessage::DaemonFailed { name, error }).await - { - error!("failed to send message: {err:?}"); - } - return; - } - } - _ => {} - } - } - } - }); let cmd = once("exec".to_string()) .chain(cmd.into_iter()) .collect_vec(); let args = vec!["-c".to_string(), cmd.join(" ")]; - let log_path = env::PITCHFORK_LOGS_DIR - .join(name) - .join(format!("{name}.log")); + let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log")); xx::file::mkdirp(log_path.parent().unwrap())?; - debug!("starting daemon: {name} with args: {args:?}"); - match tokio::process::Command::new("sh") + debug!("starting daemon: {id} with args: {args:?}"); + let mut child = tokio::process::Command::new("sh") .args(&args) .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() - { - Ok(mut child) => { - let pid = child.id().unwrap(); - self.active_pids.lock().await.insert(pid, name.to_string()); - info!("started daemon {name} with pid {pid}"); - let daemon = StateFileDaemon { - name: name.to_string(), - pid: Some(pid), - status: DaemonStatus::Running, - }; - self.state_file - .lock() - .await - .daemons - .insert(name.to_string(), daemon.clone()); - self.state_file.lock().await.write()?; - tx.send(Event::DaemonStart(daemon.clone())).unwrap(); - let name = name.to_string(); - tokio::spawn(async move { - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let mut stdout = tokio::io::BufReader::new(stdout).lines(); - let mut stderr = tokio::io::BufReader::new(stderr).lines(); - let mut log_appender = tokio::fs::File::options() - .append(true) - .create(true) - .open(&log_path) - .await - .unwrap(); - dbg!(&log_path); - loop { - select! { - Ok(Some(line)) = stdout.next_line() => { - debug!("stdout: {name} {line}"); - log_appender.write_all(line.as_bytes()).await.unwrap(); - log_appender.write_all(b"\n").await.unwrap(); - } - Ok(Some(line)) = stderr.next_line() => { - debug!("stderr: {name} {line}"); - log_appender.write_all(line.as_bytes()).await.unwrap(); - log_appender.write_all(b"\n").await.unwrap(); - }, - else => break, - } + .into_diagnostic()?; + let pid = child.id().unwrap(); + info!("started daemon {id} with pid {pid}"); + let daemon = self + .upsert_daemon(id, Some(pid), DaemonStatus::Running) + .await?; + let name = id.to_string(); + tokio::spawn(async move { + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + let mut stdout = tokio::io::BufReader::new(stdout).lines(); + let mut stderr = tokio::io::BufReader::new(stderr).lines(); + let mut log_appender = tokio::fs::File::options() + .append(true) + .create(true) + .open(&log_path) + .await + .unwrap(); + loop { + select! { + Ok(Some(line)) = stdout.next_line() => { + debug!("stdout: {name} {line}"); + log_appender.write_all(line.as_bytes()).await.unwrap(); + log_appender.write_all(b"\n").await.unwrap(); } - let status = child.wait().await.unwrap(); - info!("daemon {name} exited with status {status}"); - tx.send(Event::DaemonStop(daemon)).unwrap(); - }); + Ok(Some(line)) = stderr.next_line() => { + debug!("stderr: {name} {line}"); + log_appender.write_all(line.as_bytes()).await.unwrap(); + log_appender.write_all(b"\n").await.unwrap(); + }, + else => break, + } + } + let exit_status = child.wait().await; + if SUPERVISOR + .get_daemon(&name) + .await + .is_some_and(|d| d.status.is_stopped()) + { + // was stopped by this supervisor so don't update status + return; } - Err(err) => { - info!("failed to start daemon: {err:?}"); - self.event_tx - .send(Event::DaemonFailed { - name: name.to_string(), - error: format!("{err:?}"), - }) + if let Ok(status) = exit_status { + info!("daemon {name} exited with status {status}"); + if status.success() { + SUPERVISOR + .upsert_daemon(&name, None, DaemonStatus::Stopped) + .await + .unwrap(); + } else { + SUPERVISOR + .upsert_daemon(&name, None, DaemonStatus::Errored(status.code())) + .await + .unwrap(); + } + } else { + SUPERVISOR + .upsert_daemon(&name, None, DaemonStatus::Errored(None)) + .await .unwrap(); } - } + }); - Ok(()) + Ok(IpcResponse::DaemonStart { daemon }) } - async fn stop(&mut self, send: Sender, name: &str) -> Result<()> { - info!("received stop message: {name}"); - let daemon = self.state_file.lock().await.daemons.get(name).cloned(); - if let Some(daemon) = daemon { + async fn stop(&self, id: &str) -> Result { + info!("stopping daemon: {id}"); + if let Some(daemon) = self.get_daemon(id).await { if let Some(pid) = daemon.pid { PROCS.refresh_processes(); if PROCS.is_running(pid) { + self.upsert_daemon(id, None, DaemonStatus::Stopped).await?; PROCS.kill_async(pid).await?; } - self.active_pids.lock().await.remove(&pid); - self.state_file - .lock() - .await - .daemons - .entry(name.to_string()) - .and_modify(|d| { - d.pid = None; - d.status = DaemonStatus::Stopped; - }); - self.state_file.lock().await.write()?; - if let Err(err) = send - .send(IpcMessage::DaemonStop { - name: name.to_string(), - }) - .await - { - warn!("failed to send message: {err:?}"); - } - return Ok(()); + return Ok(IpcResponse::Ok); } } - if let Err(err) = send - .send(IpcMessage::DaemonAlreadyStopped(name.to_string())) - .await - { - warn!("failed to send message: {err:?}"); - } - Ok(()) - } - - async fn restart(&mut self) -> ! { - debug!("restarting"); - self.close().await; - if *env::PITCHFORK_EXEC || cfg!(windows) { - if let Err(err) = cmd!(&*env::BIN_PATH, "supervisor", "run", "--force").start() { - panic!("failed to restart: {err:?}"); - } - } else { - let x = exec::execvp( - &*env::BIN_PATH, - &["pitchfork", "supervisor", "run", "--force"], - ); - panic!("execvp returned unexpectedly: {x:?}"); - } - exit(0); + Ok(IpcResponse::DaemonAlreadyStopped) } #[cfg(unix)] fn signals(&self) -> Result<()> { - let tx = self.event_tx.clone(); let signals = [ SignalKind::terminate(), SignalKind::alarm(), @@ -356,7 +177,6 @@ impl Supervisor { ]; static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false); for signal in signals { - let tx = tx.clone(); tokio::spawn(async move { let mut stream = signal::unix::signal(signal).unwrap(); loop { @@ -364,7 +184,7 @@ impl Supervisor { if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) { exit(1); } else { - tx.send(Event::Signal).unwrap(); + SUPERVISOR.handle_signal().await; } } }); @@ -378,69 +198,158 @@ impl Supervisor { let mut stream = signal::ctrl_c().unwrap(); loop { stream.recv().await; - tx.send(Event::Signal).await.unwrap(); + if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) { + exit(1); + } else { + SUPERVISOR.handle_signal().await; + } } }); Ok(()) } + async fn handle_signal(&self) { + info!("received signal, stopping"); + self.close().await; + exit(0) + } + async fn file_watch(&self) -> Result<()> { - let bin_path = env::BIN_PATH.clone(); let state_file = self.state_file.lock().await.path.clone(); - let tx = self.event_tx.clone(); task::spawn(async move { let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap(); - wf.watch(&bin_path, RecursiveMode::NonRecursive).unwrap(); wf.watch(&state_file, RecursiveMode::NonRecursive).unwrap(); while let Some(paths) = wf.rx.recv().await { - tx.send(Event::FileChange(paths)).unwrap(); + if let Err(err) = SUPERVISOR.handle_file_change(paths).await { + error!("failed to handle file change: {err}"); + } } }); Ok(()) } + async fn handle_file_change(&self, paths: Vec) -> Result<()> { + debug!("file change: {:?}", paths); + // let path = self.state_file.lock().await.path.clone(); + // if paths.contains(&path) { + // *self.state_file.lock().await = StateFile::read(&path)?; + // } + self.refresh().await + } + fn interval_watch(&self) -> Result<()> { - let event_tx = self.event_tx.clone(); tokio::spawn(async move { let mut interval = time::interval(INTERVAL); loop { interval.tick().await; - event_tx.send(Event::Interval).unwrap(); + if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > INTERVAL { + if let Err(err) = SUPERVISOR.refresh().await { + error!("failed to refresh: {err}"); + } + } } }); Ok(()) } - async fn conn_watch(&self) -> Result<()> { - let tx = self.event_tx.clone(); - // TODO: reuse self.ipc - let mut ipc = IpcServer::new().await?; - tokio::spawn(async move { - loop { - let (msg, send) = match ipc.read().await { - Ok(msg) => msg, - Err(e) => { - error!("failed to accept connection: {:?}", e); - continue; - } - }; - debug!("received message: {:?}", msg); - tx.send(Event::Ipc(msg, send)).unwrap(); + async fn conn_watch(&self, mut ipc: IpcServer) -> ! { + loop { + let (msg, send) = match ipc.read().await { + Ok(msg) => msg, + Err(e) => { + error!("failed to accept connection: {:?}", e); + continue; + } + }; + debug!("received message: {:?}", msg); + tokio::spawn(async move { + let rsp = SUPERVISOR + .handle_ipc(msg) + .await + .unwrap_or_else(|err| IpcResponse::Error(err.to_string())); + if let Err(err) = send.send(rsp).await { + debug!("failed to send message: {:?}", err); + } + }); + } + } + + async fn handle_ipc(&self, req: IpcRequest) -> Result { + let rsp = match req { + IpcRequest::Connect => { + debug!("received connect message"); + IpcResponse::Ok } - }); + IpcRequest::Stop { id } => self.stop(&id).await?, + IpcRequest::Run { id, cmd } => { + self.run(&id, cmd).await?; + IpcResponse::Ok + } + }; + Ok(rsp) + } + + async fn close(&self) { + for daemon in self.active_daemons().await { + if daemon.id == "pitchfork" { + continue; + } + if let Err(err) = self.stop(&daemon.id).await { + error!("failed to stop daemon {daemon}: {err}"); + } + } + let _ = self.remove_daemon("pitchfork").await; + let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR); + // TODO: cleanly stop ipc server + } + + async fn active_daemons(&self) -> Vec { + self.state_file + .lock() + .await + .daemons + .values() + .filter(|d| d.pid.is_some()) + .cloned() + .collect() + } + + async fn remove_daemon(&self, id: &str) -> Result<()> { + self.state_file.lock().await.daemons.remove(id); + if let Err(err) = self.state_file.lock().await.write() { + warn!("failed to update state file: {err:#}"); + } Ok(()) } - async fn close(&mut self) { - self.state_file.lock().await.daemons.remove("pitchfork"); + async fn upsert_daemon( + &self, + id: &str, + pid: Option, + status: DaemonStatus, + ) -> Result { + info!("upserting daemon: {id} pid: {pid:?} status: {status}"); + let daemon = StateFileDaemon { + id: id.to_string(), + title: pid.and_then(|pid| PROCS.title(pid)), + pid, + status, + }; + self.state_file + .lock() + .await + .daemons + .insert(id.to_string(), daemon.clone()); if let Err(err) = self.state_file.lock().await.write() { - warn!("failed to update state file: {:?}", err); + warn!("failed to update state file: {err:#}"); } - let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR); - // TODO: move this to self.ipc - // self.ipc.close(); + Ok(daemon) + } + + async fn get_daemon(&self, id: &str) -> Option { + self.state_file.lock().await.daemons.get(id).cloned() } }