Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 12, 2024
1 parent 82fa01d commit 327a76a
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 407 deletions.
6 changes: 3 additions & 3 deletions docs/cli/commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@
"full_cmd": [
"run"
],
"usage": "run [-f --force] <NAME> [CMD]...",
"usage": "run [-f --force] <ID> [CMD]...",
"subcommands": {},
"args": [
{
"name": "NAME",
"usage": "<NAME>",
"name": "ID",
"usage": "<ID>",
"help": "Name of the daemon to run",
"help_first_line": "Name of the daemon to run",
"required": true,
Expand Down
2 changes: 1 addition & 1 deletion docs/cli/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
- [`pitchfork enable <ID>`](/cli/enable.md)
- [`pitchfork list [--hide-header]`](/cli/list.md)
- [`pitchfork logs [-n <N>] [-t --tail] [ID]...`](/cli/logs.md)
- [`pitchfork run [-f --force] <NAME> [CMD]...`](/cli/run.md)
- [`pitchfork run [-f --force] <ID> [CMD]...`](/cli/run.md)
- [`pitchfork start [NAME]...`](/cli/start.md)
- [`pitchfork status <ID>`](/cli/status.md)
- [`pitchfork stop <ID>`](/cli/stop.md)
Expand Down
4 changes: 2 additions & 2 deletions docs/cli/run.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# `pitchfork run`

- **Usage**: `pitchfork run [-f --force] <NAME> [CMD]...`
- **Usage**: `pitchfork run [-f --force] <ID> [CMD]...`
- **Aliases**: `r`

Runs a one-off daemon

## Arguments

### `<NAME>`
### `<ID>`

Name of the daemon to run

Expand Down
2 changes: 1 addition & 1 deletion mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ run = [

[tasks.docs]
dir = "docs"
run = "bun i && bun run docs:dev"
run = "bun i && exec bun run docs:dev"

[tasks.lint-fix]
run = [
Expand Down
2 changes: 1 addition & 1 deletion pitchfork.usage.kdl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cmd "logs" help="Displays logs for daemon(s)" {
cmd "run" help="Runs a one-off daemon" {
alias "r"
flag "-f --force"
arg "<NAME>" help="Name of the daemon to run"
arg "<ID>" help="Name of the daemon to run"
arg "[CMD]..." var=true
}
cmd "start" help="Starts a daemon from a pitchfork.toml file" {
Expand Down
85 changes: 39 additions & 46 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use crate::cli::supervisor;
use crate::ipc::client::IpcClient;
use crate::ipc::IpcMessage;
use crate::ipc::{IpcRequest, IpcResponse};
use crate::Result;
use miette::bail;
use std::time::Duration;

/// Runs a one-off daemon
#[derive(Debug, clap::Args)]
#[clap(visible_alias = "r", verbatim_doc_comment)]
pub struct Run {
/// Name of the daemon to run
name: String,
#[clap(trailing_var_arg = true)]
id: String,
#[clap(allow_hyphen_values = true, trailing_var_arg = true)]
cmd: Vec<String>,
#[clap(short, long)]
force: bool,
Expand All @@ -28,55 +27,49 @@ impl Run {
let ipc = IpcClient::connect().await?;

if self.force {
ipc.send(IpcMessage::Stop(self.name.clone())).await?;
loop {
match ipc.read().await {
Some(IpcMessage::DaemonStop { name }) => {
info!("stopped daemon {}", name);
tokio::time::sleep(Duration::from_secs(3)).await;
break;
}
Some(IpcMessage::DaemonAlreadyStopped(name)) => {
info!("daemon {} already stopped", name);
break;
}
None => {
break;
}
msg => {
debug!("ignoring message: {:?}", msg);
}
let rsp = ipc
.request(IpcRequest::Stop {
id: self.id.clone(),
})
.await?;
match rsp {
IpcResponse::Ok => {
info!("stopped daemon {}", self.id);
}
IpcResponse::DaemonAlreadyStopped => {
info!("daemon {} already stopped", self.id);
}
rsp => unreachable!("unexpected response: {rsp:?}"),
}
}

ipc.send(IpcMessage::Run(self.name.clone(), self.cmd.clone()))
let rsp = ipc
.request(IpcRequest::Run {
id: self.id.clone(),
cmd: self.cmd.clone(),
})
.await?;
loop {
match ipc.read().await {
Some(IpcMessage::DaemonAlreadyRunning(id)) => {
if self.force {
bail!("failed to stop daemon {}", id);
} else {
info!("daemon {} already running", id);
}
break;
}
Some(IpcMessage::DaemonStart(daemon)) => {
info!(
"started daemon {} with pid {}",
daemon.name,
daemon.pid.unwrap()
);
break;
}
Some(IpcMessage::DaemonFailed { name, error }) => {
bail!("Failed to start daemon {}: {}", name, error);
}
msg => {
debug!("ignoring message: {:?}", msg);
match rsp {
IpcResponse::DaemonAlreadyRunning => {
if self.force {
bail!("failed to stop daemon {}", self.id);
} else {
info!("daemon {} already running", self.id);
}
}
IpcResponse::DaemonStart { daemon } => {
info!(
"started daemon {} with pid {}",
daemon.id,
daemon.pid.unwrap()
);
}
IpcResponse::DaemonFailed { error } => {
bail!("Failed to start daemon {}: {}", self.id, error);
}
msg => {
debug!("ignoring message: {:?}", msg);
}
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/cli/supervisor/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cli::supervisor::kill_or_stop;
use crate::env;
use crate::state_file::StateFile;
use crate::supervisor::Supervisor;
use crate::supervisor::SUPERVISOR;
use crate::Result;

/// Runs the internal pitchfork daemon in the foreground
Expand All @@ -23,6 +23,6 @@ impl Run {
}
}

Supervisor::new(pid_file).await?.start().await
SUPERVISOR.start().await
}
}
14 changes: 7 additions & 7 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub static PITCHFORK_LOGS_DIR: Lazy<PathBuf> =
Lazy::new(|| var_path("PITCHFORK_LOGS_DIR").unwrap_or(PITCHFORK_STATE_DIR.join("logs")));
pub static PITCHFORK_LOG_FILE: Lazy<PathBuf> =
Lazy::new(|| PITCHFORK_LOGS_DIR.join("pitchfork").join("pitchfork.log"));
pub static PITCHFORK_EXEC: Lazy<bool> = Lazy::new(|| var_true("PITCHFORK_EXEC"));
// pub static PITCHFORK_EXEC: Lazy<bool> = Lazy::new(|| var_true("PITCHFORK_EXEC"));

pub static IPC_SOCK_DIR: Lazy<PathBuf> = Lazy::new(|| PITCHFORK_STATE_DIR.join("sock"));
pub static IPC_SOCK_MAIN: Lazy<PathBuf> = Lazy::new(|| IPC_SOCK_DIR.join("main.sock"));
Expand All @@ -44,9 +44,9 @@ fn var_false(name: &str) -> bool {
.unwrap_or(false)
}

fn var_true(name: &str) -> bool {
var(name)
.map(|val| val.to_lowercase())
.map(|val| val == "true" || val == "1")
.unwrap_or(false)
}
// fn var_true(name: &str) -> bool {
// var(name)
// .map(|val| val.to_lowercase())
// .map(|val| val == "true" || val == "1")
// .unwrap_or(false)
// }
26 changes: 17 additions & 9 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::ipc::{deserialize, fs_name, serialize, IpcMessage};
use crate::ipc::{deserialize, fs_name, serialize, IpcRequest, IpcResponse};
use crate::Result;
use exponential_backoff::Backoff;
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use interprocess::local_socket::traits::tokio::Stream;
use miette::{bail, IntoDiagnostic};
use miette::{bail, ensure, IntoDiagnostic};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
use uuid::Uuid;

pub struct IpcClient {
id: String,
_id: String,
recv: Mutex<BufReader<RecvHalf>>,
send: Mutex<SendHalf>,
}
Expand All @@ -24,9 +24,8 @@ impl IpcClient {
let id = Uuid::new_v4().to_string();
let client = Self::connect_(&id, "main").await?;
trace!("Connected to IPC socket");
client.send(IpcMessage::Connect(client.id.clone())).await?;
let msg = client.read().await.unwrap();
assert!(msg.is_connect_ok());
let rsp = client.request(IpcRequest::Connect).await?;
ensure!(rsp.is_ok(), "Failed to connect to IPC main");
debug!("Connected to IPC main");
Ok(client)
}
Expand All @@ -39,7 +38,7 @@ impl IpcClient {
let recv = BufReader::new(recv);

return Ok(Self {
id: id.to_string(),
_id: id.to_string(),
recv: Mutex::new(recv),
send: Mutex::new(send),
});
Expand All @@ -61,7 +60,7 @@ impl IpcClient {
unreachable!()
}

pub async fn send(&self, msg: IpcMessage) -> Result<()> {
pub async fn send(&self, msg: IpcRequest) -> Result<()> {
let mut msg = serialize(&msg)?;
if msg.contains(&0) {
panic!("IPC message contains null");
Expand All @@ -72,7 +71,7 @@ impl IpcClient {
Ok(())
}

pub async fn read(&self) -> Option<IpcMessage> {
pub async fn read(&self) -> Option<IpcResponse> {
let mut recv = self.recv.lock().await;
let mut bytes = Vec::new();
if let Err(err) = recv.read_until(0, &mut bytes).await.into_diagnostic() {
Expand All @@ -90,4 +89,13 @@ impl IpcClient {
}
}
}

pub async fn request(&self, msg: IpcRequest) -> Result<IpcResponse> {
self.send(msg).await?;
loop {
if let Some(msg) = self.read().await {
return Ok(msg);
}
}
}
}
25 changes: 21 additions & 4 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,30 @@ pub enum IpcMessage {
Response(String),
}

pub fn fs_name(name: &str) -> Result<Name> {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)]
pub enum IpcRequest {
Connect,
Stop { id: String },
Run { id: String, cmd: Vec<String> },
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)]
pub enum IpcResponse {
Ok,
Error(String),
DaemonAlreadyStopped,
DaemonAlreadyRunning,
DaemonStart { daemon: StateFileDaemon },
DaemonFailed { error: String },
}

fn fs_name(name: &str) -> Result<Name> {
let path = env::IPC_SOCK_DIR.join(name).with_extension("sock");
let fs_name = path.to_fs_name::<GenericFilePath>().into_diagnostic()?;
Ok(fs_name)
}

pub fn serialize(msg: &IpcMessage) -> Result<Vec<u8>> {
fn serialize<T: serde::Serialize>(msg: &T) -> Result<Vec<u8>> {
let msg = if *env::IPC_JSON {
serde_json::to_vec(msg).into_diagnostic()?
} else {
Expand All @@ -36,10 +53,10 @@ pub fn serialize(msg: &IpcMessage) -> Result<Vec<u8>> {
Ok(msg)
}

pub fn deserialize(bytes: &[u8]) -> Result<IpcMessage> {
fn deserialize<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
let mut bytes = bytes.to_vec();
bytes.pop();
trace!("msg: {:?}", std::str::from_utf8(&bytes));
trace!("msg: {:?}", std::str::from_utf8(&bytes).unwrap_or_default());
let msg = if *env::IPC_JSON {
serde_json::from_slice(&bytes).into_diagnostic()?
} else {
Expand Down
Loading

0 comments on commit 327a76a

Please sign in to comment.