Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 7, 2024
1 parent 1cc4484 commit 11149d8
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 32 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ console = "0.15"
dirs = "5"
duct = "0.13"
eyre = "0.6"
fork = "0.2.0"
indexmap = { version = "2", features = ["serde"] }
interprocess = { version = "2", features = ["tokio"] }
log = "0.4"
Expand Down
6 changes: 6 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@ run = [
dir = "docs"
run = "bun i && bun run docs:dev"

[tasks.lint-fix]
run = [
"cargo fmt --all",
"cargo clippy --fix --allow-dirty --allow-staged --all-targets --all-features -- -D warnings",
]

[tools]
bun = "latest"
24 changes: 23 additions & 1 deletion src/cli/daemon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Result;
use crate::{procs, Result};

mod run;
mod start;

#[derive(Debug, clap::Args)]
pub struct Daemon {
Expand All @@ -11,12 +12,33 @@ pub struct Daemon {
#[derive(Debug, clap::Subcommand)]
enum Commands {
Run(run::Run),
Start(start::Start),
}

impl Daemon {
pub async fn run(self) -> Result<()> {
match self.command {
Commands::Run(run) => run.run().await,
Commands::Start(start) => start.run().await,
}
}
}

/// if --force is passed, will kill existing process
/// Returns false if existing pid is running and --force was not passed (so we should cancel starting the daemon)
pub fn kill_or_stop(existing_pid: u32, force: bool) -> Result<bool> {
if let Some(process) = procs::get_process(existing_pid) {
if force {
if sysinfo::Process::kill_with(process, sysinfo::Signal::Term).is_none() {
sysinfo::Process::kill(process);
}
Ok(true)
} else {
let existing_pid = process.pid();
warn!("Pitchfork is already running with pid {existing_pid}. Kill it with `--force`");
Ok(false)
}
} else {
Ok(true)
}
}
28 changes: 5 additions & 23 deletions src/cli/daemon/run.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::cli::daemon::kill_or_stop;
use crate::env;
use crate::state_file::StateFile;
use crate::supervisor::Supervisor;
use crate::Result;
use crate::{env, procs};

/// Runs the internal pitchfork daemon in the foreground
#[derive(Debug, clap::Args)]
pub struct Run {
/// kill existing daemon
#[clap(short, long)]
force: bool,
}
Expand All @@ -13,32 +16,11 @@ impl Run {
pub async fn run(&self) -> Result<()> {
let pid_file = StateFile::read(&*env::PITCHFORK_STATE_FILE)?;
if let Some(d) = pid_file.daemons.get("pitchfork") {
if !(self.kill_or_stop(d.pid)?) {
if !(kill_or_stop(d.pid, self.force)?) {
return Ok(());
}
}

Supervisor::new(pid_file).start().await
}

/// if --force is passed, will kill existing process
/// Returns false if existing pid is running and --force was not passed (so we should cancel starting the daemon)
fn kill_or_stop(&self, existing_pid: u32) -> Result<bool> {
if let Some(process) = procs::get_process(existing_pid) {
if self.force {
if sysinfo::Process::kill_with(process, sysinfo::Signal::Term).is_none() {
sysinfo::Process::kill(process);
}
Ok(true)
} else {
let existing_pid = process.pid();
warn!(
"Pitchfork is already running with pid {existing_pid}. Kill it with `--force`"
);
Ok(false)
}
} else {
Ok(true)
}
}
}
30 changes: 30 additions & 0 deletions src/cli/daemon/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::cli::daemon::kill_or_stop;
use crate::state_file::StateFile;
use crate::supervisor::Supervisor;
use crate::{env, Result};

/// Starts the internal pitchfork daemon in the background
#[derive(Debug, clap::Args)]
#[clap()]
pub struct Start {
/// kill existing daemon
#[clap(short, long)]
force: bool,
}

impl Start {
pub async fn run(&self) -> Result<()> {
let pid_file = StateFile::read(&*env::PITCHFORK_STATE_FILE)?;
if let Some(d) = pid_file.daemons.get("pitchfork") {
if !(kill_or_stop(d.pid, self.force)?) {
return Ok(());
}
}

if let Ok(fork::Fork::Child) = fork::daemon(false, false) {
Supervisor::new(pid_file).start().await?;
}

Ok(())
}
}
5 changes: 4 additions & 1 deletion src/cli/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::Result;
/// Starts a daemon from a pitchfork.toml file
#[derive(Debug, clap::Args)]
#[clap()]
pub struct Start {}
pub struct Start {
/// Name of the daemon(s) in pitchfork.toml to start
name: Vec<String>,
}

impl Start {
pub async fn run(&self) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/procs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use once_cell::sync::Lazy;

pub static SYSTEM: Lazy<sysinfo::System> = Lazy::new(|| sysinfo::System::new_all());
pub static SYSTEM: Lazy<sysinfo::System> = Lazy::new(sysinfo::System::new_all);

pub fn get_process(pid: u32) -> Option<&'static sysinfo::Process> {
SYSTEM.process(sysinfo::Pid::from_u32(pid))
Expand Down
10 changes: 4 additions & 6 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use std::process::exit;
use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, BufReader};
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
use tokio::sync::mpsc::{channel, Sender};
use tokio::{signal, time, try_join};
use tokio::{signal, time};

pub struct Supervisor {
state_file: StateFile,
Expand Down Expand Up @@ -197,11 +197,9 @@ impl Supervisor {
}
};
let mut recv = BufReader::new(&stream);
let mut send = &stream;
// let mut send = &stream;
let mut buffer = String::with_capacity(1024);
let send = send.write_all(b"Hello, world!\n");
let recv = recv.read_line(&mut buffer);
match try_join!(send, recv) {
match recv.read_line(&mut buffer).await {
Ok(_) => {
tx.send(Event::Conn(buffer.trim().to_string()))
.await
Expand Down

0 comments on commit 11149d8

Please sign in to comment.