Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jack test 1 #9

Merged
merged 8 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
859 changes: 856 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@ name = "odysseus-daemon"
version = "0.1.0"
edition = "2021"

[dependencies]
[workspace]
members = ["uploader"]

[workspace.dependencies]
reqwest = { version = "0.12.9", features = ["blocking", "multipart"] }
clap = { version = "4.5.23", features = ["derive", "env"] }


[dependencies]
clap.workspace = true
protobuf = "3.7.1"
rumqttc = "0.24.0"
serde = { version = "1.0.216", features = ["derive"] }
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ Core principles:

Modules
- `visual`: Camera process manager and writer. Status: Beta
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Alpha
- `audible`: Call feature trigger and monitor. Status: Alpha
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Beta
- `audible`: Call feature trigger and monitor. Status: Beta
- `numerical`: Telemetry scraper and sender (tpu-telemetry python replacement). Status: Incomplete
- `logger`: MQTT receiver and disk logger. Status: Beta
- `logger`: MQTT receiver and disk logger. Status: Alpha

Upload modules:
- `logger`: Upload from the logger module to scylla. Status: Beta
- `visual`: Camera video uploader to cloud platform. Status: Incomplete
- `serial`: (from `lockdown` module) Serial output uploader to cloud platform. Status Incomplete
-


**This program will only run on Odysseus**
11 changes: 8 additions & 3 deletions src/audible.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error;

use tokio::sync::watch::Receiver;
use tokio::{process::Command, sync::watch::Receiver};
use tokio_util::sync::CancellationToken;

/// runs the mute/unmute functionality
Expand All @@ -11,11 +11,16 @@ pub async fn audible_manager(
loop {
tokio::select! {
_ = cancel_token.cancelled() => {

Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
},
new = mute_stat_recv.changed() => {
new?;

// to mute or not
if *mute_stat_recv.borrow_and_update() {
Command::new("linphonecsh").args(["generic", "mute"]).spawn()?.wait().await?;
} else {
Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ pub const HV_EN_TOPIC: &str = "MPU/State/TSMS";

/// the topic to listen for mute enable, 1 is on 0 is off
pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute";

/// The save location for all files
pub static SAVE_LOCATION: std::sync::OnceLock<String> = std::sync::OnceLock::new();
103 changes: 84 additions & 19 deletions src/lockdown.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,52 @@
use std::error::Error;
use std::{error::Error, process::Stdio, time::Duration};

use tokio::{io, process::Command, sync::watch::Receiver};
use tokio::{
io,
process::{Child, Command},
sync::watch::Receiver,
};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use crate::HVTransition;
use crate::{HVTransition, SAVE_LOCATION};

/// Run various HV on/off lockdowns
/// Takes in a receiver of HV state
pub async fn lockdown_runner(
cancel_token: CancellationToken,
mut hv_stat_recv: Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut cmds: Option<(Child, Child)> = None;
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Err(err) = hv_transition_disabled().await {
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
break Ok(());
},
new = hv_stat_recv.changed() => {
new?;
let curr_data = *hv_stat_recv.borrow_and_update();
let curr_state = match curr_data {
HVTransition::TransitionOn(_) => true,
HVTransition::TransitionOff => false,
};
if curr_state {
info!("Locking down!");
if let Err(err) = hv_transition_enabled().await {
warn!("Could not lock down!!! {}", err);
}
} else {
info!("Unlocking!");
if let Err(err) = hv_transition_disabled().await {
match curr_data {
HVTransition::TransitionOn(hvon_data) => {
info!("Locking down!");
let Ok(children) = hv_transition_enabled(hvon_data.time_ms).await else {
warn!("Could not lock down!!!");
continue;
};
cmds = Some(children);
},
HVTransition::TransitionOff => {
info!("Unlocking!");
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
},
}

}
Expand All @@ -45,7 +55,9 @@ pub async fn lockdown_runner(
}

/// Transition to HV on
pub async fn hv_transition_enabled() -> io::Result<()> {
pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> {
// unbind from the usbipd server
// this automatically brings back
let mut cmd_cerb_dis = Command::new("usbip")
.args(["unbind", "--busid", "1-1.3"])
.spawn()?;
Expand All @@ -56,14 +68,62 @@ pub async fn hv_transition_enabled() -> io::Result<()> {
cmd_cerb_dis.wait().await?;
cmd_shep_dis.wait().await?;

tokio::time::sleep(Duration::from_secs(2)).await;

let mut cmd_cerb_conf = Command::new("stty")
.args(["-F", "/dev/ttyCerberus", "115200"])
.spawn()?;

let mut cmd_shep_conf = Command::new("stty")
.args(["-F", "/dev/ttyShepherd", "115200"])
.spawn()?;

cmd_cerb_conf.wait().await?;
cmd_shep_conf.wait().await?;

// TODO actually write the tty read from cat into a file, and

// if !cmd_cerb_dis.wait().await.unwrap().success() && !cmd_shep_dis.wait().await.unwrap().success() {
// info!("Failed to run USBIP command(s) to unbind");
// }
Ok(())
let cerb_save_loc = format!(
"{}/event-{}/cerberus-dump.cap",
SAVE_LOCATION.get().unwrap(),
time_ms
);
let shep_save_loc = format!(
"{}/event-{}/shepherd-dump.cap",
SAVE_LOCATION.get().unwrap(),
time_ms
);
Ok((
Command::new("minicom")
.args([
"-D",
"/dev/ttyCerberus",
"-O",
"timestamp=extended",
"-C",
&cerb_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
Command::new("minicom")
.args([
"-D",
"/dev/ttyCerberus",
"-O",
"timestamp=extended",
"-C",
&shep_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
))
}

/// Transition to HV off
pub async fn hv_transition_disabled() -> io::Result<()> {
pub async fn hv_transition_disabled(child_writers: &mut Option<(Child, Child)>) -> io::Result<()> {
let mut cmd_cerb_rec = Command::new("usbip")
.args(["bind", "--busid", "1-1.3"])
.spawn()?;
Expand All @@ -74,6 +134,11 @@ pub async fn hv_transition_disabled() -> io::Result<()> {
cmd_cerb_rec.wait().await?;
cmd_shep_rec.wait().await?;

if let Some(child_writers) = child_writers {
child_writers.0.kill().await?;
child_writers.1.kill().await?;
}

// if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() {
// println!("Failed to run USBIP command(s) to unbind");
// }
Expand Down
49 changes: 43 additions & 6 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,70 @@
use std::error::Error;

use protobuf::Message;
use tokio::sync::mpsc::Receiver;
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::playback_data;
use crate::{playback_data, HVTransition, SAVE_LOCATION};

/// runs the mute/unmute functionality
/// Takes in a receiver of all MQTT messages
pub async fn logger_manager(
cancel_token: CancellationToken,
mut mqtt_recv_rx: Receiver<playback_data::PlaybackData>,
mut mqtt_recv_rx: tokio::sync::mpsc::Receiver<playback_data::PlaybackData>,
mut hv_stat_recv: tokio::sync::watch::Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut writer: Option<BufWriter<File>> = None;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Some(writer) = writer.as_mut() {
return Ok(writer.flush().await?)
}
return Ok(())
},
new = hv_stat_recv.changed() => {
new?;
let val = *hv_stat_recv.borrow_and_update();
match val {
HVTransition::TransitionOn(hvon_data) => {
let filename = format!("{}/event-{}/data_dump.log", SAVE_LOCATION.get().unwrap(), hvon_data.time_ms);
writer = Some(BufWriter::new(File::create_new(filename).await.expect("Could not create log file!")));
},
HVTransition::TransitionOff => {
if let Some(writ) = writer.as_mut() {
writ.flush().await?;
writer = None;

} else {
warn!("Logger - Transition off was unexpected");
}
},
}

},
msg = mqtt_recv_rx.recv() => {
let parsed_msg = match msg {
if writer.is_none() {
continue;
}

match msg {
Some(msg) => {
msg
if let Some(writ) = writer.as_mut() {
if let Err(err) = writ.write(&msg.write_length_delimited_to_bytes().unwrap()).await {
warn!("Could not write to log! {}", err);
}
}
},
None => {
warn!("Could not receive message!");
continue;
}
};
}
//println!("{:?}", parsed_msg.write_to_bytes());
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use odysseus_daemon::{
numerical::collect_data,
playback_data,
visual::{run_save_pipeline, SavePipelineOpts},
HVTransition, PublishableMessage,
HVTransition, PublishableMessage, SAVE_LOCATION,
};
use rumqttc::v5::AsyncClient;
use tokio::{
Expand Down Expand Up @@ -99,6 +99,9 @@ async fn main() {
// use that subscriber to process traces emitted after this point
tracing::subscriber::set_global_default(subscriber).expect("Could not init tracing");

// set save location
SAVE_LOCATION.get_or_init(|| cli.output_folder);

// channel to pass the mqtt data
// TODO tune buffer size
let (mqtt_sender_tx, mqtt_sender_rx) = mpsc::channel::<PublishableMessage>(1000);
Expand Down Expand Up @@ -134,7 +137,6 @@ async fn main() {
cli.mock,
mute_stat_send,
mqtt_recv_tx,
cli.output_folder.clone(),
MqttProcessorOptions {
mqtt_path: cli.mqtt_url,
},
Expand All @@ -154,7 +156,6 @@ async fn main() {
video: cli
.video_uri
.expect("Must provide video URI if video is enabled!"),
save_location: cli.output_folder,
},
));
}
Expand All @@ -165,7 +166,7 @@ async fn main() {

if cli.lockdown {
info!("Running lockdown module");
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv));
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv.clone()));
}

if cli.audible {
Expand All @@ -174,7 +175,11 @@ async fn main() {
}
if cli.logger {
info!("Running logger module");
task_tracker.spawn(logger_manager(token.clone(), mqtt_recv_rx.unwrap()));
task_tracker.spawn(logger_manager(
token.clone(),
mqtt_recv_rx.unwrap(),
hv_stat_recv.clone(),
));
}

task_tracker.close();
Expand Down
Loading
Loading