From 974dfe51b5eb9471f3580c0c728c9bef1c2ffbf3 Mon Sep 17 00:00:00 2001 From: Brian May Date: Thu, 7 Sep 2023 17:15:41 +1000 Subject: [PATCH] Refactor entities into pipes --- Cargo.lock | 1 + brian-backend/src/amber.rs | 41 +- brian-backend/src/delays.rs | 38 +- brian-backend/src/environment_monitor.rs | 7 +- brian-backend/src/ha.rs | 9 +- brian-backend/src/hdmi.rs | 7 +- brian-backend/src/lights.rs | 109 +- brian-backend/src/main.rs | 6 +- brian-backend/src/tesla.rs | 30 +- robotica-backend/Cargo.toml | 1 + robotica-backend/src/devices/fake_switch.rs | 3 +- robotica-backend/src/devices/hdmi/mod.rs | 8 +- robotica-backend/src/devices/lifx.rs | 13 +- robotica-backend/src/entities.rs | 1046 ----------------- robotica-backend/src/lib.rs | 5 +- robotica-backend/src/pipes/generic/mod.rs | 106 ++ .../src/pipes/generic/receiver.rs | 194 +++ robotica-backend/src/pipes/generic/sender.rs | 39 + robotica-backend/src/pipes/mod.rs | 61 + robotica-backend/src/pipes/stateful/mod.rs | 106 ++ .../src/pipes/stateful/receiver.rs | 369 ++++++ robotica-backend/src/pipes/stateful/sender.rs | 39 + robotica-backend/src/pipes/stateless/mod.rs | 93 ++ .../src/pipes/stateless/receiver.rs | 293 +++++ .../src/pipes/stateless/sender.rs | 39 + robotica-backend/src/scheduling/executor.rs | 3 +- .../src/services/http/websocket/mod.rs | 7 +- robotica-backend/src/services/life360.rs | 8 +- robotica-backend/src/services/mqtt/mod.rs | 51 +- robotica-backend/src/sinks/mod.rs | 19 +- robotica-backend/src/sources/timer.rs | 8 +- robotica-backend/tests/hdmi.rs | 6 +- robotica-slint/src/audio.rs | 7 +- robotica-slint/src/ui.rs | 10 +- 34 files changed, 1559 insertions(+), 1223 deletions(-) delete mode 100644 robotica-backend/src/entities.rs create mode 100644 robotica-backend/src/pipes/generic/mod.rs create mode 100644 robotica-backend/src/pipes/generic/receiver.rs create mode 100644 robotica-backend/src/pipes/generic/sender.rs create mode 100644 robotica-backend/src/pipes/mod.rs create mode 100644 robotica-backend/src/pipes/stateful/mod.rs create mode 100644 robotica-backend/src/pipes/stateful/receiver.rs create mode 100644 robotica-backend/src/pipes/stateful/sender.rs create mode 100644 robotica-backend/src/pipes/stateless/mod.rs create mode 100644 robotica-backend/src/pipes/stateless/receiver.rs create mode 100644 robotica-backend/src/pipes/stateless/sender.rs diff --git a/Cargo.lock b/Cargo.lock index 053406ef..96509063 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3992,6 +3992,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arc-swap", + "async-trait", "axum", "axum-core", "axum-sessions", diff --git a/brian-backend/src/amber.rs b/brian-backend/src/amber.rs index a52c5da8..c3373cd7 100644 --- a/brian-backend/src/amber.rs +++ b/brian-backend/src/amber.rs @@ -8,10 +8,8 @@ use tokio::time::{interval, sleep_until, Instant, MissedTickBehavior}; use tracing::{debug, error, info}; use robotica_backend::{ - entities::{self, StatelessReceiver}, - get_env, is_debug_mode, - services::persistent_state::PersistentStateRow, - spawn, EnvironmentError, + get_env, is_debug_mode, pipes::stateful, services::persistent_state::PersistentStateRow, spawn, + EnvironmentError, }; use robotica_common::datetime::{ convert_date_time_to_utc_or_default, duration_from_hms, utc_now, Date, DateTime, Duration, Time, @@ -75,7 +73,7 @@ const fn hours(num: u16) -> u16 { /// /// Returns an `AmberError` if the required environment variables are not set. /// -pub fn run(state: &State) -> Result, Error> { +pub fn run(state: &State) -> Result, Error> { let token = get_env("AMBER_TOKEN")?; let site_id = get_env("AMBER_SITE_ID")?; let influx_url = get_env("INFLUXDB_URL")?; @@ -87,7 +85,7 @@ pub fn run(state: &State) -> Result, Error> { influx_database, }; - let (tx, rx) = entities::create_stateless_entity("amber_summary"); + let (tx, rx) = stateful::create_pipe("amber_summary"); let psr = state .persistent_state_database @@ -354,10 +352,23 @@ pub enum PriceCategory { pub struct PriceSummary { pub category: PriceCategory, pub is_cheap_2hr: bool, - pub per_kwh: f32, + pub c_per_kwh: f32, + pub dc_per_kwh: i32, pub next_update: DateTime, } +// Ignore the c_per_kwh when comparing. +impl PartialEq for PriceSummary { + fn eq(&self, other: &Self) -> bool { + self.category == other.category + && self.is_cheap_2hr == other.is_cheap_2hr + && self.dc_per_kwh == other.dc_per_kwh + && self.next_update == other.next_update + } +} + +impl Eq for PriceSummary {} + // fn prices_to_category(prices: &[PriceResponse], category: Option) -> PriceCategory { // let new_category = prices // .iter() @@ -438,7 +449,7 @@ async fn prices_to_influxdb(config: &Config, prices: &[PriceResponse], summary: let reading = PriceSummaryReading { is_cheap_2hr: summary.is_cheap_2hr, - per_kwh: summary.per_kwh, + per_kwh: summary.c_per_kwh, time: Utc::now(), } .into_query("amber/price_summary"); @@ -536,9 +547,11 @@ impl PriceProcessor { .find(|p| p.interval_type == IntervalType::CurrentInterval) else { error!("No current price found in prices: {prices:?}"); + let default_c_per_kwh: u16 = 100; return PriceSummary { is_cheap_2hr: false, - per_kwh: 100.0, + c_per_kwh: f32::from(default_c_per_kwh), + dc_per_kwh: i32::from(default_c_per_kwh) * 10, next_update: *now + Duration::seconds(30), category: PriceCategory::Expensive, }; @@ -602,10 +615,12 @@ impl PriceProcessor { let category = get_price_category(old_category, current_price.per_kwh); self.category = Some(category); + #[allow(clippy::cast_possible_truncation)] let ps = PriceSummary { category, is_cheap_2hr: is_cheap, - per_kwh: current_price.per_kwh, + c_per_kwh: current_price.per_kwh, + dc_per_kwh: (current_price.per_kwh * 10.0).round() as i32, next_update: current_price.end_time, }; info!("Price summary: {old_category:?} --> {ps:?}"); @@ -841,7 +856,8 @@ mod tests { let summary = pp.prices_to_summary(&now, &prices); assert_eq!(summary.category, PriceCategory::SuperCheap); assert_eq!(summary.is_cheap_2hr, true); - assert_approx_eq!(f32, summary.per_kwh, 0.0); + assert_approx_eq!(f32, summary.c_per_kwh, 0.0); + assert_eq!(summary.dc_per_kwh, 0); assert_eq!(summary.next_update, dt("2020-01-01T01:00:00Z")); let ds = &pp.day; assert_eq!(ds.cheap_power_for_day, Duration::minutes(0)); @@ -866,7 +882,8 @@ mod tests { let summary = pp.prices_to_summary(&now, &prices); assert_eq!(summary.category, PriceCategory::SuperCheap); assert_eq!(summary.is_cheap_2hr, false); - assert_approx_eq!(f32, summary.per_kwh, 0.0); + assert_approx_eq!(f32, summary.c_per_kwh, 0.0); + assert_eq!(summary.dc_per_kwh, 0); assert_eq!(summary.next_update, dt("2020-01-01T01:30:00Z")); let ds = pp.day; assert_eq!(ds.cheap_power_for_day, Duration::minutes(45)); diff --git a/brian-backend/src/delays.rs b/brian-backend/src/delays.rs index feecd4e3..7b993943 100644 --- a/brian-backend/src/delays.rs +++ b/brian-backend/src/delays.rs @@ -1,8 +1,10 @@ -use std::fmt::Debug; use std::time::Duration; // use tracing::debug; -use robotica_backend::{entities::Data, spawn}; +use robotica_backend::{ + pipes::{stateful, Subscriber}, + spawn, +}; use tokio::{ select, time::{self, sleep_until, Instant, Interval}, @@ -39,26 +41,24 @@ pub struct DelayInputOptions { pub fn delay_input( name: &str, duration: Duration, - rx: robotica_backend::entities::Receiver, - is_active: impl Fn(&T::Received) -> bool + Send + 'static, + rx: stateful::Receiver, + is_active: impl Fn(&stateful::OldNewType) -> bool + Send + 'static, options: DelayInputOptions, -) -> robotica_backend::entities::Receiver +) -> stateful::Receiver where - T: Data + Send + 'static, - T::Sent: Send + Sync, - T::Received: Clone + Debug + Send + Sync + Eq + 'static, + T: Clone + Eq + Send + Sync + 'static, { - let (tx_out, rx_out) = T::new_entity(name); + let (tx_out, rx_out) = stateful::create_pipe(name); spawn(async move { let mut state = DelayInputState::Idle; let mut s = rx.subscribe().await; loop { select! { - Ok(v) = s.recv_value() => { + Ok(v) = s.recv_old_new() => { // debug!("delay received: {:?}", v); let active_value = is_active(&v); - let v = T::received_to_sent(v); + let (_, v) = v; match (active_value, &state) { (false, _) => { state = DelayInputState::Idle; @@ -112,25 +112,23 @@ where pub fn delay_repeat( name: &str, duration: Duration, - rx: robotica_backend::entities::Receiver, - is_active: impl Fn(&T::Received) -> bool + Send + 'static, -) -> robotica_backend::entities::Receiver + rx: stateful::Receiver, + is_active: impl Fn(&stateful::OldNewType) -> bool + Send + 'static, +) -> stateful::Receiver where - T: Data + Send + 'static, - T::Sent: Send + Clone, - T::Received: Clone + Debug + Send + Sync + Eq + 'static, + T: Clone + Eq + Send + 'static, { - let (tx_out, rx_out) = T::new_entity(name); + let (tx_out, rx_out) = stateful::create_pipe(name); spawn(async move { let mut state = DelayRepeatState::Idle; let mut s = rx.subscribe().await; loop { select! { - Ok(v) = s.recv_value() => { + Ok(v) = s.recv_old_new() => { // debug!("delay received: {:?}", v); let active_value = is_active(&v); - let v = T::received_to_sent(v); + let (_, v)= v; match (active_value, state) { (false, _) => { diff --git a/brian-backend/src/environment_monitor.rs b/brian-backend/src/environment_monitor.rs index 61ece7f9..95b1a0a3 100644 --- a/brian-backend/src/environment_monitor.rs +++ b/brian-backend/src/environment_monitor.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use influxdb::{Client, InfluxDbWriteable}; +use robotica_backend::pipes::{Subscriber, Subscription}; use robotica_backend::{get_env, is_debug_mode, spawn, EnvironmentError}; use robotica_common::anavi_thermometer::{self as anavi}; use robotica_common::mqtt::{Json, MqttMessage}; @@ -81,7 +82,9 @@ where Json: TryFrom, as TryFrom>::Error: Send + std::error::Error, { - let rx = state.subscriptions.subscribe_into::>(topic); + let rx = state + .subscriptions + .subscribe_into_stateless::>(topic); let topic = topic.to_string(); let influx_url = get_env("INFLUXDB_URL")?; let influx_database = get_env("INFLUXDB_DATABASE")?; @@ -108,7 +111,7 @@ where pub fn monitor_fishtank(state: &mut State, topic: &str) -> Result<(), EnvironmentError> { let rx = state .subscriptions - .subscribe_into::>(topic); + .subscribe_into_stateless::>(topic); let topic = topic.to_string(); let influx_url = get_env("INFLUXDB_URL")?; let influx_database = get_env("INFLUXDB_DATABASE")?; diff --git a/brian-backend/src/ha.rs b/brian-backend/src/ha.rs index f89390eb..26fac503 100644 --- a/brian-backend/src/ha.rs +++ b/brian-backend/src/ha.rs @@ -1,6 +1,7 @@ use monostate::MustBe; -use robotica_backend::entities::create_stateless_entity; -use robotica_backend::entities::StatelessSender; +use robotica_backend::pipes::stateless; +use robotica_backend::pipes::Subscriber; +use robotica_backend::pipes::Subscription; use robotica_backend::services::mqtt::MqttTx; use robotica_common::mqtt::MqttMessage; use robotica_common::mqtt::QoS; @@ -90,8 +91,8 @@ pub fn string_to_message(msg: &MessageCommand) -> MqttMessage { MqttMessage::new(topic, payload, false, QoS::ExactlyOnce) } -pub fn create_message_sink(mqtt: MqttTx) -> StatelessSender { - let (tx, rx) = create_stateless_entity::("messages"); +pub fn create_message_sink(mqtt: MqttTx) -> stateless::Sender { + let (tx, rx) = stateless::create_pipe::("messages"); tokio::spawn(async move { let mut rx = rx.subscribe().await; while let Ok(msg) = rx.recv().await { diff --git a/brian-backend/src/hdmi.rs b/brian-backend/src/hdmi.rs index 2242bab8..05e37b15 100644 --- a/brian-backend/src/hdmi.rs +++ b/brian-backend/src/hdmi.rs @@ -1,8 +1,9 @@ +use robotica_backend::pipes::{Subscriber, Subscription}; use thiserror::Error; use tokio::select; use tracing::debug; -use robotica_backend::{devices::hdmi::Command, entities, spawn}; +use robotica_backend::{devices::hdmi::Command, pipes::stateless, spawn}; use robotica_common::mqtt::{Json, MqttMessage, QoS}; use robotica_common::robotica::commands; @@ -25,10 +26,10 @@ pub fn run(state: &mut State, location: &str, device: &str, addr: &str) { let command_rx = state .subscriptions - .subscribe_into::>(&topic); + .subscribe_into_stateless::>(&topic); let name = id.get_name("hdmi"); - let (tx, rx) = entities::create_stateless_entity(name); + let (tx, rx) = stateless::create_pipe(name); spawn(async move { let mut rx_s = command_rx.subscribe().await; diff --git a/brian-backend/src/lights.rs b/brian-backend/src/lights.rs index 815dfc46..f09fca20 100644 --- a/brian-backend/src/lights.rs +++ b/brian-backend/src/lights.rs @@ -6,10 +6,7 @@ use std::{ use robotica_backend::{ devices::lifx::{device_entity, Device, DeviceConfig}, - entities::{ - self, create_stateful_entity, StatefulReceiver, StatefulSender, StatefulSubscription, - StatelessReceiver, - }, + pipes::{stateful, stateless, Subscriber, Subscription}, services::{mqtt::MqttTx, persistent_state::PersistentStateRow}, spawn, }; @@ -25,7 +22,7 @@ use tracing::{debug, error}; trait GetSceneEntity { type Scenes; - fn get_scene_entity(&self, scene: Self::Scenes) -> StatefulReceiver; + fn get_scene_entity(&self, scene: Self::Scenes) -> stateful::Receiver; } trait ScenesTrait: FromStr + ToString + Default {} @@ -81,13 +78,13 @@ impl ScenesTrait for StandardScenes {} #[derive(Clone)] pub struct SharedEntities { - on: StatefulReceiver, - rainbow: StatefulReceiver, - busy: StatefulReceiver, - akira_night: StatefulReceiver, - declan_night: StatefulReceiver, - nikolai_night: StatefulReceiver, - off: StatefulReceiver, + on: stateful::Receiver, + rainbow: stateful::Receiver, + busy: stateful::Receiver, + akira_night: stateful::Receiver, + declan_night: stateful::Receiver, + nikolai_night: stateful::Receiver, + off: stateful::Receiver, } impl Default for SharedEntities { @@ -134,14 +131,14 @@ impl Default for SharedEntities { #[derive(Clone)] struct StandardSceneEntities { - on: StatefulReceiver, - auto: StatefulReceiver, - rainbow: StatefulReceiver, - busy: StatefulReceiver, - akira_night: StatefulReceiver, - declan_night: StatefulReceiver, - nikolai_night: StatefulReceiver, - off: StatefulReceiver, + on: stateful::Receiver, + auto: stateful::Receiver, + rainbow: stateful::Receiver, + busy: stateful::Receiver, + akira_night: stateful::Receiver, + declan_night: stateful::Receiver, + nikolai_night: stateful::Receiver, + off: stateful::Receiver, } impl StandardSceneEntities { @@ -171,7 +168,7 @@ const fn flash_color() -> PowerColor { impl GetSceneEntity for StandardSceneEntities { type Scenes = StandardScenes; - fn get_scene_entity(&self, scene: Self::Scenes) -> StatefulReceiver { + fn get_scene_entity(&self, scene: Self::Scenes) -> stateful::Receiver { match scene { StandardScenes::On => self.on.clone(), StandardScenes::Auto => self.auto.clone(), @@ -185,14 +182,14 @@ impl GetSceneEntity for StandardSceneEntities { } } -fn static_entity(pc: PowerColor, name: impl Into) -> StatefulReceiver { - let (tx, rx) = entities::create_stateful_entity(name); +fn static_entity(pc: PowerColor, name: impl Into) -> stateful::Receiver { + let (tx, rx) = stateful::create_pipe(name); tx.try_send(pc); rx } -fn busy_entity(name: impl Into) -> StatefulReceiver { - let (tx, rx) = entities::create_stateful_entity(name); +fn busy_entity(name: impl Into) -> stateful::Receiver { + let (tx, rx) = stateful::create_pipe(name); spawn(async move { loop { let on_color = HSBK { @@ -221,8 +218,8 @@ fn busy_entity(name: impl Into) -> StatefulReceiver { rx } -fn rainbow_entity(name: impl Into) -> StatefulReceiver { - let (tx, rx) = entities::create_stateful_entity(name); +fn rainbow_entity(name: impl Into) -> stateful::Receiver { + let (tx, rx) = stateful::create_pipe(name); spawn(async move { let mut i = 0u16; let num_per_cycle = 10u16; @@ -256,7 +253,7 @@ fn mqtt_entity( state: &mut crate::State, topic_substr: &str, name: impl Into, -) -> StatefulReceiver { +) -> stateful::Receiver { let name = name.into(); let topic: String = format!("command/{topic_substr}/{name}"); @@ -265,7 +262,7 @@ fn mqtt_entity( .subscribe_into_stateful::>(&topic) .map(|(_, Json(c))| c); - let (tx, rx) = entities::create_stateful_entity(name); + let (tx, rx) = stateful::create_pipe(name); spawn(async move { let mut pc_s = pc_rx.subscribe().await; while let Ok(pc) = pc_s.recv().await { @@ -277,13 +274,13 @@ fn mqtt_entity( pub fn run_auto_light( state: &mut crate::State, - discover: StatelessReceiver, + discover: stateless::Receiver, shared: SharedEntities, topic_substr: &str, id: u64, ) { let entities = StandardSceneEntities::default(state, shared, topic_substr); - let (tx_state, rx_state) = entities::create_stateful_entity(format!("{id}-state")); + let (tx_state, rx_state) = stateful::create_pipe(format!("{id}-state")); let rx = switch_entity( state, entities, @@ -299,7 +296,7 @@ pub fn run_auto_light( fn run_state_sender( state: &mut crate::State, topic_substr: impl Into, - rx_state: StatefulReceiver, + rx_state: stateful::Receiver, ) { let topic_substr = topic_substr.into(); @@ -334,12 +331,12 @@ fn run_state_sender( pub fn run_passage_light( state: &mut crate::State, - discover: StatelessReceiver, + discover: stateless::Receiver, shared: SharedEntities, topic_substr: &str, id: u64, ) { - let (tx_state, rx_state) = entities::create_stateful_entity(format!("{id}-state")); + let (tx_state, rx_state) = stateful::create_pipe(format!("{id}-state")); let all_topic_substr = topic_substr.to_string(); let cupboard_topic_substr = format!("{topic_substr}/split/cupboard"); @@ -401,12 +398,12 @@ where { scene: Entities::Scenes, entities: Entities, - entity: StatefulReceiver, - entity_s: StatefulSubscription, + entity: stateful::Receiver, + entity_s: stateful::Subscription, psr: PersistentStateRow, mqtt: MqttTx, topic_substr: String, - tx: StatefulSender, + tx: stateful::Sender, flash_color: PowerColor, } @@ -416,17 +413,19 @@ fn switch_entity( topic_substr: impl Into, flash_color: PowerColor, name: impl Into, -) -> StatefulReceiver +) -> stateful::Receiver where Entities: GetSceneEntity + Send + Sync + 'static, Entities::Scenes: ScenesTrait + Copy + Send + Sync + 'static, ::Err: Send + Sync + 'static, { - let (tx, rx) = entities::create_stateful_entity(name); + let (tx, rx) = stateful::create_pipe(name); let topic_substr: String = topic_substr.into(); let topic = &format!("command/{topic_substr}"); - let rx_command = state.subscriptions.subscribe_into::>(topic); + let rx_command = state + .subscriptions + .subscribe_into_stateless::>(topic); { let psr = state.persistent_state_database.for_name(&topic_substr); @@ -570,30 +569,30 @@ fn send_scene(mqtt: &MqttTx, scene: &Scene, topic_substr: &s } struct PassageEntities { - all: StatefulReceiver, - cupboard: StatefulReceiver, - bathroom: StatefulReceiver, - bedroom: StatefulReceiver, + all: stateful::Receiver, + cupboard: stateful::Receiver, + bathroom: stateful::Receiver, + bedroom: stateful::Receiver, } struct PassageStateEntities { - all: StatefulReceiver, - cupboard: StatefulReceiver, - bathroom: StatefulReceiver, - bedroom: StatefulReceiver, + all: stateful::Receiver, + cupboard: stateful::Receiver, + bathroom: stateful::Receiver, + bedroom: stateful::Receiver, } fn run_passage_multiplexer( entities: PassageEntities, name: impl Into, - state_in: StatefulReceiver, -) -> (StatefulReceiver, PassageStateEntities) { + state_in: stateful::Receiver, +) -> (stateful::Receiver, PassageStateEntities) { let name = name.into(); - let (tx, rx) = create_stateful_entity(name.clone()); - let (tx_all_state, rx_all_state) = create_stateful_entity(format!("{name}-all")); - let (tx_cupboard_state, rx_cupboard_state) = create_stateful_entity(format!("{name}-cupboard")); - let (tx_bathroom_state, rx_bathroom_state) = create_stateful_entity(format!("{name}-bathroom")); - let (tx_bedroom_state, rx_bedroom_state) = create_stateful_entity(format!("{name}-bathroom")); + let (tx, rx) = stateful::create_pipe(name.clone()); + let (tx_all_state, rx_all_state) = stateful::create_pipe(format!("{name}-all")); + let (tx_cupboard_state, rx_cupboard_state) = stateful::create_pipe(format!("{name}-cupboard")); + let (tx_bathroom_state, rx_bathroom_state) = stateful::create_pipe(format!("{name}-bathroom")); + let (tx_bedroom_state, rx_bedroom_state) = stateful::create_pipe(format!("{name}-bathroom")); spawn(async move { let mut all = entities.all.subscribe().await; diff --git a/brian-backend/src/main.rs b/brian-backend/src/main.rs index 642e967b..5c11a3ac 100644 --- a/brian-backend/src/main.rs +++ b/brian-backend/src/main.rs @@ -20,7 +20,7 @@ use ha::MessageCommand; use lights::{run_auto_light, run_passage_light, SharedEntities}; use robotica_backend::devices::lifx::DiscoverConfig; use robotica_backend::devices::{fake_switch, lifx}; -use robotica_backend::entities::StatelessSender; +use robotica_backend::pipes::stateless; use robotica_backend::scheduling::executor::executor; use robotica_backend::services::persistent_state::PersistentStateDatabase; use tracing::{debug, info}; @@ -66,7 +66,7 @@ pub struct State { subscriptions: Subscriptions, #[allow(dead_code)] mqtt: MqttTx, - message_sink: StatelessSender, + message_sink: stateless::Sender, persistent_state_database: PersistentStateDatabase, } @@ -77,7 +77,7 @@ async fn setup_pipes(state: &mut State) { price_summary_rx .clone() - .map_stateful(|current| current.is_cheap_2hr) + .map(|(_, current)| current.is_cheap_2hr) .for_each(move |(old, current)| { if old.is_some() { let message = if current { diff --git a/brian-backend/src/tesla.rs b/brian-backend/src/tesla.rs index cb0527bd..d3e52d48 100644 --- a/brian-backend/src/tesla.rs +++ b/brian-backend/src/tesla.rs @@ -16,7 +16,7 @@ use tokio::select; use tokio::time::Interval; use tracing::{debug, error, info}; -use robotica_backend::entities::{create_stateless_entity, StatelessReceiver, StatelessSender}; +use robotica_backend::pipes::{stateful, stateless, Subscriber, Subscription}; use robotica_backend::spawn; use robotica_common::mqtt::{BoolError, Json, MqttMessage, Parsed, QoS}; @@ -244,7 +244,7 @@ pub fn monitor_tesla_doors(state: &mut State, car_number: usize) { let message_sink = state.message_sink.clone(); - let (tx, rx) = create_stateless_entity("tesla_doors"); + let (tx, rx) = stateful::create_pipe("tesla_doors"); spawn(async move { let mut frunk_s = frunk_rx.subscribe().await; @@ -308,16 +308,14 @@ pub fn monitor_tesla_doors(state: &mut State, car_number: usize) { "tesla_doors (delayed)", duration, rx, - |c| !c.is_empty(), + |(_, c)| !c.is_empty(), DelayInputOptions { skip_subsequent_delay: true, }, ); // Discard initial [] value and duplicate events. - let rx = rx - .map_stateful(|f| f) - .filter(|(p, c)| p.is_some() || !c.is_empty()); + let rx = rx.filter(|(p, c)| p.is_some() || !c.is_empty()); // Repeat the last value every 5 minutes. let duration = Duration::from_secs(300); @@ -381,7 +379,7 @@ pub enum MonitorChargingError { fn announce_charging_state( charging_state: ChargingStateEnum, old_tesla_state: &TeslaState, - message_sink: &StatelessSender, + message_sink: &stateless::Sender, ) { let was_plugged_in = old_tesla_state .charging_state @@ -441,7 +439,7 @@ impl TeslaState { pub fn monitor_charging( state: &mut State, car_number: usize, - price_summary_rx: StatelessReceiver, + price_summary_rx: stateful::Receiver, ) -> Result<(), MonitorChargingError> { let tesla_secret = state.persistent_state_database.for_name("tesla_token"); @@ -453,13 +451,15 @@ pub fn monitor_charging( let mqtt = state.mqtt.clone(); let message_sink = state.message_sink.clone(); - let price_category_rx = price_summary_rx.map_stateful(|ps| ps.category); + let price_category_rx = price_summary_rx.map(|(_, ps)| ps.category); let auto_charge_rx = { let mqtt = mqtt.clone(); state .subscriptions - .subscribe_into::>(&format!("command/Tesla/{car_number}/AutoCharge")) + .subscribe_into_stateless::>(&format!( + "command/Tesla/{car_number}/AutoCharge" + )) .map(move |Json(cmd)| { if let Command::Device(cmd) = &cmd { let status = match cmd.action { @@ -476,7 +476,9 @@ pub fn monitor_charging( let mqtt = mqtt.clone(); state .subscriptions - .subscribe_into::>(&format!("command/Tesla/{car_number}/ForceCharge")) + .subscribe_into_stateless::>(&format!( + "command/Tesla/{car_number}/ForceCharge" + )) .map(move |Json(cmd)| { if let Command::Device(cmd) = &cmd { let status = match cmd.action { @@ -492,8 +494,8 @@ pub fn monitor_charging( let is_home_rx = { state .subscriptions - .subscribe_into::(&format!("state/Tesla/{car_number}/Location")) - .map_stateful(move |location| location == "home") + .subscribe_into_stateful::(&format!("state/Tesla/{car_number}/Location")) + .map(move |(_, location)| location == "home") }; let charging_state_rx = state @@ -608,7 +610,7 @@ pub fn monitor_charging( tesla_state.is_at_home = Some(new_is_at_home); } - Ok((old, charging_state)) = charging_state_s.recv_value() => { + Ok((old, charging_state)) = charging_state_s.recv_old_new() => { info!("Charging state: {charging_state:?}"); if old.is_some() { announce_charging_state(charging_state, &tesla_state, &message_sink); diff --git a/robotica-backend/Cargo.toml b/robotica-backend/Cargo.toml index acc354a8..7baf3a43 100644 --- a/robotica-backend/Cargo.toml +++ b/robotica-backend/Cargo.toml @@ -41,6 +41,7 @@ tower = "0.4.13" tower-http = { version = "0.4.0", features = ["fs", "trace"] } arc-swap = "1.4.0" hyper = "0.14.25" +async-trait = "0.1.51" [build-dependencies] lalrpop = "0.20.0" diff --git a/robotica-backend/src/devices/fake_switch.rs b/robotica-backend/src/devices/fake_switch.rs index ed038f0d..dd0f772c 100644 --- a/robotica-backend/src/devices/fake_switch.rs +++ b/robotica-backend/src/devices/fake_switch.rs @@ -9,6 +9,7 @@ use robotica_common::{ use tokio::select; use crate::{ + pipes::{Subscriber, Subscription}, services::mqtt::{MqttTx, Subscriptions}, spawn, }; @@ -17,7 +18,7 @@ use crate::{ pub fn run(subscription: &mut Subscriptions, mqtt: MqttTx, topic_substr: impl Into) { let topic_substr: String = topic_substr.into(); let topic = format!("command/{topic_substr}"); - let rx = subscription.subscribe_into::>(&topic); + let rx = subscription.subscribe_into_stateless::>(&topic); spawn(async move { let mut rx = rx.subscribe().await; diff --git a/robotica-backend/src/devices/hdmi/mod.rs b/robotica-backend/src/devices/hdmi/mod.rs index 1e8534d0..73595ad0 100644 --- a/robotica-backend/src/devices/hdmi/mod.rs +++ b/robotica-backend/src/devices/hdmi/mod.rs @@ -11,8 +11,8 @@ use tokio::{ use tracing::{debug, info}; use crate::{ - entities::{self, StatefulReceiver, StatelessReceiver}, is_debug_mode, + pipes::{stateful, stateless, Subscriber, Subscription}, }; /// A command to send to the HDMI matrix. @@ -48,15 +48,15 @@ pub enum Error { /// This function will return an error if the connection to the HDMI matrix fails. pub fn run( addr: A, - rx_cmd: StatelessReceiver, + rx_cmd: stateless::Receiver, options: &Options, -) -> (StatefulReceiver>, JoinHandle<()>) +) -> (stateful::Receiver>, JoinHandle<()>) where A: ToSocketAddrs + Clone + Send + Sync + Debug + 'static, { let options = options.clone(); let name = format!("{addr:?}"); - let (tx, rx) = entities::create_stateful_entity(name); + let (tx, rx) = stateful::create_pipe(name); let handle = spawn(async move { debug!("hdmi: Starting with addr {addr:?}"); diff --git a/robotica-backend/src/devices/lifx.rs b/robotica-backend/src/devices/lifx.rs index 3eddb1af..08aa1472 100644 --- a/robotica-backend/src/devices/lifx.rs +++ b/robotica-backend/src/devices/lifx.rs @@ -13,7 +13,8 @@ use tokio::{ use tracing::{debug, error, info}; use crate::{ - entities::{self, StatefulReceiver, StatefulSender, StatelessReceiver}, + pipes::stateful, + pipes::{stateless, Subscriber, Subscription}, spawn, }; @@ -50,8 +51,8 @@ pub enum DiscoverError { /// Returns an error if the UDP socket cannot be created. pub async fn discover( config: DiscoverConfig, -) -> Result, DiscoverError> { - let (tx, rx) = entities::create_stateless_entity("lifx"); +) -> Result, DiscoverError> { + let (tx, rx) = stateless::create_pipe("lifx"); let socket = UdpSocket::bind("0.0.0.0:0").await?; socket.set_broadcast(true)?; @@ -451,10 +452,10 @@ pub struct DeviceConfig { /// /// This function will panic if something goes wrong. pub fn device_entity( - rx_pc: StatefulReceiver, - tx_state: StatefulSender, + rx_pc: stateful::Receiver, + tx_state: stateful::Sender, id: u64, - discover: StatelessReceiver, + discover: stateless::Receiver, config: DeviceConfig, ) { let discover = discover.filter(move |d| d.target == id); diff --git a/robotica-backend/src/entities.rs b/robotica-backend/src/entities.rs deleted file mode 100644 index 0c525ffb..00000000 --- a/robotica-backend/src/entities.rs +++ /dev/null @@ -1,1046 +0,0 @@ -//! Stream of data from a source. - -use std::fmt::Formatter; -use std::ops::Deref; - -use thiserror::Error; -use tokio::select; -use tokio::sync::broadcast; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tracing::debug; -use tracing::error; - -use crate::spawn; -use crate::PIPE_SIZE; - -/// A stateless receiver -pub type StatelessReceiver = Receiver>; - -/// A stateless weak receiver that won't hold the sender alive. -pub type StatelessWeakReceiver = WeakReceiver>; - -/// A stateless sender -pub type StatelessSender = Sender>; - -/// A subscription to a stateless receiver -pub type StatelessSubscription = Subscription>; - -/// A stateful receiver -pub type StatefulReceiver = Receiver>; - -/// A stateful weak receiver that won't hold the sender alive. -pub type StatefulWeakReceiver = WeakReceiver>; - -/// A subscription to a stateful receiver -pub type StatefulSender = Sender>; - -/// A subscription to a stateful receiver -pub type StatefulSubscription = Subscription>; - -enum SendMessage { - Set(T), -} - -/// Represents the type of the data, whether stateless or stateful -pub trait Data { - /// The type of data that is sent to the receiver - type Sent; - - /// The type of data that is received from the sender - type Received: Clone; - - /// Create a new entity with the given name - fn new_entity(name: impl Into) -> (Sender, Receiver) - where - Self: Sized; - - /// Convert a received value to a sent value - fn received_to_sent(data: Self::Received) -> Self::Sent; -} - -/// A stateless data type -/// -/// A stateless connection doesn't care about previous values, it just sends the latest value. -#[derive(Clone)] -pub struct StatelessData(T); -impl Data for StatelessData { - type Sent = T; - type Received = T; - - fn new_entity(name: impl Into) -> (Sender, Receiver) { - create_stateless_entity(name) - } - - fn received_to_sent(data: Self::Received) -> Self::Sent { - data - } -} - -impl std::fmt::Debug for StatelessData { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("StatelessData").field(&self.0).finish() - } -} - -impl Deref for StatelessData { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -type StreamableData = (Option, T); - -/// A stateful data type -/// -/// A stateful connection will try to keep track of the current state and sent it to new subscribers. -#[derive(Clone)] -pub struct StatefulData(T); -impl Data for StatefulData { - type Sent = T; - type Received = StreamableData; - - fn new_entity(name: impl Into) -> (Sender, Receiver) { - create_stateful_entity(name) - } - - fn received_to_sent(data: Self::Received) -> Self::Sent { - data.1 - } -} - -impl std::fmt::Debug for StatefulData { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("StatefulData").field(&self.0).finish() - } -} - -#[allow(type_alias_bounds)] -type SubscribeMessage = (broadcast::Receiver, Option); - -enum ReceiveMessage { - Get(oneshot::Sender>), - Subscribe(oneshot::Sender>), -} - -/// Send a value to an entity. -#[derive(Clone)] -pub struct Sender { - #[allow(dead_code)] - name: String, - tx: mpsc::Sender>, -} - -impl Sender { - /// Send data to the entity. - // pub async fn send(&self, data: T) { - // let msg = SendMessage::Set(data); - // if let Err(err) = self.tx.send(msg).await { - // error!("send failed: {}", err); - // } - // } - - /// Send data to the entity or fail if buffer is full. - pub fn try_send(&self, data: T::Sent) { - let msg = SendMessage::Set(data); - if let Err(err) = self.tx.try_send(msg) { - error!("send failed: {}", err); - } - } - - /// Is the remote end of the channel closed? - #[must_use] - pub fn is_closed(&self) -> bool { - self.tx.is_closed() - } - - /// Completes when the entity is closed. - pub async fn closed(&self) - where - T: Send, - T::Sent: Send, - { - self.tx.closed().await; - } -} - -/// A `Receiver` that doesn't count as a reference to the entity. -pub struct WeakReceiver { - name: String, - tx: mpsc::WeakSender>, -} - -impl WeakReceiver { - /// Try to convert to a `Receiver`. - #[must_use] - pub fn upgrade(&self) -> Option> { - self.tx.upgrade().map(|tx| Receiver { - name: self.name.clone(), - tx, - }) - } -} - -/// Receive a value from an entity. -#[derive(Debug, Clone)] -pub struct Receiver { - name: String, - tx: mpsc::Sender>, -} - -impl Receiver -where - T::Received: Send, -{ - /// Subscribe to this entity. - /// - /// Returns an already closed subscription if the entity is closed. - pub async fn subscribe(&self) -> Subscription { - let (tx, rx) = oneshot::channel(); - let msg = ReceiveMessage::Subscribe(tx); - if let Err(err) = self.tx.send(msg).await { - error!("{}: subscribe/send failed: {}", self.name, err); - return Subscription::null(self.tx.clone()); - }; - if let Ok((rx, initial)) = rx.await { - Subscription { - rx, - _tx: self.tx.clone(), - initial, - } - } else { - error!("{}: subscribe/await failed", self.name); - Subscription::null(self.tx.clone()) - } - } - - /// Translate this receiver into a another type using a stateless receiver. - #[must_use] - fn translate_anything(self) -> Receiver - where - T: 'static, - T::Sent: Send + 'static, - T::Received: Send + 'static, - U: Data + Send + 'static, - U::Sent: TryFrom + Clone + Send + 'static, - >::Error: Send + std::error::Error, - { - let name = format!("{} (translate_anything)", self.name); - let (tx, rx) = U::new_entity(&name); - - spawn(async move { - let mut sub = self.subscribe().await; - - loop { - select! { - data = sub.recv_value() => { - let data = match data { - Ok(data) => T::received_to_sent(data), - Err(err) => { - debug!("{name}: recv failed, exiting: {err}"); - break; - } - }; - - match U::Sent::try_from(data) { - Ok(data) => { - tx.try_send(data); - } - Err(err) => { - error!("{name}: parse failed: {err}"); - } - } - } - - _ = tx.closed() => { - debug!("{name}: source closed"); - break; - } - } - } - }); - - rx - } - - /// Translate this receiver into a another type using a stateful receiver. - // #[must_use] - // pub fn translate_into_stateful(self) -> Receiver> - // where - // T: Send + 'static, - // U: TryFrom + Clone + Eq + Send + 'static, - // >::Error: Send + std::error::Error, - // { - // let name = format!("{} (translate_into_stateful)", self.name); - // let (tx, rx) = create_stateful_entity(&name); - - // spawn(async move { - // let mut sub = self.subscribe().await; - - // loop { - // select! { - // data = sub.recv() => { - // let data = match data { - // Ok(data) => data, - // Err(err) => { - // debug!("{}: translate_into_stateful({}): recv failed, exiting", name, err); - // break; - // } - // }; - - // match U::try_from(data) { - // Ok(data) => { - // tx.try_send(data); - // } - // Err(err) => { - // error!("translate_into_stateful({}): parse failed: {}", name, err); - // } - // } - // } - - // _ = tx.closed() => { - // debug!("translate_into_stateful({}): source closed", name); - // break; - // } - // } - // } - // }); - - // rx - // } - - /// Map this receiver into a another type using a stateless receiver. - // #[must_use] - // pub fn map_into_stateless(self, f: impl Fn(T) -> U + Send + 'static) -> Receiver - // where - // T: Send + 'static, - // U: Clone + Send + 'static, - // { - // let name = format!("{} (map_into_stateless)", self.name); - // let (tx, rx) = create_stateless_entity(name); - // self.map(tx, f); - // rx - // } - - /// Map this receiver into a another type using a stateful receiver. - // #[must_use] - // pub fn map_into_stateful( - // self, - // f: impl Fn(T) -> U + Send + 'static, - // ) -> Receiver> - // where - // T: Send + 'static, - // U: Clone + Eq + Send + 'static, - // { - // let name = format!("{} (map_into_stateful)", self.name); - // let (tx, rx) = create_stateful_entity(name); - // self.map(tx, f); - // rx - // } - - /// Map this receiver into a another type using a any type of receiver. - fn map_into_anything( - self, - f: impl Fn(T::Received) -> U::Sent + Send + 'static, - ) -> Receiver - where - T: 'static, - T::Sent: Send + 'static, - T::Received: Send + 'static, - U: Data + Send + 'static, - U::Sent: Send + 'static, - U::Received: Send + 'static, - { - let name = format!("{} (map_into_anything)", self.name); - let (tx, rx) = U::new_entity(&name); - - spawn(async move { - let mut sub = self.subscribe().await; - - loop { - select! { - data = sub.recv_value() => { - let data = match data { - Ok(data) => data, - Err(err) => { - debug!("{name}: recv failed, exiting: {err}"); - break; - } - }; - - tx.try_send(f(data)); - } - - _ = tx.closed() => { - debug!("{name}: dest closed"); - break; - } - } - } - }); - - rx - } - - // fn filter(self, f: impl Fn(&T) -> bool + Send + 'static) -> Receiver - // where - // T: Data, - // { - // let name = format!("{} (map_into_stateless)", self.name); - // // let (tx, rx) = create_stateless_entity(name); - // // rx - // todo!(); - // } - - /// Map this receiver into a another type using a stateless receiver. - // #[must_use] - // pub fn filter_into_stateless(self, f: impl Fn(&T) -> bool + Send + 'static) -> Receiver - // where - // T: Send + 'static, - // { - // let name = format!("{} (map_into_stateless)", self.name); - // let (tx, rx) = create_stateless_entity(name); - // self.filter(tx, f); - // rx - // } - - /// Map this receiver into a another type using a stateful receiver. - // #[must_use] - // pub fn filter_into_stateful( - // self, - // f: impl Fn(&T) -> bool + Send + 'static, - // ) -> Receiver> - // where - // T: Send + Eq + 'static, - // { - // let name = format!("{} (map_into_stateful)", self.name); - // let (tx, rx) = create_stateful_entity(name); - // self.filter(tx, f); - // rx - // } - - /// Filter this receiver based on function result - #[must_use] - pub fn filter(self, f: impl Fn(&T::Received) -> bool + Send + 'static) -> Receiver - where - T: 'static, - T::Received: Send + 'static, - T::Sent: Send + 'static, - { - let name = format!("{} (filter_into_anything)", self.name); - let (tx, rx) = T::new_entity(&name); - - spawn(async move { - let mut sub = self.subscribe().await; - - loop { - select! { - data = sub.recv_value() => { - let data = match data { - Ok(data) => data, - Err(err) => { - debug!("{name}: recv failed, exiting: {err}"); - break; - } - }; - - if f(&data) { - tx.try_send(T::received_to_sent(data)); - } - } - - _ = tx.closed() => { - debug!("{name}: dest closed"); - break; - } - } - } - }); - rx - } - - /// Run a function for every value received. - pub fn for_each_value(self, f: F) - where - F: Fn(T::Received) + Send + 'static, - T: 'static, - T::Sent: Send + 'static, - T::Received: Send + 'static, - { - spawn(async move { - let mut sub = self.subscribe().await; - - loop { - while let Ok(data) = sub.recv_value().await { - f(data); - } - } - }); - } - - /// Completes when the entity is closed. - pub async fn closed(&self) { - self.tx.closed().await; - } - - /// Create a new `WeakReceiver` from this Receiver. - #[must_use] - pub fn downgrade(&self) -> WeakReceiver { - WeakReceiver { - tx: self.tx.downgrade(), - name: self.name.clone(), - } - } - - /// Run a function for every value received. - pub fn for_each(self, f: F) - where - F: Fn(T::Received) + Send + 'static, - T: Send + 'static, - { - spawn(async move { - let mut sub = self.subscribe().await; - - loop { - while let Ok(data) = sub.recv_value().await { - f(data); - } - } - }); - } -} - -impl Receiver> { - // /// Get the most recent value from the entity. - // pub async fn get(&self) -> Option { - // self.get_value().await - // } - - // pub fn for_each(self, f: F) - // where - // F: Fn(T) + Send + 'static, - // T: Send + 'static, - // { - // self.for_each_value(f) - // } - - // pub fn new(name: impl Into) -> (Sender>, Receiver>) - // where - // T: 'static, - // { - // create_stateless_entity(name) - // } - - /// Subscribe to this entity and translate the data. - // pub async fn subscribe_into(&self) -> Subscription> - // where - // T: Send + 'static, - // U: TryFrom + Clone + Send + 'static, - // >::Error: Send + std::error::Error, - // { - // let s = self.translate_into::(); - // s.subscribe().await - // } - - /// Translate this receiver into a another stateless receiver using a stateless receiver. - #[must_use] - pub fn translate(self) -> Receiver> - where - T: Send + 'static, - U: TryFrom + Clone + Send + 'static, - >::Error: Send + std::error::Error, - { - self.translate_anything() - } - - /// Translate this receiver into a stateful receiver using a stateless receiver. - #[must_use] - pub fn translate_stateful(self) -> Receiver> - where - T: Send + 'static, - U: TryFrom + Clone + Send + Eq + 'static, - >::Error: Send + std::error::Error, - { - self.translate_anything() - } - - /// Translate this receiver into a another type using a stateless receiver. - #[must_use] - pub fn into_stateful(self) -> Receiver> - where - T: Eq + 'static, - { - self.translate_anything() - } - - /// Translate this receiver into a another type using a stateless receiver. - #[must_use] - pub fn map(self, f: impl Fn(T) -> U + Send + 'static) -> Receiver> - where - T: 'static, - U: Clone + Send + 'static, - // T::Sent: Send + 'static, - // T::Received: Send + 'static, - // U::Sent: Send + 'static, - // U::Received: Send + 'static, - { - self.map_into_anything(f) - } - - /// Translate this receiver into a another type using a stateful receiver. - #[must_use] - pub fn map_stateful(self, f: impl Fn(T) -> U + Send + 'static) -> Receiver> - where - T: 'static, - U: Clone + Send + 'static + Eq, - // T::Sent: Send + 'static, - // T::Received: Send + 'static, - // U::Sent: Send + 'static, - // U::Received: Send + 'static, - { - self.map_into_anything(f) - } - - // #[must_use] - // pub fn filter(self, f: impl Fn(&T) -> bool + Send + 'static) -> Receiver> - // where - // T: Eq, // T::Received: Send + 'static, - // // T::Sent: Send + 'static, - // { - // self.filter_into_anything(f); - // } -} - -// impl Receiver> { -// /// Translate this receiver into a another type using a stateful receiver. -// #[must_use] -// pub fn into_stateful(self) -> Receiver> { -// self.translate_into_anything() -// } -// } - -impl Receiver> { - // pub fn new(name: impl Into) -> (Sender>, Receiver>) { - // create_stateful_entity(name) - // } - - /// Retrieve the most recent value from the entity. - /// - /// Returns `None` if the entity is closed. - pub async fn get_value(&self) -> Option> { - let (tx, rx) = oneshot::channel(); - let msg = ReceiveMessage::Get(tx); - if let Err(err) = self.tx.send(msg).await { - error!("{}: get/send failed: {}", self.name, err); - return None; - }; - (rx.await).map_or_else( - |_| { - error!("{}: get/await failed", self.name); - None - }, - |v| v, - ) - } - - /// Get the most recent value from the entity. - pub async fn get(&self) -> Option { - self.get_value().await.map(|(_prev, current)| current) - } - - /// Subscribe to this entity and translate the data. - // pub async fn subscribe_into(&self) -> Subscription> - // where - // T: Send + 'static, - // U: TryFrom + Clone + Send + Eq + 'static, - // >::Error: Send + std::error::Error, - // { - // let s = self.translate_into::(); - // s.subscribe().await - // } - - /// Translate this receiver into a stateful receiver using a stateful receiver. - #[must_use] - pub fn translate(self) -> Receiver> - where - T: Send + 'static, - U: TryFrom + Clone + Send + Eq + 'static, - >::Error: Send + std::error::Error, - { - self.translate_anything() - } - - /// Map this receiver into another type using a stateful receiver. - #[must_use] - pub fn map( - self, - f: impl Fn(StreamableData) -> U + Send + 'static, - ) -> Receiver> - where - T: 'static, - U: Clone + Send + 'static + Eq, - // T::Sent: Send + 'static, - // T::Received: Send + 'static, - // U::Sent: Send + 'static, - // U::Received: Send + 'static, - { - self.map_into_anything(f) - } -} - -/// Something went wrong in Receiver. -#[derive(Error, Debug)] -pub enum RecvError { - /// The Pipe was closed. - #[error("The pipe was closed")] - Closed, -} - -/// A subscription to receive data from an entity. -pub struct Subscription { - rx: broadcast::Receiver, - // We need to keep this to ensure connection stays alive. - _tx: mpsc::Sender>, - initial: Option, -} - -impl Subscription { - /// Create a null subscription that is already closed. - fn null(tx: mpsc::Sender>) -> Self { - let (_tx, rx) = broadcast::channel(0); - Self { - rx, - _tx: tx, - initial: None, - } - } -} - -impl Subscription -where - T: Data, - T::Received: Send, -{ - /// Wait for the next value from the entity. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub async fn recv_value(&mut self) -> Result { - let initial = self.initial.take(); - if let Some(initial) = initial { - return Ok(initial); - } - loop { - match self.rx.recv().await { - Ok(v) => return Ok(v), - Err(err) => match err { - broadcast::error::RecvError::Closed => return Err(RecvError::Closed), - broadcast::error::RecvError::Lagged(_) => { - error!("recv failed: The pipe was lagged"); - } - }, - } - } - } - - /// Get the next value but don't wait for it. Returns `None` if there is no value. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub fn try_recv_value(&mut self) -> Result, RecvError> { - let initial = self.initial.take(); - if let Some(initial) = initial { - return Ok(Some(initial)); - } - loop { - match self.rx.try_recv() { - Ok(v) => return Ok(Some(v)), - Err(err) => match err { - broadcast::error::TryRecvError::Closed => { - return Err(RecvError::Closed); - } - broadcast::error::TryRecvError::Empty => return Ok(None), - broadcast::error::TryRecvError::Lagged(_) => { - error!("try_recv failed: The pipe was lagged"); - } - }, - } - } - } -} - -impl Subscription> { - /// Wait for the next value from the entity. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub async fn recv(&mut self) -> Result { - self.recv_value().await - } - - /// Get the next value but don't wait for it. Returns `None` if there is no value. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub fn try_recv(&mut self) -> Result, RecvError> { - self.try_recv_value() - } -} - -impl Subscription> { - /// Wait for the next value from the entity. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub async fn recv(&mut self) -> Result { - self.recv_value().await.map(|(_prev, current)| current) - } - - /// Get the next value but don't wait for it. Returns `None` if there is no value. - /// - /// # Errors - /// - /// Returns `RecvError::Closed` if the entity is closed. - pub fn try_recv(&mut self) -> Result, RecvError> { - self.try_recv_value() - .map(|opt| opt.map(|(_prev, current)| current)) - } -} - -/// Create a stateless entity that sends every message. -#[must_use] -pub fn create_stateless_entity( - name: impl Into, -) -> (Sender>, Receiver>) { - let (send_tx, mut send_rx) = mpsc::channel::>(PIPE_SIZE); - let (receive_tx, receive_rx) = mpsc::channel::>>(PIPE_SIZE); - let (out_tx, out_rx) = broadcast::channel::(PIPE_SIZE); - - drop(out_rx); - - let name = name.into(); - - let sender: Sender> = Sender { - tx: send_tx, - name: name.clone(), - }; - let receiver: Receiver> = Receiver { - tx: receive_tx, - name: name.clone(), - }; - - spawn(async move { - let name = name; - let mut receive_rx = Some(receive_rx); - - loop { - select! { - Some(msg) = send_rx.recv() => { - match msg { - SendMessage::Set(data) => { - if let Err(_err) = out_tx.send(data) { - // It is not an error if there are no subscribers. - // debug!("create_stateless_entity({name}): send to broadcast failed: {err} (not an error)"); - } - } - } - } - Some(msg) = try_receive(&mut receive_rx) => { - match msg { - Some(ReceiveMessage::Get(tx)) => { - if tx.send(None).is_err() { - error!("create_stateless_entity{name}): get send failed"); - }; - } - Some(ReceiveMessage::Subscribe(tx)) => { - let rx = out_tx.subscribe(); - if tx.send((rx, None)).is_err() { - error!("create_stateless_entity({name}): subscribe send failed"); - }; - } - None => { - debug!("create_stateless_entity({name}): command channel closed"); - receive_rx = None; - } - } - } - else => { - debug!("create_stateless_entity({name}): all inputs closed"); - break; - } - } - - if matches!((&receive_rx, out_tx.receiver_count()), (None, 0)) { - debug!( - "create_stateless_entity({name}): receiver closed and all subscriptions closed" - ); - break; - } - } - }); - - (sender, receiver) -} - -/// Create a stateful entity that only produces messages when there is a change. -#[must_use] -pub fn create_stateful_entity( - name: impl Into, -) -> (Sender>, Receiver>) { - let (send_tx, mut send_rx) = mpsc::channel::>(PIPE_SIZE); - let (receive_tx, receive_rx) = mpsc::channel::>>(PIPE_SIZE); - let (out_tx, out_rx) = broadcast::channel::>(PIPE_SIZE); - - drop(out_rx); - - let name = name.into(); - - let sender = Sender { - tx: send_tx, - name: name.clone(), - }; - let receiver = Receiver { - tx: receive_tx, - name: name.clone(), - }; - - spawn(async move { - let name = name; - - let mut prev_data: Option = None; - let mut saved_data: Option = None; - let mut receive_rx = Some(receive_rx); - - loop { - select! { - Some(msg) = send_rx.recv() => { - match msg { - SendMessage::Set(data) => { - let changed = saved_data.as_ref().map_or(true, |saved_data| data != *saved_data); - if changed { - prev_data = saved_data.clone(); - saved_data = Some(data.clone()); - if let Err(_err) = out_tx.send((prev_data.clone(), data)) { - // It is not an error if there are no subscribers. - // debug!("create_stateful_entity({name}): send to broadcast failed: {err} (not an error)"); - } - }; - } - } - } - Some(msg) = try_receive(&mut receive_rx) => { - match msg { - Some(ReceiveMessage::Get(tx)) => { - let data = saved_data.clone().map(|data| (prev_data.clone(), data)); - if tx.send(data).is_err() { - error!("create_stateful_entity({name}): get send failed"); - }; - } - Some(ReceiveMessage::Subscribe(tx)) => { - let data = saved_data.clone().map(|data| (prev_data.clone(), data)); - let rx = out_tx.subscribe(); - if tx.send((rx, data)).is_err() { - error!("create_stateful_entity{name}): subscribe send failed"); - }; - } - None => { - debug!("create_stateful_entity({name}): command channel closed"); - receive_rx = None; - } - } - } - else => { - debug!("create_stateful_entity({name}): all inputs closed"); - break; - } - } - - if matches!((&receive_rx, out_tx.receiver_count()), (None, 0)) { - debug!( - "create_stateful_entity({name}): receiver closed and all subscriptions closed" - ); - break; - } - } - }); - - (sender, receiver) -} - -async fn try_receive(rx: &mut Option>) -> Option> { - match rx { - Some(rx) => Some(rx.recv().await), - None => None, - } -} - -#[cfg(test)] -mod tests { - #![allow(clippy::unwrap_used)] - use super::*; - - #[tokio::test] - async fn test_stateless_entity() { - let (tx, rx) = create_stateless_entity::("test"); - let mut s = rx.subscribe().await; - tx.try_send("hello".to_string()); - tx.try_send("goodbye".to_string()); - - let current = s.recv().await.unwrap(); - assert_eq!("hello", current); - - let current = s.recv().await.unwrap(); - assert_eq!("goodbye", current); - - // let result = rx.get().await; - // assert!(result.is_none()); - - // let result = s.try_recv().unwrap(); - // assert!(result.is_none()); - - // let result = rx.get().await; - // assert!(result.is_none()); - - drop(s); - drop(rx); - tx.closed().await; - } - - #[tokio::test] - async fn test_stateful_entity() { - let (tx, rx) = create_stateful_entity::("test"); - tx.try_send("hello".to_string()); - let mut s = rx.subscribe().await; - tx.try_send("goodbye".to_string()); - - let (prev, current) = s.recv_value().await.unwrap(); - assert_eq!(None, prev); - assert_eq!("hello", current); - - let (prev, current) = s.recv_value().await.unwrap(); - assert_eq!("hello", prev.unwrap()); - assert_eq!("goodbye", current); - - let (prev, current) = rx.get_value().await.unwrap(); - assert_eq!("hello", prev.unwrap()); - assert_eq!("goodbye", current); - - let result = s.try_recv().unwrap(); - assert!(result.is_none()); - - let (prev, current) = rx.get_value().await.unwrap(); - assert_eq!("hello", prev.unwrap()); - assert_eq!("goodbye", current); - - drop(s); - drop(rx); - tx.closed().await; - } -} diff --git a/robotica-backend/src/lib.rs b/robotica-backend/src/lib.rs index 3c4a3c94..41b743dd 100644 --- a/robotica-backend/src/lib.rs +++ b/robotica-backend/src/lib.rs @@ -13,7 +13,7 @@ extern crate lalrpop_util; // extern crate robotica_backend_macros; pub mod devices; -pub mod entities; +pub mod pipes; pub mod scheduling; pub mod services; pub mod sinks; @@ -25,9 +25,6 @@ use thiserror::Error; use tokio::task::JoinHandle; use tracing::{debug, error}; -/// Size of all pipes. -pub const PIPE_SIZE: usize = 10; - /// Spawn a task and automatically monitor its execution. pub fn spawn(future: T) -> JoinHandle<()> where diff --git a/robotica-backend/src/pipes/generic/mod.rs b/robotica-backend/src/pipes/generic/mod.rs new file mode 100644 index 00000000..44f20959 --- /dev/null +++ b/robotica-backend/src/pipes/generic/mod.rs @@ -0,0 +1,106 @@ +//! A Generic Pipe can be turned into a stateful or stateless receiver +pub mod receiver; +pub mod sender; + +pub use receiver::Receiver; +pub use receiver::WeakReceiver; +pub use sender::Sender; +use tokio::select; +use tracing::debug; +use tracing::error; + +use crate::pipes::try_receive; +use crate::spawn; + +use self::receiver::ReceiveMessage; + +use super::PIPE_SIZE; +use sender::SendMessage; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// Create a generic entity that sends every message. +#[must_use] +pub fn create_pipe(name: impl Into) -> (Sender, Receiver) +where + T: Clone + Eq + Send + 'static, +{ + let (send_tx, mut send_rx) = mpsc::channel::>(PIPE_SIZE); + let (receive_tx, receive_rx) = mpsc::channel::>(PIPE_SIZE); + let (out_tx, out_rx) = broadcast::channel::(PIPE_SIZE); + + drop(out_rx); + + let name = name.into(); + + let sender: Sender = Sender { + tx: send_tx, + name: name.clone(), + }; + let receiver: Receiver = Receiver { + tx: receive_tx, + name: name.clone(), + }; + + spawn(async move { + let name = name; + + // let mut prev_data: Option = None; + let mut current_data: Option = None; + let mut receive_rx = Some(receive_rx); + + loop { + select! { + Some(msg) = send_rx.recv() => { + match msg { + SendMessage::Set(data) => { + let changed = current_data.as_ref().map_or(true, |saved_data| data != *saved_data); + if changed { + // let prev_data = current_data; + current_data = Some(data.clone()); + if let Err(_err) = out_tx.send(data) { + // It is not an error if there are no subscribers. + // debug!("stateful::create_pipe({name}): send to broadcast failed: {err} (not an error)"); + } + }; + } + } + } + Some(msg) = try_receive(&mut receive_rx) => { + #[allow(clippy::single_match_else)] + match msg { + // Some(ReceiveMessage::Get(tx)) => { + // let data = current_data.clone(); + // if tx.send(data).is_err() { + // error!("stateful::create_pipe({name}): get send failed"); + // }; + // } + Some(ReceiveMessage::Subscribe(tx)) => { + let rx = out_tx.subscribe(); + if tx.send((rx, current_data.clone())).is_err() { + error!("stateful::create_pipe{name}): subscribe send failed"); + }; + } + None => { + debug!("stateful::create_pipe({name}): command channel closed"); + receive_rx = None; + } + } + } + else => { + debug!("stateful::create_pipe({name}): all inputs closed"); + break; + } + } + + if matches!((&receive_rx, out_tx.receiver_count()), (None, 0)) { + debug!( + "stateful::create_pipe({name}): receiver closed and all subscriptions closed" + ); + break; + } + } + }); + + (sender, receiver) +} diff --git a/robotica-backend/src/pipes/generic/receiver.rs b/robotica-backend/src/pipes/generic/receiver.rs new file mode 100644 index 00000000..bf96d0b7 --- /dev/null +++ b/robotica-backend/src/pipes/generic/receiver.rs @@ -0,0 +1,194 @@ +//! Stateless receiver code. + +use thiserror::Error; +use tokio::{ + select, + sync::{broadcast, mpsc, oneshot}, +}; +use tracing::{debug, error}; + +use crate::{ + pipes::{stateful, stateless}, + spawn, +}; + +/// Something went wrong in Receiver. +#[derive(Error, Debug)] +pub enum RecvError { + /// The Pipe was closed. + #[error("The pipe was closed")] + Closed, +} + +type SubscribeMessage = (broadcast::Receiver, Option); +pub(in crate::pipes) enum ReceiveMessage { + // Get(oneshot::Sender>), + Subscribe(oneshot::Sender>), +} + +/// A `Receiver` that doesn't count as a reference to the entity. +pub struct WeakReceiver { + name: String, + pub(in crate::pipes) tx: mpsc::WeakSender>, +} + +impl WeakReceiver { + /// Try to convert to a `Receiver` + #[must_use] + pub fn upgrade(&self) -> Option> { + self.tx.upgrade().map(|tx| Receiver { + name: self.name.clone(), + tx, + }) + } +} + +// impl WeakReceiver +// where +// T: Send + Clone, +// { +// /// Get a stateless receiver +// #[must_use] +// pub fn into_stateless(&self) -> stateless::WeakReceiver { +// stateless::WeakReceiver { +// name: self.name.clone(), +// tx: self.stateless_tx.clone(), +// } +// } + +// /// Turn this into a stateful receiver. +// #[must_use] +// pub fn into_stateful(&self) -> stateful::WeakReceiver { +// stateful::WeakReceiver { +// name: self.name.clone(), +// tx: self.stateful_tx.clone(), +// } +// } +// } + +/// Receive a value from an entity +#[derive(Debug, Clone)] +pub struct Receiver { + pub(super) name: String, + pub(in crate::pipes) tx: mpsc::Sender>, +} + +impl Receiver { + async fn subscribe(&self) -> Option<(broadcast::Receiver, Option)> + where + T: Send, + { + let (tx, rx) = oneshot::channel(); + let msg = ReceiveMessage::Subscribe(tx); + if let Err(err) = self.tx.send(msg).await { + error!("{}: subscribe/send failed: {}", self.name, err); + return None; + }; + rx.await.map_or_else( + |_| { + error!("{}: subscribe/await failed", self.name); + None + }, + |(rx, initial)| Some((rx, initial)), + ) + } + + /// Try to convert to a `WeakReceiver` + #[must_use] + pub fn downgrade(&self) -> WeakReceiver { + WeakReceiver { + name: self.name.clone(), + tx: self.tx.downgrade(), + } + } +} + +impl Receiver +where + T: Send + Clone, +{ + /// Get a stateless receiver + #[must_use] + pub fn into_stateless(&self) -> stateless::Receiver + where + T: 'static, + { + let name = format!("{} (into_stateless)", self.name); + let (tx, rx) = stateless::create_pipe(&name); + + let clone_self = self.clone(); + + spawn(async move { + let Some((mut sub, _)) = clone_self.subscribe().await else { + return; + }; + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + tx.try_send(data); + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + rx + } + + /// Turn this into a stateful receiver. + #[must_use] + pub fn into_stateful(&self) -> stateful::Receiver + where + T: Eq + 'static, + { + let name = format!("{} (into_stateful)", self.name); + let (tx, rx) = stateful::create_pipe(&name); + + let clone_self = self.clone(); + + spawn(async move { + // We need to keep tx to ensure connection is not closed if self is dropped. + let Some((mut sub, initial)) = clone_self.subscribe().await else { + return; + }; + + if let Some(initial) = initial { + tx.try_send(initial); + } + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + tx.try_send(data); + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + rx + } +} diff --git a/robotica-backend/src/pipes/generic/sender.rs b/robotica-backend/src/pipes/generic/sender.rs new file mode 100644 index 00000000..d2e8393a --- /dev/null +++ b/robotica-backend/src/pipes/generic/sender.rs @@ -0,0 +1,39 @@ +//! Stateless sender code. +use tokio::sync::mpsc; +use tracing::error; + +pub(super) enum SendMessage { + Set(T), +} + +/// Send a value to an entity. +#[derive(Clone)] +pub struct Sender { + #[allow(dead_code)] + pub(super) name: String, + pub(super) tx: mpsc::Sender>, +} + +impl Sender { + /// Send data to the entity or fail if buffer is full. + pub fn try_send(&self, data: T) { + let msg = SendMessage::Set(data); + if let Err(err) = self.tx.try_send(msg) { + error!("send failed: {}", err); + } + } + + /// Is the remote end of the channel closed? + #[must_use] + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } + + /// Completes when the entity is closed. + pub async fn closed(&self) + where + T: Send, + { + self.tx.closed().await; + } +} diff --git a/robotica-backend/src/pipes/mod.rs b/robotica-backend/src/pipes/mod.rs new file mode 100644 index 00000000..0a8c697e --- /dev/null +++ b/robotica-backend/src/pipes/mod.rs @@ -0,0 +1,61 @@ +//! Enhanced message queues + +use async_trait::async_trait; +use thiserror::Error; +use tokio::sync::mpsc; + +pub mod generic; +pub mod stateful; +pub mod stateless; + +/// Size of all pipes. +pub const PIPE_SIZE: usize = 10; + +async fn try_receive(rx: &mut Option>) -> Option> { + match rx { + Some(rx) => Some(rx.recv().await), + None => None, + } +} + +/// Something went wrong in Receiver. +#[derive(Error, Debug)] +pub enum RecvError { + /// The Pipe was closed. + #[error("The pipe was closed")] + Closed, +} + +/// Allow subscribing to an pipe +#[async_trait] +pub trait Subscriber { + /// The type of the subscription + type SubscriptionType: Subscription + Send + 'static; + + /// Subscribe to a pipe + /// + /// Returns an already closed subscription if the entity is closed. + async fn subscribe(&self) -> Self::SubscriptionType; +} + +/// A subscription to a pipe +#[async_trait] +pub trait Subscription { + /// Wait for the next value from the entity + /// + /// This will return the new value. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + async fn recv(&mut self) -> Result; + + /// Get the next value but don't wait for it. Returns `None` if there is no value + /// + /// This will return the new value. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + fn try_recv(&mut self) -> Result, RecvError>; +} diff --git a/robotica-backend/src/pipes/stateful/mod.rs b/robotica-backend/src/pipes/stateful/mod.rs new file mode 100644 index 00000000..1be288fb --- /dev/null +++ b/robotica-backend/src/pipes/stateful/mod.rs @@ -0,0 +1,106 @@ +//! Stateless pipes do not track the current state of the entity. +pub mod receiver; +pub mod sender; + +pub use receiver::OldNewType; +pub use receiver::Receiver; +pub use receiver::Subscription; +pub use receiver::WeakReceiver; +pub use sender::Sender; +use tokio::select; +use tracing::debug; +use tracing::error; + +use crate::pipes::try_receive; +use crate::spawn; + +use super::PIPE_SIZE; +pub(in crate::pipes) use receiver::ReceiveMessage; +use sender::SendMessage; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// Create a stateful entity that sends every message. +#[must_use] +pub fn create_pipe(name: impl Into) -> (Sender, Receiver) +where + T: Clone + Eq + Send + 'static, +{ + let (send_tx, mut send_rx) = mpsc::channel::>(PIPE_SIZE); + let (receive_tx, receive_rx) = mpsc::channel::>(PIPE_SIZE); + let (out_tx, out_rx) = broadcast::channel::>(PIPE_SIZE); + + drop(out_rx); + + let name = name.into(); + + let sender: Sender = Sender { + tx: send_tx, + name: name.clone(), + }; + let receiver: Receiver = Receiver { + tx: receive_tx, + name: name.clone(), + }; + + spawn(async move { + let name = name; + + // let mut prev_data: Option = None; + let mut current_data: Option = None; + let mut receive_rx = Some(receive_rx); + + loop { + select! { + Some(msg) = send_rx.recv() => { + match msg { + SendMessage::Set(data) => { + let changed = current_data.as_ref().map_or(true, |saved_data| data != *saved_data); + if changed { + let prev_data = current_data; + current_data = Some(data.clone()); + if let Err(_err) = out_tx.send((prev_data.clone(), data)) { + // It is not an error if there are no subscribers. + // debug!("stateful::create_pipe({name}): send to broadcast failed: {err} (not an error)"); + } + }; + } + } + } + Some(msg) = try_receive(&mut receive_rx) => { + match msg { + Some(ReceiveMessage::Get(tx)) => { + let data = current_data.clone(); + if tx.send(data).is_err() { + error!("stateful::create_pipe({name}): get send failed"); + }; + } + Some(ReceiveMessage::Subscribe(tx)) => { + let rx = out_tx.subscribe(); + if tx.send((rx, current_data.clone())).is_err() { + error!("stateful::create_pipe{name}): subscribe send failed"); + }; + } + None => { + debug!("stateful::create_pipe({name}): command channel closed"); + receive_rx = None; + } + } + } + else => { + debug!("stateful::create_pipe({name}): all inputs closed"); + break; + } + } + + if matches!((&receive_rx, out_tx.receiver_count()), (None, 0)) { + debug!( + "stateful::create_pipe({name}): receiver closed and all subscriptions closed" + ); + break; + } + } + }); + + (sender, receiver) +} diff --git a/robotica-backend/src/pipes/stateful/receiver.rs b/robotica-backend/src/pipes/stateful/receiver.rs new file mode 100644 index 00000000..9fb10fc1 --- /dev/null +++ b/robotica-backend/src/pipes/stateful/receiver.rs @@ -0,0 +1,369 @@ +//! Stateless receiver code. + +use super::create_pipe; +use crate::pipes::{Subscriber, Subscription as SubscriptionTrait}; +use crate::{pipes::RecvError, spawn}; +use async_trait::async_trait; +use tokio::{ + select, + sync::{broadcast, mpsc, oneshot}, +}; +use tracing::{debug, error}; + +/// Old and new value. +pub type OldNewType = (Option, T); + +type SubscribeMessage = (broadcast::Receiver>, Option); + +pub(in crate::pipes) enum ReceiveMessage { + Get(oneshot::Sender>), + Subscribe(oneshot::Sender>), +} + +/// A `Receiver` that doesn't count as a reference to the entity. +pub struct WeakReceiver { + pub(in crate::pipes) name: String, + pub(in crate::pipes) tx: mpsc::WeakSender>, +} + +impl WeakReceiver { + /// Try to convert to a `Receiver`. + #[must_use] + pub fn upgrade(&self) -> Option> { + self.tx.upgrade().map(|tx| Receiver { + name: self.name.clone(), + tx, + }) + } +} + +/// Receive a value from an entity. +#[derive(Debug, Clone)] +pub struct Receiver { + pub(in crate::pipes) name: String, + pub(in crate::pipes) tx: mpsc::Sender>, +} + +#[async_trait] +impl Subscriber for Receiver +where + T: Send + Clone + 'static, +{ + type SubscriptionType = Subscription; + + /// Subscribe to this entity. + /// + /// Returns an already closed subscription if the entity is closed. + async fn subscribe(&self) -> Subscription { + let (tx, rx) = oneshot::channel(); + let msg = ReceiveMessage::Subscribe(tx); + if let Err(err) = self.tx.send(msg).await { + error!("{}: subscribe/send failed: {}", self.name, err); + return Subscription::null(self.tx.clone()); + }; + rx.await.map_or_else( + |_| { + error!("{}: subscribe/await failed", self.name); + Subscription::null(self.tx.clone()) + }, + |(rx, initial)| Subscription { + rx, + _tx: self.tx.clone(), + initial, + }, + ) + } +} + +impl Receiver +where + T: Send + Clone, +{ + /// Retrieve the most recent value from the entity. + /// + /// Returns `None` if the entity is closed. + pub async fn get(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let msg = ReceiveMessage::Get(tx); + if let Err(err) = self.tx.send(msg).await { + error!("{}: get/send failed: {}", self.name, err); + return None; + }; + rx.await.map_or_else( + |_| { + error!("{}: get/await failed", self.name); + None + }, + |v| v, + ) + } + + /// Translate this receiver into a another type using a stateless receiver. + #[must_use] + pub fn translate(self) -> Receiver + where + T: 'static, + U: Eq + Clone + Send + TryFrom + 'static, + >::Error: std::error::Error, + { + let name = format!("{} (translate)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + match U::try_from(data) { + Ok(data) => { + tx.try_send(data); + } + Err(err) => { + error!("{name}: parse failed: {err}"); + } + } + } + + _ = tx.closed() => { + debug!("{name}: source closed"); + break; + } + } + } + }); + + rx + } + + /// Map this receiver into a another type using a any type of receiver. + pub fn map(self, f: impl Fn(OldNewType) -> U + Send + 'static) -> Receiver + where + T: 'static, + U: Eq + Clone + Send + 'static, + { + let name = format!("{} (map_into)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv_old_new() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + tx.try_send(f(data)); + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + + rx + } + + /// Filter this receiver based on function result + #[must_use] + pub fn filter(self, f: impl Fn(&OldNewType) -> bool + Send + 'static) -> Receiver + where + T: Eq + 'static, + { + let name = format!("{} (filter_into_anything)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv_old_new() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + if f(&data) { + let (_, new) = data; + tx.try_send(new); + } + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + rx + } + + /// Run a function for every value received. + pub fn for_each(self, f: F) + where + F: Fn(OldNewType) + Send + 'static, + T: 'static, + { + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + while let Ok(data) = sub.recv_old_new().await { + f(data); + } + } + }); + } + + /// Completes when the entity is closed. + pub async fn closed(&self) { + self.tx.closed().await; + } + + /// Create a new `WeakReceiver` from this Receiver. + #[must_use] + pub fn downgrade(&self) -> WeakReceiver { + WeakReceiver { + name: self.name.clone(), + tx: self.tx.downgrade(), + } + } +} + +/// A subscription to receive data from an entity. +pub struct Subscription { + rx: broadcast::Receiver>, + // We need to keep this to ensure connection stays alive. + _tx: mpsc::Sender>, + initial: Option, +} + +impl Subscription +where + T: Clone, +{ + /// Create a null subscription that is already closed. + fn null(tx: mpsc::Sender>) -> Self { + let (_tx, rx) = broadcast::channel(0); + Self { + rx, + _tx: tx, + initial: None, + } + } +} + +impl Subscription +where + T: Send + Clone, +{ + /// Wait for the next value from the entity. + /// + /// This will return (old value, new value) + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + pub async fn recv_old_new(&mut self) -> Result, RecvError> { + let initial = self.initial.take(); + if let Some(initial) = initial { + return Ok((None, initial)); + } + loop { + match self.rx.recv().await { + Ok(v) => return Ok(v), + Err(err) => match err { + broadcast::error::RecvError::Closed => return Err(RecvError::Closed), + broadcast::error::RecvError::Lagged(_) => { + error!("recv failed: The pipe was lagged"); + } + }, + } + } + } + + /// Get the next value but don't wait for it. Returns `None` if there is no value. + /// + /// This will return (old value, new value) + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + pub fn try_recv_old_new(&mut self) -> Result>, RecvError> { + let initial = self.initial.take(); + if let Some(initial) = initial { + return Ok(Some((None, initial))); + } + loop { + match self.rx.try_recv() { + Ok(v) => return Ok(Some(v)), + Err(err) => match err { + broadcast::error::TryRecvError::Closed => { + return Err(RecvError::Closed); + } + broadcast::error::TryRecvError::Empty => return Ok(None), + broadcast::error::TryRecvError::Lagged(_) => { + error!("try_recv failed: The pipe was lagged"); + } + }, + } + } + } +} + +#[async_trait] +impl SubscriptionTrait for Subscription +where + T: Send + Clone, +{ + /// Wait for the next value from the entity. + /// + /// This will return the new value. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + async fn recv(&mut self) -> Result { + match self.recv_old_new().await { + Ok((_, v)) => Ok(v), + Err(err) => Err(err), + } + } + + /// Get the next value but don't wait for it. Returns `None` if there is no value. + /// + /// This will return the new value. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + fn try_recv(&mut self) -> Result, RecvError> { + match self.try_recv_old_new() { + Ok(Some((_, v))) => Ok(Some(v)), + Ok(None) => Ok(None), + Err(err) => Err(err), + } + } +} diff --git a/robotica-backend/src/pipes/stateful/sender.rs b/robotica-backend/src/pipes/stateful/sender.rs new file mode 100644 index 00000000..d2e8393a --- /dev/null +++ b/robotica-backend/src/pipes/stateful/sender.rs @@ -0,0 +1,39 @@ +//! Stateless sender code. +use tokio::sync::mpsc; +use tracing::error; + +pub(super) enum SendMessage { + Set(T), +} + +/// Send a value to an entity. +#[derive(Clone)] +pub struct Sender { + #[allow(dead_code)] + pub(super) name: String, + pub(super) tx: mpsc::Sender>, +} + +impl Sender { + /// Send data to the entity or fail if buffer is full. + pub fn try_send(&self, data: T) { + let msg = SendMessage::Set(data); + if let Err(err) = self.tx.try_send(msg) { + error!("send failed: {}", err); + } + } + + /// Is the remote end of the channel closed? + #[must_use] + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } + + /// Completes when the entity is closed. + pub async fn closed(&self) + where + T: Send, + { + self.tx.closed().await; + } +} diff --git a/robotica-backend/src/pipes/stateless/mod.rs b/robotica-backend/src/pipes/stateless/mod.rs new file mode 100644 index 00000000..b5ed58bc --- /dev/null +++ b/robotica-backend/src/pipes/stateless/mod.rs @@ -0,0 +1,93 @@ +//! Stateless pipes do not track the current state of the entity. +pub mod receiver; +pub mod sender; + +pub use receiver::Receiver; +pub use receiver::Subscription; +pub use receiver::WeakReceiver; +pub use sender::Sender; +use tokio::select; +use tracing::debug; +use tracing::error; + +use crate::pipes::try_receive; +use crate::spawn; + +use super::PIPE_SIZE; +pub(super) use receiver::ReceiveMessage; +use sender::SendMessage; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// Create a stateless entity that sends every message. +#[must_use] +pub fn create_pipe(name: impl Into) -> (Sender, Receiver) +where + T: Clone + Send + 'static, +{ + let (send_tx, mut send_rx) = mpsc::channel::>(PIPE_SIZE); + let (receive_tx, receive_rx) = mpsc::channel::>(PIPE_SIZE); + let (out_tx, out_rx) = broadcast::channel::(PIPE_SIZE); + + drop(out_rx); + + let name = name.into(); + + let sender: Sender = Sender { + tx: send_tx, + name: name.clone(), + }; + let receiver: Receiver = Receiver { + tx: receive_tx, + name: name.clone(), + }; + + spawn(async move { + let name = name; + let mut receive_rx = Some(receive_rx); + + loop { + select! { + Some(msg) = send_rx.recv() => { + match msg { + SendMessage::Set(data) => { + if let Err(_err) = out_tx.send(data) { + // It is not an error if there are no subscribers. + // debug!("stateless::create_pipe({name}): send to broadcast failed: {err} (not an error)"); + } + } + } + } + Some(msg) = try_receive(&mut receive_rx) => { + // This should create an error if we add messages in the future. + #[allow(clippy::single_match_else)] + match msg { + Some(ReceiveMessage::Subscribe(tx)) => { + let rx = out_tx.subscribe(); + if tx.send(rx).is_err() { + error!("stateless::create_pipe({name}): subscribe send failed"); + }; + } + None => { + debug!("stateless::create_pipe({name}): command channel closed"); + receive_rx = None; + } + } + } + else => { + debug!("stateless::create_pipe({name}): all inputs closed"); + break; + } + } + + if matches!((&receive_rx, out_tx.receiver_count()), (None, 0)) { + debug!( + "stateless::create_pipe({name}): receiver closed and all subscriptions closed" + ); + break; + } + } + }); + + (sender, receiver) +} diff --git a/robotica-backend/src/pipes/stateless/receiver.rs b/robotica-backend/src/pipes/stateless/receiver.rs new file mode 100644 index 00000000..8daf80a4 --- /dev/null +++ b/robotica-backend/src/pipes/stateless/receiver.rs @@ -0,0 +1,293 @@ +//! Stateless receiver code. + +use super::create_pipe; +use crate::pipes::{Subscriber, Subscription as SubscriptionTrait}; +use crate::{pipes::RecvError, spawn}; +use async_trait::async_trait; +use tokio::{ + select, + sync::{broadcast, mpsc, oneshot}, +}; +use tracing::{debug, error}; + +type SubscribeMessage = broadcast::Receiver; + +pub(in crate::pipes) enum ReceiveMessage { + Subscribe(oneshot::Sender>), +} + +/// A `Receiver` that doesn't count as a reference to the entity. +pub struct WeakReceiver { + pub(in crate::pipes) name: String, + pub(in crate::pipes) tx: mpsc::WeakSender>, +} + +impl WeakReceiver { + /// Try to convert to a `Receiver`. + #[must_use] + pub fn upgrade(&self) -> Option> { + self.tx.upgrade().map(|tx| Receiver { + name: self.name.clone(), + tx, + }) + } +} + +/// Receive a value from an entity. +#[derive(Debug, Clone)] +pub struct Receiver { + pub(in crate::pipes) name: String, + pub(in crate::pipes) tx: mpsc::Sender>, +} + +#[async_trait] +impl Subscriber for Receiver +where + T: Send + Clone + 'static, +{ + type SubscriptionType = Subscription; + + /// Subscribe to this entity. + /// + /// Returns an already closed subscription if the entity is closed. + async fn subscribe(&self) -> Subscription { + let (tx, rx) = oneshot::channel(); + let msg = ReceiveMessage::Subscribe(tx); + if let Err(err) = self.tx.send(msg).await { + error!("{}: subscribe/send failed: {}", self.name, err); + return Subscription::null(self.tx.clone()); + }; + rx.await.map_or_else( + |_| { + error!("{}: subscribe/await failed", self.name); + Subscription::null(self.tx.clone()) + }, + |rx| Subscription { + rx, + _tx: self.tx.clone(), + }, + ) + } +} + +impl Receiver +where + T: Send + Clone, +{ + /// Translate this receiver into a another type using a stateless receiver. + #[must_use] + pub fn translate(self) -> Receiver + where + T: 'static, + U: Clone + Send + TryFrom + 'static, + >::Error: std::error::Error, + { + let name = format!("{} (translate)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + match U::try_from(data) { + Ok(data) => { + tx.try_send(data); + } + Err(err) => { + error!("{name}: parse failed: {err}"); + } + } + } + + _ = tx.closed() => { + debug!("{name}: source closed"); + break; + } + } + } + }); + + rx + } + + /// Map this receiver into a another type using a any type of receiver. + pub fn map(self, f: impl Fn(T) -> U + Send + 'static) -> Receiver + where + T: 'static, + U: Clone + Send + 'static, + { + let name = format!("{} (map_into)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + tx.try_send(f(data)); + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + + rx + } + + /// Filter this receiver based on function result + #[must_use] + pub fn filter(self, f: impl Fn(&T) -> bool + Send + 'static) -> Receiver + where + T: 'static, + { + let name = format!("{} (filter_into_anything)", self.name); + let (tx, rx) = create_pipe(&name); + + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + select! { + data = sub.recv() => { + let data = match data { + Ok(data) => data, + Err(err) => { + debug!("{name}: recv failed, exiting: {err}"); + break; + } + }; + + if f(&data) { + tx.try_send(data); + } + } + + _ = tx.closed() => { + debug!("{name}: dest closed"); + break; + } + } + } + }); + rx + } + + /// Run a function for every value received. + pub fn for_each(self, f: F) + where + F: Fn(T) + Send + 'static, + T: 'static, + { + spawn(async move { + let mut sub = self.subscribe().await; + + loop { + while let Ok(data) = sub.recv().await { + f(data); + } + } + }); + } + + /// Completes when the entity is closed. + pub async fn closed(&self) { + self.tx.closed().await; + } + + /// Create a new `WeakReceiver` from this Receiver. + #[must_use] + pub fn downgrade(&self) -> WeakReceiver { + WeakReceiver { + tx: self.tx.downgrade(), + name: self.name.clone(), + } + } +} + +/// A subscription to receive data from an entity. +pub struct Subscription { + rx: broadcast::Receiver, + // We need to keep this to ensure connection stays alive. + _tx: mpsc::Sender>, +} + +impl Subscription +where + T: Clone, +{ + /// Create a null subscription that is already closed. + fn null(tx: mpsc::Sender>) -> Self { + let (_tx, rx) = broadcast::channel(0); + Self { rx, _tx: tx } + } +} + +#[async_trait] +impl SubscriptionTrait for Subscription +where + T: Send + Clone, +{ + /// Wait for the next value from the entity. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + async fn recv(&mut self) -> Result { + loop { + match self.rx.recv().await { + Ok(v) => return Ok(v), + Err(err) => match err { + broadcast::error::RecvError::Closed => return Err(RecvError::Closed), + broadcast::error::RecvError::Lagged(_) => { + error!("recv failed: The pipe was lagged"); + } + }, + } + } + } + + /// Get the next value but don't wait for it. Returns `None` if there is no value. + /// + /// # Errors + /// + /// Returns `RecvError::Closed` if the entity is closed. + fn try_recv(&mut self) -> Result, RecvError> { + loop { + match self.rx.try_recv() { + Ok(v) => return Ok(Some(v)), + Err(err) => match err { + broadcast::error::TryRecvError::Closed => { + return Err(RecvError::Closed); + } + broadcast::error::TryRecvError::Empty => return Ok(None), + broadcast::error::TryRecvError::Lagged(_) => { + error!("try_recv failed: The pipe was lagged"); + } + }, + } + } + } +} diff --git a/robotica-backend/src/pipes/stateless/sender.rs b/robotica-backend/src/pipes/stateless/sender.rs new file mode 100644 index 00000000..d2e8393a --- /dev/null +++ b/robotica-backend/src/pipes/stateless/sender.rs @@ -0,0 +1,39 @@ +//! Stateless sender code. +use tokio::sync::mpsc; +use tracing::error; + +pub(super) enum SendMessage { + Set(T), +} + +/// Send a value to an entity. +#[derive(Clone)] +pub struct Sender { + #[allow(dead_code)] + pub(super) name: String, + pub(super) tx: mpsc::Sender>, +} + +impl Sender { + /// Send data to the entity or fail if buffer is full. + pub fn try_send(&self, data: T) { + let msg = SendMessage::Set(data); + if let Err(err) = self.tx.try_send(msg) { + error!("send failed: {}", err); + } + } + + /// Is the remote end of the channel closed? + #[must_use] + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } + + /// Completes when the entity is closed. + pub async fn closed(&self) + where + T: Send, + { + self.tx.closed().await; + } +} diff --git a/robotica-backend/src/scheduling/executor.rs b/robotica-backend/src/scheduling/executor.rs index dd15c1f7..dca520c8 100644 --- a/robotica-backend/src/scheduling/executor.rs +++ b/robotica-backend/src/scheduling/executor.rs @@ -15,6 +15,7 @@ use robotica_common::datetime::{utc_now, Date, DateTime, Duration}; use robotica_common::mqtt::MqttMessage; use robotica_common::scheduler::Mark; +use crate::pipes::{Subscriber, Subscription}; use crate::scheduling::sequencer::check_schedule; use crate::services::mqtt::{MqttTx, Subscriptions}; use crate::spawn; @@ -272,7 +273,7 @@ pub enum ExecutorError { /// This function will return an error if the `config` is invalid. pub fn executor(subscriptions: &mut Subscriptions, mqtt: MqttTx) -> Result<(), ExecutorError> { let mut state = get_initial_state(mqtt)?; - let mark_rx = subscriptions.subscribe_into::>("mark"); + let mark_rx = subscriptions.subscribe_into_stateless::>("mark"); spawn(async move { let mut mark_s = mark_rx.subscribe().await; diff --git a/robotica-backend/src/services/http/websocket/mod.rs b/robotica-backend/src/services/http/websocket/mod.rs index 3ba715e2..e2027031 100644 --- a/robotica-backend/src/services/http/websocket/mod.rs +++ b/robotica-backend/src/services/http/websocket/mod.rs @@ -19,7 +19,10 @@ use robotica_common::{ websocket::{WsCommand, WsError, WsStatus}, }; -use crate::{entities::StatefulSubscription, services::mqtt::topics::topic_matches_any}; +use crate::{ + pipes::{stateful, Subscriber, Subscription}, + services::mqtt::topics::topic_matches_any, +}; use super::{get_user, HttpConfig, User}; @@ -79,7 +82,7 @@ async fn websocket(mut stream: WebSocket, config: HttpConfig, user: User) { let mut add_subscriptions: Vec = Vec::new(); let mut remove_subscriptions: Vec = Vec::new(); - let mut subscriptions: HashMap> = HashMap::new(); + let mut subscriptions: HashMap> = HashMap::new(); loop { for topic in &add_subscriptions { diff --git a/robotica-backend/src/services/life360.rs b/robotica-backend/src/services/life360.rs index 240a3011..16ef482a 100644 --- a/robotica-backend/src/services/life360.rs +++ b/robotica-backend/src/services/life360.rs @@ -11,8 +11,8 @@ use tracing::error; use tokio::time; -use crate::entities; use crate::get_env; +use crate::pipes::stateless; use crate::spawn; use crate::EnvironmentError; @@ -150,8 +150,8 @@ struct Circle { /// # Errors /// /// Will return an error if the environment variables `LIFE360_USERNAME` and `LIFE360_PASSWORD` are not set. -pub fn circles(name: &str) -> Result>, EnvironmentError> { - let (tx, rx) = entities::create_stateless_entity(name); +pub fn circles(name: &str) -> Result>, EnvironmentError> { + let (tx, rx) = stateless::create_pipe(name); let username = get_env("LIFE360_USERNAME")?; let password = get_env("LIFE360_PASSWORD")?; @@ -225,7 +225,7 @@ async fn get_circles_or_none(login: &Login) -> Option { async fn dispatch_circle_details( login: &Login, circles: &List, - tx: &entities::StatelessSender>, + tx: &stateless::Sender>, ) { for circle in &circles.circles { match get_circle_details(login, circle).await { diff --git a/robotica-backend/src/services/mqtt/mod.rs b/robotica-backend/src/services/mqtt/mod.rs index 08f811cf..6406bc0c 100644 --- a/robotica-backend/src/services/mqtt/mod.rs +++ b/robotica-backend/src/services/mqtt/mod.rs @@ -18,9 +18,7 @@ use tracing::{debug, error}; use robotica_common::mqtt::{MqttMessage, QoS}; -use crate::entities::{ - self, StatefulReceiver, StatelessReceiver, StatelessSender, StatelessWeakReceiver, -}; +use crate::pipes::{generic, stateful, stateless}; use crate::{get_env, is_debug_mode, spawn, EnvironmentError}; const fn qos_to_rumqttc(qos: QoS) -> rumqttc::v5::mqttbytes::QoS { @@ -71,7 +69,7 @@ enum MqttCommand { MqttOut(MqttMessage), Subscribe( String, - oneshot::Sender, SubscribeError>>, + oneshot::Sender, SubscribeError>>, ), Unsubscribe(String), } @@ -98,7 +96,7 @@ impl MqttTx { pub async fn subscribe( &self, topic: impl Into + Send, - ) -> Result, SubscribeError> { + ) -> Result, SubscribeError> { let (tx, rx) = oneshot::channel(); self.0 .send(MqttCommand::Subscribe(topic.into(), tx)) @@ -112,10 +110,10 @@ impl MqttTx { /// # Errors /// /// Returns an error if the subscribe request could not be sent. - pub async fn subscribe_into( + pub async fn subscribe_into_stateless( &self, topic: impl Into + Send, - ) -> Result, SubscribeError> + ) -> Result, SubscribeError> where // U: Data, // T::Sent: Send + 'static, @@ -123,7 +121,11 @@ impl MqttTx { >::Error: Send + std::error::Error, // T::Received: Send + 'static, { - Ok(self.subscribe(topic).await?.translate::()) + Ok(self + .subscribe(topic) + .await? + .into_stateless() + .translate::()) } /// Add new subscription and parse incoming data as type T @@ -134,7 +136,7 @@ impl MqttTx { pub async fn subscribe_into_stateful( &self, topic: impl Into + Send, - ) -> Result, SubscribeError> + ) -> Result, SubscribeError> where // U: Data, // T::Sent: Send + 'static, @@ -142,7 +144,11 @@ impl MqttTx { >::Error: Send + std::error::Error, // T::Received: Send + 'static, { - Ok(self.subscribe(topic).await?.translate_stateful::()) + Ok(self + .subscribe(topic) + .await? + .into_stateful() + .translate::()) } } @@ -295,7 +301,7 @@ fn process_subscribe( client: &AsyncClient, subscriptions: &mut Subscriptions, topic: impl Into, - tx: oneshot::Sender, SubscribeError>>, + tx: oneshot::Sender, SubscribeError>>, channel_tx: mpsc::Sender, ) { let topic: String = topic.into(); @@ -307,7 +313,7 @@ fn process_subscribe( let response = if let Some(rx) = maybe_rx { Ok(rx) } else { - let (tx, rx) = entities::create_stateless_entity(&topic); + let (tx, rx) = generic::create_pipe(&topic); let subscription = Subscription { topic: topic.to_string(), @@ -337,7 +343,7 @@ fn process_subscribe( } fn watch_tx_closed( - tx: StatelessSender, + tx: generic::Sender, channel_tx: mpsc::Sender, topic: String, ) { @@ -377,8 +383,8 @@ fn incoming_event(client: &AsyncClient, pkt: Packet, subscriptions: &Subscriptio struct Subscription { #[allow(dead_code)] topic: String, - tx: StatelessSender, - rx: StatelessWeakReceiver, + tx: generic::Sender, + rx: generic::WeakReceiver, } /// List of all required subscriptions. @@ -396,7 +402,7 @@ impl Subscriptions { } /// Add a new subscription. - pub fn subscribe(&mut self, topic: impl Into) -> StatelessReceiver { + pub fn subscribe(&mut self, topic: impl Into) -> generic::Receiver { // Per subscription incoming MQTT queue. let topic = topic.into(); let subscription = self.0.get(&topic); @@ -405,7 +411,7 @@ impl Subscriptions { if let Some(rx) = maybe_rx { rx } else { - let (tx, rx) = entities::create_stateless_entity(topic.clone()); + let (tx, rx) = generic::create_pipe(topic.clone()); let subscription = Subscription { topic: topic.clone(), @@ -419,21 +425,24 @@ impl Subscriptions { } /// Add new subscription and parse incoming data as type T - pub fn subscribe_into(&mut self, topic: impl Into) -> StatelessReceiver + pub fn subscribe_into_stateless( + &mut self, + topic: impl Into, + ) -> stateless::Receiver where T: TryFrom + Clone + Send + 'static, >::Error: Send + std::error::Error, { - self.subscribe(topic).translate() + self.subscribe(topic).into_stateless().translate() } /// Add new subscription and parse incoming data as type T - pub fn subscribe_into_stateful(&mut self, topic: impl Into) -> StatefulReceiver + pub fn subscribe_into_stateful(&mut self, topic: impl Into) -> stateful::Receiver where T: TryFrom + Clone + Eq + Send + 'static, >::Error: Send + std::error::Error, { - self.subscribe(topic).translate_stateful() + self.subscribe(topic).into_stateful().translate() } /// Iterate over all subscriptions. diff --git a/robotica-backend/src/sinks/mod.rs b/robotica-backend/src/sinks/mod.rs index 5d141523..9a1ee417 100644 --- a/robotica-backend/src/sinks/mod.rs +++ b/robotica-backend/src/sinks/mod.rs @@ -1,16 +1,17 @@ //! Sinks take one/more inputs and produce now outputs. -use crate::{entities, spawn}; +use crate::pipes::{Subscriber, Subscription}; +use crate::spawn; /// Create a sink that consumes incoming messages and discards them. -pub async fn null(rx: entities::Receiver) +pub async fn null(rx: impl Subscriber + Send) where - T: entities::Data + Send + 'static, - T::Received: Send + 'static, - T::Sent: Send + 'static, + T: Send + 'static, { - let mut s = rx.subscribe().await; + let s = rx.subscribe(); + let mut s = s.await; + spawn(async move { - while s.recv_value().await.is_ok() { + while s.recv().await.is_ok() { // do nothing } }); @@ -18,11 +19,13 @@ where #[cfg(test)] mod tests { + use crate::pipes::stateless; + use super::*; #[tokio::test] async fn test_null() { - let (tx, rx) = entities::create_stateless_entity("test"); + let (tx, rx) = stateless::create_pipe("test"); null(rx).await; tx.try_send(10); tx.try_send(10); diff --git a/robotica-backend/src/sources/timer.rs b/robotica-backend/src/sources/timer.rs index 2acbbbfc..81c046f2 100644 --- a/robotica-backend/src/sources/timer.rs +++ b/robotica-backend/src/sources/timer.rs @@ -3,13 +3,13 @@ use std::time::Duration; use tokio::time; use tokio::time::Instant; -use crate::entities; +use crate::pipes::stateless; use crate::spawn; /// Create a timer that sends outgoing messages at regularly spaced intervals. #[must_use] -pub fn timer(duration: Duration, name: &str) -> entities::StatelessReceiver { - let (tx, rx) = entities::create_stateless_entity(name); +pub fn timer(duration: Duration, name: &str) -> stateless::Receiver { + let (tx, rx) = stateless::create_pipe(name); spawn(async move { let mut interval = time::interval(duration); @@ -29,6 +29,8 @@ mod tests { #![allow(clippy::unwrap_used)] use tokio::time::sleep; + use crate::pipes::{Subscriber, Subscription}; + use super::*; #[tokio::test] diff --git a/robotica-backend/tests/hdmi.rs b/robotica-backend/tests/hdmi.rs index 1843fd55..7a5dba4a 100644 --- a/robotica-backend/tests/hdmi.rs +++ b/robotica-backend/tests/hdmi.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use robotica_backend::{ devices::hdmi::{Command, Options}, - entities, + pipes::{stateless, Subscriber, Subscription}, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -121,7 +121,7 @@ async fn test_client_once() { let _ = started.await; println!("test: starting client"); - let (client, rx) = entities::create_stateless_entity("test"); + let (client, rx) = stateless::create_pipe("test"); let (rx, client_handle) = robotica_backend::devices::hdmi::run(addr, rx, &options); let mut rx_s = rx.subscribe().await; @@ -157,7 +157,7 @@ async fn test_client_reconnect() { let _ = started.await; println!("test: starting client"); - let (client, rx) = entities::create_stateless_entity("test"); + let (client, rx) = stateless::create_pipe("test"); let (rx, client_handle) = robotica_backend::devices::hdmi::run(addr, rx, &options); let mut rx_s = rx.subscribe().await; diff --git a/robotica-slint/src/audio.rs b/robotica-slint/src/audio.rs index 40be2ae2..8262f9ee 100644 --- a/robotica-slint/src/audio.rs +++ b/robotica-slint/src/audio.rs @@ -7,8 +7,8 @@ use std::{ }; use robotica_backend::{ - entities::{StatefulReceiver, StatelessReceiver}, get_env_os, + pipes::{stateful, stateless, Subscriber, Subscription}, services::{ mqtt::{MqttTx, Subscriptions}, persistent_state::PersistentStateDatabase, @@ -99,8 +99,9 @@ pub fn run( ) { let topic_substr = &config.topic_substr; let topic = format!("command/{topic_substr}"); - let command_rx: StatelessReceiver> = subscriptions.subscribe_into(topic); - let messages_enabled_rx: StatefulReceiver = + let command_rx: stateless::Receiver> = + subscriptions.subscribe_into_stateless(topic); + let messages_enabled_rx: stateful::Receiver = subscriptions.subscribe_into_stateful(&config.messages_enabled_topic); let psr = database.for_name::(topic_substr); let mut state = psr.load().unwrap_or_default(); diff --git a/robotica-slint/src/ui.rs b/robotica-slint/src/ui.rs index 389f4b7b..0c37e390 100644 --- a/robotica-slint/src/ui.rs +++ b/robotica-slint/src/ui.rs @@ -43,7 +43,7 @@ use futures::{stream::FuturesUnordered, Future, StreamExt}; use serde::Deserialize; use robotica_backend::{ - entities::{self, RecvError}, + pipes::{stateful, RecvError, Subscriber, Subscription}, services::mqtt::MqttTx, }; use robotica_common::controllers::{ @@ -197,7 +197,7 @@ where async fn receive( label: Label, - subscription: &mut entities::StatefulSubscription, + subscription: &mut stateful::Subscription, ) -> Result<(Label, MqttMessage), RecvError> { let msg = subscription.recv().await?; Ok((label, msg)) @@ -380,7 +380,7 @@ fn monitor_tags(mqtt: &MqttTx, ui: &slint::AppWindow) { let handle_weak = ui.as_weak(); tokio::spawn(async move { let rx = mqtt - .subscribe_into::>>("robotica/robotica.linuxpenguins.xyz/tags") + .subscribe_into_stateless::>>("robotica/robotica.linuxpenguins.xyz/tags") .await .unwrap(); let mut rx = rx.subscribe().await; @@ -403,7 +403,9 @@ fn monitor_schedule(mqtt: &MqttTx, ui: &slint::AppWindow) { let handle_weak = ui.as_weak(); tokio::spawn(async move { let rx = mqtt - .subscribe_into::>>>("schedule/robotica.linuxpenguins.xyz") + .subscribe_into_stateless::>>>( + "schedule/robotica.linuxpenguins.xyz", + ) .await .unwrap(); let mut rx = rx.subscribe().await;