diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 047950bf..2b56bc3a 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -24,10 +24,14 @@ pub struct Action { pub deadline: Option, } +const DEFAULT_RESPONSE_STREAM: &str = "action_status"; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ActionResponse { #[serde(alias = "id")] pub action_id: String, + #[serde(skip)] + pub action_name: Option, // sequence number pub sequence: u32, // timestamp @@ -48,6 +52,7 @@ impl ActionResponse { ActionResponse { action_id: id.to_owned(), + action_name: None, sequence: 0, timestamp, state: state.to_owned(), @@ -69,6 +74,10 @@ impl ActionResponse { self.progress == 100 } + pub fn set_action_name(&mut self, action_name: String) { + self.action_name = Some(action_name) + } + pub fn progress(id: &str, state: &str, progress: u8) -> Self { ActionResponse::new(id, state, progress, vec![]) } @@ -105,7 +114,7 @@ impl ActionResponse { impl Point for ActionResponse { fn stream_name(&self) -> &str { - "action_status" + self.action_name.as_ref().map(|n| n.as_str()).unwrap_or_else(|| DEFAULT_RESPONSE_STREAM) } fn sequence(&self) -> u32 { diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index c6468272..68fde89c 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize}; use tokio::select; use tokio::time::{self, interval, Instant, Sleep}; -use std::collections::HashSet; use std::fs; use std::path::PathBuf; use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration}; @@ -57,7 +56,7 @@ pub struct ActionsBridge { action_redirections: HashMap, /// Current action that is being processed current_action: Option, - parallel_actions: HashSet, + parallel_actions: HashMap, ctrl_rx: Receiver, ctrl_tx: Sender, shutdown_handle: Sender<()>, @@ -82,8 +81,11 @@ impl ActionsBridge { } action_status.buf_size = 1; streams_config.insert("action_status".to_owned(), action_status); - let mut streams = Streams::new(config.clone(), package_tx, metrics_tx); + let topic_template = + "/tenants/{tenant_id}/devices/{device_id}/action_status/{stream_name}".to_string(); + let mut streams = Streams::new(config.clone(), package_tx, metrics_tx, topic_template); streams.config_streams(streams_config); + streams.max_buf_size = 1; Self { status_tx, @@ -94,7 +96,7 @@ impl ActionsBridge { action_routes: HashMap::with_capacity(10), action_redirections, current_action: None, - parallel_actions: HashSet::new(), + parallel_actions: HashMap::new(), shutdown_handle, ctrl_rx, ctrl_tx, @@ -266,7 +268,7 @@ impl ActionsBridge { let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; // current action left unchanged in case of new tunshell action if action.name == TUNSHELL_ACTION { - self.parallel_actions.insert(action.action_id); + self.parallel_actions.insert(action.action_id, action.name); return Ok(()); } @@ -276,7 +278,7 @@ impl ActionsBridge { } async fn forward_action_response(&mut self, mut response: ActionResponse) { - if self.parallel_actions.contains(&response.action_id) { + if self.parallel_actions.contains_key(&response.action_id) { self.forward_parallel_action_response(response).await; return; @@ -296,7 +298,8 @@ impl ActionsBridge { } info!("Action response = {:?}", response); - self.streams.forward(response.clone()).await; + let action_name = inflight_action.action.name.to_owned(); + send_action_response(&mut self.streams, response.clone(), action_name).await; if response.is_completed() || response.is_failed() { self.clear_current_action(); @@ -316,7 +319,7 @@ impl ActionsBridge { // NOTE: send success reponse for actions that don't have redirections configured warn!("Action redirection is not configured for: {:?}", action); let response = ActionResponse::success(&action.action_id); - self.streams.forward(response).await; + send_action_response(&mut self.streams, response, action.name).await; self.clear_current_action(); } @@ -348,21 +351,32 @@ impl ActionsBridge { } async fn forward_parallel_action_response(&mut self, response: ActionResponse) { + let action_name = self.parallel_actions.get(&response.action_id).unwrap().to_owned(); info!("Action response = {:?}", response); if response.is_completed() || response.is_failed() { self.parallel_actions.remove(&response.action_id); } - self.streams.forward(response).await; + send_action_response(&mut self.streams, response, action_name).await; } async fn forward_action_error(&mut self, action: Action, error: Error) { let response = ActionResponse::failure(&action.action_id, error.to_string()); - self.streams.forward(response).await; + send_action_response(&mut self.streams, response, action.name).await; } } +async fn send_action_response( + streams: &mut Streams, + mut response: ActionResponse, + action_name: String, +) { + streams.forward(response.clone()).await; + response.set_action_name(action_name); + streams.forward(response).await; +} + #[derive(Debug, Deserialize, Serialize)] struct SaveAction { pub id: String, diff --git a/uplink/src/base/bridge/data_lane.rs b/uplink/src/base/bridge/data_lane.rs index 6e100fc1..c6edc723 100644 --- a/uplink/src/base/bridge/data_lane.rs +++ b/uplink/src/base/bridge/data_lane.rs @@ -36,7 +36,9 @@ impl DataBridge { let (data_tx, data_rx) = bounded(10); let (ctrl_tx, ctrl_rx) = bounded(1); - let mut streams = Streams::new(config.clone(), package_tx, metrics_tx); + let topic_template = + "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray".to_string(); + let mut streams = Streams::new(config.clone(), package_tx, metrics_tx, topic_template); streams.config_streams(config.streams.clone()); Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx } diff --git a/uplink/src/base/bridge/stream.rs b/uplink/src/base/bridge/stream.rs index d83c9354..10fd9b43 100644 --- a/uplink/src/base/bridge/stream.rs +++ b/uplink/src/base/bridge/stream.rs @@ -81,25 +81,23 @@ where } pub fn dynamic( - stream: impl Into, - project_id: impl Into, + stream_name: impl Into, + tenant_id: impl Into, device_id: impl Into, max_buffer_size: usize, tx: Sender>, + topic_template: &str, ) -> Stream { - let stream = stream.into(); - let project_id = project_id.into(); + let stream_name = stream_name.into(); + let tenant_id = tenant_id.into(); let device_id = device_id.into(); - let topic = String::from("/tenants/") - + &project_id - + "/devices/" - + &device_id - + "/events/" - + &stream - + "/jsonarray"; + // e.g. "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray" + let topic_template = topic_template.replace("{tenant_id}", &tenant_id); + let topic_template = topic_template.replace("{device_id}", &device_id); + let topic = topic_template.replace("{stream_name}", &stream_name); - Stream::new(stream, topic, max_buffer_size, tx, Compression::Disabled) + Stream::new(stream_name, topic, max_buffer_size, tx, Compression::Disabled) } fn add(&mut self, data: T) -> Result>, Error> { diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index 158f3870..aa66a85d 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -17,6 +17,8 @@ pub struct Streams { metrics_tx: Sender, map: HashMap>, pub stream_timeouts: DelayMap, + pub max_buf_size: usize, + topic_template: String, } impl Streams { @@ -24,8 +26,17 @@ impl Streams { config: Arc, data_tx: Sender>, metrics_tx: Sender, + topic_template: String, ) -> Self { - Self { config, data_tx, metrics_tx, map: HashMap::new(), stream_timeouts: DelayMap::new() } + Self { + config, + data_tx, + metrics_tx, + map: HashMap::new(), + stream_timeouts: DelayMap::new(), + topic_template, + max_buf_size: MAX_BUFFER_SIZE, + } } pub fn config_streams(&mut self, streams_config: HashMap) { @@ -50,8 +61,9 @@ impl Streams { &stream_name, &self.config.project_id, &self.config.device_id, - MAX_BUFFER_SIZE, + self.max_buf_size, self.data_tx.clone(), + &self.topic_template, ); self.map.entry(stream_name.to_owned()).or_insert(stream)