Skip to content

Commit

Permalink
feat: support actions on new topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Nov 17, 2023
1 parent 228a40b commit 5837d02
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,25 @@ impl Mqtt {
match self.eventloop.poll().await {
Ok(Event::Incoming(Incoming::ConnAck(connack))) => {
info!("Connected to broker. Session present = {}", connack.session_present);
let subscription = self.config.actions_subscription.clone();
// Subscribe to both old and new action topics:
// - /tenants/{tenant_id}/devices/{device_id}/actions
// - /tenants/{tenant_id}/devices/{device_id}/actions/+
let base_topic = self.config.actions_subscription.clone();
let new_topic = format!("{base_topic}/+");
let subscriptions = vec![base_topic, new_topic];

let client = self.client();

self.metrics.add_connection();

// This can potentially block when client from other threads
// have already filled the channel due to bad network. So we spawn
task::spawn(async move {
match client.subscribe(&subscription, QoS::AtLeastOnce).await {
Ok(..) => info!("Subscribe -> {:?}", subscription),
Err(e) => error!("Failed to send subscription. Error = {:?}", e),
for subscription in subscriptions {
match client.subscribe(&subscription, QoS::AtLeastOnce).await {
Ok(..) => info!("Subscribe -> {:?}", subscription),
Err(e) => error!("Failed to send subscription. Error = {:?}", e),
}
}
});
}
Expand Down Expand Up @@ -137,11 +145,15 @@ impl Mqtt {
}

fn handle_incoming_publish(&mut self, publish: Publish) -> Result<(), Error> {
if self.config.actions_subscription != publish.topic {
if !publish.topic.starts_with(&self.config.actions_subscription) {
error!("Unsolicited publish on {}", publish.topic);
return Ok(());
}

if self.config.actions_subscription != publish.topic {
// Set a marker in recevied action for later use
}

let action: Action = serde_json::from_slice(&publish.payload)?;
info!("Action = {:?}", action);
self.native_actions_tx.try_send(action)?;
Expand Down

0 comments on commit 5837d02

Please sign in to comment.