Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 9, 2024
1 parent f4c32bf commit 91ca860
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .idea/pitchfork.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 79 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ indexmap = { version = "2", features = ["serde"] }
interprocess = { version = "2", features = ["tokio"] }
itertools = "0.13.0"
log = "0.4"
miette = { version = "7.4.0", features = ["fancy"] }
notify-debouncer-mini = "0.5.0"
once_cell = "1"
psutil = "3"
Expand Down
4 changes: 3 additions & 1 deletion src/cli/daemon/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::cli::daemon::kill_or_stop;
use crate::state_file::StateFile;
use crate::{env, Result};
use duct::cmd;
use miette::IntoDiagnostic;

/// Starts the internal pitchfork daemon in the background
#[derive(Debug, clap::Args)]
Expand All @@ -24,7 +25,8 @@ impl Start {
cmd!(&*env::BIN_PATH, "daemon", "run")
.stdout_null()
.stderr_null()
.start()?;
.start()
.into_diagnostic()?;

Ok(())
}
Expand Down
89 changes: 43 additions & 46 deletions src/cli/logs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::watch_files::WatchFiles;
use crate::{env, Result};
use itertools::Itertools;
use miette::IntoDiagnostic;
use notify_debouncer_mini::notify::RecursiveMode;
use notify_debouncer_mini::{new_debouncer, DebounceEventResult};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use std::time::Duration;
use xx::regex;

Expand All @@ -29,36 +31,42 @@ pub struct Logs {
impl Logs {
pub async fn run(&self) -> Result<()> {
let names = self.name.iter().collect::<HashSet<_>>();
let log_files = xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
let mut log_files = xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
.into_iter()
.filter(|d| !d.starts_with("."))
.filter(|d| d.is_dir())
.filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
.filter(|n| names.is_empty() || names.contains(n))
.map(|n| {
let path = env::PITCHFORK_LOGS_DIR
.join(&n)
.join(format!("{n}.log"))
.canonicalize()
.into_diagnostic()?;
Ok((
n.clone(),
env::PITCHFORK_LOGS_DIR
.join(&n)
.join(format!("{n}.log"))
.canonicalize()?,
LogFile {
_name: n,
file: xx::file::open(&path)?,
// TODO: might be better to build the length when reading the file so we don't have gaps
cur: xx::file::metadata(&path).into_diagnostic()?.len(),
path,
},
))
})
.filter_ok(|(_, f)| f.exists())
.filter_ok(|(_, f)| f.path.exists())
.collect::<Result<BTreeMap<_, _>>>()?;
let mut log_file_sizes = log_files

let files_to_name = log_files
.iter()
.map(|(name, path)| {
let size = fs::metadata(path).unwrap().len();
(path.clone(), size)
})
.map(|(n, f)| (f.path.clone(), n.clone()))
.collect::<HashMap<_, _>>();

let log_lines = log_files.iter().flat_map(|(name, path)| {
let rev = match xx::file::open(path) {
let log_lines = log_files.iter().flat_map(|(name, lf)| {
let rev = match xx::file::open(&lf.path) {
Ok(f) => rev_lines::RevLines::new(f),
Err(e) => {
error!("{}: {}", path.display(), e);
error!("{}: {}", lf.path.display(), e);
return vec![];
}
};
Expand All @@ -85,43 +93,25 @@ impl Logs {
}

if self.tail {
let h = tokio::runtime::Handle::current();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let mut debouncer = new_debouncer(
Duration::from_millis(10),
move |res: DebounceEventResult| {
let tx = tx.clone();
h.spawn(async move {
if let Ok(ev) = res {
for path in ev.into_iter().map(|e| e.path) {
tx.send(path).await.unwrap();
}
}
});
},
)?;
let mut wf = WatchFiles::new(Duration::from_millis(10))?;

for (_name, path) in &log_files {
debouncer
.watcher()
.watch(path, RecursiveMode::NonRecursive)?;
for lf in log_files.values() {
wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
}

while let Some(path) = rx.recv().await {
let mut f = fs::File::open(&path)?;
let name = log_files.iter().find(|(_, p)| **p == path).unwrap().0;
let mut existing_size = *log_file_sizes.get(&path).unwrap();
f.seek(SeekFrom::Start(existing_size))?;
let lines = BufReader::new(f)
.lines()
.filter_map(Result::ok)
.collect_vec();
existing_size += lines.iter().fold(0, |acc, l| acc + l.len() as u64);
let lines = merge_log_lines(name, lines);
while let Some(path) = wf.rx.recv().await {
let name = files_to_name.get(&path).unwrap().to_string();
let info = log_files.get_mut(&name).unwrap();
info.file
.seek(SeekFrom::Start(info.cur))
.into_diagnostic()?;
let reader = BufReader::new(&info.file);
let lines = reader.lines().map_while(Result::ok).collect_vec();
info.cur += lines.iter().fold(0, |acc, l| acc + l.len() as u64);
let lines = merge_log_lines(&name, lines);
for (date, name, msg) in lines {
println!("{} {} {}", date, name, msg);
}
log_file_sizes.insert(path, existing_size);
}
}

Expand All @@ -147,3 +137,10 @@ fn merge_log_lines(name: &str, lines: Vec<String>) -> Vec<(String, String, Strin
}
})
}

struct LogFile {
_name: String,
path: PathBuf,
file: fs::File,
cur: u64,
}
2 changes: 1 addition & 1 deletion src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::ipc::client::IpcClient;
use crate::ipc::IpcMessage;
use crate::Result;
use eyre::bail;
use miette::bail;

/// Runs a one-off daemon
#[derive(Debug, clap::Args)]
Expand Down
6 changes: 3 additions & 3 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::ipc::{deserialize, fs_name, serialize, IpcMessage};
use crate::Result;
use exponential_backoff::Backoff;
use eyre::bail;
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use interprocess::local_socket::traits::tokio::Stream;
use miette::{bail, IntoDiagnostic};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -68,14 +68,14 @@ impl IpcClient {
}
msg.push(0);
let mut send = self.send.lock().await;
send.write_all(&msg).await?;
send.write_all(&msg).await.into_diagnostic()?;
Ok(())
}

pub async fn read(&self) -> Result<IpcMessage> {
let mut recv = self.recv.lock().await;
let mut bytes = Vec::new();
recv.read_until(0, &mut bytes).await?;
recv.read_until(0, &mut bytes).await.into_diagnostic()?;
deserialize(&bytes)
}
}
Loading

0 comments on commit 91ca860

Please sign in to comment.