Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 7, 2024
1 parent 8cc4f83 commit 2aad201
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use clap::Parser;
use crate::Result;
use clap::Parser;

mod start;
mod daemon;
mod run;
mod start;

#[derive(Debug, clap::Parser)]
struct Cli {
Expand Down
8 changes: 5 additions & 3 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{env, Result};
use eyre::bail;
use interprocess::local_socket::{GenericFilePath, ToFsName};
use interprocess::local_socket::traits::tokio::Stream;
use interprocess::local_socket::{GenericFilePath, ToFsName};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::try_join;

Expand All @@ -25,8 +25,10 @@ impl Run {
}
dbg!(&self);

let conn =
interprocess::local_socket::tokio::Stream::connect(env::IPC_SOCK_PATH.clone().to_fs_name::<GenericFilePath>()?).await?;
let conn = interprocess::local_socket::tokio::Stream::connect(
env::IPC_SOCK_PATH.clone().to_fs_name::<GenericFilePath>()?,
)
.await?;
let (recv, mut send) = conn.split();
let mut read = tokio::io::BufReader::new(recv);
let mut buffer = String::with_capacity(1024);
Expand Down
1 change: 1 addition & 0 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 1 addition & 1 deletion src/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub(crate) mod client;
pub(crate) mod server;
pub(crate) mod server;
6 changes: 3 additions & 3 deletions src/ipc/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{env, Result};
use interprocess::local_socket::{GenericFilePath, ListenerOptions, Name, ToFsName};
use std::path::Path;
use tokio::fs;
use interprocess::local_socket::{GenericFilePath, ListenerOptions, Name, ToFsName};
use crate::{env, Result};

pub async fn listen() -> Result<interprocess::local_socket::tokio::Listener> {
let _ = fs::remove_file(&*env::IPC_SOCK_PATH).await;
Expand All @@ -13,4 +13,4 @@ pub async fn listen() -> Result<interprocess::local_socket::tokio::Listener> {
fn fs_name(path: &Path) -> Result<Name> {
let fs_name = path.clone().to_fs_name::<GenericFilePath>()?;
Ok(fs_name)
}
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ extern crate log;
mod cli;
mod daemon;
mod env;
mod ipc;
mod logger;
mod state_file;
mod pitchfork_toml;
mod procs;
mod state_file;
mod supervisor;
mod ui;
mod ipc;

pub use eyre::Result;

Expand Down
2 changes: 1 addition & 1 deletion src/state_file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Result;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use crate::Result;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct StateFile {
Expand Down
46 changes: 30 additions & 16 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ enum Event {

impl Supervisor {
pub fn new(pid_file: StateFile) -> Self {
Self { state_file: pid_file, last_run: time::Instant::now() }
Self {
state_file: pid_file,
last_run: time::Instant::now(),
}
}

pub async fn start(mut self) -> Result<()> {
Expand All @@ -39,7 +42,13 @@ impl Supervisor {

let listener = ipc::server::listen().await?;

self.state_file.daemons.insert("pitchfork".to_string(), StateFileDaemon { pid, status: StateFileDaemonStatus::Running });
self.state_file.daemons.insert(
"pitchfork".to_string(),
StateFileDaemon {
pid,
status: StateFileDaemonStatus::Running,
},
);
self.state_file.write()?;

let (tx, mut rx) = channel(1);
Expand All @@ -65,14 +74,14 @@ impl Supervisor {
continue;
}
};

let mut recv = BufReader::new(&conn);
let mut send = &conn;
let mut buffer = String::with_capacity(1024);
let send = send.write_all(b"Hello, world!\n");
let recv = recv.read_line(&mut buffer);
try_join!(send, recv)?;

println!("Received: {}", buffer.trim());
}
}
Expand Down Expand Up @@ -167,22 +176,27 @@ impl Supervisor {

fn file_watch(&self, tx: Sender<Event>) -> Result<Debouncer<RecommendedWatcher>> {
let h = tokio::runtime::Handle::current();
let mut debouncer = new_debouncer(Duration::from_secs(2), move |res: DebounceEventResult| {
let tx = tx.clone();
h.spawn(async move {
if let Ok(ev) = res {
let paths = ev.into_iter().map(|e| e.path).collect();
tx.send(Event::FileChange(paths)).await.unwrap();
}
});
})?;
let mut debouncer =
new_debouncer(Duration::from_secs(2), move |res: DebounceEventResult| {
let tx = tx.clone();
h.spawn(async move {
if let Ok(ev) = res {
let paths = ev.into_iter().map(|e| e.path).collect();
tx.send(Event::FileChange(paths)).await.unwrap();
}
});
})?;

debouncer.watcher().watch(&env::BIN_PATH, RecursiveMode::NonRecursive)?;
debouncer.watcher().watch(&self.state_file.path, RecursiveMode::NonRecursive)?;
debouncer
.watcher()
.watch(&env::BIN_PATH, RecursiveMode::NonRecursive)?;
debouncer
.watcher()
.watch(&self.state_file.path, RecursiveMode::NonRecursive)?;

Ok(debouncer)
}

fn interval_watch(&self, tx: Sender<Event>) -> Result<()> {
let tx = tx.clone();
tokio::spawn(async move {
Expand Down

0 comments on commit 2aad201

Please sign in to comment.