Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 10, 2024
1 parent fa8a1f5 commit 474524f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
1 change: 0 additions & 1 deletion src/state_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct StateFile {

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StateFileDaemon {
#[serde(skip)]
pub name: String,
pub pid: u32,
pub status: DaemonStatus,
Expand Down
57 changes: 46 additions & 11 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::state_file::{DaemonStatus, StateFile, StateFileDaemon};
use crate::watch_files::WatchFiles;
use crate::{env, Result};
use duct::cmd;
use miette::IntoDiagnostic;
use notify::RecursiveMode;
use std::collections::HashMap;
use std::fs;
Expand All @@ -12,11 +13,12 @@ use std::process::exit;
use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use tokio::{signal, task, time};
use tokio::{select, signal, task, time};

pub struct Supervisor {
state_file: StateFile,
Expand Down Expand Up @@ -118,7 +120,7 @@ impl Supervisor {
async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender<IpcMessage>) -> Result<()> {
let mut event_rx = self.event_tx.subscribe();
match msg {
IpcMessage::Run(name, cmd) => {
IpcMessage::Run(name, mut cmd) => {
info!("received run message: {name:?} cmd: {cmd:?}");
task::spawn({
let name = name.clone();
Expand Down Expand Up @@ -151,27 +153,60 @@ impl Supervisor {
}
}
});
let program = cmd[0].clone();
let args = cmd[1..].to_vec();
// cmd.push("2>&1".to_string());
let args = vec!["-c".to_string(), cmd.join(" ")];
let log_path = env::PITCHFORK_LOGS_DIR
.join(&name)
.join(format!("{name}.log"));
match duct::cmd(&program, &args)
.stderr_to_stdout()
.stdout_path(log_path)
.reader()
xx::file::mkdirp(&log_path.parent().unwrap())?;
debug!("starting daemon: {name} with args: {args:?}");
match tokio::process::Command::new("sh")
.args(&args)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => {
let pid = *child.pids().first().unwrap();
Ok(mut child) => {
let pid = child.id().unwrap();
info!("started daemon {name} with pid {pid}");
let daemon = StateFileDaemon {
name: name.clone(),
pid,
status: DaemonStatus::Running,
};
self.state_file.daemons.insert(name, daemon.clone());
self.state_file.daemons.insert(name.clone(), daemon.clone());
self.state_file.write()?;
self.event_tx.send(Event::DaemonStart(daemon)).unwrap();
tokio::spawn(async move {
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let mut stdout = tokio::io::BufReader::new(stdout).lines();
let mut stderr = tokio::io::BufReader::new(stderr).lines();
let mut log_appender = tokio::fs::File::options()
.append(true)
.create(true)
.open(&log_path)
.await
.unwrap();
tokio::spawn(async move {
let status = child.wait().await.unwrap();
info!("daemon {name} exited with status {status}");
});
loop {
select! {
Ok(Some(line)) = stdout.next_line() => {
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
}
Ok(Some(line)) = stderr.next_line() => {
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
},
else => break,
}
}
});
}
Err(err) => {
info!("failed to start daemon: {err:?}");
Expand Down

0 comments on commit 474524f

Please sign in to comment.