From edb347377d04aea5d6ff1fc59665c9d9a690cb46 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Tue, 3 Oct 2023 07:01:55 +0200 Subject: [PATCH 01/15] WIP test with mqtt --- Cargo.toml | 1 + src/homeassistant/mod.rs | 67 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 134a026..c56d9cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ url = { version = "2.4.1", features = ["serde"] } [dev-dependencies] fluent-asserter = "0.1.9" mockito = "1.2.0" +testcontainers = "0.14.0" diff --git a/src/homeassistant/mod.rs b/src/homeassistant/mod.rs index ac344db..cb2c89f 100644 --- a/src/homeassistant/mod.rs +++ b/src/homeassistant/mod.rs @@ -1,6 +1,7 @@ use crate::mitaffald::Container; use crate::settings::MQTTConfig; use rumqttc::{Client, LastWill, MqttOptions}; + const HA_AVAILABILITY_TOPIC: &str = "garbage_bin/availability"; impl From for MqttOptions { @@ -18,6 +19,7 @@ impl From for MqttOptions { config } } + pub struct HASensor { pub container_id: String, configure_topic: String, @@ -135,10 +137,63 @@ impl HASensor { } } -//can we use asref here? +//generate tests for this module +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use testcontainers::{clients, core::WaitFor}; + + #[test] + fn smoke_test() { + let docker = clients::Cli::default(); + let hive = docker.run(HiveMQContainer::default()); + let port = hive.get_host_port_ipv4(1883); + + println!("Ip address: {}", hive.get_bridge_ip_address()); + println!("HiveMQ is listening on port {}", port); + } + + const NAME: &str = "hivemq/hivemq-ce"; + const TAG: &str = "latest"; + + struct HiveMQContainer { + _env_vars: HashMap, + tag: String, + } + + impl Default for HiveMQContainer { + fn default() -> Self { + let mut env_vars = HashMap::new(); + env_vars.insert("discovery.type".to_owned(), "single-node".to_owned()); + HiveMQContainer { + _env_vars: env_vars, + tag: TAG.to_owned(), + } + } + } -// impl Into for Container { -// fn into(&self) -> HASensor { -// HASensor::new(&self) -// } -// } + impl testcontainers::Image for HiveMQContainer { + type Args = (); + + fn name(&self) -> String { + NAME.to_owned() + } + + fn tag(&self) -> String { + self.tag.to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Started HiveMQ in")] + } + + fn expose_ports(&self) -> Vec { + vec![1883] + } + + // fn bla (&self){ + // self. + // } + } +} From 9b2500dcec55b5851cb84afacd4a1e1e5b4e75de Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Mon, 23 Oct 2023 05:42:11 +0200 Subject: [PATCH 02/15] Add full flow integration test --- Cargo.toml | 2 + src/homeassistant/mod.rs | 64 +------------- src/lib.rs | 43 +++++++++ src/main.rs | 36 +------- src/settings.rs | 2 +- tests/full_flow.rs | 187 +++++++++++++++++++++++++++++++++++++++ tests/hivemq/mod.rs | 41 +++++++++ tests/mqtt/mod.rs | 98 ++++++++++++++++++++ 8 files changed, 374 insertions(+), 99 deletions(-) create mode 100644 tests/full_flow.rs create mode 100644 tests/hivemq/mod.rs create mode 100644 tests/mqtt/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c56d9cd..53b188b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,11 @@ easy-scraper = "0.2.0" reqwest = { version = "0.11.18", features = ["blocking"] } rumqttc = "0.22.0" serde = { version = "1.0.183", features = ["derive"] } +serde_json = "1.0.107" url = { version = "2.4.1", features = ["serde"] } [dev-dependencies] fluent-asserter = "0.1.9" mockito = "1.2.0" testcontainers = "0.14.0" +assert-json-diff = "2.0.2" diff --git a/src/homeassistant/mod.rs b/src/homeassistant/mod.rs index cb2c89f..8af7140 100644 --- a/src/homeassistant/mod.rs +++ b/src/homeassistant/mod.rs @@ -61,8 +61,7 @@ impl HASensor { client: &mut Client, ) -> Result<(), rumqttc::ClientError> { let payload = format!( - r#" - {{ + r#"{{ "object_id": "ha_affaldvarme_{id}", "unique_id": "ha_affaldvarme_{id}", "name": "{sensor_name}", @@ -136,64 +135,3 @@ impl HASensor { client.publish(&self.state_topic, rumqttc::QoS::AtLeastOnce, false, payload) } } - -//generate tests for this module -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use testcontainers::{clients, core::WaitFor}; - - #[test] - fn smoke_test() { - let docker = clients::Cli::default(); - let hive = docker.run(HiveMQContainer::default()); - let port = hive.get_host_port_ipv4(1883); - - println!("Ip address: {}", hive.get_bridge_ip_address()); - println!("HiveMQ is listening on port {}", port); - } - - const NAME: &str = "hivemq/hivemq-ce"; - const TAG: &str = "latest"; - - struct HiveMQContainer { - _env_vars: HashMap, - tag: String, - } - - impl Default for HiveMQContainer { - fn default() -> Self { - let mut env_vars = HashMap::new(); - env_vars.insert("discovery.type".to_owned(), "single-node".to_owned()); - HiveMQContainer { - _env_vars: env_vars, - tag: TAG.to_owned(), - } - } - } - - impl testcontainers::Image for HiveMQContainer { - type Args = (); - - fn name(&self) -> String { - NAME.to_owned() - } - - fn tag(&self) -> String { - self.tag.to_owned() - } - - fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout("Started HiveMQ in")] - } - - fn expose_ports(&self) -> Vec { - vec![1883] - } - - // fn bla (&self){ - // self. - // } - } -} diff --git a/src/lib.rs b/src/lib.rs index 8f253a9..cd1ae5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,46 @@ +use std::collections::HashMap; + +use homeassistant::HASensor; +use mitaffald::get_containers; +use rumqttc::Client; +use settings::Settings; + pub mod homeassistant; pub mod mitaffald; pub mod settings; + +pub fn sync_data( + settings: Settings, + sensor_map: &mut HashMap, +) -> Result<(), String> { + let (mut client, mut connection) = Client::new(settings.mqtt.into(), 200); + let mut has_errors = false; + + get_containers(settings.affaldvarme)? + .into_iter() + .for_each(|x| { + let report_result = sensor_map + .entry(x.id.clone()) + .or_insert_with(|| HASensor::new(&x)) + .report(x, &mut client); + + has_errors = has_errors || report_result.is_err(); + }); + + //calling disconnect() causes an error in the connection iterator + if let Err(x) = client.disconnect() { + return Err(x.to_string()); + } + + //create own error and provide conversion from this? + //client.disconnect()?; + + //iterate the connection untill we hit the above generated error + connection.iter().take_while(|x| x.is_ok()).count(); + + if has_errors { + Err("Failed to report all containers".into()) + } else { + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index c16d711..bfad40c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use ha_mitaffald::homeassistant::HASensor; -use ha_mitaffald::mitaffald::get_containers; use ha_mitaffald::settings::Settings; -use rumqttc::Client; +use ha_mitaffald::sync_data; use std::collections::HashMap; fn main() { @@ -17,36 +16,3 @@ fn main() { ); } } - -fn sync_data(settings: Settings, sensor_map: &mut HashMap) -> Result<(), String> { - let (mut client, mut connection) = Client::new(settings.mqtt.into(), 200); - let mut has_errors = false; - - get_containers(settings.affaldvarme)? - .into_iter() - .for_each(|x| { - let report_result = sensor_map - .entry(x.id.clone()) - .or_insert_with(|| HASensor::new(&x)) - .report(x, &mut client); - - has_errors = has_errors || report_result.is_err(); - }); - - //calling disconnect() causes an error in the connection iterator - if let Err(x) = client.disconnect() { - return Err(x.to_string()); - } - - //create own error and provide conversion from this? - //client.disconnect()?; - - //iterate the connection untill we hit the above generated error - connection.iter().take_while(|x| x.is_ok()).count(); - - if has_errors { - Err("Failed to report all containers".into()) - } else { - Ok(()) - } -} diff --git a/src/settings.rs b/src/settings.rs index bb553e0..a0b322c 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -20,7 +20,7 @@ impl Settings { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] #[allow(unused)] pub struct MQTTConfig { pub host: String, diff --git a/tests/full_flow.rs b/tests/full_flow.rs new file mode 100644 index 0000000..4dab757 --- /dev/null +++ b/tests/full_flow.rs @@ -0,0 +1,187 @@ +mod hivemq; +mod mqtt; + +use crate::mqtt::CollectingClient; +use assert_json_diff::assert_json_include; +use fluent_asserter::{assert_that, create_asserter}; +use ha_mitaffald::{ + homeassistant::HASensor, + mitaffald::settings::{Address, AddressId, AffaldVarmeConfig}, + settings::Settings, + sync_data, +}; +use hivemq::HiveMQContainer; +use rumqttc::Publish; +use serde_json::Value; +use std::collections::HashMap; +use std::time::Duration; +use testcontainers::clients; +use url::Url; + +#[test] +fn smoke_test() { + let docker = clients::Cli::default(); + let mqtt_server = docker.run(HiveMQContainer::default()); + let mqtt_server_port = mqtt_server.get_host_port_ipv4(1883); + + let mut mit_affald_server = mockito::Server::new(); + let mit_affald_server_url = Url::parse(&mit_affald_server.url()).unwrap(); + let address_id = "123".to_string(); + let mit_affald_server = mit_affald_server + .mock( + "GET", + format!("/Adresse/VisAdresseInfo?address-selected-id={}", address_id).as_str(), + ) + .with_status(200) + .with_body_from_file("src/mitaffald/remote_responses/container_information.html") + .create(); + + let settings = Settings { + affaldvarme: AffaldVarmeConfig { + address: Address::Id(AddressId { id: address_id }), + base_url: mit_affald_server_url, + }, + mqtt: ha_mitaffald::settings::MQTTConfig { + client_id: "test".to_string(), + host: "127.0.0.1".to_string(), + port: mqtt_server_port, + username: "".to_owned(), + password: "".to_owned(), + }, + }; + + let mut collecting_client = CollectingClient::new(&settings.mqtt); + collecting_client.start(); + + let mut sensor_map: HashMap = HashMap::new(); + let sync_result = sync_data(settings, &mut sensor_map); + + assert!( + sync_result.is_ok(), + "Error synchronizing: {:?}", + sync_result.err() + ); + + let collect_result = collecting_client.wait_for_messages(6, Duration::from_secs(20)); + assert!( + collect_result.is_ok(), + "Error waiting for messages: {:?}", + collect_result.err() + ); + + mit_affald_server.assert(); + + let actual = actual(collect_result.unwrap()); + let expected = expectations(); + + expected.into_iter().for_each(|(key, expected)| { + assert_that!(&actual).contains_key(&key.to_owned()); + let actual = actual.get(key).unwrap(); + assert_eq!(actual.len(), expected.len()); + + expected.iter().zip(actual).for_each(|(expected, actual)| { + let actual_json = serde_json::from_str::(actual); + let expected_json = serde_json::from_str::(expected); + + match (actual_json, expected_json) { + (Ok(actual), Ok(expected)) => { + assert_json_include!(actual: actual, expected: expected) + } + _ => assert_eq!(actual, expected), + } + }); + }); +} + +fn actual(messages: Vec) -> HashMap> { + let mut actual: HashMap> = HashMap::new(); + for message in messages { + let topic = message.topic; + let payload = String::from_utf8(message.payload.to_vec()).unwrap(); + + actual.entry(topic).or_insert_with(Vec::new).push(payload); + } + + actual +} + +fn expectations() -> HashMap<&'static str, Vec<&'static str>> { + let mut expectation: HashMap<&'static str, Vec<&'static str>> = HashMap::new(); + expectation.insert( + "homeassistant/sensor/ha_affaldvarme_11064295/config", + vec![r#"{ + "object_id": "ha_affaldvarme_11064295", + "unique_id": "ha_affaldvarme_11064295", + "name": "Restaffald", + "state_topic": "garbage_bin/11064295/status", + "json_attributes_topic": "garbage_bin/11064295/status", + "value_template": "{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}", + "availability_topic": "garbage_bin/availability", + "payload_available": "online", + "payload_not_available": "offline", + "unit_of_measurement": "days", + "device": { + "identifiers": [ + "ha_affaldvarme" + ], + "name": "Affaldvarme integration", + "sw_version": "1.0", + "model": "Standard", + "manufacturer": "Your Garbage Bin Manufacturer" + }, + "icon": "mdi:recycle" + }"#]); + + expectation.insert( + "homeassistant/sensor/ha_affaldvarme_12019493/config", + vec![r#"{ + "object_id": "ha_affaldvarme_12019493", + "unique_id": "ha_affaldvarme_12019493", + "name": "Genanvendeligt affald (Glas plast metal og papir pap)", + "state_topic": "garbage_bin/12019493/status", + "json_attributes_topic": "garbage_bin/12019493/status", + "value_template": "{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}", + "availability_topic": "garbage_bin/availability", + "payload_available": "online", + "payload_not_available": "offline", + "unit_of_measurement": "days", + "device": { + "identifiers": [ + "ha_affaldvarme" + ], + "name": "Affaldvarme integration", + "sw_version": "1.0", + "model": "Standard", + "manufacturer": "Your Garbage Bin Manufacturer" + }, + "icon": "mdi:recycle" + }"#]); + + expectation.insert( + "garbage_bin/11064295/status", + vec![ + r#" { + "id": "11064295", + "size": "240 L", + "frequency": "1 gang på 2 uger", + "name": "Restaffald", + "next_empty": "2024-08-04" + }"#, + ], + ); + expectation.insert( + "garbage_bin/12019493/status", + vec![ + r#" { + "id": "12019493", + "size": "240 L", + "frequency": "1 gang på 4 uger", + "name": "Genanvendeligt affald (Glas plast metal og papir pap)", + "next_empty": "2024-08-03" + }"#, + ], + ); + expectation.insert("garbage_bin/availability", vec!["online", "online"]); + + expectation +} diff --git a/tests/hivemq/mod.rs b/tests/hivemq/mod.rs new file mode 100644 index 0000000..50bf455 --- /dev/null +++ b/tests/hivemq/mod.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; +use testcontainers::core::WaitFor; + +const NAME: &str = "hivemq/hivemq-ce"; +const TAG: &str = "latest"; + +pub struct HiveMQContainer { + _env_vars: HashMap, + tag: String, +} + +impl Default for HiveMQContainer { + fn default() -> Self { + let mut env_vars = HashMap::new(); + env_vars.insert("discovery.type".to_owned(), "single-node".to_owned()); + HiveMQContainer { + _env_vars: env_vars, + tag: TAG.to_owned(), + } + } +} + +impl testcontainers::Image for HiveMQContainer { + type Args = (); + + fn name(&self) -> String { + NAME.to_owned() + } + + fn tag(&self) -> String { + self.tag.to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Started HiveMQ in")] + } + + fn expose_ports(&self) -> Vec { + vec![1883] + } +} diff --git a/tests/mqtt/mod.rs b/tests/mqtt/mod.rs new file mode 100644 index 0000000..220fe5a --- /dev/null +++ b/tests/mqtt/mod.rs @@ -0,0 +1,98 @@ +use std::{ + cmp::Ordering, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::{Duration, Instant}, +}; + +use rumqttc::Publish; + +pub struct CollectingClient { + received_messages: std::sync::Arc>>, + join_handle: Option>, + config: ha_mitaffald::settings::MQTTConfig, + terminate_flag: Arc, +} + +impl CollectingClient { + pub fn new(config: &ha_mitaffald::settings::MQTTConfig) -> Self { + let config = ha_mitaffald::settings::MQTTConfig { + client_id: "collecting-client".to_owned(), + ..config.clone() + }; + + Self { + config, + received_messages: Arc::new(Mutex::new(Vec::new())), + join_handle: None, + terminate_flag: Arc::new(AtomicBool::new(false)), + } + } + + pub fn start(&mut self) { + let config = self.config.clone(); + let received_messages = self.received_messages.clone(); + let stopping_flag = Arc::clone(&self.terminate_flag); + let (mut client, mut connection) = rumqttc::Client::new(config.into(), 100); + client.subscribe("#", rumqttc::QoS::AtLeastOnce).unwrap(); + + let handle = std::thread::spawn(move || loop { + let message = connection.recv_timeout(Duration::from_secs(1)); + + println!("Received message: {:?}", message); + if let Ok(Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(message)))) = message { + received_messages.lock().unwrap().push(message); + } + + if stopping_flag.load(std::sync::atomic::Ordering::Relaxed) { + println!("Thread is terminating"); + break; + } + }); + + self.join_handle = Some(handle); + } + + pub fn wait_for_messages( + self, + count: usize, + timeout: Duration, + ) -> Result, WaitError> { + let start = std::time::Instant::now(); + loop { + let received_messages = self.received_messages.lock().unwrap().len(); + if received_messages >= count { + self.terminate_flag + .store(true, std::sync::atomic::Ordering::Relaxed); + + break; + } + + if Instant::now() - start > timeout { + self.terminate_flag + .store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + + std::thread::sleep(std::time::Duration::from_millis(500)); + } + + println!("Joining worker thread..."); + if let Some(handle) = self.join_handle { + handle.join().unwrap(); + } + //todo: can get rid of clone here? + let received_messages = self.received_messages.lock().unwrap(); + + match received_messages.len().cmp(&count) { + Ordering::Equal => Ok(received_messages.clone()), + Ordering::Greater => Err(WaitError::TooMany(received_messages.clone())), + Ordering::Less => Err(WaitError::Timeout(received_messages.clone())), + } + } +} + +#[derive(Debug)] +pub enum WaitError { + Timeout(Vec), + TooMany(Vec), +} From 427ff43445eeb13b6655b9a383f110cb98f1ce55 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Mon, 23 Oct 2023 06:17:48 +0200 Subject: [PATCH 03/15] Improve debuggability on test failure --- tests/full_flow.rs | 5 +++-- tests/mqtt/mod.rs | 31 +++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index 4dab757..57948b9 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -63,10 +63,11 @@ fn smoke_test() { ); let collect_result = collecting_client.wait_for_messages(6, Duration::from_secs(20)); + assert!( collect_result.is_ok(), - "Error waiting for messages: {:?}", - collect_result.err() + "Error waiting for messages: {}", + collect_result.unwrap_err() ); mit_affald_server.assert(); diff --git a/tests/mqtt/mod.rs b/tests/mqtt/mod.rs index 220fe5a..5910dfd 100644 --- a/tests/mqtt/mod.rs +++ b/tests/mqtt/mod.rs @@ -1,5 +1,6 @@ use std::{ cmp::Ordering, + fmt::{self, Display, Formatter}, sync::{atomic::AtomicBool, Arc, Mutex}, time::{Duration, Instant}, }; @@ -96,3 +97,33 @@ pub enum WaitError { Timeout(Vec), TooMany(Vec), } + +impl Display for WaitError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let payload_print = |f: &mut Formatter<'_>, publishes: &Vec| -> fmt::Result { + for publish in publishes { + writeln!( + f, + "({} : {}), ", + publish.topic, + String::from_utf8(publish.payload.to_vec()).unwrap() + )?; + } + + Ok(()) + }; + + match self { + WaitError::Timeout(publishes) => { + write!(f, "Timeout: [")?; + payload_print(f, publishes)?; + write!(f, "]") + } + WaitError::TooMany(publishes) => { + write!(f, "TooMany: [")?; + payload_print(f, publishes)?; + write!(f, "]") + } + } + } +} From ff7c33b64a88822607db0b10fc976663efa7d98d Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Mon, 23 Oct 2023 06:34:06 +0200 Subject: [PATCH 04/15] Log local mqtt server port --- tests/full_flow.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index 57948b9..18b6096 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -23,6 +23,7 @@ fn smoke_test() { let docker = clients::Cli::default(); let mqtt_server = docker.run(HiveMQContainer::default()); let mqtt_server_port = mqtt_server.get_host_port_ipv4(1883); + println!("Running local MQTT server on port {}", mqtt_server_port); let mut mit_affald_server = mockito::Server::new(); let mit_affald_server_url = Url::parse(&mit_affald_server.url()).unwrap(); From 1f678355be9631e199303ffc9fc10e00084e71ce Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Tue, 24 Oct 2023 05:57:44 +0200 Subject: [PATCH 05/15] Try to give containers unique names --- tests/full_flow.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index 18b6096..aec049e 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -13,15 +13,24 @@ use ha_mitaffald::{ use hivemq::HiveMQContainer; use rumqttc::Publish; use serde_json::Value; -use std::collections::HashMap; use std::time::Duration; -use testcontainers::clients; +use std::{collections::HashMap, time::SystemTime}; +use testcontainers::{clients, Image, RunnableImage}; use url::Url; #[test] fn smoke_test() { let docker = clients::Cli::default(); - let mqtt_server = docker.run(HiveMQContainer::default()); + let image: RunnableImage = HiveMQContainer::default().into(); + let image = image.with_container_name(format!( + "name{:?}", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + )); + // let mqtt_server = docker.run(HiveMQContainer::default()); + let mqtt_server = docker.run(image); + let mqtt_server_port = mqtt_server.get_host_port_ipv4(1883); println!("Running local MQTT server on port {}", mqtt_server_port); From 0e741124e57fad81b7935ad946f3ce21c2e2e5c1 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Tue, 24 Oct 2023 06:31:32 +0200 Subject: [PATCH 06/15] Try localhost --- tests/full_flow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index aec049e..cf9ab75 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -53,7 +53,7 @@ fn smoke_test() { }, mqtt: ha_mitaffald::settings::MQTTConfig { client_id: "test".to_string(), - host: "127.0.0.1".to_string(), + host: "localhost".to_string(), port: mqtt_server_port, username: "".to_owned(), password: "".to_owned(), From 5a39d7f51870cd79d1d81408072d747a6b1b0b88 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Tue, 24 Oct 2023 06:37:25 +0200 Subject: [PATCH 07/15] Increase timeout --- tests/full_flow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index cf9ab75..1c4996c 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -72,7 +72,7 @@ fn smoke_test() { sync_result.err() ); - let collect_result = collecting_client.wait_for_messages(6, Duration::from_secs(20)); + let collect_result = collecting_client.wait_for_messages(6, Duration::from_secs(60)); assert!( collect_result.is_ok(), From f85c83b399a84aecc788b5af6a4edfc442023015 Mon Sep 17 00:00:00 2001 From: Cosmin Lazar Date: Tue, 24 Oct 2023 14:50:16 +0200 Subject: [PATCH 08/15] Update build.yml --- .github/workflows/build.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ff8e3dd..29271ea 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -57,6 +57,8 @@ jobs: coverage: name: Code coverage runs-on: ubuntu-latest + # temporary wait for test to see if it fixes mqtt errors + needs: test env: CARGO_TERM_COLOR: always steps: @@ -79,4 +81,4 @@ jobs: with: coverage-files: lcov.info artifact-name: code-coverage-report - update-comment: true \ No newline at end of file + update-comment: true From 38566b70e3acb6b1cfd3649529d31178bd3c9220 Mon Sep 17 00:00:00 2001 From: Cosmin Lazar Date: Tue, 24 Oct 2023 16:02:26 +0200 Subject: [PATCH 09/15] Update build.yml --- .github/workflows/build.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 29271ea..95d0815 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,13 @@ jobs: uses: taiki-e/install-action@v2 with: tool: nextest - + + - name: Run unit tests (cargo test) + env: + CARGO_TERM_COLOR: always + run: | + cargo test + - name: Run unit tests (nextest) env: CARGO_TERM_COLOR: always From 13a2b3bad412759f31b1db9bf1edf9743c5ca5b0 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Tue, 24 Oct 2023 20:56:16 +0200 Subject: [PATCH 10/15] Try fix test --- tests/full_flow.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/full_flow.rs b/tests/full_flow.rs index 1c4996c..cf70bea 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -62,6 +62,9 @@ fn smoke_test() { let mut collecting_client = CollectingClient::new(&settings.mqtt); collecting_client.start(); + //sleep 4 seconds + println!("Sleeping for 4 seconds to make sure the consumer started..."); + std::thread::sleep(Duration::from_secs(4)); let mut sensor_map: HashMap = HashMap::new(); let sync_result = sync_data(settings, &mut sensor_map); From e2452b21ba946b870ff4574d48f93bf1730792d4 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Wed, 25 Oct 2023 05:42:14 +0200 Subject: [PATCH 11/15] Wait for consummer thread --- .idea/.gitignore | 8 +++ .idea/ha-mitaffald.iml | 11 ++++ .idea/modules.xml | 8 +++ .idea/vcs.xml | 6 ++ .vscode/launch.json | 64 ++++++++++++++++++ .vscode/settings.json | 4 ++ src/homeassistant/messages.rs | 120 ++++++++++++++++++++++++++++++++++ tests/full_flow.rs | 7 +- tests/mqtt/mod.rs | 56 +++++++++------- 9 files changed, 254 insertions(+), 30 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/ha-mitaffald.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 src/homeassistant/messages.rs diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/ha-mitaffald.iml b/.idea/ha-mitaffald.iml new file mode 100644 index 0000000..cf84ae4 --- /dev/null +++ b/.idea/ha-mitaffald.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e7226c5 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..9fe7dcb --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,64 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'ha-mitaffald'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'ha-mitaffald'", + "cargo": { + "args": [ + "build", + "--bin=ha-mitaffald", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'ha-mitaffald'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=ha-mitaffald", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..df15ce6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "rust-analyzer.checkOnSave.command": "clippy", + "editor.formatOnSave": true, +} \ No newline at end of file diff --git a/src/homeassistant/messages.rs b/src/homeassistant/messages.rs new file mode 100644 index 0000000..e02c51f --- /dev/null +++ b/src/homeassistant/messages.rs @@ -0,0 +1,120 @@ +// use derive_builder::Builder; +// use serde::{Deserialize, Serialize}; + +// #[derive(Debug, Default, Builder, Clone)] +// pub struct Device { +// /// Webpage link to manage the configuration. +// configuration_url: Option, + +// /// List of connections of the device. +// connections: Option>, + +// /// Hardware version. +// hw_version: Option, + +// /// List of IDs that uniquely identify the device. +// identifiers: Option>, + +// /// Manufacturer of the device. +// manufacturer: Option, + +// /// Model of the device. +// model: Option, + +// /// Name of the device. +// name: Option, + +// /// Suggest an area if the device isn’t in one yet. +// suggested_area: Option, + +// /// Firmware version. +// sw_version: Option, + +// /// Identifier of a device that routes messages between this device and Home Assistant. Examples of such devices are hubs, or parent devices of a sub-device. This is used to show device topology in Home Assistant. +// via_device: Option, +// } + +// /// MQTT sensor configuration. +// #[derive(Debug, Default, Builder)] +// pub struct Sensor { +// /// The MQTT topic subscribed to receive sensor values. +// state_topic: String, + +// /// A list of MQTT topics subscribed to receive availability updates. +// availability: Option>, + +// /// Represents the available state. +// payload_available: Option, + +// /// Represents the unavailable state. +// payload_not_available: Option, + +// /// An MQTT topic subscribed to receive availability updates. +// topic: String, + +// /// Template to extract device’s availability from the topic. +// value_template: Option, + +// /// Controls the conditions to set the entity to available. +// availability_mode: Option, + +// /// Template to extract device’s availability from the availability_topic. +// availability_template: Option, + +// /// The MQTT topic subscribed to receive availability updates. +// availability_topic: Option, + +// /// Information about the device. +// device: Option, + +// /// A link to the webpage that can manage the configuration of this device. +// configuration_url: Option, + +// /// Flag which defines if the entity should be enabled when first added. +// enabled_by_default: Option, + +// /// Encoding of the payloads received. +// encoding: Option, + +// /// Category of the entity. +// entity_category: Option, + +// /// Defines the number of seconds after the sensor’s state expires. +// expire_after: Option, + +// /// Sends update events even if the value hasn’t changed. +// force_update: Option, + +// /// Icon for the entity. +// icon: Option, + +// /// Template to extract the JSON dictionary. +// json_attributes_template: Option, + +// /// Topic subscribed to receive a JSON dictionary payload. +// json_attributes_topic: Option, + +// /// Template to extract the last_reset. +// last_reset_value_template: Option, + +// /// Name of the MQTT sensor. +// name: Option, + +// /// Used for automatic generation of entity_id. +// object_id: Option, + +// /// Number of decimals used in the sensor’s state after rounding. +// suggested_display_precision: Option, + +// /// Maximum QoS level to be used. +// qos: Option, + +// /// State class of the sensor. +// state_class: Option, + +// /// Unique ID for this sensor. +// unique_id: Option, + +// /// Units of measurement of the sensor. +// unit_of_measurement: Option, +// } diff --git a/tests/full_flow.rs b/tests/full_flow.rs index cf70bea..f5182a7 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -60,11 +60,8 @@ fn smoke_test() { }, }; - let mut collecting_client = CollectingClient::new(&settings.mqtt); - collecting_client.start(); - //sleep 4 seconds - println!("Sleeping for 4 seconds to make sure the consumer started..."); - std::thread::sleep(Duration::from_secs(4)); + let mut collecting_client = CollectingClient::new(); + collecting_client.start(&settings.mqtt); let mut sensor_map: HashMap = HashMap::new(); let sync_result = sync_data(settings, &mut sensor_map); diff --git a/tests/mqtt/mod.rs b/tests/mqtt/mod.rs index 5910dfd..e95bae8 100644 --- a/tests/mqtt/mod.rs +++ b/tests/mqtt/mod.rs @@ -5,51 +5,57 @@ use std::{ time::{Duration, Instant}, }; -use rumqttc::Publish; +use rumqttc::{Client, Event, Packet, Publish, QoS}; pub struct CollectingClient { received_messages: std::sync::Arc>>, join_handle: Option>, - config: ha_mitaffald::settings::MQTTConfig, terminate_flag: Arc, } impl CollectingClient { - pub fn new(config: &ha_mitaffald::settings::MQTTConfig) -> Self { - let config = ha_mitaffald::settings::MQTTConfig { - client_id: "collecting-client".to_owned(), - ..config.clone() - }; - + pub fn new() -> Self { Self { - config, received_messages: Arc::new(Mutex::new(Vec::new())), join_handle: None, terminate_flag: Arc::new(AtomicBool::new(false)), } } - pub fn start(&mut self) { - let config = self.config.clone(); + pub fn start(&mut self, config: &ha_mitaffald::settings::MQTTConfig) { + let config = ha_mitaffald::settings::MQTTConfig { + client_id: "collecting-client".to_owned(), + ..config.clone() + }; let received_messages = self.received_messages.clone(); let stopping_flag = Arc::clone(&self.terminate_flag); - let (mut client, mut connection) = rumqttc::Client::new(config.into(), 100); - client.subscribe("#", rumqttc::QoS::AtLeastOnce).unwrap(); - - let handle = std::thread::spawn(move || loop { - let message = connection.recv_timeout(Duration::from_secs(1)); - - println!("Received message: {:?}", message); - if let Ok(Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(message)))) = message { - received_messages.lock().unwrap().push(message); - } - - if stopping_flag.load(std::sync::atomic::Ordering::Relaxed) { - println!("Thread is terminating"); - break; + let (tx, rx) = std::sync::mpsc::channel::<()>(); + + let handle = std::thread::spawn(move || { + let (mut client, mut connection) = Client::new(config.into(), 100); + client.subscribe("#", QoS::AtLeastOnce).unwrap(); + + loop { + let message = connection.recv_timeout(Duration::from_secs(1)); + println!("Received message: {:?}", &message); + match message { + Ok(Ok(Event::Incoming(Packet::SubAck(_)))) => { + tx.send(()).expect("Cannot report ready to main thread") + } + Ok(Ok(Event::Incoming(Packet::Publish(message)))) => { + received_messages.lock().unwrap().push(message); + } + _ => {} + } + + if stopping_flag.load(std::sync::atomic::Ordering::Relaxed) { + println!("Thread is terminating"); + break; + } } }); + rx.recv().expect("Consumer thread did not report ready"); self.join_handle = Some(handle); } From 3866d32444873d5270f9c8a5f8c01c79958789ac Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Wed, 25 Oct 2023 05:44:46 +0200 Subject: [PATCH 12/15] Remove hacks --- .github/workflows/build.yml | 2 -- tests/full_flow.rs | 16 +++------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 95d0815..81078e9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -63,8 +63,6 @@ jobs: coverage: name: Code coverage runs-on: ubuntu-latest - # temporary wait for test to see if it fixes mqtt errors - needs: test env: CARGO_TERM_COLOR: always steps: diff --git a/tests/full_flow.rs b/tests/full_flow.rs index f5182a7..6e43e8d 100644 --- a/tests/full_flow.rs +++ b/tests/full_flow.rs @@ -13,26 +13,16 @@ use ha_mitaffald::{ use hivemq::HiveMQContainer; use rumqttc::Publish; use serde_json::Value; +use std::collections::HashMap; use std::time::Duration; -use std::{collections::HashMap, time::SystemTime}; -use testcontainers::{clients, Image, RunnableImage}; +use testcontainers::clients; use url::Url; #[test] fn smoke_test() { let docker = clients::Cli::default(); - let image: RunnableImage = HiveMQContainer::default().into(); - let image = image.with_container_name(format!( - "name{:?}", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - )); - // let mqtt_server = docker.run(HiveMQContainer::default()); - let mqtt_server = docker.run(image); - + let mqtt_server = docker.run(HiveMQContainer::default()); let mqtt_server_port = mqtt_server.get_host_port_ipv4(1883); - println!("Running local MQTT server on port {}", mqtt_server_port); let mut mit_affald_server = mockito::Server::new(); let mit_affald_server_url = Url::parse(&mit_affald_server.url()).unwrap(); From 835b1518a7e1261b231f6073a4dd02826f333296 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Wed, 25 Oct 2023 05:50:29 +0200 Subject: [PATCH 13/15] Stop running tests with cargo test --- .github/workflows/build.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 81078e9..0c5d64e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,13 +26,7 @@ jobs: uses: taiki-e/install-action@v2 with: tool: nextest - - - name: Run unit tests (cargo test) - env: - CARGO_TERM_COLOR: always - run: | - cargo test - + - name: Run unit tests (nextest) env: CARGO_TERM_COLOR: always From c45e6f229717ed0e84e5406583436da991693817 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Wed, 25 Oct 2023 05:52:04 +0200 Subject: [PATCH 14/15] Rm and ignore .idea --- .gitignore | 2 ++ .idea/.gitignore | 8 -------- .idea/ha-mitaffald.iml | 11 ----------- .idea/modules.xml | 8 -------- .idea/vcs.xml | 6 ------ 5 files changed, 2 insertions(+), 33 deletions(-) delete mode 100644 .idea/.gitignore delete mode 100644 .idea/ha-mitaffald.iml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index f1158e7..6088e9f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ Cargo.lock # Secret configuration config/secrets.toml + +.idea/ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/ha-mitaffald.iml b/.idea/ha-mitaffald.iml deleted file mode 100644 index cf84ae4..0000000 --- a/.idea/ha-mitaffald.iml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index e7226c5..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From c4badfed9b42d425117a56557d6fce01417b5a01 Mon Sep 17 00:00:00 2001 From: Cosmin Constantin Lazar Date: Wed, 25 Oct 2023 06:07:44 +0200 Subject: [PATCH 15/15] Get rid of clone --- tests/mqtt/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/mqtt/mod.rs b/tests/mqtt/mod.rs index e95bae8..fbaf5e6 100644 --- a/tests/mqtt/mod.rs +++ b/tests/mqtt/mod.rs @@ -87,13 +87,15 @@ impl CollectingClient { if let Some(handle) = self.join_handle { handle.join().unwrap(); } - //todo: can get rid of clone here? - let received_messages = self.received_messages.lock().unwrap(); + + let inner_mutex = + Arc::try_unwrap(self.received_messages).expect("More than one reference detected"); + let received_messages = inner_mutex.into_inner().unwrap(); match received_messages.len().cmp(&count) { - Ordering::Equal => Ok(received_messages.clone()), - Ordering::Greater => Err(WaitError::TooMany(received_messages.clone())), - Ordering::Less => Err(WaitError::Timeout(received_messages.clone())), + Ordering::Equal => Ok(received_messages), + Ordering::Greater => Err(WaitError::TooMany(received_messages)), + Ordering::Less => Err(WaitError::Timeout(received_messages)), } } }