Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 7, 2024
1 parent 0b899f7 commit 0fd2483
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 53 deletions.
135 changes: 135 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ tokio = { version = "1", features = ["full"] }
toml = { version = "0.8", features = ["indexmap", "preserve_order"] }
xx = { version = "2", features = ["fslock", "hash"] }
chrono = "0.4.38"
uuid = { version = "1.11.0", features = ["v4", "fast-rng"] }
rmp-serde = "1.3.0"
strum = { version = "0.26.3", features = ["derive"] }

[target.'cfg(unix)'.dependencies]
exec = "0.3"
1 change: 1 addition & 0 deletions src/cli/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl Daemon {
pub fn kill_or_stop(existing_pid: u32, force: bool) -> Result<bool> {
if let Some(process) = procs::get_process(existing_pid) {
if force {
debug!("Killing existing pitchfork daemon with pid {existing_pid}");
if sysinfo::Process::kill_with(process, sysinfo::Signal::Term).is_none() {
sysinfo::Process::kill(process);
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/daemon/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ impl Run {
}
}

Supervisor::new(pid_file).start().await
Supervisor::new(pid_file).await?.start().await
}
}
9 changes: 5 additions & 4 deletions src/cli/daemon/start.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use duct::cmd;
use crate::cli::daemon::kill_or_stop;
use crate::state_file::StateFile;
use crate::supervisor::Supervisor;
use crate::{env, Result};

/// Starts the internal pitchfork daemon in the background
Expand All @@ -21,9 +21,10 @@ impl Start {
}
}

if let Ok(fork::Fork::Child) = fork::daemon(false, false) {
Supervisor::new(pid_file).start().await?;
}
cmd!(&*env::BIN_PATH, "daemon", "run")
.stdout_null()
.stderr_null()
.start()?;

Ok(())
}
Expand Down
21 changes: 5 additions & 16 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::{env, Result};
use crate::{ipc, Result};
use eyre::bail;
use interprocess::local_socket::traits::tokio::Stream;
use interprocess::local_socket::{GenericFilePath, ToFsName};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::try_join;
use tokio::io::AsyncWriteExt;
use crate::ipc::client::IpcClient;

/// Runs a one-off daemon
#[derive(Debug, clap::Args)]
Expand All @@ -25,17 +23,8 @@ impl Run {
}
dbg!(&self);

let conn = interprocess::local_socket::tokio::Stream::connect(
env::IPC_SOCK_PATH.clone().to_fs_name::<GenericFilePath>()?,
)
.await?;
let (recv, mut send) = conn.split();
let mut read = tokio::io::BufReader::new(recv);
let mut buffer = String::with_capacity(1024);
let send = send.write_all(b"Hello from client!\n");
let recv = read.read_line(&mut buffer);
try_join!(recv, send)?;
println!("Received: {}", buffer.trim());
let _ipc = IpcClient::connect().await?;
// ipc.send.write_all(b"Hello from client!\n").await?;
Ok(())
}
}
6 changes: 4 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use once_cell::sync::Lazy;
pub use std::env::*;
use std::path::PathBuf;

pub static BIN_PATH: Lazy<PathBuf> = Lazy::new(|| current_exe().unwrap());
pub static BIN_PATH: Lazy<PathBuf> = Lazy::new(|| current_exe().unwrap().canonicalize().unwrap());

pub static HOME_DIR: Lazy<PathBuf> = Lazy::new(|| dirs::home_dir().unwrap_or_default());
pub static PITCHFORK_STATE_DIR: Lazy<PathBuf> = Lazy::new(|| {
Expand All @@ -28,7 +28,9 @@ pub static PITCHFORK_LOG_FILE: Lazy<PathBuf> = Lazy::new(|| {
});
pub static PITCHFORK_EXEC: Lazy<bool> = Lazy::new(|| var_true("PITCHFORK_EXEC"));

pub static IPC_SOCK_PATH: Lazy<PathBuf> = Lazy::new(|| PITCHFORK_STATE_DIR.join("pitchfork.sock"));
pub static IPC_SOCK_DIR: Lazy<PathBuf> = Lazy::new(|| PITCHFORK_STATE_DIR.join("sock"));
pub static IPC_SOCK_MAIN: Lazy<PathBuf> = Lazy::new(|| IPC_SOCK_DIR.join("main.sock"));
pub static IPC_JSON: Lazy<bool> = Lazy::new(|| var_true("IPC_JSON"));

fn var_path(name: &str) -> Option<PathBuf> {
var(name).map(PathBuf::from).ok()
Expand Down
45 changes: 45 additions & 0 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1 +1,46 @@
use tokio::sync::Mutex;
use crate::ipc::{fs_name, IpcMessage};
use crate::{env, ipc, Result};
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use interprocess::local_socket::traits::tokio::Stream;
use interprocess::local_socket::{GenericFilePath, ToFsName};
use tokio::io::{AsyncWriteExt, BufReader};
use uuid::Uuid;

pub struct IpcClient {
id: String,
recv: BufReader<RecvHalf>,
send: Mutex<SendHalf>,
}

impl IpcClient {
pub async fn connect() -> Result<Self> {
// ensure nobody else can connect to the IPC server at the same time
let _fslock = xx::fslock::get(&*env::IPC_SOCK_MAIN, false)?;
let conn =
interprocess::local_socket::tokio::Stream::connect(fs_name(&env::IPC_SOCK_MAIN)?)
.await?;
debug!("Connected to IPC main");
let (recv, send) = conn.split();
let recv = BufReader::new(recv);
let id = Uuid::new_v4().to_string();
let client = IpcClient { id, recv, send: Mutex::new(send) };
client.send(IpcMessage::Connect(client.id.clone())).await?;
Ok(client)
}

pub async fn send(&self, msg: IpcMessage) -> Result<()> {
let mut msg = if *env::IPC_JSON {
serde_json::to_vec(&msg)?
} else {
rmp_serde::to_vec(&msg)?
};
// if msg.contains(&b'\n') {
// panic!("IPC message contains newline");
// }
msg.push(0);
let mut send = self.send.lock().await;
send.write_all(&msg).await?;
Ok(())
}
}
14 changes: 14 additions & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,16 @@
use std::path::Path;
use interprocess::local_socket::{GenericFilePath, Name, ToFsName};

pub(crate) mod client;
pub(crate) mod server;

#[derive(Debug, serde::Serialize, serde::Deserialize, strum::Display)]
pub enum IpcMessage {
Connect(String),
Response(String),
}

pub fn fs_name(path: &Path) -> eyre::Result<Name> {
let fs_name = path.to_fs_name::<GenericFilePath>()?;
Ok(fs_name)
}
Loading

0 comments on commit 0fd2483

Please sign in to comment.