Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 10, 2024
1 parent 8d0c04b commit b6ef6b9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
26 changes: 25 additions & 1 deletion src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,37 @@ impl Run {
}

let ipc = IpcClient::connect().await?;

if self.force {
ipc.send(IpcMessage::Stop(self.name.clone())).await?;
loop {
match ipc.read().await? {
IpcMessage::DaemonStop { name } => {
info!("stopped daemon {}", name);
break;
}
msg => {
debug!("ignoring message: {:?}", msg);
}
}
}
}

ipc.send(IpcMessage::Run(self.name.clone(), self.cmd.clone()))
.await?;
loop {
match ipc.read().await? {
IpcMessage::DaemonAlreadyRunning(id) => {
if self.force {
bail!("failed to stop daemon {}", id);
} else {
info!("daemon {} already running", id);
}
break;
}
IpcMessage::DaemonStart(daemon) => {
info!(
"Started daemon {} with pid {}",
"started daemon {} with pid {}",
daemon.name,
daemon.pid.unwrap()
);
Expand Down
3 changes: 3 additions & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ pub enum IpcMessage {
Connect(String),
ConnectOK,
Run(String, Vec<String>),
Stop(String),
DaemonAlreadyRunning(String),
DaemonStart(StateFileDaemon),
DaemonStop { name: String },
DaemonFailed { name: String, error: String },
Response(String),
}
Expand Down
58 changes: 56 additions & 2 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::ipc::server::IpcServer;
use crate::ipc::IpcMessage;
use crate::procs::Procs;
use crate::state_file::{DaemonStatus, StateFile, StateFileDaemon};
use crate::watch_files::WatchFiles;
use crate::{env, Result};
use duct::cmd;
use itertools::Itertools;
use miette::IntoDiagnostic;
use notify::RecursiveMode;
use std::collections::HashMap;
use std::fs;
use std::iter::once;
use std::path::PathBuf;
use std::process::exit;
use std::sync::atomic;
Expand All @@ -25,6 +29,7 @@ pub struct Supervisor {
active_pids: HashMap<u32, String>,
event_tx: broadcast::Sender<Event>,
event_rx: broadcast::Receiver<Event>,
procs: Procs,
// ipc: IpcServer,
}

Expand All @@ -50,6 +55,7 @@ impl Supervisor {
active_pids: Default::default(),
event_tx,
event_rx,
procs: Procs::new(),
// ipc: IpcServer::new().await?,
})
}
Expand Down Expand Up @@ -132,7 +138,10 @@ impl Supervisor {
async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender<IpcMessage>) -> Result<()> {
match msg {
IpcMessage::Run(name, cmd) => {
self.run(send, &name, cmd)?;
self.run(send, &name, cmd).await?;
}
IpcMessage::Stop(name) => {
self.stop(send, &name).await?;
}
_ => {
debug!("received unknown message: {msg}");
Expand All @@ -141,10 +150,22 @@ impl Supervisor {
Ok(())
}

fn run(&mut self, send: Sender<IpcMessage>, name: &str, cmd: Vec<String>) -> Result<()> {
async fn run(&mut self, send: Sender<IpcMessage>, name: &str, cmd: Vec<String>) -> Result<()> {
let tx = self.event_tx.clone();
let mut event_rx = self.event_tx.subscribe();
info!("received run message: {name:?} cmd: {cmd:?}");
if let Some(daemon) = self.state_file.daemons.get(name) {
if let Some(pid) = daemon.pid {
warn!("daemon {name} already running with pid {}", pid);
if let Err(err) = send
.send(IpcMessage::DaemonAlreadyRunning(name.to_string()))
.await
{
warn!("failed to send message: {err:?}");
}
return Ok(());
}
}
task::spawn({
let name = name.to_string();
async move {
Expand Down Expand Up @@ -173,6 +194,9 @@ impl Supervisor {
}
}
});
let cmd = once("exec".to_string())
.chain(cmd.into_iter())
.collect_vec();
let args = vec!["-c".to_string(), cmd.join(" ")];
let log_path = env::PITCHFORK_LOGS_DIR
.join(name)
Expand Down Expand Up @@ -212,6 +236,7 @@ impl Supervisor {
.open(&log_path)
.await
.unwrap();
dbg!(&log_path);
loop {
select! {
Ok(Some(line)) = stdout.next_line() => {
Expand Down Expand Up @@ -246,6 +271,35 @@ impl Supervisor {
Ok(())
}

async fn stop(&mut self, send: Sender<IpcMessage>, name: &str) -> Result<()> {
info!("received stop message: {name}");
if let Some(daemon) = self.state_file.daemons.get(name) {
if let Some(pid) = daemon.pid {
cmd!("kill", "-TERM", pid.to_string())
.run()
.into_diagnostic()?;
self.active_pids.remove(&pid);
self.state_file
.daemons
.entry(name.to_string())
.and_modify(|d| {
d.pid = None;
d.status = DaemonStatus::Stopped;
});
self.state_file.write()?;
if let Err(err) = send
.send(IpcMessage::DaemonStop {
name: name.to_string(),
})
.await
{
warn!("failed to send message: {err:?}");
}
}
}
Ok(())
}

fn restart(&mut self) -> ! {
debug!("restarting");
self.close();
Expand Down

0 comments on commit b6ef6b9

Please sign in to comment.