Skip to content

Commit

Permalink
fixups, add audible
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 23, 2024
1 parent a83c769 commit 5e071a3
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 65 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Core principles:

Modules
- `visual`: Camera process manager and writer. Status: Beta
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Alpha
- `audible`: Call feature trigger and monitor. Status: Alpha
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Beta
- `audible`: Call feature trigger and monitor. Status: Beta
- `numerical`: Telemetry scraper and sender (tpu-telemetry python replacement). Status: Incomplete
- `logger`: MQTT receiver and disk logger. Status: Beta
- `logger`: MQTT receiver and disk logger. Status: Alpha


**This program will only run on Odysseus**
11 changes: 8 additions & 3 deletions src/audible.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error;

use tokio::sync::watch::Receiver;
use tokio::{process::Command, sync::watch::Receiver};
use tokio_util::sync::CancellationToken;

/// runs the mute/unmute functionality
Expand All @@ -11,11 +11,16 @@ pub async fn audible_manager(
loop {
tokio::select! {
_ = cancel_token.cancelled() => {

Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
},
new = mute_stat_recv.changed() => {
new?;

// to mute or not
if *mute_stat_recv.borrow_and_update() {
Command::new("linphonecsh").args(["generic", "mute"]).spawn()?.wait().await?;
} else {
Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute";

/// The save location for all files
pub static SAVE_LOCATION: std::sync::OnceLock<String> = std::sync::OnceLock::new();

33 changes: 14 additions & 19 deletions src/lockdown.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{error::Error, process::Stdio, sync::Arc, time::Duration};
use std::{error::Error, process::Stdio, time::Duration};

use tokio::{
io,
process::{Child, Command},
sync::{watch::Receiver, RwLock},
sync::watch::Receiver,
};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
Expand All @@ -16,24 +16,15 @@ pub async fn lockdown_runner(
cancel_token: CancellationToken,
mut hv_stat_recv: Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let cmds: Arc<RwLock<(Child, Child)>> = Arc::new(RwLock::new((
Command::new("sleep")
.args(["2147483647"])
.stdin(Stdio::null())
.spawn()?,
Command::new("sleep")
.args(["2147483647"])
.stdin(Stdio::null())
.spawn()?,
)));
if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await {
let mut cmds: Option<(Child, Child)> = None;
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await {
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
break Ok(());
Expand All @@ -48,11 +39,11 @@ pub async fn lockdown_runner(
warn!("Could not lock down!!!");
continue;
};
*cmds.write().await = children;
cmds = Some(children);
},
HVTransition::TransitionOff => {
info!("Unlocking!");
if let Err(err) = hv_transition_disabled(&mut (*cmds.write().await)).await {
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
},
Expand Down Expand Up @@ -115,6 +106,7 @@ pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> {
"-C",
&cerb_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
Command::new("minicom")
.args([
Expand All @@ -125,12 +117,13 @@ pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> {
"-C",
&shep_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
))
}

/// Transition to HV off
pub async fn hv_transition_disabled(child_writers: &mut (Child, Child)) -> io::Result<()> {
pub async fn hv_transition_disabled(child_writers: &mut Option<(Child, Child)>) -> io::Result<()> {
let mut cmd_cerb_rec = Command::new("usbip")
.args(["bind", "--busid", "1-1.3"])
.spawn()?;
Expand All @@ -141,8 +134,10 @@ pub async fn hv_transition_disabled(child_writers: &mut (Child, Child)) -> io::R
cmd_cerb_rec.wait().await?;
cmd_shep_rec.wait().await?;

child_writers.0.kill().await?;
child_writers.1.kill().await?;
if let Some(child_writers) = child_writers {
child_writers.0.kill().await?;
child_writers.1.kill().await?;
}

// if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() {
// println!("Failed to run USBIP command(s) to unbind");
Expand Down
49 changes: 43 additions & 6 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,70 @@
use std::error::Error;

use protobuf::Message;
use tokio::sync::mpsc::Receiver;
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::playback_data;
use crate::{playback_data, HVTransition, SAVE_LOCATION};

/// runs the mute/unmute functionality
/// Takes in a receiver of all MQTT messages
pub async fn logger_manager(
cancel_token: CancellationToken,
mut mqtt_recv_rx: Receiver<playback_data::PlaybackData>,
mut mqtt_recv_rx: tokio::sync::mpsc::Receiver<playback_data::PlaybackData>,
mut hv_stat_recv: tokio::sync::watch::Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut writer: Option<BufWriter<File>> = None;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Some(writer) = writer.as_mut() {
return Ok(writer.flush().await?)
}
return Ok(())
},
new = hv_stat_recv.changed() => {
new?;
let val = *hv_stat_recv.borrow_and_update();
match val {
HVTransition::TransitionOn(hvon_data) => {
let filename = format!("{}/event-{}/data_dump.log", SAVE_LOCATION.get().unwrap(), hvon_data.time_ms);
writer = Some(BufWriter::new(File::create_new(filename).await.expect("Could not create log file!")));
},
HVTransition::TransitionOff => {
if let Some(writ) = writer.as_mut() {
writ.flush().await?;
writer = None;

} else {
warn!("Logger - Transition off was unexpected");
}
},
}

},
msg = mqtt_recv_rx.recv() => {
let parsed_msg = match msg {
if writer.is_none() {
continue;
}

match msg {
Some(msg) => {
msg
if let Some(writ) = writer.as_mut() {
if let Err(err) = writ.write(&msg.write_length_delimited_to_bytes().unwrap()).await {
warn!("Could not write to log! {}", err);
}
}
},
None => {
warn!("Could not receive message!");
continue;
}
};
}
//println!("{:?}", parsed_msg.write_to_bytes());
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn main() {

if cli.lockdown {
info!("Running lockdown module");
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv));
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv.clone()));
}

if cli.audible {
Expand All @@ -175,7 +175,11 @@ async fn main() {
}
if cli.logger {
info!("Running logger module");
task_tracker.spawn(logger_manager(token.clone(), mqtt_recv_rx.unwrap()));
task_tracker.spawn(logger_manager(
token.clone(),
mqtt_recv_rx.unwrap(),
hv_stat_recv.clone(),
));
}

task_tracker.close();
Expand Down
24 changes: 10 additions & 14 deletions src/numerical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fs, time::Duration};

use sysinfo::{Components, MemoryRefreshKind, Pid, ProcessesToUpdate, System};
use tokio::sync::{mpsc::Sender, Mutex};
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};

Expand All @@ -13,9 +13,8 @@ pub async fn collect_data(
mqtt_sender_tx: Sender<PublishableMessage>,
) {
// create requisites
let sys = Mutex::new(System::new_all());
let mut sys_l = sys.lock().await;
sys_l.refresh_all();
let mut sys = System::new_all();
sys.refresh_all();

// for CPU temp
let mut components = Components::new_with_refreshed_list();
Expand All @@ -38,7 +37,6 @@ pub async fn collect_data(
warn!("Could not parse mosquitto pid, using 1, error: {}", a);
1
});
drop(sys_l);

// STEP 1: add a refresh rates for the message

Expand Down Expand Up @@ -79,30 +77,28 @@ pub async fn collect_data(
const TOPIC_B: &str = "TPU/OnBoard/BrokerCpuUsage";
const UNIT_B: &str = "%";

let mut sys_l = sys.lock().await;

sys_l.refresh_cpu_usage();
sys_l.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid_clean)]),
sys.refresh_cpu_usage();
sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid_clean)]),
true);

let process = sys_l.process(Pid::from_u32(pid_clean)).unwrap_or_else(|| {
let process = sys.process(Pid::from_u32(pid_clean)).unwrap_or_else(|| {
warn!("Could not find mosquitto from PID, using 1");
sys_l.process(Pid::from(1)).unwrap()
sys.process(Pid::from(1)).unwrap()
});
trace!("Using process: {:?}", process.name());

vec![
PublishableMessage{ topic: TOPIC_C, data: vec![sys_l.global_cpu_usage()], unit: UNIT_C },
PublishableMessage{ topic: TOPIC_C, data: vec![sys.global_cpu_usage()], unit: UNIT_C },
PublishableMessage{ topic: TOPIC_B, data: vec![process.cpu_usage()], unit: UNIT_B }]
},
_ = mem_avail_int.tick() => {
const TOPIC: &str = "TPU/OnBoard/MemAvailable";
const UNIT: &str = "MB";

let mut sys_l = sys.lock().await;
sys_l.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());
sys.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());

vec![PublishableMessage { topic: TOPIC, data: vec![sys_l.free_memory() as f32 / 1e6], unit: UNIT}]
vec![PublishableMessage { topic: TOPIC, data: vec![sys.free_memory() as f32 / 1e6], unit: UNIT}]
}
};
for msg in msgs {
Expand Down
34 changes: 17 additions & 17 deletions src/visual.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{error::Error, process::Stdio, sync::Arc, time::Duration};
use std::{error::Error, process::Stdio, time::Duration};

use tokio::{
process::{Child, Command},
sync::{watch::Receiver, RwLock},
sync::watch::Receiver,
};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
Expand All @@ -23,26 +23,20 @@ pub async fn run_save_pipeline(
// ffmpeg -f video4linux2 -input_format mjpeg -s 1280x720 -i /dev/video0 -vf "drawtext=fontfile=FreeSerif.tff: \
//text='%{localtime\:%T}': [email protected]: x=7: y=700" -vcodec libx264 -preset veryfast -f mp4 -pix_fmt yuv420p -y output.mp4

//let mut res: Option<Child> = None;

let cmd: Arc<RwLock<Child>> = Arc::new(RwLock::new(
Command::new("sleep")
.args(["2147483647"])
.stdin(Stdio::null())
.spawn()?,
));

//let mut first_run = false;
let mut cmd: Option<Child> = None;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Ffmpeg canceled");
(*cmd.write().await).wait().await.unwrap();
if cmd.as_ref().is_some() {
cmd.unwrap().wait().await.unwrap();
}
return Ok(())
},
new = hv_stat_recv.changed() => {
new?;

let curr_data = *hv_stat_recv.borrow_and_update();
match curr_data {
HVTransition::TransitionOn(hvon_data) => {
Expand Down Expand Up @@ -74,11 +68,15 @@ pub async fn run_save_pipeline(
"-y",
&save_location
]).stdin(Stdio::null()).spawn()?;
*cmd.write().await = cmd_new;
cmd = Some(cmd_new);

},
HVTransition::TransitionOff => {
let proc_id = cmd.read().await.id().unwrap_or_default();
if cmd.is_none() {
continue;
}
if let Some(val) = cmd.as_mut() {
let proc_id = val.id().unwrap_or_default();
// logic to safely shutdown since ffmpeg doesnt capture our fake ctrl+c from mqtt
// first try and run a SIGTERM kill
if let Ok(mut child) = Command::new("kill").args(["-SIGTERM".to_string(), proc_id.to_string(), ]).spawn() {
Expand All @@ -92,13 +90,15 @@ pub async fn run_save_pipeline(
let mut tics_cnt = 0;
while tics_cnt < 12 {
tokio::time::sleep(Duration::from_secs(1)).await;
if (*cmd.write().await).try_wait().is_ok_and(|f| f.is_some()){
if val.try_wait().is_ok_and(|f| f.is_some()){
break;
}
tics_cnt += 1;
}
// finally as a last sanity check kill the ffmpeg process, as worst case is leaving it dangling
let _ = (*cmd.write().await).kill().await;
let _ = val.kill().await;
}

},
}
},
Expand Down

0 comments on commit 5e071a3

Please sign in to comment.