Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 10, 2024
1 parent b127bd3 commit bcb57ee
Showing 1 changed file with 101 additions and 93 deletions.
194 changes: 101 additions & 93 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,111 +124,119 @@ impl Supervisor {
}

async fn handle_ipc(&mut self, msg: IpcMessage, send: Sender<IpcMessage>) -> Result<()> {
let tx = self.event_tx.clone();
let mut event_rx = self.event_tx.subscribe();
match msg {
IpcMessage::Run(name, cmd) => {
info!("received run message: {name:?} cmd: {cmd:?}");
task::spawn({
let name = name.clone();
async move {
while let Ok(ev) = event_rx.recv().await {
match ev {
Event::DaemonStart(daemon) => {
if daemon.name == name {
if let Err(err) =
send.send(IpcMessage::DaemonStart(daemon)).await
{
error!("failed to send message: {err:?}");
}
return;
}
}
Event::DaemonFailed { name: n, error } => {
if n == name {
if let Err(err) = send
.send(IpcMessage::DaemonFailed { name, error })
.await
{
error!("failed to send message: {err:?}");
}
return;
}
self.run(send, &name, cmd)?;
}
_ => {
debug!("received unknown message: {msg}");
}
}
Ok(())
}

fn run(&mut self, send: Sender<IpcMessage>, name: &str, cmd: Vec<String>) -> Result<()> {
let tx = self.event_tx.clone();
let mut event_rx = self.event_tx.subscribe();
info!("received run message: {name:?} cmd: {cmd:?}");
task::spawn({
let name = name.to_string();
async move {
while let Ok(ev) = event_rx.recv().await {
match ev {
Event::DaemonStart(daemon) => {
if daemon.name == name {
if let Err(err) = send.send(IpcMessage::DaemonStart(daemon)).await {
error!("failed to send message: {err:?}");
}
_ => {}
return;
}
}
}
});
let args = vec!["-c".to_string(), cmd.join(" ")];
let log_path = env::PITCHFORK_LOGS_DIR
.join(&name)
.join(format!("{name}.log"));
xx::file::mkdirp(log_path.parent().unwrap())?;
debug!("starting daemon: {name} with args: {args:?}");
match tokio::process::Command::new("sh")
.args(&args)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
let pid = child.id().unwrap();
self.active_pids.insert(pid, name.clone());
info!("started daemon {name} with pid {pid}");
let daemon = StateFileDaemon {
name: name.clone(),
pid,
status: DaemonStatus::Running,
};
self.state_file.daemons.insert(name.clone(), daemon.clone());
self.state_file.write()?;
tx.send(Event::DaemonStart(daemon.clone())).unwrap();
tokio::spawn(async move {
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let mut stdout = tokio::io::BufReader::new(stdout).lines();
let mut stderr = tokio::io::BufReader::new(stderr).lines();
let mut log_appender = tokio::fs::File::options()
.append(true)
.create(true)
.open(&log_path)
.await
.unwrap();
loop {
select! {
Ok(Some(line)) = stdout.next_line() => {
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
}
Ok(Some(line)) = stderr.next_line() => {
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
},
else => break,
Event::DaemonFailed { name: n, error } => {
if n == name {
if let Err(err) =
send.send(IpcMessage::DaemonFailed { name, error }).await
{
error!("failed to send message: {err:?}");
}
return;
}
let status = child.wait().await.unwrap();
info!("daemon {name} exited with status {status}");
tx.send(Event::DaemonStop(daemon)).unwrap();
});
}
Err(err) => {
info!("failed to start daemon: {err:?}");
self.event_tx
.send(Event::DaemonFailed {
name,
error: format!("{err:?}"),
})
.unwrap();
}
_ => {}
}
}
}
_ => {
debug!("received unknown message: {msg}");
});
let args = vec!["-c".to_string(), cmd.join(" ")];
let log_path = env::PITCHFORK_LOGS_DIR
.join(name)
.join(format!("{name}.log"));
xx::file::mkdirp(log_path.parent().unwrap())?;
debug!("starting daemon: {name} with args: {args:?}");
match tokio::process::Command::new("sh")
.args(&args)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
let pid = child.id().unwrap();
self.active_pids.insert(pid, name.to_string());
info!("started daemon {name} with pid {pid}");
let daemon = StateFileDaemon {
name: name.to_string(),
pid,
status: DaemonStatus::Running,
};
self.state_file
.daemons
.insert(name.to_string(), daemon.clone());
self.state_file.write()?;
tx.send(Event::DaemonStart(daemon.clone())).unwrap();
let name = name.to_string();
tokio::spawn(async move {
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let mut stdout = tokio::io::BufReader::new(stdout).lines();
let mut stderr = tokio::io::BufReader::new(stderr).lines();
let mut log_appender = tokio::fs::File::options()
.append(true)
.create(true)
.open(&log_path)
.await
.unwrap();
loop {
select! {
Ok(Some(line)) = stdout.next_line() => {
debug!("stdout: {name} {line}");
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
}
Ok(Some(line)) = stderr.next_line() => {
debug!("stderr: {name} {line}");
log_appender.write_all(line.as_bytes()).await.unwrap();
log_appender.write_all(b"\n").await.unwrap();
},
else => break,
}
}
let status = child.wait().await.unwrap();
info!("daemon {name} exited with status {status}");
tx.send(Event::DaemonStop(daemon)).unwrap();
});
}
Err(err) => {
info!("failed to start daemon: {err:?}");
self.event_tx
.send(Event::DaemonFailed {
name: name.to_string(),
error: format!("{err:?}"),
})
.unwrap();
}
}

Ok(())
}

Expand Down

0 comments on commit bcb57ee

Please sign in to comment.