Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 10, 2024
1 parent b6ef6b9 commit 5fc37c6
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 22 deletions.
1 change: 0 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ enum Commands {
Wait(wait::Wait),
}

#[tokio::main]
pub async fn run() -> Result<()> {
let args = Cli::parse();
match args.command {
Expand Down
15 changes: 9 additions & 6 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ impl Run {
if self.force {
ipc.send(IpcMessage::Stop(self.name.clone())).await?;
loop {
match ipc.read().await? {
IpcMessage::DaemonStop { name } => {
match ipc.read().await {
Some(IpcMessage::DaemonStop { name }) => {
info!("stopped daemon {}", name);
break;
}
None => {
break;
}
msg => {
debug!("ignoring message: {:?}", msg);
}
Expand All @@ -44,24 +47,24 @@ impl Run {
ipc.send(IpcMessage::Run(self.name.clone(), self.cmd.clone()))
.await?;
loop {
match ipc.read().await? {
IpcMessage::DaemonAlreadyRunning(id) => {
match ipc.read().await {
Some(IpcMessage::DaemonAlreadyRunning(id)) => {
if self.force {
bail!("failed to stop daemon {}", id);
} else {
info!("daemon {} already running", id);
}
break;
}
IpcMessage::DaemonStart(daemon) => {
Some(IpcMessage::DaemonStart(daemon)) => {
info!(
"started daemon {} with pid {}",
daemon.name,
daemon.pid.unwrap()
);
break;
}
IpcMessage::DaemonFailed { name, error } => {
Some(IpcMessage::DaemonFailed { name, error }) => {
bail!("Failed to start daemon {}: {}", name, error);
}
msg => {
Expand Down
20 changes: 16 additions & 4 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl IpcClient {
let client = Self::connect_(&id, "main").await?;
trace!("Connected to IPC socket");
client.send(IpcMessage::Connect(client.id.clone())).await?;
let msg = client.read().await?;
let msg = client.read().await.unwrap();
assert!(msg.is_connect_ok());
debug!("Connected to IPC main");
Ok(client)
Expand Down Expand Up @@ -72,10 +72,22 @@ impl IpcClient {
Ok(())
}

pub async fn read(&self) -> Result<IpcMessage> {
pub async fn read(&self) -> Option<IpcMessage> {
let mut recv = self.recv.lock().await;
let mut bytes = Vec::new();
recv.read_until(0, &mut bytes).await.into_diagnostic()?;
deserialize(&bytes)
if let Err(err) = recv.read_until(0, &mut bytes).await.into_diagnostic() {
warn!("Failed to read IPC message: {}", err);
}
if bytes.is_empty() {
None
} else {
match deserialize(&bytes) {
Ok(msg) => Some(msg),
Err(err) => {
warn!("Failed to deserialize IPC message: {}", err);
None
}
}
}
}
}
16 changes: 14 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@ mod ui;
mod watch_files;

pub use miette::Result;
use tokio::signal;
use tokio::signal::unix::SignalKind;

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
logger::init();
cli::run()
handle_epipe();
cli::run().await
}

fn handle_epipe() {
let mut pipe_stream = signal::unix::signal(SignalKind::pipe()).unwrap();
tokio::spawn(async move {
pipe_stream.recv().await;
debug!("received SIGPIPE");
});
}
11 changes: 2 additions & 9 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::watch_files::WatchFiles;
use crate::{env, Result};
use duct::cmd;
use itertools::Itertools;
use miette::IntoDiagnostic;
use notify::RecursiveMode;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -275,9 +274,8 @@ impl Supervisor {
info!("received stop message: {name}");
if let Some(daemon) = self.state_file.daemons.get(name) {
if let Some(pid) = daemon.pid {
cmd!("kill", "-TERM", pid.to_string())
.run()
.into_diagnostic()?;
self.procs.refresh_processes();
self.procs.get_process(pid).map(|p| p.kill());
self.active_pids.remove(&pid);
self.state_file
.daemons
Expand Down Expand Up @@ -330,11 +328,6 @@ impl Supervisor {
SignalKind::user_defined2(),
];
static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
let mut pipe_stream = signal::unix::signal(SignalKind::pipe()).unwrap();
tokio::spawn(async move {
pipe_stream.recv().await;
debug!("received SIGPIPE");
});
for signal in signals {
let tx = tx.clone();
tokio::spawn(async move {
Expand Down

0 comments on commit 5fc37c6

Please sign in to comment.