diff --git a/README.md b/README.md index 8ee19fa..3dd18b9 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,10 @@ Core principles: Modules - `visual`: Camera process manager and writer. Status: Beta -- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Alpha -- `audible`: Call feature trigger and monitor. Status: Alpha +- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Beta +- `audible`: Call feature trigger and monitor. Status: Beta - `numerical`: Telemetry scraper and sender (tpu-telemetry python replacement). Status: Incomplete -- `logger`: MQTT receiver and disk logger. Status: Beta +- `logger`: MQTT receiver and disk logger. Status: Alpha **This program will only run on Odysseus** diff --git a/src/audible.rs b/src/audible.rs index 0d125d0..abf290c 100644 --- a/src/audible.rs +++ b/src/audible.rs @@ -1,6 +1,6 @@ use std::error::Error; -use tokio::sync::watch::Receiver; +use tokio::{process::Command, sync::watch::Receiver}; use tokio_util::sync::CancellationToken; /// runs the mute/unmute functionality @@ -11,11 +11,16 @@ pub async fn audible_manager( loop { tokio::select! { _ = cancel_token.cancelled() => { - + Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?; }, new = mute_stat_recv.changed() => { new?; - + // to mute or not + if *mute_stat_recv.borrow_and_update() { + Command::new("linphonecsh").args(["generic", "mute"]).spawn()?.wait().await?; + } else { + Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?; + } } } } diff --git a/src/lib.rs b/src/lib.rs index d42674b..6959f27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,4 +39,3 @@ pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute"; /// The save location for all files pub static SAVE_LOCATION: std::sync::OnceLock = std::sync::OnceLock::new(); - diff --git a/src/lockdown.rs b/src/lockdown.rs index 9ced330..09aaa7b 100644 --- a/src/lockdown.rs +++ b/src/lockdown.rs @@ -1,9 +1,9 @@ -use std::{error::Error, process::Stdio, sync::Arc, time::Duration}; +use std::{error::Error, process::Stdio, time::Duration}; use tokio::{ io, process::{Child, Command}, - sync::{watch::Receiver, RwLock}, + sync::watch::Receiver, }; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; @@ -16,24 +16,15 @@ pub async fn lockdown_runner( cancel_token: CancellationToken, mut hv_stat_recv: Receiver, ) -> Result<(), Box> { - let cmds: Arc> = Arc::new(RwLock::new(( - Command::new("sleep") - .args(["2147483647"]) - .stdin(Stdio::null()) - .spawn()?, - Command::new("sleep") - .args(["2147483647"]) - .stdin(Stdio::null()) - .spawn()?, - ))); - if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await { + let mut cmds: Option<(Child, Child)> = None; + if let Err(err) = hv_transition_disabled(&mut cmds).await { warn!("Could not unlock!!! {}", err); } loop { tokio::select! { _ = cancel_token.cancelled() => { - if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await { + if let Err(err) = hv_transition_disabled(&mut cmds).await { warn!("Could not unlock!!! {}", err); } break Ok(()); @@ -48,11 +39,11 @@ pub async fn lockdown_runner( warn!("Could not lock down!!!"); continue; }; - *cmds.write().await = children; + cmds = Some(children); }, HVTransition::TransitionOff => { info!("Unlocking!"); - if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await { + if let Err(err) = hv_transition_disabled(&mut cmds).await { warn!("Could not unlock!!! {}", err); } }, @@ -115,6 +106,7 @@ pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> { "-C", &cerb_save_loc, ]) + .stdout(Stdio::null()) .spawn()?, Command::new("minicom") .args([ @@ -125,12 +117,13 @@ pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> { "-C", &shep_save_loc, ]) + .stdout(Stdio::null()) .spawn()?, )) } /// Transition to HV off -pub async fn hv_transition_disabled(child_writers: &mut (Child, Child)) -> io::Result<()> { +pub async fn hv_transition_disabled(child_writers: &mut Option<(Child, Child)>) -> io::Result<()> { let mut cmd_cerb_rec = Command::new("usbip") .args(["bind", "--busid", "1-1.3"]) .spawn()?; @@ -141,8 +134,10 @@ pub async fn hv_transition_disabled(child_writers: &mut (Child, Child)) -> io::R cmd_cerb_rec.wait().await?; cmd_shep_rec.wait().await?; - child_writers.0.kill().await?; - child_writers.1.kill().await?; + if let Some(child_writers) = child_writers { + child_writers.0.kill().await?; + child_writers.1.kill().await?; + } // if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() { // println!("Failed to run USBIP command(s) to unbind"); diff --git a/src/logger.rs b/src/logger.rs index b422a70..6661c5a 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,33 +1,70 @@ use std::error::Error; use protobuf::Message; -use tokio::sync::mpsc::Receiver; +use tokio::{ + fs::File, + io::{AsyncWriteExt, BufWriter}, +}; use tokio_util::sync::CancellationToken; use tracing::warn; -use crate::playback_data; +use crate::{playback_data, HVTransition, SAVE_LOCATION}; /// runs the mute/unmute functionality /// Takes in a receiver of all MQTT messages pub async fn logger_manager( cancel_token: CancellationToken, - mut mqtt_recv_rx: Receiver, + mut mqtt_recv_rx: tokio::sync::mpsc::Receiver, + mut hv_stat_recv: tokio::sync::watch::Receiver, ) -> Result<(), Box> { + let mut writer: Option> = None; + loop { tokio::select! { _ = cancel_token.cancelled() => { + if let Some(writer) = writer.as_mut() { + return Ok(writer.flush().await?) + } return Ok(()) }, + new = hv_stat_recv.changed() => { + new?; + let val = *hv_stat_recv.borrow_and_update(); + match val { + HVTransition::TransitionOn(hvon_data) => { + let filename = format!("{}/event-{}/data_dump.log", SAVE_LOCATION.get().unwrap(), hvon_data.time_ms); + writer = Some(BufWriter::new(File::create_new(filename).await.expect("Could not create log file!"))); + }, + HVTransition::TransitionOff => { + if let Some(writ) = writer.as_mut() { + writ.flush().await?; + writer = None; + + } else { + warn!("Logger - Transition off was unexpected"); + } + }, + } + + }, msg = mqtt_recv_rx.recv() => { - let parsed_msg = match msg { + if writer.is_none() { + continue; + } + + match msg { Some(msg) => { - msg + if let Some(writ) = writer.as_mut() { + if let Err(err) = writ.write(&msg.write_length_delimited_to_bytes().unwrap()).await { + warn!("Could not write to log! {}", err); + } + } }, None => { warn!("Could not receive message!"); continue; } - }; + } //println!("{:?}", parsed_msg.write_to_bytes()); } } diff --git a/src/main.rs b/src/main.rs index 7f4db96..674439b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -166,7 +166,7 @@ async fn main() { if cli.lockdown { info!("Running lockdown module"); - task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv)); + task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv.clone())); } if cli.audible { @@ -175,7 +175,11 @@ async fn main() { } if cli.logger { info!("Running logger module"); - task_tracker.spawn(logger_manager(token.clone(), mqtt_recv_rx.unwrap())); + task_tracker.spawn(logger_manager( + token.clone(), + mqtt_recv_rx.unwrap(), + hv_stat_recv.clone(), + )); } task_tracker.close(); diff --git a/src/numerical.rs b/src/numerical.rs index fca55de..7db8b5f 100644 --- a/src/numerical.rs +++ b/src/numerical.rs @@ -1,7 +1,7 @@ use std::{fs, time::Duration}; use sysinfo::{Components, MemoryRefreshKind, Pid, ProcessesToUpdate, System}; -use tokio::sync::{mpsc::Sender, Mutex}; +use tokio::sync::mpsc::Sender; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -13,9 +13,8 @@ pub async fn collect_data( mqtt_sender_tx: Sender, ) { // create requisites - let sys = Mutex::new(System::new_all()); - let mut sys_l = sys.lock().await; - sys_l.refresh_all(); + let mut sys = System::new_all(); + sys.refresh_all(); // for CPU temp let mut components = Components::new_with_refreshed_list(); @@ -38,7 +37,6 @@ pub async fn collect_data( warn!("Could not parse mosquitto pid, using 1, error: {}", a); 1 }); - drop(sys_l); // STEP 1: add a refresh rates for the message @@ -79,30 +77,28 @@ pub async fn collect_data( const TOPIC_B: &str = "TPU/OnBoard/BrokerCpuUsage"; const UNIT_B: &str = "%"; - let mut sys_l = sys.lock().await; - sys_l.refresh_cpu_usage(); - sys_l.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid_clean)]), + sys.refresh_cpu_usage(); + sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid_clean)]), true); - let process = sys_l.process(Pid::from_u32(pid_clean)).unwrap_or_else(|| { + let process = sys.process(Pid::from_u32(pid_clean)).unwrap_or_else(|| { warn!("Could not find mosquitto from PID, using 1"); - sys_l.process(Pid::from(1)).unwrap() + sys.process(Pid::from(1)).unwrap() }); trace!("Using process: {:?}", process.name()); vec![ - PublishableMessage{ topic: TOPIC_C, data: vec![sys_l.global_cpu_usage()], unit: UNIT_C }, + PublishableMessage{ topic: TOPIC_C, data: vec![sys.global_cpu_usage()], unit: UNIT_C }, PublishableMessage{ topic: TOPIC_B, data: vec![process.cpu_usage()], unit: UNIT_B }] }, _ = mem_avail_int.tick() => { const TOPIC: &str = "TPU/OnBoard/MemAvailable"; const UNIT: &str = "MB"; - let mut sys_l = sys.lock().await; - sys_l.refresh_memory_specifics(MemoryRefreshKind::new().with_ram()); + sys.refresh_memory_specifics(MemoryRefreshKind::new().with_ram()); - vec![PublishableMessage { topic: TOPIC, data: vec![sys_l.free_memory() as f32 / 1e6], unit: UNIT}] + vec![PublishableMessage { topic: TOPIC, data: vec![sys.free_memory() as f32 / 1e6], unit: UNIT}] } }; for msg in msgs { diff --git a/src/visual.rs b/src/visual.rs index b7e11eb..e28b948 100644 --- a/src/visual.rs +++ b/src/visual.rs @@ -1,8 +1,8 @@ -use std::{error::Error, process::Stdio, sync::Arc, time::Duration}; +use std::{error::Error, process::Stdio, time::Duration}; use tokio::{ process::{Child, Command}, - sync::{watch::Receiver, RwLock}, + sync::watch::Receiver, }; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; @@ -23,26 +23,20 @@ pub async fn run_save_pipeline( // ffmpeg -f video4linux2 -input_format mjpeg -s 1280x720 -i /dev/video0 -vf "drawtext=fontfile=FreeSerif.tff: \ //text='%{localtime\:%T}': fontcolor=white@0.8: x=7: y=700" -vcodec libx264 -preset veryfast -f mp4 -pix_fmt yuv420p -y output.mp4 - //let mut res: Option = None; - - let cmd: Arc> = Arc::new(RwLock::new( - Command::new("sleep") - .args(["2147483647"]) - .stdin(Stdio::null()) - .spawn()?, - )); - - //let mut first_run = false; + let mut cmd: Option = None; loop { tokio::select! { _ = cancel_token.cancelled() => { info!("Ffmpeg canceled"); - (*cmd.write().await).wait().await.unwrap(); + if cmd.as_ref().is_some() { + cmd.unwrap().wait().await.unwrap(); + } return Ok(()) }, new = hv_stat_recv.changed() => { new?; + let curr_data = *hv_stat_recv.borrow_and_update(); match curr_data { HVTransition::TransitionOn(hvon_data) => { @@ -74,11 +68,15 @@ pub async fn run_save_pipeline( "-y", &save_location ]).stdin(Stdio::null()).spawn()?; - *cmd.write().await = cmd_new; + cmd = Some(cmd_new); }, HVTransition::TransitionOff => { - let proc_id = cmd.read().await.id().unwrap_or_default(); + if cmd.is_none() { + continue; + } + if let Some(val) = cmd.as_mut() { + let proc_id = val.id().unwrap_or_default(); // logic to safely shutdown since ffmpeg doesnt capture our fake ctrl+c from mqtt // first try and run a SIGTERM kill if let Ok(mut child) = Command::new("kill").args(["-SIGTERM".to_string(), proc_id.to_string(), ]).spawn() { @@ -92,13 +90,15 @@ pub async fn run_save_pipeline( let mut tics_cnt = 0; while tics_cnt < 12 { tokio::time::sleep(Duration::from_secs(1)).await; - if (*cmd.write().await).try_wait().is_ok_and(|f| f.is_some()){ + if val.try_wait().is_ok_and(|f| f.is_some()){ break; } tics_cnt += 1; } // finally as a last sanity check kill the ffmpeg process, as worst case is leaving it dangling - let _ = (*cmd.write().await).kill().await; + let _ = val.kill().await; + } + }, } },