Skip to content

Commit

Permalink
Removed obsolete blocking call (#271, #292)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Oct 18, 2023
1 parent a77cdbb commit d6a715e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,60 +93,6 @@ private Flux<Signal<HvacDeviceStatus<Void>, Void>> computeNonBlocking(Flux<Signa
});
}

/**
* Retanied for analysis. Should be removed as soon as normal operation is confirmed.
*
* @deprecated
*/
@Deprecated(forRemoval = true, since = "2023-10-01")
Flux<Signal<HvacDeviceStatus<Void>, Void>> computeBlocking(Flux<Signal<HvacCommand, Void>> in) {
return in
.filter(Signal::isOK)
.flatMap(signal -> {
return Flux
.<Signal<HvacDeviceStatus<Void>, Void>>create(sink -> {

try {

var command = reconcile(signal.getValue());

if (isModeOnly(command)) {
return;
}

var state = getState(command);

logger.debug("State: {}", state);

sink.next(new Signal<>(clock.instant(), new HvacDeviceStatus<Void>(command, uptime(), null)));

// By this time, the command has been verified to be valid
requested = command;

theSwitch.setState(state != inverted).block();

// No longer relevant
//actual = state;

updateUptime(clock.instant(), state);

var complete = new HvacDeviceStatus<Void>(command, uptime(), null);
sink.next(new Signal<>(clock.instant(), complete));

} catch (Throwable t) { // NOSONAR Consequences have been considered

logger.error("Failed to compute {}", signal, t);
sink.next(new Signal<>(clock.instant(), null, null, Signal.Status.FAILURE_TOTAL, t));

} finally {
sink.complete();
}

});
})
.doOnNext(this::broadcast);
}

@Override
protected Flux<Signal<HvacDeviceStatus<Void>, Void>> apply(HvacCommand command) {
throw new IllegalStateException("refactoring incomplete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.time.temporal.ChronoUnit;

import static net.sf.dz3r.model.HvacMode.COOLING;
import static net.sf.dz3r.signal.Signal.Status.FAILURE_TOTAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -218,38 +217,6 @@ void inverted() {
assertThat(s.getState().block()).isTrue();
}

@Test
void blockingFail() {

var now = Instant.now();
var s = new NullSwitch("a");
var d = new SwitchableHvacDevice(Clock.systemUTC(), "d", COOLING, s, false, null);
var sequence = Flux
.just(new Signal<HvacCommand, Void>(now, new HvacCommand(null, 0.8, null)))
.publishOn(Schedulers.newSingle("blocking-fail"));
var result = d.computeBlocking(sequence).log();

StepVerifier
.create(result)
.assertNext(e -> {
assertThat(e.isOK()).isTrue();
assertThat(e.isError()).isFalse();
assertThat(e.getValue().command.mode).isEqualTo(COOLING);
assertThat(e.getValue().command.demand).isEqualTo(0.8);
assertThat(e.getValue().command.fanSpeed).isNull();
})
.assertNext(e -> {
assertThat(e.isOK()).isFalse();
assertThat(e.isError()).isTrue();
assertThat(e.getValue()).isNull();
assertThat(e.status).isEqualTo(FAILURE_TOTAL);
assertThat(e.error)
.isInstanceOf(IllegalStateException.class)
.hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread");
})
.verifyComplete();
}

@Test
void nonBlockingPass() {

Expand Down

0 comments on commit d6a715e

Please sign in to comment.