Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 13, 2024
1 parent 3108800 commit aec7af1
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 83 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exponential-backoff = "2"
indexmap = { version = "2", features = ["serde"] }
interprocess = { version = "2", features = ["tokio"] }
itertools = "0.13"
log = "0.4"
log = {version="0.4", features=["serde"]}
miette = { version = "7", features = ["fancy"] }
notify = { version = "7", features = ["macos_fsevent"] }
notify-debouncer-full = "0.4"
Expand Down
7 changes: 7 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ run = [
"git add pitchfork.usage.kdl docs",
]

[tasks.install-dev]
env = { "PITCHFORK_LOG" = "debug" }
run = [
"cargo install --path . --debug",
"pitchfork sup start -f",
]

[tools]
bun = "latest"

Expand Down
13 changes: 12 additions & 1 deletion src/cli/cd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::pitchfork_toml::{PitchforkToml, PitchforkTomlAuto};
use crate::{env, Result};
use duct::cmd;
use itertools::Itertools;
use log::LevelFilter;
use miette::IntoDiagnostic;
use std::collections::HashSet;

Expand All @@ -15,7 +16,7 @@ pub struct Cd {

impl Cd {
pub async fn run(&self) -> Result<()> {
if let Ok(ipc) = IpcClient::connect(false).await {
if let Ok(ipc) = IpcClient::connect(true).await {
ipc.update_shell_dir(self.shell_pid, env::CWD.clone())
.await?;

Expand Down Expand Up @@ -50,6 +51,16 @@ impl Cd {
if args.len() > 3 {
cmd(&*env::PITCHFORK_BIN, args).run().into_diagnostic()?;
}
for (level, msg) in ipc.get_notifications().await? {
match level {
LevelFilter::Trace => trace!("{}", msg),
LevelFilter::Debug => debug!("{}", msg),
LevelFilter::Info => info!("{}", msg),
LevelFilter::Warn => warn!("{}", msg),
LevelFilter::Error => error!("{}", msg),
_ => {}
}
}
} else {
debug!("No daemon running");
}
Expand Down
42 changes: 21 additions & 21 deletions src/cli/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Logs {
/// Show N lines of logs
///
/// Set to 0 to show all logs
#[clap(short, default_value = "10")]
#[clap(short, default_value = "100")]
n: usize,

/// Show logs in real-time
Expand Down Expand Up @@ -54,31 +54,31 @@ impl Logs {

fn print_existing_logs(&self) -> Result<()> {
let log_files = get_log_file_infos(&self.id)?;
let log_lines = log_files.iter().flat_map(|(name, lf)| {
let rev = match xx::file::open(&lf.path) {
Ok(f) => rev_lines::RevLines::new(f),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
return vec![];
}
};
let lines = rev.into_iter().filter_map(Result::ok);
let lines = if self.n == 0 {
lines.collect_vec()
} else {
lines.take(self.n).collect_vec()
};
merge_log_lines(name, lines)
});
let log_lines = log_files
.iter()
.flat_map(|(name, lf)| {
let rev = match xx::file::open(&lf.path) {
Ok(f) => rev_lines::RevLines::new(f),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
return vec![];
}
};
let lines = rev.into_iter().filter_map(Result::ok);
let lines = if self.n == 0 {
lines.collect_vec()
} else {
lines.take(self.n).collect_vec()
};
merge_log_lines(name, lines)
})
.sorted_by_cached_key(|l| l.0.to_string());

let log_lines = if self.n == 0 {
log_lines.collect_vec()
} else {
log_lines.take(self.n).collect_vec()
};
let log_lines = log_lines
.into_iter()
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();

for (date, id, msg) in log_lines {
if self.id.len() == 1 {
Expand Down
23 changes: 14 additions & 9 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@ impl Run {

let ipc = IpcClient::connect(true).await?;

ipc.run(RunOptions {
id: self.id.clone(),
cmd: self.run.clone(),
shell_pid: None,
force: self.force,
dir: env::CWD.clone(),
autostop: false,
})
.await?;
let started = ipc
.run(RunOptions {
id: self.id.clone(),
cmd: self.run.clone(),
shell_pid: None,
force: self.force,
dir: env::CWD.clone(),
autostop: false,
})
.await?;

if !started.is_empty() {
info!("started {}", started.join(", "));
}
Ok(())
}
}
38 changes: 21 additions & 17 deletions src/cli/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,27 @@ impl Start {
let daemon = pt.daemons.get(id);
if let Some(daemon) = daemon {
let cmd = shell_words::split(&daemon.run).into_diagnostic()?;
ipc.run(RunOptions {
id: id.clone(),
cmd,
shell_pid: self.shell_pid,
force: self.force,
autostop: daemon
.auto
.contains(&crate::pitchfork_toml::PitchforkTomlAuto::Stop),
dir: daemon
.path
.as_ref()
.unwrap()
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_default(),
})
.await?;
let started = ipc
.run(RunOptions {
id: id.clone(),
cmd,
shell_pid: self.shell_pid,
force: self.force,
autostop: daemon
.auto
.contains(&crate::pitchfork_toml::PitchforkTomlAuto::Stop),
dir: daemon
.path
.as_ref()
.unwrap()
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_default(),
})
.await?;
if !started.is_empty() {
info!("started {}", started.join(", "));
}
} else {
warn!("Daemon {} not found", id);
}
Expand Down
17 changes: 13 additions & 4 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ impl IpcClient {
}
}

pub async fn run(&self, opts: RunOptions) -> Result<()> {
info!("starting daemon {}", opts.id);
pub async fn run(&self, opts: RunOptions) -> Result<Vec<String>> {
debug!("starting daemon {}", opts.id);
let rsp = self.request(IpcRequest::Run(opts.clone())).await?;
let mut started_daemons = vec![];
match rsp {
IpcResponse::DaemonStart { daemon } => {
info!("started daemon {}", daemon);
started_daemons.push(daemon.id);
}
IpcResponse::DaemonAlreadyRunning => {
warn!("daemon {} already running", opts.id);
Expand All @@ -149,7 +150,7 @@ impl IpcClient {
}
rsp => unreachable!("unexpected response: {rsp:?}"),
}
Ok(())
Ok(started_daemons)
}

pub async fn active_daemons(&self) -> Result<Vec<Daemon>> {
Expand Down Expand Up @@ -194,4 +195,12 @@ impl IpcClient {
rsp => unreachable!("unexpected response: {rsp:?}"),
}
}

pub async fn get_notifications(&self) -> Result<Vec<(log::LevelFilter, String)>> {
let rsp = self.request(IpcRequest::GetNotifications).await?;
match rsp {
IpcResponse::Notifications(notifications) => Ok(notifications),
rsp => unreachable!("unexpected response: {rsp:?}"),
}
}
}
2 changes: 2 additions & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub enum IpcRequest {
Enable { id: String },
Disable { id: String },
UpdateShellDir { shell_pid: u32, dir: PathBuf },
GetNotifications,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum::Display, strum::EnumIs)]
Expand All @@ -41,6 +42,7 @@ pub enum IpcResponse {
Yes,
No,
Error(String),
Notifications(Vec<(log::LevelFilter, String)>),
ActiveDaemons(Vec<Daemon>),
DisabledDaemons(Vec<String>),
DaemonAlreadyStopped,
Expand Down
2 changes: 1 addition & 1 deletion src/pitchfork_toml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl PitchforkToml {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct PitchforkTomlDaemon {
pub run: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub auto: Vec<PitchforkTomlAuto>,
#[serde(skip)]
pub path: Option<PathBuf>,
Expand Down
Loading

0 comments on commit aec7af1

Please sign in to comment.