From 5837d0231782b07be11d91a1d6400f5ce4f2df62 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Nov 2023 17:42:10 +0530 Subject: [PATCH] feat: support actions on new topic --- uplink/src/base/mqtt/mod.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/uplink/src/base/mqtt/mod.rs b/uplink/src/base/mqtt/mod.rs index 07026aad..48cda503 100644 --- a/uplink/src/base/mqtt/mod.rs +++ b/uplink/src/base/mqtt/mod.rs @@ -81,7 +81,13 @@ impl Mqtt { match self.eventloop.poll().await { Ok(Event::Incoming(Incoming::ConnAck(connack))) => { info!("Connected to broker. Session present = {}", connack.session_present); - let subscription = self.config.actions_subscription.clone(); + // Subscribe to both old and new action topics: + // - /tenants/{tenant_id}/devices/{device_id}/actions + // - /tenants/{tenant_id}/devices/{device_id}/actions/+ + let base_topic = self.config.actions_subscription.clone(); + let new_topic = format!("{base_topic}/+"); + let subscriptions = vec![base_topic, new_topic]; + let client = self.client(); self.metrics.add_connection(); @@ -89,9 +95,11 @@ impl Mqtt { // This can potentially block when client from other threads // have already filled the channel due to bad network. So we spawn task::spawn(async move { - match client.subscribe(&subscription, QoS::AtLeastOnce).await { - Ok(..) => info!("Subscribe -> {:?}", subscription), - Err(e) => error!("Failed to send subscription. Error = {:?}", e), + for subscription in subscriptions { + match client.subscribe(&subscription, QoS::AtLeastOnce).await { + Ok(..) => info!("Subscribe -> {:?}", subscription), + Err(e) => error!("Failed to send subscription. Error = {:?}", e), + } } }); } @@ -137,11 +145,15 @@ impl Mqtt { } fn handle_incoming_publish(&mut self, publish: Publish) -> Result<(), Error> { - if self.config.actions_subscription != publish.topic { + if !publish.topic.starts_with(&self.config.actions_subscription) { error!("Unsolicited publish on {}", publish.topic); return Ok(()); } + if self.config.actions_subscription != publish.topic { + // Set a marker in recevied action for later use + } + let action: Action = serde_json::from_slice(&publish.payload)?; info!("Action = {:?}", action); self.native_actions_tx.try_send(action)?;