diff --git a/crates/core/src/bc_protocol/connection/udpsource.rs b/crates/core/src/bc_protocol/connection/udpsource.rs index cb35b4bb..d22c8844 100644 --- a/crates/core/src/bc_protocol/connection/udpsource.rs +++ b/crates/core/src/bc_protocol/connection/udpsource.rs @@ -296,7 +296,7 @@ impl Stream for UdpPayloadSource { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let camera_addr = self.inner.addr; - let mut this = self.get_mut(); + let this = self.get_mut(); match this.state { State::Normal => { // this.state = State::YieldNow; @@ -417,7 +417,7 @@ impl Sink> for UdpPayloadSource { } } fn start_send(self: Pin<&mut Self>, item: Vec) -> std::result::Result<(), Self::Error> { - let mut this = self.get_mut(); + let this = self.get_mut(); for chunk in item.chunks(MTU - UDPDATA_HEADER_SIZE) { let udp_data = UdpData { connection_id: this.camera_id, @@ -487,7 +487,7 @@ impl Sink> for UdpPayloadSource { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let mut this = self.get_mut(); + let this = self.get_mut(); if let State::Closed = this.state { return Poll::Ready(Ok(())); } diff --git a/crates/core/src/bc_protocol/ptz.rs b/crates/core/src/bc_protocol/ptz.rs index c06fee06..3c925d33 100644 --- a/crates/core/src/bc_protocol/ptz.rs +++ b/crates/core/src/bc_protocol/ptz.rs @@ -15,6 +15,8 @@ pub enum Direction { In, /// To zoom the camera Out (may be done with cropping depending on camera model) Out, + /// To stop currently active PTZ command + Stop, } impl BcCamera { @@ -36,6 +38,7 @@ impl BcCamera { Direction::Out => { todo!() } + Direction::Stop => "stop", } .to_string(); let send = Bc { diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index fe0ae263..91fde9fa 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -9,8 +9,11 @@ use std::sync::Arc; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task::JoinSet, + time::sleep, }; +use core::time::Duration; + #[derive(Debug, Copy, Clone)] pub(crate) enum Messages { Login, @@ -31,12 +34,12 @@ pub(crate) enum Messages { #[derive(Debug, Copy, Clone)] pub(crate) enum Direction { - Up(f32), - Down(f32), - Left(f32), - Right(f32), - In(f32), - Out(f32), + Up(f32, f32), + Down(f32, f32), + Left(f32, f32), + Right(f32, f32), + In(f32, f32), + Out(f32, f32), } #[derive(Debug)] @@ -408,19 +411,42 @@ impl<'a> MessageHandler<'a> { } } Messages::Ptz(direction) => { - let (bc_direction, amount) = match direction { - Direction::Up(amount) => (BcDirection::Up, amount), - Direction::Down(amount) => (BcDirection::Down, amount), - Direction::Left(amount) => (BcDirection::Left, amount), - Direction::Right(amount) => (BcDirection::Right, amount), - Direction::In(amount) => (BcDirection::In, amount), - Direction::Out(amount) => (BcDirection::Out, amount), + let (bc_direction, amount, seconds) = match direction { + Direction::Up(amount, seconds) => { + (BcDirection::Up, amount, seconds) + } + Direction::Down(amount, seconds) => { + (BcDirection::Down, amount, seconds) + } + Direction::Left(amount, seconds) => { + (BcDirection::Left, amount, seconds) + } + Direction::Right(amount, seconds) => { + (BcDirection::Right, amount, seconds) + } + Direction::In(amount, seconds) => { + (BcDirection::In, amount, seconds) + } + Direction::Out(amount, seconds) => { + (BcDirection::Out, amount, seconds) + } }; if let Err(e) = self.camera.send_ptz(bc_direction, amount).await { error = Some(format!("Failed to send PTZ: {:?}", e)); "FAIL".to_string() } else { - "OK".to_string() + // sleep for the designated seconds + sleep(Duration::from_secs_f32(seconds)).await; + + // note that amount is not used in the stop command + if let Err(e) = + self.camera.send_ptz(BcDirection::Stop, amount).await + { + error = Some(format!("Failed to send PTZ: {:?}", e)); + "FAIL".to_string() + } else { + "OK".to_string() + } } } _ => "UNKNOWN COMMAND".to_string(), diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index dcdb2ab2..15e4ab05 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -67,6 +67,11 @@ pub(crate) use event_cam::{Direction, Messages}; use log::*; use mqttc::{Mqtt, MqttReplyRef}; +use self::{ + event_cam::EventCamSender, + mqttc::{MqttReply, MqttSender}, +}; + /// Entry point for the mqtt subcommand /// /// Opt is the command line options @@ -118,175 +123,36 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi // Listen on mqtt messages and post on camera let camera_name = cam_config.name.clone(); + // When one error is recieved we pass it up async + let (error_send, mut error_recv) = tokio::sync::mpsc::channel::(1); let mqtt_to_cam = async { - while let Ok(msg) = mqtt.poll().await { - tokio::task::yield_now().await; - let mut reply = None; - let mut reply_topic = None; - match msg.as_ref() { - MqttReplyRef { - topic: "control/led", - message: "on", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::StatusLedOn) - .await - .with_context(|| "Failed to set camera status light on")?, - ); - } - MqttReplyRef { - topic: "control/led", - message: "off", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::StatusLedOff) - .await - .with_context(|| "Failed to set camera status light off")?, - ); - } - MqttReplyRef { - topic: "control/ir", - message: "on", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::IRLedOn) - .await - .with_context(|| "Failed to set camera status light on")?, - ); - } - MqttReplyRef { - topic: "control/ir", - message: "off", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::IRLedOff) - .await - .with_context(|| "Failed to set camera status light off")?, - ); - } - MqttReplyRef { - topic: "control/ir", - message: "auto", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::IRLedAuto) - .await - .with_context(|| "Failed to set camera status light off")?, - ); - } - MqttReplyRef { - topic: "control/reboot", - .. - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::Reboot) - .await - .with_context(|| "Failed to set camera status light off")?, - ); - } - MqttReplyRef { - topic: "control/ptz", - message, - } => { - let lowercase_message = message.to_lowercase(); - let mut words = lowercase_message.split_whitespace(); - if let Some(direction_txt) = words.next() { - let amount = words.next().unwrap_or("32.0"); - if let Ok(amount) = amount.parse::() { - let direction = match direction_txt { - "up" => Direction::Up(amount), - "down" => Direction::Down(amount), - "left" => Direction::Left(amount), - "right" => Direction::Right(amount), - "in" => Direction::In(amount), - "out" => Direction::Out(amount), - _ => { - error!("Unrecongnized PTZ direction"); - continue; - } - }; - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::Ptz(direction)) - .await - .with_context(|| "Failed to send PTZ")?, - ); - } else { - error!("No PTZ direction speed was not a valid number"); + // Select to wait on an error (via the channel) or normal poll ops + tokio::select! { + v = async { + // Normal poll operations + while let Ok(msg) = mqtt.poll().await { + tokio::task::yield_now().await; + // Put the reply on it's own async thread so we can safely sleep + // and wait for it to reply in it's own time + let event_cam_sender = event_cam_sender.clone(); + let mqtt_sender_mqtt = mqtt_sender_mqtt.clone(); + let error_send = error_send.clone(); + tokio::task::spawn(async move { + // Handle the message and wait for ok/error on this thread + let result: Result<()> = handle_mqtt_message(&msg, event_cam_sender, mqtt_sender_mqtt).await; + // If there is an error we pass it to the channel + // this allows for async error handelling + if let Err(e) = result { + let _ = error_send.try_send(e); } - } else { - error!("No PTZ Direction given. Please add up/down/left/right/in/out"); - } - } - MqttReplyRef { - topic: "control/pir", - message: "on", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::PIROn) - .await - .with_context(|| "Failed to set pir on")?, - ); - } - MqttReplyRef { - topic: "control/pir", - message: "off", - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::PIROff) - .await - .with_context(|| "Failed to set pir off")?, - ); - } - MqttReplyRef { - topic: "query/battery", - .. - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::Battery) - .await - .with_context(|| "Failed to get battery status")?, - ); - reply_topic = Some("status/battery"); - } - MqttReplyRef { - topic: "query/pir", .. - } => { - reply = Some( - event_cam_sender - .send_message_with_reply(Messages::PIRQuery) - .await - .with_context(|| "Failed to get pir status")?, - ); - reply_topic = Some("status/pir"); + }); } - _ => {} - } - if let Some(reply) = reply { - if let Some(topic) = reply_topic { - mqtt_sender_mqtt - .send_message(topic, &reply, false) - .await - .with_context(|| "Failed to send Camera reply to Mqtt")?; - } else { - mqtt_sender_mqtt - .send_message(&msg.topic, &reply, false) - .await - .with_context(|| "Failed to send Camera reply to Mqtt")?; - } - } + Ok(()) + } => v, + // Wait on any error from any of the error channels and if we get it we abort + v = error_recv.recv() => v.map(Err).unwrap_or_else(|| Err(anyhow!("Listen on camera error channel closed"))), } - Result::<(), Error>::Ok(()) }; let cam_to_mqtt = async { @@ -329,3 +195,195 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi Ok(()) } + +async fn handle_mqtt_message( + msg: &MqttReply, + event_cam_sender: EventCamSender, + mqtt_sender_mqtt: MqttSender, +) -> Result<()> { + let mut reply = None; + let mut reply_topic = None; + match msg.as_ref() { + MqttReplyRef { + topic: _, + message: "OK", + } + | MqttReplyRef { + topic: _, + message: "FAIL", + } => { + // Do nothing for the success/fail replies + } + MqttReplyRef { + topic: "control/led", + message: "on", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::StatusLedOn) + .await + .with_context(|| "Failed to set camera status light on")?, + ); + } + MqttReplyRef { + topic: "control/led", + message: "off", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::StatusLedOff) + .await + .with_context(|| "Failed to set camera status light off")?, + ); + } + MqttReplyRef { + topic: "control/ir", + message: "on", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::IRLedOn) + .await + .with_context(|| "Failed to set camera status light on")?, + ); + } + MqttReplyRef { + topic: "control/ir", + message: "off", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::IRLedOff) + .await + .with_context(|| "Failed to set camera status light off")?, + ); + } + MqttReplyRef { + topic: "control/ir", + message: "auto", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::IRLedAuto) + .await + .with_context(|| "Failed to set camera status light off")?, + ); + } + MqttReplyRef { + topic: "control/reboot", + .. + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::Reboot) + .await + .with_context(|| "Failed to set camera status light off")?, + ); + } + MqttReplyRef { + topic: "control/ptz", + message, + } => { + let lowercase_message = message.to_lowercase(); + let mut words = lowercase_message.split_whitespace(); + if let Some(direction_txt) = words.next() { + // Target amount to move + let amount = words.next().unwrap_or("32.0"); + if let Ok(amount) = amount.parse::() { + let seconds = amount / 32f32; + // range checking on seconds so that you can't sleep for 3.4E+38 seconds + match seconds { + x if (0.0..10.0).contains(&x) => seconds, + _ => { + error!("seconds was not a valid number (out of range)"); + return Ok(()); + } + }; + + let direction = match direction_txt { + "up" => Direction::Up(amount, seconds), + "down" => Direction::Down(amount, seconds), + "left" => Direction::Left(amount, seconds), + "right" => Direction::Right(amount, seconds), + "in" => Direction::In(amount, seconds), + "out" => Direction::Out(amount, seconds), + _ => { + error!("Unrecognized PTZ direction \"{}\"", direction_txt); + return Ok(()); + } + }; + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::Ptz(direction)) + .await + .with_context(|| "Failed to send PTZ")?, + ); + } else { + error!("No PTZ direction speed was not a valid number"); + } + } else { + error!("No PTZ Direction given. Please add up/down/left/right/in/out"); + } + } + MqttReplyRef { + topic: "control/pir", + message: "on", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::PIROn) + .await + .with_context(|| "Failed to set pir on")?, + ); + } + MqttReplyRef { + topic: "control/pir", + message: "off", + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::PIROff) + .await + .with_context(|| "Failed to set pir off")?, + ); + } + MqttReplyRef { + topic: "query/battery", + .. + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::Battery) + .await + .with_context(|| "Failed to get battery status")?, + ); + reply_topic = Some("status/battery"); + } + MqttReplyRef { + topic: "query/pir", .. + } => { + reply = Some( + event_cam_sender + .send_message_with_reply(Messages::PIRQuery) + .await + .with_context(|| "Failed to get pir status")?, + ); + reply_topic = Some("status/pir"); + } + _ => {} + } + if let Some(reply) = reply { + if let Some(topic) = reply_topic { + mqtt_sender_mqtt + .send_message(topic, &reply, false) + .await + .with_context(|| "Failed to send Camera reply to Mqtt")?; + } else { + mqtt_sender_mqtt + .send_message(&msg.topic, &reply, false) + .await + .with_context(|| "Failed to send Camera reply to Mqtt")?; + } + } + Ok(()) +} diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index 87e85b1a..1cb2317b 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -43,6 +43,7 @@ struct MqttReciever { name: String, } +#[derive(Clone)] pub(crate) struct MqttSender { client: Arc, name: String,