Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 7, 2024
1 parent 02c38b4 commit 970d8b1
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::process::exit;
use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use interprocess::local_socket::tokio::Listener;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
Expand All @@ -24,6 +25,7 @@ const INTERVAL: Duration = Duration::from_secs(10);

enum Event {
FileChange(Vec<PathBuf>),
Conn(String),
Signal,
Interval,
}
Expand All @@ -40,8 +42,6 @@ impl Supervisor {
let pid = std::process::id();
info!("Starting supervisor with pid {pid}");

let listener = ipc::server::listen().await?;

let daemon = StateFileDaemon {
pid,
status: DaemonStatus::Running,
Expand All @@ -52,36 +52,14 @@ impl Supervisor {
let (tx, mut rx) = channel(1);
self.interval_watch(tx.clone())?;
self.signals(tx.clone())?;
self.conn_watch(ipc::server::listen().await?, tx.clone())?;
let _file_watcher = self.file_watch(tx.clone())?;
self.refresh(Event::Interval).await?;

loop {
select! {
e = rx.recv() => {
if let Some(e) = e {
if let Err(err) = self.refresh(e).await {
error!("supervisor error: {:?}", err);
}
}
}
conn = listener.accept() => {
let conn = match conn {
Ok(c) => c,
Err(e) => {
error!("failed to accept connection: {:?}", e);
continue;
}
};

let mut recv = BufReader::new(&conn);
let mut send = &conn;
let mut buffer = String::with_capacity(1024);
let send = send.write_all(b"Hello, world!\n");
let recv = recv.read_line(&mut buffer);
try_join!(send, recv)?;

println!("Received: {}", buffer.trim());
}
let e = rx.recv().await.unwrap();
if let Err(err) = self.refresh(e).await {
error!("supervisor error: {:?}", err);
}
}
}
Expand All @@ -104,6 +82,9 @@ impl Supervisor {
// self.pid_file = PidFile::read(&self.pid_file.path)?;
// }
}
Event::Conn(msg) => {
info!("received message: {:?}", msg);
}
Event::Signal => {
info!("received SIGTERM, stopping");
exit(0);
Expand Down Expand Up @@ -196,7 +177,6 @@ impl Supervisor {
}

fn interval_watch(&self, tx: Sender<Event>) -> Result<()> {
let tx = tx.clone();
tokio::spawn(async move {
let mut interval = time::interval(INTERVAL);
loop {
Expand All @@ -206,4 +186,32 @@ impl Supervisor {
});
Ok(())
}

fn conn_watch(&self, listener: Listener, tx: Sender<Event>) -> Result<()> {
tokio::spawn(async move {
loop {
let stream = match listener.accept().await {
Ok(stream) => stream,
Err(e) => {
error!("failed to accept connection: {:?}", e);
continue;
}
};
let mut recv = BufReader::new(&stream);
let mut send = &stream;
let mut buffer = String::with_capacity(1024);
let send = send.write_all(b"Hello, world!\n");
let recv = recv.read_line(&mut buffer);
match try_join!(send, recv) {
Ok(_) => {
tx.send(Event::Conn(buffer.trim().to_string())).await.unwrap();
}
Err(e) => {
error!("failed to read/write: {:?}", e);
}
}
}
});
Ok(())
}
}

0 comments on commit 970d8b1

Please sign in to comment.