From e5d7ee0f15823d06edb4c179e697beeb6546f314 Mon Sep 17 00:00:00 2001 From: Vadim Tkachenko Date: Sat, 19 Aug 2023 20:40:17 -0700 Subject: [PATCH] Band aid for #272 (works so far) --- .../device/actuator/AbstractHvacDevice.java | 49 +++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java index cb1d2e89e..ff46fea04 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/AbstractHvacDevice.java @@ -4,6 +4,7 @@ import net.sf.dz3r.signal.hvac.HvacDeviceStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import reactor.core.publisher.Flux; import java.io.IOException; @@ -56,26 +57,46 @@ protected void check(Switch s, String purpose) { @Override public final synchronized Flux> getFlux() { - logger.info("getFlux(): name={} waiting...", getAddress()); + // VT: NOTE: This whole synchronized thing must be eliminated. + bucket list. - while (statusFlux == null) { - try { - wait(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("This shouldn't have happened", ex); + ThreadContext.push("getFlux#" + Integer.toHexString(hashCode())); + + try { + logger.debug("getFlux(): name={} waiting...", getAddress()); + + while (statusFlux == null) { + try { + wait(); + logger.debug("getFlux(): name={} flux={}", getAddress(), statusFlux); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("This shouldn't have happened", ex); + } } - } - // VT: NOTE: Be careful when refactoring this, need correct sharing option here - return statusFlux.publish().autoConnect(); + logger.debug("getFlux(): name={} DONE", getAddress()); + + // VT: NOTE: Be careful when refactoring this, need correct sharing option here + return statusFlux; + + } finally { + ThreadContext.pop(); + } } protected final synchronized Flux> setFlux(Flux> source) { - statusFlux = source; - notifyAll(); - logger.info("setFlux(): name={} notified", getAddress()); - return source; + + // VT: NOTE: This whole synchronized thing must be eliminated. + bucket list. + + ThreadContext.push("getFlux#" + Integer.toHexString(hashCode())); + try { + statusFlux = source; + notifyAll(); + logger.debug("setFlux(): name={} notified", getAddress()); + return source; + } finally { + ThreadContext.pop(); + } } /**