From dd990ed2541ad865a69cc286391e8238f1cd8fd7 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 9 Nov 2024 17:57:23 -0500 Subject: [PATCH] misc untested improvements --- src/audible.rs | 22 ++++++++++++++++ src/lib.rs | 4 +++ src/lockdown.rs | 62 +++++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 33 ++++++++++++------------ src/mqtt_handler.rs | 36 +++++++++++--------------- src/visual.rs | 39 ++++++++++++++++++---------- 6 files changed, 144 insertions(+), 52 deletions(-) diff --git a/src/audible.rs b/src/audible.rs index e69de29..0d125d0 100644 --- a/src/audible.rs +++ b/src/audible.rs @@ -0,0 +1,22 @@ +use std::error::Error; + +use tokio::sync::watch::Receiver; +use tokio_util::sync::CancellationToken; + +/// runs the mute/unmute functionality +pub async fn audible_manager( + cancel_token: CancellationToken, + mut mute_stat_recv: Receiver, +) -> Result<(), Box> { + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + + }, + new = mute_stat_recv.changed() => { + new?; + + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 49c8d21..0e2348c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod audible; pub mod lockdown; pub mod mqtt_handler; pub mod numerical; @@ -13,3 +14,6 @@ pub struct PublishableMessage { /// the topic to listen for for HV enable, 1 is on 0 is off pub const HV_EN_TOPIC: &str = "MPU/State/TSMS"; + +/// the topic to listen for mute enable, 1 is on 0 is off +pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute"; diff --git a/src/lockdown.rs b/src/lockdown.rs index 3a0d239..d6fc0a0 100644 --- a/src/lockdown.rs +++ b/src/lockdown.rs @@ -1,17 +1,75 @@ use std::error::Error; -use tokio::sync::watch::Receiver; +use tokio::{process::Command, sync::watch::Receiver}; +use tokio_util::sync::CancellationToken; use tracing::info; +/// Run various HV on/off lockdowns pub async fn lockdown_runner( + cancel_token: CancellationToken, mut hv_stat_recv: Receiver, ) -> Result<(), Box> { + let mut prev_state = false; + loop { tokio::select! { + _ = cancel_token.cancelled() => { + hv_transition_disabled().await; + break Ok(()); + }, new = hv_stat_recv.changed() => { new?; - info!("New HV state:{}", *hv_stat_recv.borrow_and_update()); + let curr_state = *hv_stat_recv.borrow_and_update(); + if prev_state == curr_state { continue } else{ + prev_state = curr_state; + } + + info!("New HV state: {}", curr_state); + if curr_state { + hv_transition_enabled().await; + } else { + hv_transition_disabled().await; + } + } } } } + +/// Transition to HV on +pub async fn hv_transition_enabled() { + let mut cmd_cerb_dis = Command::new("usbip") + .args(["unbind", "--busid", "1-1.3"]) + .spawn() + .unwrap(); + let mut cmd_shep_dis = Command::new("usbip") + .args(["unbind", "--busid", "1-1.4"]) + .spawn() + .unwrap(); + + cmd_cerb_dis.wait().await.unwrap(); + cmd_shep_dis.wait().await.unwrap(); + + // if !cmd_cerb_dis.wait().await.unwrap().success() && !cmd_shep_dis.wait().await.unwrap().success() { + // info!("Failed to run USBIP command(s) to unbind"); + // } +} + +/// Transition to HV off +pub async fn hv_transition_disabled() { + let mut cmd_cerb_rec = Command::new("usbip") + .args(["bind", "--busid", "1-1.3"]) + .spawn() + .unwrap(); + let mut cmd_shep_rec = Command::new("usbip") + .args(["bind", "--busid", "1-1.4"]) + .spawn() + .unwrap(); + + cmd_cerb_rec.wait().await.unwrap(); + cmd_shep_rec.wait().await.unwrap(); + + // if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() { + // println!("Failed to run USBIP command(s) to unbind"); + // } +} diff --git a/src/main.rs b/src/main.rs index 7112177..8362f33 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::{ use clap::Parser; use odysseus_daemon::{ + audible::audible_manager, lockdown::lockdown_runner, mqtt_handler::{MqttProcessor, MqttProcessorOptions}, numerical::collect_data, @@ -24,15 +25,19 @@ use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; #[derive(Parser, Debug)] #[command(version)] struct VisualArgs { - /// Enable lockdown mode + /// Enable lockdown module #[arg(short = 's', long, env = "TPU_TELEMETRY_LOCKDOWN_ENABLE")] lockdown: bool, - /// Enable data mode + /// Enable audio module + #[arg(short = 'a', long, env = "TPU_TELEMETRY_AUDIBLE_ENABLE")] + audible: bool, + + /// Enable data module #[arg(short = 'd', long, env = "TPU_TELEMETRY_DATA_ENABLE")] data: bool, - /// Enable video mode + /// Enable video module #[arg(short = 'v', long, env = "TPU_TELEMETRY_VIDEO_ENABLE")] video: bool, @@ -85,6 +90,7 @@ async fn main() { let (mqtt_sender_tx, mqtt_sender_rx) = mpsc::channel::(1000); let (hv_stat_send, hv_stat_recv) = watch::channel(false); + let (mute_stat_send, mute_stat_recv) = watch::channel(false); let task_tracker = TaskTracker::new(); let token = CancellationToken::new(); @@ -98,23 +104,13 @@ async fn main() { tokio::time::sleep(Duration::from_secs(1)).await; } - // use the passed in folder - let save_location = format!( - "{}/frontcam-{}-ner24.avi", - cli.output_folder, - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - ); - let video_token = token.clone(); if cli.video { task_tracker.spawn(run_save_pipeline( video_token, SavePipelineOpts { video: cli.video_uri.clone(), - save_location, + save_location: cli.output_folder, }, )); } @@ -124,9 +120,9 @@ async fn main() { token.clone(), mqtt_sender_rx, hv_stat_send, + mute_stat_send, MqttProcessorOptions { mqtt_path: cli.mqtt_url, - mqtt_recv: None, }, ); let (client, eventloop) = AsyncClient::new(opts, 600); @@ -140,7 +136,12 @@ async fn main() { if cli.lockdown { info!("Running lockdown module"); - task_tracker.spawn(lockdown_runner(hv_stat_recv)); + task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv)); + } + + if cli.audible { + info!("Running audio module"); + task_tracker.spawn(audible_manager(token.clone(), mute_stat_recv)); } task_tracker.close(); diff --git a/src/mqtt_handler.rs b/src/mqtt_handler.rs index db14a47..06ff9bd 100644 --- a/src/mqtt_handler.rs +++ b/src/mqtt_handler.rs @@ -8,11 +8,11 @@ use rumqttc::v5::{ mqttbytes::{v5::Packet, QoS}, AsyncClient, Event, EventLoop, MqttOptions, }; -use tokio::sync::{mpsc::Receiver, watch::Sender, RwLock}; +use tokio::sync::{mpsc::Receiver, watch::Sender}; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; -use crate::{serverdata, PublishableMessage, HV_EN_TOPIC}; +use crate::{serverdata, PublishableMessage, HV_EN_TOPIC, MUTE_EN_TOPIC}; /// The chief processor of incoming mqtt data, this handles /// - mqtt state @@ -21,16 +21,14 @@ use crate::{serverdata, PublishableMessage, HV_EN_TOPIC}; pub struct MqttProcessor { cancel_token: CancellationToken, mqtt_sender_rx: Receiver, - mqtt_recv: Option<(String, Arc>)>, hv_stat_send: Sender, + mute_stat_send: Sender, } /// processor options, these are static immutable settings pub struct MqttProcessorOptions { /// URI of the mqtt server pub mqtt_path: String, - /// MQTT topic and place to put data, or none - pub mqtt_recv: Option<(String, Arc>)>, } impl MqttProcessor { @@ -39,6 +37,7 @@ impl MqttProcessor { cancel_token: CancellationToken, mqtt_sender_rx: Receiver, hv_stat_send: Sender, + mute_stat_send: Sender, opts: MqttProcessorOptions, ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it @@ -69,8 +68,8 @@ impl MqttProcessor { MqttProcessor { cancel_token, mqtt_sender_rx, - mqtt_recv: opts.mqtt_recv, hv_stat_send, + mute_stat_send, }, mqtt_opts, ) @@ -81,15 +80,6 @@ impl MqttProcessor { /// * `client` - The async mqttt v5 client to use for subscriptions pub async fn process_mqtt(mut self, client: Arc, mut eventloop: EventLoop) { debug!("Subscribing to siren with inputted topic"); - if self.mqtt_recv.as_ref().is_some() { - client - .subscribe( - self.mqtt_recv.as_ref().unwrap().0.clone(), - rumqttc::v5::mqttbytes::QoS::ExactlyOnce, - ) - .await - .expect("Could not subscribe to Siren"); - } client .subscribe(HV_EN_TOPIC, rumqttc::v5::mqttbytes::QoS::ExactlyOnce) .await @@ -111,14 +101,9 @@ impl MqttProcessor { warn!("Could not parse topic, topic: {:?}", msg.topic); continue; }; - if let Some(ref mqtt_recv) = self.mqtt_recv { - let mut d = mqtt_recv.1.write().await; - *d = *res.values.first().unwrap_or(&0f32); - } - + let val = *res.values.first().unwrap_or(&-1f32) as u8; match topic { HV_EN_TOPIC => { - let val = *res.values.first().unwrap_or(&-1f32) as u8; if val == 1 { self.hv_stat_send.send(true).expect("HV Stat Channel Closed"); } else if val == 0 { @@ -127,6 +112,15 @@ impl MqttProcessor { warn!("Received bad HV message!"); } }, + MUTE_EN_TOPIC => { + if val == 1 { + self.mute_stat_send.send(true).expect("Mute Stat Channel Closed"); + } else if val == 0 { + self.mute_stat_send.send(false).expect("Mute Stat Channel Closed"); + } else { + warn!("Received bad mute message!"); + } + }, _ => { warn!("Unknown topic received: {}", topic); } diff --git a/src/visual.rs b/src/visual.rs index 7dd102c..27c4bb3 100644 --- a/src/visual.rs +++ b/src/visual.rs @@ -1,4 +1,8 @@ -use std::{error::Error, process::Stdio}; +use std::{ + error::Error, + process::Stdio, + time::{SystemTime, UNIX_EPOCH}, +}; use tokio::process::Command; use tokio_util::sync::CancellationToken; @@ -16,6 +20,17 @@ pub async fn run_save_pipeline( ) -> Result<(), Box> { // ffmpeg -f video4linux2 -input_format mjpeg -s 1280x720 -i /dev/video0 -vf "drawtext=fontfile=FreeSerif.tff: \ //text='%{localtime\:%T}': fontcolor=white@0.8: x=7: y=700" -vcodec libx264 -preset veryfast -f mp4 -pix_fmt yuv420p -y output.mp4 + + // use the passed in folder + let save_location = format!( + "{}/frontcam-{}-ner24.avi", + vid_opts.save_location, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + ); + info!("Creating and launching ffmpeg..."); let mut res = Command::new("ffmpeg").args([ "-f", @@ -37,20 +52,18 @@ pub async fn run_save_pipeline( "-pix_fmt", "yuv420p", "-y", - &vid_opts.save_location + &save_location ]).stdin(Stdio::null()).spawn()?; - loop { - tokio::select! { - _ = cancel_token.cancelled() => { - info!("Ffmpeg canceled"); - res.wait().await.unwrap(); - return Ok(()) - }, - _ = res.wait() => { - warn!("Ffmpeg ended early!"); - return Ok(()) - } + tokio::select! { + _ = cancel_token.cancelled() => { + info!("Ffmpeg canceled"); + res.wait().await.unwrap(); + Ok(()) + }, + _ = res.wait() => { + warn!("Ffmpeg ended early!"); + Ok(()) } } }