Skip to content

Commit

Permalink
Refactor entities into pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmay committed Sep 8, 2023
1 parent 2812828 commit 974dfe5
Show file tree
Hide file tree
Showing 34 changed files with 1,559 additions and 1,223 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 29 additions & 12 deletions brian-backend/src/amber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use tokio::time::{interval, sleep_until, Instant, MissedTickBehavior};
use tracing::{debug, error, info};

use robotica_backend::{
entities::{self, StatelessReceiver},
get_env, is_debug_mode,
services::persistent_state::PersistentStateRow,
spawn, EnvironmentError,
get_env, is_debug_mode, pipes::stateful, services::persistent_state::PersistentStateRow, spawn,
EnvironmentError,
};
use robotica_common::datetime::{
convert_date_time_to_utc_or_default, duration_from_hms, utc_now, Date, DateTime, Duration, Time,
Expand Down Expand Up @@ -75,7 +73,7 @@ const fn hours(num: u16) -> u16 {
///
/// Returns an `AmberError` if the required environment variables are not set.
///
pub fn run(state: &State) -> Result<StatelessReceiver<PriceSummary>, Error> {
pub fn run(state: &State) -> Result<stateful::Receiver<PriceSummary>, Error> {
let token = get_env("AMBER_TOKEN")?;
let site_id = get_env("AMBER_SITE_ID")?;
let influx_url = get_env("INFLUXDB_URL")?;
Expand All @@ -87,7 +85,7 @@ pub fn run(state: &State) -> Result<StatelessReceiver<PriceSummary>, Error> {
influx_database,
};

let (tx, rx) = entities::create_stateless_entity("amber_summary");
let (tx, rx) = stateful::create_pipe("amber_summary");

let psr = state
.persistent_state_database
Expand Down Expand Up @@ -354,10 +352,23 @@ pub enum PriceCategory {
pub struct PriceSummary {
pub category: PriceCategory,
pub is_cheap_2hr: bool,
pub per_kwh: f32,
pub c_per_kwh: f32,
pub dc_per_kwh: i32,
pub next_update: DateTime<Utc>,
}

// Ignore the c_per_kwh when comparing.
impl PartialEq for PriceSummary {
fn eq(&self, other: &Self) -> bool {
self.category == other.category
&& self.is_cheap_2hr == other.is_cheap_2hr
&& self.dc_per_kwh == other.dc_per_kwh
&& self.next_update == other.next_update
}
}

impl Eq for PriceSummary {}

// fn prices_to_category(prices: &[PriceResponse], category: Option<PriceCategory>) -> PriceCategory {
// let new_category = prices
// .iter()
Expand Down Expand Up @@ -438,7 +449,7 @@ async fn prices_to_influxdb(config: &Config, prices: &[PriceResponse], summary:

let reading = PriceSummaryReading {
is_cheap_2hr: summary.is_cheap_2hr,
per_kwh: summary.per_kwh,
per_kwh: summary.c_per_kwh,
time: Utc::now(),
}
.into_query("amber/price_summary");
Expand Down Expand Up @@ -536,9 +547,11 @@ impl PriceProcessor {
.find(|p| p.interval_type == IntervalType::CurrentInterval)
else {
error!("No current price found in prices: {prices:?}");
let default_c_per_kwh: u16 = 100;
return PriceSummary {
is_cheap_2hr: false,
per_kwh: 100.0,
c_per_kwh: f32::from(default_c_per_kwh),
dc_per_kwh: i32::from(default_c_per_kwh) * 10,
next_update: *now + Duration::seconds(30),
category: PriceCategory::Expensive,
};
Expand Down Expand Up @@ -602,10 +615,12 @@ impl PriceProcessor {
let category = get_price_category(old_category, current_price.per_kwh);
self.category = Some(category);

#[allow(clippy::cast_possible_truncation)]
let ps = PriceSummary {
category,
is_cheap_2hr: is_cheap,
per_kwh: current_price.per_kwh,
c_per_kwh: current_price.per_kwh,
dc_per_kwh: (current_price.per_kwh * 10.0).round() as i32,
next_update: current_price.end_time,
};
info!("Price summary: {old_category:?} --> {ps:?}");
Expand Down Expand Up @@ -841,7 +856,8 @@ mod tests {
let summary = pp.prices_to_summary(&now, &prices);
assert_eq!(summary.category, PriceCategory::SuperCheap);
assert_eq!(summary.is_cheap_2hr, true);
assert_approx_eq!(f32, summary.per_kwh, 0.0);
assert_approx_eq!(f32, summary.c_per_kwh, 0.0);
assert_eq!(summary.dc_per_kwh, 0);
assert_eq!(summary.next_update, dt("2020-01-01T01:00:00Z"));
let ds = &pp.day;
assert_eq!(ds.cheap_power_for_day, Duration::minutes(0));
Expand All @@ -866,7 +882,8 @@ mod tests {
let summary = pp.prices_to_summary(&now, &prices);
assert_eq!(summary.category, PriceCategory::SuperCheap);
assert_eq!(summary.is_cheap_2hr, false);
assert_approx_eq!(f32, summary.per_kwh, 0.0);
assert_approx_eq!(f32, summary.c_per_kwh, 0.0);
assert_eq!(summary.dc_per_kwh, 0);
assert_eq!(summary.next_update, dt("2020-01-01T01:30:00Z"));
let ds = pp.day;
assert_eq!(ds.cheap_power_for_day, Duration::minutes(45));
Expand Down
38 changes: 18 additions & 20 deletions brian-backend/src/delays.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::fmt::Debug;
use std::time::Duration;

// use tracing::debug;
use robotica_backend::{entities::Data, spawn};
use robotica_backend::{
pipes::{stateful, Subscriber},
spawn,
};
use tokio::{
select,
time::{self, sleep_until, Instant, Interval},
Expand Down Expand Up @@ -39,26 +41,24 @@ pub struct DelayInputOptions {
pub fn delay_input<T>(
name: &str,
duration: Duration,
rx: robotica_backend::entities::Receiver<T>,
is_active: impl Fn(&T::Received) -> bool + Send + 'static,
rx: stateful::Receiver<T>,
is_active: impl Fn(&stateful::OldNewType<T>) -> bool + Send + 'static,
options: DelayInputOptions,
) -> robotica_backend::entities::Receiver<T>
) -> stateful::Receiver<T>
where
T: Data + Send + 'static,
T::Sent: Send + Sync,
T::Received: Clone + Debug + Send + Sync + Eq + 'static,
T: Clone + Eq + Send + Sync + 'static,
{
let (tx_out, rx_out) = T::new_entity(name);
let (tx_out, rx_out) = stateful::create_pipe(name);
spawn(async move {
let mut state = DelayInputState::Idle;
let mut s = rx.subscribe().await;

loop {
select! {
Ok(v) = s.recv_value() => {
Ok(v) = s.recv_old_new() => {
// debug!("delay received: {:?}", v);
let active_value = is_active(&v);
let v = T::received_to_sent(v);
let (_, v) = v;
match (active_value, &state) {
(false, _) => {
state = DelayInputState::Idle;
Expand Down Expand Up @@ -112,25 +112,23 @@ where
pub fn delay_repeat<T>(
name: &str,
duration: Duration,
rx: robotica_backend::entities::Receiver<T>,
is_active: impl Fn(&T::Received) -> bool + Send + 'static,
) -> robotica_backend::entities::Receiver<T>
rx: stateful::Receiver<T>,
is_active: impl Fn(&stateful::OldNewType<T>) -> bool + Send + 'static,
) -> stateful::Receiver<T>
where
T: Data + Send + 'static,
T::Sent: Send + Clone,
T::Received: Clone + Debug + Send + Sync + Eq + 'static,
T: Clone + Eq + Send + 'static,
{
let (tx_out, rx_out) = T::new_entity(name);
let (tx_out, rx_out) = stateful::create_pipe(name);
spawn(async move {
let mut state = DelayRepeatState::Idle;
let mut s = rx.subscribe().await;

loop {
select! {
Ok(v) = s.recv_value() => {
Ok(v) = s.recv_old_new() => {
// debug!("delay received: {:?}", v);
let active_value = is_active(&v);
let v = T::received_to_sent(v);
let (_, v)= v;

match (active_value, state) {
(false, _) => {
Expand Down
7 changes: 5 additions & 2 deletions brian-backend/src/environment_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use influxdb::{Client, InfluxDbWriteable};

use robotica_backend::pipes::{Subscriber, Subscription};
use robotica_backend::{get_env, is_debug_mode, spawn, EnvironmentError};
use robotica_common::anavi_thermometer::{self as anavi};
use robotica_common::mqtt::{Json, MqttMessage};
Expand Down Expand Up @@ -81,7 +82,9 @@ where
Json<T>: TryFrom<MqttMessage>,
<Json<T> as TryFrom<MqttMessage>>::Error: Send + std::error::Error,
{
let rx = state.subscriptions.subscribe_into::<Json<T>>(topic);
let rx = state
.subscriptions
.subscribe_into_stateless::<Json<T>>(topic);
let topic = topic.to_string();
let influx_url = get_env("INFLUXDB_URL")?;
let influx_database = get_env("INFLUXDB_DATABASE")?;
Expand All @@ -108,7 +111,7 @@ where
pub fn monitor_fishtank(state: &mut State, topic: &str) -> Result<(), EnvironmentError> {
let rx = state
.subscriptions
.subscribe_into::<Json<FishTankData>>(topic);
.subscribe_into_stateless::<Json<FishTankData>>(topic);
let topic = topic.to_string();
let influx_url = get_env("INFLUXDB_URL")?;
let influx_database = get_env("INFLUXDB_DATABASE")?;
Expand Down
9 changes: 5 additions & 4 deletions brian-backend/src/ha.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use monostate::MustBe;
use robotica_backend::entities::create_stateless_entity;
use robotica_backend::entities::StatelessSender;
use robotica_backend::pipes::stateless;
use robotica_backend::pipes::Subscriber;
use robotica_backend::pipes::Subscription;
use robotica_backend::services::mqtt::MqttTx;
use robotica_common::mqtt::MqttMessage;
use robotica_common::mqtt::QoS;
Expand Down Expand Up @@ -90,8 +91,8 @@ pub fn string_to_message(msg: &MessageCommand) -> MqttMessage {
MqttMessage::new(topic, payload, false, QoS::ExactlyOnce)
}

pub fn create_message_sink(mqtt: MqttTx) -> StatelessSender<MessageCommand> {
let (tx, rx) = create_stateless_entity::<MessageCommand>("messages");
pub fn create_message_sink(mqtt: MqttTx) -> stateless::Sender<MessageCommand> {
let (tx, rx) = stateless::create_pipe::<MessageCommand>("messages");
tokio::spawn(async move {
let mut rx = rx.subscribe().await;
while let Ok(msg) = rx.recv().await {
Expand Down
7 changes: 4 additions & 3 deletions brian-backend/src/hdmi.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use robotica_backend::pipes::{Subscriber, Subscription};
use thiserror::Error;
use tokio::select;
use tracing::debug;

use robotica_backend::{devices::hdmi::Command, entities, spawn};
use robotica_backend::{devices::hdmi::Command, pipes::stateless, spawn};
use robotica_common::mqtt::{Json, MqttMessage, QoS};
use robotica_common::robotica::commands;

Expand All @@ -25,10 +26,10 @@ pub fn run(state: &mut State, location: &str, device: &str, addr: &str) {

let command_rx = state
.subscriptions
.subscribe_into::<Json<commands::Command>>(&topic);
.subscribe_into_stateless::<Json<commands::Command>>(&topic);

let name = id.get_name("hdmi");
let (tx, rx) = entities::create_stateless_entity(name);
let (tx, rx) = stateless::create_pipe(name);

spawn(async move {
let mut rx_s = command_rx.subscribe().await;
Expand Down
Loading

0 comments on commit 974dfe5

Please sign in to comment.