Skip to content

Commit

Permalink
Eliminated blocking call in SwitchableHvacDevice (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Sep 12, 2023
1 parent 7d89440 commit f51da16
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

/**
* A device with just one switch acting as an HVAC device that just supports one mode (either heating or cooling).
Expand Down Expand Up @@ -38,15 +39,6 @@ public class SwitchableHvacDevice extends AbstractHvacDevice {
*/
private HvacCommand requested;

/**
* Actual switch status.
*
* {@code null} value means the status is unknown.
*
* VT: FIXME: Associate a timestamp with this value.
*/
private Boolean actual;

/**
* Create a named instance.
*
Expand Down Expand Up @@ -83,7 +75,48 @@ public Set<HvacMode> getModes() {

@Override
public Flux<Signal<HvacDeviceStatus, Void>> compute(Flux<Signal<HvacCommand, Void>> in) {
return computeNonBlocking(in);
}

private Flux<Signal<HvacDeviceStatus, Void>> computeNonBlocking(Flux<Signal<HvacCommand, Void>> in) {

return in
.filter(Signal::isOK)

// Can't throw this as a payload like we did in a blocking version, need to complain
.doOnNext(signal -> logger.debug("compute signal={}", signal))

.map(Signal::getValue)
.map(this::reconcile)
.filter(Predicate.not(this::isModeOnly))
.doOnNext(command -> logger.debug("compute command={}", command))
.flatMap(command -> {

var state = getState(command);

logger.debug("state: {}", state);

// By this time, the command has been verified to be valid
requested = command;

var result = new HvacDeviceStatus(command, uptime());

return theSwitch
.setState(state != inverted)
.map(ignore -> {
updateUptime(clock.instant(), state);
return new Signal<>(clock.instant(), result);
});
});
}

/**
* Retanied for analysis. Should be removed as soon as normal operation is confirmed.
*
* @deprecated
*/
@Deprecated(forRemoval = true, since = "2023-10-01")
Flux<Signal<HvacDeviceStatus, Void>> computeBlocking(Flux<Signal<HvacCommand, Void>> in) {
return in
.filter(Signal::isOK)
.flatMap(signal -> {
Expand All @@ -108,7 +141,10 @@ public Flux<Signal<HvacDeviceStatus, Void>> compute(Flux<Signal<HvacCommand, Voi
requested = command;

theSwitch.setState(state != inverted).block();
actual = state;

// No longer relevant
//actual = state;

updateUptime(clock.instant(), state);

var complete = new HvacDeviceStatus(command, uptime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static net.sf.dz3r.model.HvacMode.COOLING;
import static net.sf.dz3r.signal.Signal.Status.FAILURE_TOTAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

Expand All @@ -23,7 +26,7 @@ void lifecycle() {
var minDelayMillis = 50;
var maxDelayMillis = 200;
var s = new NullSwitch("a", false, minDelayMillis, maxDelayMillis, null);
var d = new SwitchableHvacDevice("d", HvacMode.COOLING, s);
var d = new SwitchableHvacDevice("d", COOLING, s);

var sequence = Flux.just(
new Signal<HvacCommand, Void>(now, new HvacCommand(null, 0.8, null)),
Expand All @@ -37,11 +40,6 @@ void lifecycle() {

StepVerifier
.create(result)
.assertNext(e -> {
// Actual is not yet set
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
Expand All @@ -51,29 +49,22 @@ void lifecycle() {
assertThat(e.getValue().command.demand).isEqualTo(0.5);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.5);
assertThat(e.getValue().command.fanSpeed).isNull();
})
// --
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.0);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.0);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.verifyComplete();
}

/**
* Make sure that the wrong mode is refused - these devices don't support more than one at a time.
*/
@Test
@Disabled("for now; need to fix blocking operation first")
void wrongMode() {

var d = new SwitchableHvacDevice("d", HvacMode.COOLING, mock(Switch.class));
var d = new SwitchableHvacDevice("d", COOLING, mock(Switch.class));
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null))
);
Expand Down Expand Up @@ -120,7 +111,7 @@ void noFansForHeating() {
void allowFansForCooling() {

var s = new NullSwitch("a");
var d = new SwitchableHvacDevice("d", HvacMode.COOLING, s);
var d = new SwitchableHvacDevice("d", COOLING, s);
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(null, 0.8, 1.0))
);
Expand All @@ -132,9 +123,6 @@ void allowFansForCooling() {
.assertNext(e -> {
assertThat(e.getValue().command.fanSpeed).isEqualTo(1.0);
})
.assertNext(e -> {
assertThat(e.getValue().command.fanSpeed).isEqualTo(1.0);
})
.verifyComplete();
}

Expand All @@ -144,9 +132,9 @@ void allowFansForCooling() {
@Test
void modeOnly() {

var d = new SwitchableHvacDevice("d", HvacMode.COOLING, mock(Switch.class));
var d = new SwitchableHvacDevice("d", COOLING, mock(Switch.class));
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.COOLING, null, null))
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(COOLING, null, null))
);

var result = d.compute(sequence).log();
Expand All @@ -164,7 +152,7 @@ void interleave() {

var now = Instant.now();
var s = new NullSwitch("a");
var d = new SwitchableHvacDevice("d", HvacMode.COOLING, s);
var d = new SwitchableHvacDevice("d", COOLING, s);

var sequence = Flux.just(
// First, request cooling
Expand All @@ -186,38 +174,22 @@ void interleave() {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
// Device must stay on
.assertNext(e -> {
// Requested demand is the previous value
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isEqualTo(0.5);
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isEqualTo(0.5);
})
// Device must still stay on
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isZero();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isZero();
})
// Device must shut off
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.0);
assertThat(e.getValue().command.fanSpeed).isZero();
})
.assertNext(e -> {
assertThat(e.getValue().command.demand).isEqualTo(0.0);
assertThat(e.getValue().command.fanSpeed).isZero();
})
.verifyComplete();
}

Expand All @@ -226,7 +198,7 @@ void inverted() {

var now = Instant.now();
var s = new NullSwitch("a");
var d = new SwitchableHvacDevice("d", HvacMode.COOLING, s, true);
var d = new SwitchableHvacDevice("d", COOLING, s, true);

var sequence = Flux.just(
// First, request cooling
Expand All @@ -244,4 +216,59 @@ void inverted() {
// A bit simpler than full, but it'll do
assertThat(s.getState().block()).isTrue();
}

@Test
void blockingFail() {

var now = Instant.now();
var s = new NullSwitch("a");
var d = new SwitchableHvacDevice("d", COOLING, s);
var sequence = Flux
.just(new Signal<HvacCommand, Void>(now, new HvacCommand(null, 0.8, null)))
.publishOn(Schedulers.newSingle("blocking-fail"));
var result = d.computeBlocking(sequence).log();

StepVerifier
.create(result)
.assertNext(e -> {
assertThat(e.isOK()).isTrue();
assertThat(e.isError()).isFalse();
assertThat(e.getValue().command.mode).isEqualTo(COOLING);
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.isOK()).isFalse();
assertThat(e.isError()).isTrue();
assertThat(e.getValue()).isNull();
assertThat(e.status).isEqualTo(FAILURE_TOTAL);
assertThat(e.error)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread");
})
.verifyComplete();
}

@Test
void nonBlockingPass() {

var now = Instant.now();
var s = new NullSwitch("a");
var d = new SwitchableHvacDevice("d", COOLING, s);
var sequence = Flux
.just(new Signal<HvacCommand, Void>(now, new HvacCommand(null, 0.8, null)))
.publishOn(Schedulers.newSingle("blocking-fail"));
var result = d.compute(sequence).log();

StepVerifier
.create(result)
.assertNext(e -> {
assertThat(e.isOK()).isTrue();
assertThat(e.isError()).isFalse();
assertThat(e.getValue().command.mode).isEqualTo(COOLING);
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.verifyComplete();
}
}

0 comments on commit f51da16

Please sign in to comment.