diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ff8e3dd..0c5d64e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -79,4 +79,4 @@ jobs: with: coverage-files: lcov.info artifact-name: code-coverage-report - update-comment: true \ No newline at end of file + update-comment: true diff --git a/.gitignore b/.gitignore index f1158e7..6088e9f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ Cargo.lock # Secret configuration config/secrets.toml + +.idea/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..9fe7dcb --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,64 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'ha-mitaffald'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'ha-mitaffald'", + "cargo": { + "args": [ + "build", + "--bin=ha-mitaffald", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'ha-mitaffald'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=ha-mitaffald", + "--package=ha-mitaffald" + ], + "filter": { + "name": "ha-mitaffald", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..df15ce6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "rust-analyzer.checkOnSave.command": "clippy", + "editor.formatOnSave": true, +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 134a026..53b188b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,11 @@ easy-scraper = "0.2.0" reqwest = { version = "0.11.18", features = ["blocking"] } rumqttc = "0.22.0" serde = { version = "1.0.183", features = ["derive"] } +serde_json = "1.0.107" url = { version = "2.4.1", features = ["serde"] } [dev-dependencies] fluent-asserter = "0.1.9" mockito = "1.2.0" +testcontainers = "0.14.0" +assert-json-diff = "2.0.2" diff --git a/src/homeassistant/messages.rs b/src/homeassistant/messages.rs new file mode 100644 index 0000000..e02c51f --- /dev/null +++ b/src/homeassistant/messages.rs @@ -0,0 +1,120 @@ +// use derive_builder::Builder; +// use serde::{Deserialize, Serialize}; + +// #[derive(Debug, Default, Builder, Clone)] +// pub struct Device { +// /// Webpage link to manage the configuration. +// configuration_url: Option, + +// /// List of connections of the device. +// connections: Option>, + +// /// Hardware version. +// hw_version: Option, + +// /// List of IDs that uniquely identify the device. +// identifiers: Option>, + +// /// Manufacturer of the device. +// manufacturer: Option, + +// /// Model of the device. +// model: Option, + +// /// Name of the device. +// name: Option, + +// /// Suggest an area if the device isn’t in one yet. +// suggested_area: Option, + +// /// Firmware version. +// sw_version: Option, + +// /// Identifier of a device that routes messages between this device and Home Assistant. Examples of such devices are hubs, or parent devices of a sub-device. This is used to show device topology in Home Assistant. +// via_device: Option, +// } + +// /// MQTT sensor configuration. +// #[derive(Debug, Default, Builder)] +// pub struct Sensor { +// /// The MQTT topic subscribed to receive sensor values. +// state_topic: String, + +// /// A list of MQTT topics subscribed to receive availability updates. +// availability: Option>, + +// /// Represents the available state. +// payload_available: Option, + +// /// Represents the unavailable state. +// payload_not_available: Option, + +// /// An MQTT topic subscribed to receive availability updates. +// topic: String, + +// /// Template to extract device’s availability from the topic. +// value_template: Option, + +// /// Controls the conditions to set the entity to available. +// availability_mode: Option, + +// /// Template to extract device’s availability from the availability_topic. +// availability_template: Option, + +// /// The MQTT topic subscribed to receive availability updates. +// availability_topic: Option, + +// /// Information about the device. +// device: Option, + +// /// A link to the webpage that can manage the configuration of this device. +// configuration_url: Option, + +// /// Flag which defines if the entity should be enabled when first added. +// enabled_by_default: Option, + +// /// Encoding of the payloads received. +// encoding: Option, + +// /// Category of the entity. +// entity_category: Option, + +// /// Defines the number of seconds after the sensor’s state expires. +// expire_after: Option, + +// /// Sends update events even if the value hasn’t changed. +// force_update: Option, + +// /// Icon for the entity. +// icon: Option, + +// /// Template to extract the JSON dictionary. +// json_attributes_template: Option, + +// /// Topic subscribed to receive a JSON dictionary payload. +// json_attributes_topic: Option, + +// /// Template to extract the last_reset. +// last_reset_value_template: Option, + +// /// Name of the MQTT sensor. +// name: Option, + +// /// Used for automatic generation of entity_id. +// object_id: Option, + +// /// Number of decimals used in the sensor’s state after rounding. +// suggested_display_precision: Option, + +// /// Maximum QoS level to be used. +// qos: Option, + +// /// State class of the sensor. +// state_class: Option, + +// /// Unique ID for this sensor. +// unique_id: Option, + +// /// Units of measurement of the sensor. +// unit_of_measurement: Option, +// } diff --git a/src/homeassistant/mod.rs b/src/homeassistant/mod.rs index ac344db..8af7140 100644 --- a/src/homeassistant/mod.rs +++ b/src/homeassistant/mod.rs @@ -1,6 +1,7 @@ use crate::mitaffald::Container; use crate::settings::MQTTConfig; use rumqttc::{Client, LastWill, MqttOptions}; + const HA_AVAILABILITY_TOPIC: &str = "garbage_bin/availability"; impl From for MqttOptions { @@ -18,6 +19,7 @@ impl From for MqttOptions { config } } + pub struct HASensor { pub container_id: String, configure_topic: String, @@ -59,8 +61,7 @@ impl HASensor { client: &mut Client, ) -> Result<(), rumqttc::ClientError> { let payload = format!( - r#" - {{ + r#"{{ "object_id": "ha_affaldvarme_{id}", "unique_id": "ha_affaldvarme_{id}", "name": "{sensor_name}", @@ -134,11 +135,3 @@ impl HASensor { client.publish(&self.state_topic, rumqttc::QoS::AtLeastOnce, false, payload) } } - -//can we use asref here? - -// impl Into for Container { -// fn into(&self) -> HASensor { -// HASensor::new(&self) -// } -// } diff --git a/src/lib.rs b/src/lib.rs index 8f253a9..cd1ae5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,46 @@ +use std::collections::HashMap; + +use homeassistant::HASensor; +use mitaffald::get_containers; +use rumqttc::Client; +use settings::Settings; + pub mod homeassistant; pub mod mitaffald; pub mod settings; + +pub fn sync_data( + settings: Settings, + sensor_map: &mut HashMap, +) -> Result<(), String> { + let (mut client, mut connection) = Client::new(settings.mqtt.into(), 200); + let mut has_errors = false; + + get_containers(settings.affaldvarme)? + .into_iter() + .for_each(|x| { + let report_result = sensor_map + .entry(x.id.clone()) + .or_insert_with(|| HASensor::new(&x)) + .report(x, &mut client); + + has_errors = has_errors || report_result.is_err(); + }); + + //calling disconnect() causes an error in the connection iterator + if let Err(x) = client.disconnect() { + return Err(x.to_string()); + } + + //create own error and provide conversion from this? + //client.disconnect()?; + + //iterate the connection untill we hit the above generated error + connection.iter().take_while(|x| x.is_ok()).count(); + + if has_errors { + Err("Failed to report all containers".into()) + } else { + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index c16d711..bfad40c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use ha_mitaffald::homeassistant::HASensor; -use ha_mitaffald::mitaffald::get_containers; use ha_mitaffald::settings::Settings; -use rumqttc::Client; +use ha_mitaffald::sync_data; use std::collections::HashMap; fn main() { @@ -17,36 +16,3 @@ fn main() { ); } } - -fn sync_data(settings: Settings, sensor_map: &mut HashMap) -> Result<(), String> { - let (mut client, mut connection) = Client::new(settings.mqtt.into(), 200); - let mut has_errors = false; - - get_containers(settings.affaldvarme)? - .into_iter() - .for_each(|x| { - let report_result = sensor_map - .entry(x.id.clone()) - .or_insert_with(|| HASensor::new(&x)) - .report(x, &mut client); - - has_errors = has_errors || report_result.is_err(); - }); - - //calling disconnect() causes an error in the connection iterator - if let Err(x) = client.disconnect() { - return Err(x.to_string()); - } - - //create own error and provide conversion from this? - //client.disconnect()?; - - //iterate the connection untill we hit the above generated error - connection.iter().take_while(|x| x.is_ok()).count(); - - if has_errors { - Err("Failed to report all containers".into()) - } else { - Ok(()) - } -} diff --git a/src/settings.rs b/src/settings.rs index bb553e0..a0b322c 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -20,7 +20,7 @@ impl Settings { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] #[allow(unused)] pub struct MQTTConfig { pub host: String, diff --git a/tests/full_flow.rs b/tests/full_flow.rs new file mode 100644 index 0000000..6e43e8d --- /dev/null +++ b/tests/full_flow.rs @@ -0,0 +1,188 @@ +mod hivemq; +mod mqtt; + +use crate::mqtt::CollectingClient; +use assert_json_diff::assert_json_include; +use fluent_asserter::{assert_that, create_asserter}; +use ha_mitaffald::{ + homeassistant::HASensor, + mitaffald::settings::{Address, AddressId, AffaldVarmeConfig}, + settings::Settings, + sync_data, +}; +use hivemq::HiveMQContainer; +use rumqttc::Publish; +use serde_json::Value; +use std::collections::HashMap; +use std::time::Duration; +use testcontainers::clients; +use url::Url; + +#[test] +fn smoke_test() { + let docker = clients::Cli::default(); + let mqtt_server = docker.run(HiveMQContainer::default()); + let mqtt_server_port = mqtt_server.get_host_port_ipv4(1883); + + let mut mit_affald_server = mockito::Server::new(); + let mit_affald_server_url = Url::parse(&mit_affald_server.url()).unwrap(); + let address_id = "123".to_string(); + let mit_affald_server = mit_affald_server + .mock( + "GET", + format!("/Adresse/VisAdresseInfo?address-selected-id={}", address_id).as_str(), + ) + .with_status(200) + .with_body_from_file("src/mitaffald/remote_responses/container_information.html") + .create(); + + let settings = Settings { + affaldvarme: AffaldVarmeConfig { + address: Address::Id(AddressId { id: address_id }), + base_url: mit_affald_server_url, + }, + mqtt: ha_mitaffald::settings::MQTTConfig { + client_id: "test".to_string(), + host: "localhost".to_string(), + port: mqtt_server_port, + username: "".to_owned(), + password: "".to_owned(), + }, + }; + + let mut collecting_client = CollectingClient::new(); + collecting_client.start(&settings.mqtt); + + let mut sensor_map: HashMap = HashMap::new(); + let sync_result = sync_data(settings, &mut sensor_map); + + assert!( + sync_result.is_ok(), + "Error synchronizing: {:?}", + sync_result.err() + ); + + let collect_result = collecting_client.wait_for_messages(6, Duration::from_secs(60)); + + assert!( + collect_result.is_ok(), + "Error waiting for messages: {}", + collect_result.unwrap_err() + ); + + mit_affald_server.assert(); + + let actual = actual(collect_result.unwrap()); + let expected = expectations(); + + expected.into_iter().for_each(|(key, expected)| { + assert_that!(&actual).contains_key(&key.to_owned()); + let actual = actual.get(key).unwrap(); + assert_eq!(actual.len(), expected.len()); + + expected.iter().zip(actual).for_each(|(expected, actual)| { + let actual_json = serde_json::from_str::(actual); + let expected_json = serde_json::from_str::(expected); + + match (actual_json, expected_json) { + (Ok(actual), Ok(expected)) => { + assert_json_include!(actual: actual, expected: expected) + } + _ => assert_eq!(actual, expected), + } + }); + }); +} + +fn actual(messages: Vec) -> HashMap> { + let mut actual: HashMap> = HashMap::new(); + for message in messages { + let topic = message.topic; + let payload = String::from_utf8(message.payload.to_vec()).unwrap(); + + actual.entry(topic).or_insert_with(Vec::new).push(payload); + } + + actual +} + +fn expectations() -> HashMap<&'static str, Vec<&'static str>> { + let mut expectation: HashMap<&'static str, Vec<&'static str>> = HashMap::new(); + expectation.insert( + "homeassistant/sensor/ha_affaldvarme_11064295/config", + vec![r#"{ + "object_id": "ha_affaldvarme_11064295", + "unique_id": "ha_affaldvarme_11064295", + "name": "Restaffald", + "state_topic": "garbage_bin/11064295/status", + "json_attributes_topic": "garbage_bin/11064295/status", + "value_template": "{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}", + "availability_topic": "garbage_bin/availability", + "payload_available": "online", + "payload_not_available": "offline", + "unit_of_measurement": "days", + "device": { + "identifiers": [ + "ha_affaldvarme" + ], + "name": "Affaldvarme integration", + "sw_version": "1.0", + "model": "Standard", + "manufacturer": "Your Garbage Bin Manufacturer" + }, + "icon": "mdi:recycle" + }"#]); + + expectation.insert( + "homeassistant/sensor/ha_affaldvarme_12019493/config", + vec![r#"{ + "object_id": "ha_affaldvarme_12019493", + "unique_id": "ha_affaldvarme_12019493", + "name": "Genanvendeligt affald (Glas plast metal og papir pap)", + "state_topic": "garbage_bin/12019493/status", + "json_attributes_topic": "garbage_bin/12019493/status", + "value_template": "{{ (strptime(value_json.next_empty, '%Y-%m-%d').date() - now().date()).days }}", + "availability_topic": "garbage_bin/availability", + "payload_available": "online", + "payload_not_available": "offline", + "unit_of_measurement": "days", + "device": { + "identifiers": [ + "ha_affaldvarme" + ], + "name": "Affaldvarme integration", + "sw_version": "1.0", + "model": "Standard", + "manufacturer": "Your Garbage Bin Manufacturer" + }, + "icon": "mdi:recycle" + }"#]); + + expectation.insert( + "garbage_bin/11064295/status", + vec![ + r#" { + "id": "11064295", + "size": "240 L", + "frequency": "1 gang på 2 uger", + "name": "Restaffald", + "next_empty": "2024-08-04" + }"#, + ], + ); + expectation.insert( + "garbage_bin/12019493/status", + vec![ + r#" { + "id": "12019493", + "size": "240 L", + "frequency": "1 gang på 4 uger", + "name": "Genanvendeligt affald (Glas plast metal og papir pap)", + "next_empty": "2024-08-03" + }"#, + ], + ); + expectation.insert("garbage_bin/availability", vec!["online", "online"]); + + expectation +} diff --git a/tests/hivemq/mod.rs b/tests/hivemq/mod.rs new file mode 100644 index 0000000..50bf455 --- /dev/null +++ b/tests/hivemq/mod.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; +use testcontainers::core::WaitFor; + +const NAME: &str = "hivemq/hivemq-ce"; +const TAG: &str = "latest"; + +pub struct HiveMQContainer { + _env_vars: HashMap, + tag: String, +} + +impl Default for HiveMQContainer { + fn default() -> Self { + let mut env_vars = HashMap::new(); + env_vars.insert("discovery.type".to_owned(), "single-node".to_owned()); + HiveMQContainer { + _env_vars: env_vars, + tag: TAG.to_owned(), + } + } +} + +impl testcontainers::Image for HiveMQContainer { + type Args = (); + + fn name(&self) -> String { + NAME.to_owned() + } + + fn tag(&self) -> String { + self.tag.to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Started HiveMQ in")] + } + + fn expose_ports(&self) -> Vec { + vec![1883] + } +} diff --git a/tests/mqtt/mod.rs b/tests/mqtt/mod.rs new file mode 100644 index 0000000..fbaf5e6 --- /dev/null +++ b/tests/mqtt/mod.rs @@ -0,0 +1,137 @@ +use std::{ + cmp::Ordering, + fmt::{self, Display, Formatter}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::{Duration, Instant}, +}; + +use rumqttc::{Client, Event, Packet, Publish, QoS}; + +pub struct CollectingClient { + received_messages: std::sync::Arc>>, + join_handle: Option>, + terminate_flag: Arc, +} + +impl CollectingClient { + pub fn new() -> Self { + Self { + received_messages: Arc::new(Mutex::new(Vec::new())), + join_handle: None, + terminate_flag: Arc::new(AtomicBool::new(false)), + } + } + + pub fn start(&mut self, config: &ha_mitaffald::settings::MQTTConfig) { + let config = ha_mitaffald::settings::MQTTConfig { + client_id: "collecting-client".to_owned(), + ..config.clone() + }; + let received_messages = self.received_messages.clone(); + let stopping_flag = Arc::clone(&self.terminate_flag); + let (tx, rx) = std::sync::mpsc::channel::<()>(); + + let handle = std::thread::spawn(move || { + let (mut client, mut connection) = Client::new(config.into(), 100); + client.subscribe("#", QoS::AtLeastOnce).unwrap(); + + loop { + let message = connection.recv_timeout(Duration::from_secs(1)); + println!("Received message: {:?}", &message); + match message { + Ok(Ok(Event::Incoming(Packet::SubAck(_)))) => { + tx.send(()).expect("Cannot report ready to main thread") + } + Ok(Ok(Event::Incoming(Packet::Publish(message)))) => { + received_messages.lock().unwrap().push(message); + } + _ => {} + } + + if stopping_flag.load(std::sync::atomic::Ordering::Relaxed) { + println!("Thread is terminating"); + break; + } + } + }); + + rx.recv().expect("Consumer thread did not report ready"); + self.join_handle = Some(handle); + } + + pub fn wait_for_messages( + self, + count: usize, + timeout: Duration, + ) -> Result, WaitError> { + let start = std::time::Instant::now(); + loop { + let received_messages = self.received_messages.lock().unwrap().len(); + if received_messages >= count { + self.terminate_flag + .store(true, std::sync::atomic::Ordering::Relaxed); + + break; + } + + if Instant::now() - start > timeout { + self.terminate_flag + .store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + + std::thread::sleep(std::time::Duration::from_millis(500)); + } + + println!("Joining worker thread..."); + if let Some(handle) = self.join_handle { + handle.join().unwrap(); + } + + let inner_mutex = + Arc::try_unwrap(self.received_messages).expect("More than one reference detected"); + let received_messages = inner_mutex.into_inner().unwrap(); + + match received_messages.len().cmp(&count) { + Ordering::Equal => Ok(received_messages), + Ordering::Greater => Err(WaitError::TooMany(received_messages)), + Ordering::Less => Err(WaitError::Timeout(received_messages)), + } + } +} + +#[derive(Debug)] +pub enum WaitError { + Timeout(Vec), + TooMany(Vec), +} + +impl Display for WaitError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let payload_print = |f: &mut Formatter<'_>, publishes: &Vec| -> fmt::Result { + for publish in publishes { + writeln!( + f, + "({} : {}), ", + publish.topic, + String::from_utf8(publish.payload.to_vec()).unwrap() + )?; + } + + Ok(()) + }; + + match self { + WaitError::Timeout(publishes) => { + write!(f, "Timeout: [")?; + payload_print(f, publishes)?; + write!(f, "]") + } + WaitError::TooMany(publishes) => { + write!(f, "TooMany: [")?; + payload_print(f, publishes)?; + write!(f, "]") + } + } + } +}