Skip to content

Commit

Permalink
Merge branch 'main' into preconditions
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi authored Feb 7, 2024
2 parents a504c03 + f601a9b commit cf3c440
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 44 deletions.
2 changes: 1 addition & 1 deletion tools/utils/src/push_to_uplink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() {

fn argv_to_payload(pairs: &[String]) -> Value {
// nlici
let stream = pairs.get(0).unwrap();
let stream = pairs.first().unwrap();
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
let kv_count = pairs.len() - 1;
assert_eq!(kv_count % 2, 0);
Expand Down
21 changes: 11 additions & 10 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use flume::{Receiver, Sender};
use flume::{bounded, Receiver, Sender};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -101,20 +101,21 @@ impl Bridge {
(self.actions.ctrl_tx(), self.data.ctrl_tx())
}

pub fn register_action_route(
&mut self,
route: ActionRoute,
actions_tx: Sender<Action>,
) -> Result<(), Error> {
self.actions.register_action_route(route, actions_tx)
pub fn register_action_route(&mut self, route: ActionRoute) -> Result<Receiver<Action>, Error> {
let (actions_tx, actions_rx) = bounded(1);
self.actions.register_action_route(route, actions_tx)?;

Ok(actions_rx)
}

pub fn register_action_routes<R: Into<ActionRoute>, V: IntoIterator<Item = R>>(
&mut self,
routes: V,
actions_tx: Sender<Action>,
) -> Result<(), Error> {
self.actions.register_action_routes(routes, actions_tx)
) -> Result<Receiver<Action>, Error> {
let (actions_tx, actions_rx) = bounded(1);
self.actions.register_action_routes(routes, actions_tx)?;

Ok(actions_rx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ fn check_and_flush_metrics(
metrics.set_disk_utilized(disk_utilized);

// Send pending metrics. This signifies state change
while let Some(metrics) = pending.get(0) {
while let Some(metrics) = pending.front() {
match metrics {
SerializerMetrics::Main(metrics) => {
// Always send pending metrics. They represent state changes
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/collector/systemstats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl StatCollector {
self.sys.refresh_processes();
let timestamp = clock() as u64;
for (&id, p) in self.sys.processes() {
let name = p.cmd().get(0).map(|s| s.to_string()).unwrap_or(p.name().to_string());
let name = p.cmd().first().map(|s| s.to_string()).unwrap_or(p.name().to_string());

if self.config.system_stats.process_names.contains(&name) {
let payload = self.processes.push(id.as_u32(), p, name, timestamp);
Expand Down
23 changes: 8 additions & 15 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,12 @@ impl Uplink {

let route =
ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) };
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone());
spawn_named_thread("Tunshell Client", move || tunshell_client.start());

if !self.config.downloader.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?;
let file_downloader =
FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?;
spawn_named_thread("File Downloader", || file_downloader.start());
Expand All @@ -445,8 +443,7 @@ impl Uplink {
spawn_named_thread("Device Shadow Generator", move || device_shadow.start());

if !self.config.ota_installer.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.ota_installer.actions)?;
let ota_installer =
OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone());
spawn_named_thread("OTA Installer", move || ota_installer.start());
Expand All @@ -458,8 +455,7 @@ impl Uplink {
name: "journalctl_config".to_string(),
timeout: Duration::from_secs(10),
};
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone());
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
Expand All @@ -474,8 +470,7 @@ impl Uplink {
name: "journalctl_config".to_string(),
timeout: Duration::from_secs(10),
};
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let logger = Logcat::new(config, actions_rx, bridge_tx.clone());
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
Expand All @@ -490,8 +485,7 @@ impl Uplink {
};

if !self.config.processes.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.processes, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.processes)?;
let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone());
spawn_named_thread("Process Handler", || {
if let Err(e) = process_handler.start() {
Expand All @@ -501,9 +495,8 @@ impl Uplink {
}

if !self.config.script_runner.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.script_runner, actions_tx)?;
let script_runner = ScriptRunner::new(actions_rx, bridge_tx.clone());
let actions_rx = bridge.register_action_routes(&self.config.script_runner)?;
let script_runner = ScriptRunner::new(actions_rx, bridge_tx);
spawn_named_thread("Script Runner", || {
if let Err(e) = script_runner.start() {
error!("Script runner stopped!! Error = {:?}", e);
Expand Down
28 changes: 12 additions & 16 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Error;
use flume::bounded;
use log::info;
use structopt::StructOpt;
use tokio::time::sleep;
Expand Down Expand Up @@ -131,25 +130,22 @@ fn main() -> Result<(), Error> {

let mut tcpapps = vec![];
for (app, cfg) in config.tcpapps.clone() {
let mut route_rx = None;
if !cfg.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&cfg.actions, actions_tx)?;
route_rx = Some(actions_rx)
}
let route_rx = if !cfg.actions.is_empty() {
let actions_rx = bridge.register_action_routes(&cfg.actions)?;
Some(actions_rx)
} else {
None
};
tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx()));
}

let simulator_actions = config.simulator.as_ref().and_then(|cfg| {
let mut route_rx = None;
if !cfg.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&cfg.actions, actions_tx).unwrap();
route_rx = Some(actions_rx)
let simulator_actions = match &config.simulator {
Some(cfg) if !cfg.actions.is_empty() => {
let actions_rx = bridge.register_action_routes(&cfg.actions)?;
Some(actions_rx)
}

route_rx
});
_ => None,
};

let ctrl_tx = uplink.spawn(bridge)?;

Expand Down

0 comments on commit cf3c440

Please sign in to comment.