Skip to content

Commit

Permalink
feat: action_status on action specific stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Nov 16, 2023
1 parent 3a2374c commit d2e6fd3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 26 deletions.
11 changes: 10 additions & 1 deletion uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ pub struct Action {
pub deadline: Option<Instant>,
}

const DEFAULT_RESPONSE_STREAM: &str = "action_status";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActionResponse {
#[serde(alias = "id")]
pub action_id: String,
#[serde(skip)]
pub action_name: Option<String>,
// sequence number
pub sequence: u32,
// timestamp
Expand All @@ -48,6 +52,7 @@ impl ActionResponse {

ActionResponse {
action_id: id.to_owned(),
action_name: None,
sequence: 0,
timestamp,
state: state.to_owned(),
Expand All @@ -69,6 +74,10 @@ impl ActionResponse {
self.progress == 100
}

pub fn set_action_name(&mut self, action_name: String) {
self.action_name = Some(action_name)
}

pub fn progress(id: &str, state: &str, progress: u8) -> Self {
ActionResponse::new(id, state, progress, vec![])
}
Expand Down Expand Up @@ -105,7 +114,7 @@ impl ActionResponse {

impl Point for ActionResponse {
fn stream_name(&self) -> &str {
"action_status"
self.action_name.as_ref().map(|n| n.as_str()).unwrap_or_else(|| DEFAULT_RESPONSE_STREAM)
}

fn sequence(&self) -> u32 {
Expand Down
34 changes: 24 additions & 10 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::time::{self, interval, Instant, Sleep};

use std::collections::HashSet;
use std::fs;
use std::path::PathBuf;
use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration};
Expand Down Expand Up @@ -57,7 +56,7 @@ pub struct ActionsBridge {
action_redirections: HashMap<String, String>,
/// Current action that is being processed
current_action: Option<CurrentAction>,
parallel_actions: HashSet<String>,
parallel_actions: HashMap<String, String>,
ctrl_rx: Receiver<ActionBridgeShutdown>,
ctrl_tx: Sender<ActionBridgeShutdown>,
shutdown_handle: Sender<()>,
Expand All @@ -82,8 +81,11 @@ impl ActionsBridge {
}
action_status.buf_size = 1;
streams_config.insert("action_status".to_owned(), action_status);
let mut streams = Streams::new(config.clone(), package_tx, metrics_tx);
let topic_template =
"/tenants/{tenant_id}/devices/{device_id}/action_status/{stream_name}".to_string();
let mut streams = Streams::new(config.clone(), package_tx, metrics_tx, topic_template);
streams.config_streams(streams_config);
streams.max_buf_size = 1;

Self {
status_tx,
Expand All @@ -94,7 +96,7 @@ impl ActionsBridge {
action_routes: HashMap::with_capacity(10),
action_redirections,
current_action: None,
parallel_actions: HashSet::new(),
parallel_actions: HashMap::new(),
shutdown_handle,
ctrl_rx,
ctrl_tx,
Expand Down Expand Up @@ -266,7 +268,7 @@ impl ActionsBridge {
let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
// current action left unchanged in case of new tunshell action
if action.name == TUNSHELL_ACTION {
self.parallel_actions.insert(action.action_id);
self.parallel_actions.insert(action.action_id, action.name);
return Ok(());
}

Expand All @@ -276,7 +278,7 @@ impl ActionsBridge {
}

async fn forward_action_response(&mut self, mut response: ActionResponse) {
if self.parallel_actions.contains(&response.action_id) {
if self.parallel_actions.contains_key(&response.action_id) {
self.forward_parallel_action_response(response).await;

return;
Expand All @@ -296,7 +298,8 @@ impl ActionsBridge {
}

info!("Action response = {:?}", response);
self.streams.forward(response.clone()).await;
let action_name = inflight_action.action.name.to_owned();
send_action_response(&mut self.streams, response.clone(), action_name).await;

if response.is_completed() || response.is_failed() {
self.clear_current_action();
Expand All @@ -316,7 +319,7 @@ impl ActionsBridge {
// NOTE: send success reponse for actions that don't have redirections configured
warn!("Action redirection is not configured for: {:?}", action);
let response = ActionResponse::success(&action.action_id);
self.streams.forward(response).await;
send_action_response(&mut self.streams, response, action.name).await;

self.clear_current_action();
}
Expand Down Expand Up @@ -348,21 +351,32 @@ impl ActionsBridge {
}

async fn forward_parallel_action_response(&mut self, response: ActionResponse) {
let action_name = self.parallel_actions.get(&response.action_id).unwrap().to_owned();
info!("Action response = {:?}", response);
if response.is_completed() || response.is_failed() {
self.parallel_actions.remove(&response.action_id);
}

self.streams.forward(response).await;
send_action_response(&mut self.streams, response, action_name).await;
}

async fn forward_action_error(&mut self, action: Action, error: Error) {
let response = ActionResponse::failure(&action.action_id, error.to_string());

self.streams.forward(response).await;
send_action_response(&mut self.streams, response, action.name).await;
}
}

async fn send_action_response(
streams: &mut Streams<ActionResponse>,
mut response: ActionResponse,
action_name: String,
) {
streams.forward(response.clone()).await;
response.set_action_name(action_name);
streams.forward(response).await;
}

#[derive(Debug, Deserialize, Serialize)]
struct SaveAction {
pub id: String,
Expand Down
4 changes: 3 additions & 1 deletion uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ impl DataBridge {
let (data_tx, data_rx) = bounded(10);
let (ctrl_tx, ctrl_rx) = bounded(1);

let mut streams = Streams::new(config.clone(), package_tx, metrics_tx);
let topic_template =
"/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray".to_string();
let mut streams = Streams::new(config.clone(), package_tx, metrics_tx, topic_template);
streams.config_streams(config.streams.clone());

Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx }
Expand Down
22 changes: 10 additions & 12 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,23 @@ where
}

pub fn dynamic(
stream: impl Into<String>,
project_id: impl Into<String>,
stream_name: impl Into<String>,
tenant_id: impl Into<String>,
device_id: impl Into<String>,
max_buffer_size: usize,
tx: Sender<Box<dyn Package>>,
topic_template: &str,
) -> Stream<T> {
let stream = stream.into();
let project_id = project_id.into();
let stream_name = stream_name.into();
let tenant_id = tenant_id.into();
let device_id = device_id.into();

let topic = String::from("/tenants/")
+ &project_id
+ "/devices/"
+ &device_id
+ "/events/"
+ &stream
+ "/jsonarray";
// e.g. "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray"
let topic_template = topic_template.replace("{tenant_id}", &tenant_id);
let topic_template = topic_template.replace("{device_id}", &device_id);
let topic = topic_template.replace("{stream_name}", &stream_name);

Stream::new(stream, topic, max_buffer_size, tx, Compression::Disabled)
Stream::new(stream_name, topic, max_buffer_size, tx, Compression::Disabled)
}

fn add(&mut self, data: T) -> Result<Option<Buffer<T>>, Error> {
Expand Down
16 changes: 14 additions & 2 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@ pub struct Streams<T> {
metrics_tx: Sender<StreamMetrics>,
map: HashMap<String, Stream<T>>,
pub stream_timeouts: DelayMap<String>,
pub max_buf_size: usize,
topic_template: String,
}

impl<T: Point> Streams<T> {
pub fn new(
config: Arc<Config>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
topic_template: String,
) -> Self {
Self { config, data_tx, metrics_tx, map: HashMap::new(), stream_timeouts: DelayMap::new() }
Self {
config,
data_tx,
metrics_tx,
map: HashMap::new(),
stream_timeouts: DelayMap::new(),
topic_template,
max_buf_size: MAX_BUFFER_SIZE,
}
}

pub fn config_streams(&mut self, streams_config: HashMap<String, StreamConfig>) {
Expand All @@ -50,8 +61,9 @@ impl<T: Point> Streams<T> {
&stream_name,
&self.config.project_id,
&self.config.device_id,
MAX_BUFFER_SIZE,
self.max_buf_size,
self.data_tx.clone(),
&self.topic_template,
);

self.map.entry(stream_name.to_owned()).or_insert(stream)
Expand Down

0 comments on commit d2e6fd3

Please sign in to comment.