From 3e3d9c684d5c83d25ef44bb9c1fbb6e65ed10af8 Mon Sep 17 00:00:00 2001 From: Jacob Garby Date: Tue, 21 May 2024 00:52:09 +0200 Subject: [PATCH] nice progress --- rsman/Cargo.lock | 144 +++++++++++++++++++++++++++++++ rsman/Cargo.toml | 6 +- rsman/lib/resman_common.rs | 50 +++++++++++ rsman/src/client/main.rs | 85 +++++++++++++++++- rsman/src/daemon/main.rs | 133 +++++++++++++++++++++++++--- rsman/src/daemon/shared_queue.rs | 41 +++++++++ 6 files changed, 441 insertions(+), 18 deletions(-) create mode 100644 rsman/src/daemon/shared_queue.rs diff --git a/rsman/Cargo.lock b/rsman/Cargo.lock index c5c5210..3f52f2f 100644 --- a/rsman/Cargo.lock +++ b/rsman/Cargo.lock @@ -17,6 +17,55 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "anstream" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" + +[[package]] +name = "anstyle-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -68,18 +117,82 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +[[package]] +name = "clap" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" + +[[package]] +name = "colorchoice" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" + [[package]] name = "gimli" version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "libc" version = "0.2.154" @@ -213,8 +326,10 @@ dependencies = [ name = "rsman" version = "0.1.0" dependencies = [ + "clap", "nix", "serde", + "serde_json", "tokio", ] @@ -224,6 +339,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -250,6 +371,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -275,6 +407,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.63" @@ -322,6 +460,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/rsman/Cargo.toml b/rsman/Cargo.toml index 8e003cb..7738785 100644 --- a/rsman/Cargo.toml +++ b/rsman/Cargo.toml @@ -16,6 +16,8 @@ name = "resman" path = "src/client/main.rs" [dependencies] -nix = { version = "0.28.0", features = ["signal"] } -serde = "1.0.201" +clap = { version = "4.5.4", features = ["derive"] } +nix = { version = "0.28.0", features = ["signal", "user"] } +serde = { version = "1.0.201", features = ["derive"] } +serde_json = "1.0.117" tokio = { version = "1", features = ["full"] } diff --git a/rsman/lib/resman_common.rs b/rsman/lib/resman_common.rs index 764028b..02ebefb 100644 --- a/rsman/lib/resman_common.rs +++ b/rsman/lib/resman_common.rs @@ -1 +1,51 @@ +use serde::{Deserialize, Serialize}; + pub const SOCKET_NAME: &str = "/tmp/resman_socket"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum IPCMessage { + QueueReq(QueueRequest), + SkipReq(SkipRequest), + StatReq(StatusRequest), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct QueueRequest { + pub pid: i32, + pub uid: u32, + pub msg: String, + pub cmd: String, // it's not run by the server, but sent out of interest +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct QueueResponse { + pub success: bool, + pub place_in_queue: i32, + pub job_id: i32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SkipRequest { + pub job_id: i32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SkipResponse { + pub success: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatusRequest {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatusResponse { + pub jobs: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct JobStatus { + pub job_id: i32, + pub uid: u32, + pub msg: String, + pub elapsed_seconds: i32, +} diff --git a/rsman/src/client/main.rs b/rsman/src/client/main.rs index 7575b90..28c5809 100644 --- a/rsman/src/client/main.rs +++ b/rsman/src/client/main.rs @@ -1,5 +1,84 @@ -use resman_common::test; +use clap::{Parser, Subcommand}; +use resman_common::*; +use std::io; +use tokio::net::UnixStream; -pub fn main() { - test(); +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + #[command(subcommand)] + subcommand: Subcommands, +} + +#[derive(Subcommand, Debug)] +enum Subcommands { + #[command(aliases = &["c", "ch", "chk"], about = "Get reservation status.")] + Check { + #[arg(short, long, help = "Confirm that user has read notice.")] + interactive: bool, + + #[arg(short, long, help = "Silence output; only set exit status.")] + silent: bool, + }, + + #[command(aliases = &["x", "e", "ex", "exe"], about = "Queue a command.")] + Exec { + #[arg(short, long, help = "Short message to show to others.")] + msg: Option, + + #[arg(trailing_var_arg = true, help = "The command to queue.")] + args: Vec, + }, + + #[command(aliases = &["s", "sk"], about = "Skip the current job.")] + Skip { + #[arg(short, long, help = "Skip even other users' jobs.")] + force: bool, + }, +} + +pub fn main() -> io::Result<()> { + let args = Args::parse(); + + // let stream = UnixStream::connect(SOCKET_NAME); + + match &args.subcommand { + Subcommands::Exec { msg, args } => { + println!("Executing command: {:?}", args); + + let req: QueueRequest = QueueRequest { + pid: 1337, + uid: 2, + msg: match msg { + Some(msg) => msg.to_owned(), + None => String::from(""), + }, + cmd: args.join(" "), + }; + + let req = IPCMessage::QueueReq(req); + let req = serde_json::to_string(&req).unwrap(); + + println!("Serialised = {}", req); + + let stream = UnixStream::connect(SOCKET_NAME); + } + + Subcommands::Check { + interactive, + silent, + } => { + println!("Server is reserved?: "); + + let req: StatusRequest = StatusRequest {}; + } + + Subcommands::Skip { force } => { + println!("Skipping job..."); + + let req: SkipRequest = SkipRequest { job_id: 10 }; + } + } + + Ok(()) } diff --git a/rsman/src/daemon/main.rs b/rsman/src/daemon/main.rs index 087084d..80e9db7 100644 --- a/rsman/src/daemon/main.rs +++ b/rsman/src/daemon/main.rs @@ -1,38 +1,145 @@ -use std::io; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::sync::Arc; +use std::{fs, io, str}; + +use nix::sys::signal::Signal::*; +use tokio::io::AsyncReadExt; use tokio::net::{UnixListener, UnixStream}; use tokio::task; +use tokio::time::{sleep, Duration}; -async fn handle_client(mut stream: UnixStream) -> io::Result<()> { - let mut buffer = [0u8; 1024]; +use resman_common::{QueueRequest, SOCKET_NAME}; - loop { - let bytes_read = stream.read(&mut buffer).await?; +mod shared_queue; +use shared_queue::JobQueue; + +use nix::sys::signal::kill; +use nix::sys::wait::{waitpid, WaitStatus}; +use nix::unistd::Pid; + +async fn handle_client(mut stream: UnixStream, queue: Arc) -> io::Result<()> { + let mut buff = [0u8; 512]; + let mut full_str: String = "".to_string(); - if bytes_read == 0 { + loop { + let n_bytes = stream.read(&mut buff).await?; + if n_bytes == 0 { break; } - println!("Received: {:?}", &buffer[..bytes_read]); + let buff = match str::from_utf8(&buff[..n_bytes]) { + Ok(v) => v, + Err(e) => { + eprintln!("Invalid UTF-8 from client: {}", e); + break; + } + }; - // Echo the data back to the client - stream.write_all(&buffer[..bytes_read]).await?; + full_str.push_str(buff); } + let request: QueueRequest = serde_json::from_str(&full_str)?; + println!("Got data: {:?}", request); + + queue.enqueue(request).await; + Ok(()) } +/* Ensure that a given Pid actually exists */ +fn verify_process(pid: Pid) -> bool { + match kill(pid, None) { + Ok(_) => true, + Err(e) => { + eprintln!("Process with PID {} failed to verify: {}", pid, e); + return false; + } + } +} + +async fn dispatch_jobs(queue: Arc) { + loop { + sleep(Duration::from_millis(1000)).await; + + let next_job = queue.dequeue().await; + println!("[dispatcher] Got {:#?}", next_job); + + let pid = Pid::from_raw(next_job.pid); + + if verify_process(pid) { + println!("[dispatcher] Running job: {}", next_job.pid); + + match kill(pid, SIGINT) { + Ok(_) => { + println!("[dispatcher] Successfully sent SIGCONT."); + } + Err(e) => { + eprintln!("[dispatcher] Failed to SIGCONT: {}", e); + continue; + } + } + + // Loop until the given process has finished. + // In some cases, waitpid returns when the process has merely been stopped, in which + // case we need to call it again. + loop { + match waitpid(pid, None) { + Ok(stat) => { + println!("[dispatcher] Process {} has finished.", pid); + match stat { + WaitStatus::Exited(_, exit_code) => { + println!( + "[dispatcher] Process {} exited with code {}.", + pid, exit_code + ); + break; + } + WaitStatus::Signaled(_, sig, _) => { + println!( + "[dispatcher] Process {} exited due to signal {}.", + pid, sig + ); + break; + } + _ => { + println!( + "[dispatcher] Process {} has not exited, but is stopped.", + pid + ); + } + } + } + Err(e) => { + eprintln!("[dispatcher] waitpid failed: {}", e); + continue; + } + } + } + } + } +} + #[tokio::main] async fn main() -> io::Result<()> { - let listener = UnixListener::bind(resman_common::SOCKET_NAME)?; + let job_queue = Arc::new(JobQueue::new()); + + match fs::remove_file(SOCKET_NAME) { + _ => {} + } + let listener = UnixListener::bind(SOCKET_NAME)?; + + println!("Server listening at {}", SOCKET_NAME); - println!("Server listening at {}", resman_common::SOCKET_NAME); + let queue_clone = job_queue.clone(); + task::spawn(async move { + dispatch_jobs(queue_clone).await; + }); loop { match listener.accept().await { Ok((stream, _addr)) => { + let queue_clone = job_queue.clone(); task::spawn(async move { - if let Err(e) = handle_client(stream).await { + if let Err(e) = handle_client(stream, queue_clone).await { eprintln!("Error handling client: {}", e); } }); diff --git a/rsman/src/daemon/shared_queue.rs b/rsman/src/daemon/shared_queue.rs new file mode 100644 index 0000000..d982d1e --- /dev/null +++ b/rsman/src/daemon/shared_queue.rs @@ -0,0 +1,41 @@ +use resman_common::QueueRequest; + +use std::collections::VecDeque; + +use tokio::sync::{Mutex, Notify}; + +pub struct JobQueue { + queue: Mutex>, + notify: Notify, +} + +impl JobQueue { + pub fn new() -> Self { + Self { + queue: Mutex::new(VecDeque::new()), + notify: Notify::new(), + } + } + + pub async fn enqueue(&self, job: QueueRequest) { + let mut queue = self.queue.lock().await; + queue.push_front(job); + self.notify.notify_one(); + } + + pub async fn dequeue(&self) -> QueueRequest { + loop { + let mut queue = self.queue.lock().await; + if let Some(job) = queue.pop_front() { + return job; + } + drop(queue); + self.notify.notified().await; + } + } + + pub async fn len(&self) -> usize { + let queue = self.queue.lock().await; + queue.len() + } +}