Skip to content

Commit

Permalink
1/2: improved feedback from control inputs to console (#271)
Browse files Browse the repository at this point in the history
2/2: improved feedback from connectors coming in next commit[s]

Note: chances of stalling on start (noticed in rev.
cb446b6) increased, need to fix this for
good soon
  • Loading branch information
climategadgets committed Oct 4, 2023
1 parent f297a13 commit 363625f
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion dz3r-model/src/main/java/net/sf/dz3r/model/Zone.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Instant;
import java.util.Optional;
Expand Down Expand Up @@ -58,6 +59,11 @@ public enum State {

private final AbstractEconomizer<?> economizer;

private final Sinks.Many<Signal<Double, String>> feedbackSink = Sinks.many().unicast().onBackpressureBuffer();
private final Flux<Signal<Double, String>> feedbackFlux = feedbackSink.asFlux();

private Signal<Double, String> lastKnownSignal;

/**
* Create an instance without an economizer.
*
Expand Down Expand Up @@ -111,9 +117,25 @@ public ZoneSettings setSettingsSync(ZoneSettings settings) {

this.settings = newSettings;

bump();

return this.settings;
}

/**
* Force the {@link #lastKnownSignal} through {@link #compute(Flux)}.
*/
private void bump() {

if (lastKnownSignal == null) {
logger.warn("{}: no lastKnownSignal yet, settings will get to consumers on next sensor signal arrival", getAddress());
return;
}

logger.info("{}: replaying signal: {}", getAddress(), lastKnownSignal);
feedbackSink.tryEmitNext(lastKnownSignal);
}

/**
* Set zone settings in a reactive way.
*
Expand Down Expand Up @@ -169,8 +191,18 @@ public PeriodSettings getPeriodSettings() {
@Override
public Flux<Signal<ZoneStatus, String>> compute(Flux<Signal<Double, String>> in) {

// There are two input streams:
//
// - the argument, brings in sensor readings
// - the feedback, repeats the last sensor reading upon changing zone settings and forces a new signal to be emitted here

// Record the signal so the feedback flux can pick it up
var recorded = in.doOnNext(this::recordSignal);

var combined = Flux.merge(recorded, feedbackFlux);

var source = Optional.ofNullable(economizer)
.map(eco -> eco.compute(in))
.map(eco -> eco.compute(combined))
.orElse(in);

// Since the zone doesn't need the payload, but the thermostat does, need to translate the input
Expand All @@ -196,6 +228,10 @@ public Flux<Signal<ZoneStatus, String>> compute(Flux<Signal<Double, String>> in)
return stage3.map(this::suppressEconomizer);
}

private void recordSignal(Signal<Double, String> signal) {
this.lastKnownSignal = signal;
}

private Signal<ZoneStatus, String> translate(Signal<ProcessController.Status<CallingStatus>, Void> source) {

return new Signal<>(
Expand Down

0 comments on commit 363625f

Please sign in to comment.