diff --git a/src/supervisor.rs b/src/supervisor.rs index 5adb37f..d6fb77a 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -9,6 +9,7 @@ use std::process::exit; use std::sync::atomic; use std::sync::atomic::AtomicBool; use std::time::Duration; +use interprocess::local_socket::tokio::Listener; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; #[cfg(unix)] use tokio::signal::unix::SignalKind; @@ -24,6 +25,7 @@ const INTERVAL: Duration = Duration::from_secs(10); enum Event { FileChange(Vec), + Conn(String), Signal, Interval, } @@ -40,8 +42,6 @@ impl Supervisor { let pid = std::process::id(); info!("Starting supervisor with pid {pid}"); - let listener = ipc::server::listen().await?; - let daemon = StateFileDaemon { pid, status: DaemonStatus::Running, @@ -52,36 +52,14 @@ impl Supervisor { let (tx, mut rx) = channel(1); self.interval_watch(tx.clone())?; self.signals(tx.clone())?; + self.conn_watch(ipc::server::listen().await?, tx.clone())?; let _file_watcher = self.file_watch(tx.clone())?; self.refresh(Event::Interval).await?; loop { - select! { - e = rx.recv() => { - if let Some(e) = e { - if let Err(err) = self.refresh(e).await { - error!("supervisor error: {:?}", err); - } - } - } - conn = listener.accept() => { - let conn = match conn { - Ok(c) => c, - Err(e) => { - error!("failed to accept connection: {:?}", e); - continue; - } - }; - - let mut recv = BufReader::new(&conn); - let mut send = &conn; - let mut buffer = String::with_capacity(1024); - let send = send.write_all(b"Hello, world!\n"); - let recv = recv.read_line(&mut buffer); - try_join!(send, recv)?; - - println!("Received: {}", buffer.trim()); - } + let e = rx.recv().await.unwrap(); + if let Err(err) = self.refresh(e).await { + error!("supervisor error: {:?}", err); } } } @@ -104,6 +82,9 @@ impl Supervisor { // self.pid_file = PidFile::read(&self.pid_file.path)?; // } } + Event::Conn(msg) => { + info!("received message: {:?}", msg); + } Event::Signal => { info!("received SIGTERM, stopping"); exit(0); @@ -196,7 +177,6 @@ impl Supervisor { } fn interval_watch(&self, tx: Sender) -> Result<()> { - let tx = tx.clone(); tokio::spawn(async move { let mut interval = time::interval(INTERVAL); loop { @@ -206,4 +186,32 @@ impl Supervisor { }); Ok(()) } + + fn conn_watch(&self, listener: Listener, tx: Sender) -> Result<()> { + tokio::spawn(async move { + loop { + let stream = match listener.accept().await { + Ok(stream) => stream, + Err(e) => { + error!("failed to accept connection: {:?}", e); + continue; + } + }; + let mut recv = BufReader::new(&stream); + let mut send = &stream; + let mut buffer = String::with_capacity(1024); + let send = send.write_all(b"Hello, world!\n"); + let recv = recv.read_line(&mut buffer); + match try_join!(send, recv) { + Ok(_) => { + tx.send(Event::Conn(buffer.trim().to_string())).await.unwrap(); + } + Err(e) => { + error!("failed to read/write: {:?}", e); + } + } + } + }); + Ok(()) + } }