diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java index fb879a663..6ffaf7a61 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/economizer/AbstractEconomizer.java @@ -138,12 +138,18 @@ protected final void initFluxes(Flux> ambientFlux) { // Let the transmission layer figure out the dupes, they have a better idea about what to do with them .flatMap(demandDevice::setState) + .doOnError(t -> logger.error("{}: errored out", getAddress(), t)) + .doOnComplete(() -> logger.debug("{}: completed", getAddress())) + // VT: NOTE: Careful when testing, this will consume everything thrown at it immediately .subscribe(); ambientFlux .doOnNext(this::recordAmbient) + .doOnError(t -> logger.error("{}: errored out", getAddress(), t)) + .doOnComplete(() -> logger.debug("{}: completed", getAddress())) + // VT: NOTE: Careful when testing, this will consume everything thrown at it immediately .subscribe(); } diff --git a/dz3r-model/src/main/java/net/sf/dz3r/signal/filter/TimeoutGuard.java b/dz3r-model/src/main/java/net/sf/dz3r/signal/filter/TimeoutGuard.java index 77a0bf3c6..40148317d 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/signal/filter/TimeoutGuard.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/signal/filter/TimeoutGuard.java @@ -101,7 +101,7 @@ private synchronized void guard() { private void generateTimeoutSignal(Instant now) { - logger.warn("{}: timeout of {} is exceeded, inTimeout={}, repeat={}", marker, timeout, inTimeout, repeat); + logger.info("{}: timeout of {} is exceeded, inTimeout={}, repeat={}", marker, timeout, inTimeout, repeat); timeoutFluxSink.next(new Signal<>( now, @@ -119,10 +119,13 @@ public Flux> compute(Flux> in) { var actual = in .doOnNext(s -> touch(s.timestamp)) .doOnNext(ignored -> inTimeout = false) + .doOnError(t -> logger.error("{}: errored out", marker, t)) .doOnComplete(this::close); return Flux.merge(actual, timeoutFlux) - .doOnNext(s -> logger.trace("{}: compute={}", marker, s)); + .doOnNext(s -> logger.trace("{}: compute={}", marker, s)) + .doOnError(t -> logger.error("{}: errored out", marker, t)) + .doOnComplete(() -> logger.debug("{}: completed", marker)); } private synchronized void touch(Instant timestamp) {