Skip to content

Commit

Permalink
refactor: action registration
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Feb 5, 2024
1 parent c23eab6 commit c3cf4fe
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 36 deletions.
21 changes: 11 additions & 10 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use flume::{Receiver, Sender};
use flume::{bounded, Receiver, Sender};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -101,20 +101,21 @@ impl Bridge {
(self.actions.ctrl_tx(), self.data.ctrl_tx())
}

pub fn register_action_route(
&mut self,
route: ActionRoute,
actions_tx: Sender<Action>,
) -> Result<(), Error> {
self.actions.register_action_route(route, actions_tx)
pub fn register_action_route(&mut self, route: ActionRoute) -> Result<Receiver<Action>, Error> {
let (actions_tx, actions_rx) = bounded(1);
self.actions.register_action_route(route, actions_tx)?;

Ok(actions_rx)
}

pub fn register_action_routes<R: Into<ActionRoute>, V: IntoIterator<Item = R>>(
&mut self,
routes: V,
actions_tx: Sender<Action>,
) -> Result<(), Error> {
self.actions.register_action_routes(routes, actions_tx)
) -> Result<Receiver<Action>, Error> {
let (actions_tx, actions_rx) = bounded(1);
self.actions.register_action_routes(routes, actions_tx)?;

Ok(actions_rx)
}
}

Expand Down
21 changes: 7 additions & 14 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,12 @@ impl Uplink {

let route =
ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) };
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone());
spawn_named_thread("Tunshell Client", move || tunshell_client.start());

if !self.config.downloader.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?;
let file_downloader =
FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?;
spawn_named_thread("File Downloader", || file_downloader.start());
Expand All @@ -444,8 +442,7 @@ impl Uplink {
spawn_named_thread("Device Shadow Generator", move || device_shadow.start());

if !self.config.ota_installer.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.ota_installer.actions)?;
let ota_installer =
OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone());
spawn_named_thread("OTA Installer", move || ota_installer.start());
Expand All @@ -457,8 +454,7 @@ impl Uplink {
name: "journalctl_config".to_string(),
timeout: Duration::from_secs(10),
};
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone());
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
Expand All @@ -473,8 +469,7 @@ impl Uplink {
name: "journalctl_config".to_string(),
timeout: Duration::from_secs(10),
};
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let actions_rx = bridge.register_action_route(route)?;
let logger = Logcat::new(config, actions_rx, bridge_tx.clone());
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
Expand All @@ -489,8 +484,7 @@ impl Uplink {
};

if !self.config.processes.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.processes, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.processes)?;
let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone());
spawn_named_thread("Process Handler", || {
if let Err(e) = process_handler.start() {
Expand All @@ -500,8 +494,7 @@ impl Uplink {
}

if !self.config.script_runner.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.script_runner, actions_tx)?;
let actions_rx = bridge.register_action_routes(&self.config.script_runner)?;
let script_runner = ScriptRunner::new(actions_rx, bridge_tx);
spawn_named_thread("Script Runner", || {
if let Err(e) = script_runner.start() {
Expand Down
19 changes: 7 additions & 12 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Error;
use flume::bounded;
use log::info;
use structopt::StructOpt;
use tokio::time::sleep;
Expand Down Expand Up @@ -133,23 +132,19 @@ fn main() -> Result<(), Error> {
for (app, cfg) in config.tcpapps.clone() {
let mut route_rx = None;
if !cfg.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&cfg.actions, actions_tx)?;
let actions_rx = bridge.register_action_routes(&cfg.actions)?;
route_rx = Some(actions_rx)
}
tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx()));
}

let simulator_actions = config.simulator.as_ref().and_then(|cfg| {
let mut route_rx = None;
if !cfg.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&cfg.actions, actions_tx).unwrap();
route_rx = Some(actions_rx)
let simulator_actions = match &config.simulator {
Some(cfg) if !cfg.actions.is_empty() => {
let actions_rx = bridge.register_action_routes(&cfg.actions)?;
Some(actions_rx)
}

route_rx
});
_ => None,
};

let ctrl_tx = uplink.spawn(bridge)?;

Expand Down

0 comments on commit c3cf4fe

Please sign in to comment.