Skip to content

Commit

Permalink
Band aid for #272 (works so far)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Aug 20, 2023
1 parent 95ea57c commit e5d7ee0
Showing 1 changed file with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import net.sf.dz3r.signal.hvac.HvacDeviceStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;

import java.io.IOException;
Expand Down Expand Up @@ -56,26 +57,46 @@ protected void check(Switch<?> s, String purpose) {
@Override
public final synchronized Flux<Signal<HvacDeviceStatus, Void>> getFlux() {

logger.info("getFlux(): name={} waiting...", getAddress());
// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

while (statusFlux == null) {
try {
wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("This shouldn't have happened", ex);
ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));

try {
logger.debug("getFlux(): name={} waiting...", getAddress());

while (statusFlux == null) {
try {
wait();
logger.debug("getFlux(): name={} flux={}", getAddress(), statusFlux);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("This shouldn't have happened", ex);
}
}
}

// VT: NOTE: Be careful when refactoring this, need correct sharing option here
return statusFlux.publish().autoConnect();
logger.debug("getFlux(): name={} DONE", getAddress());

// VT: NOTE: Be careful when refactoring this, need correct sharing option here
return statusFlux;

} finally {
ThreadContext.pop();
}
}

protected final synchronized Flux<Signal<HvacDeviceStatus, Void>> setFlux(Flux<Signal<HvacDeviceStatus, Void>> source) {
statusFlux = source;
notifyAll();
logger.info("setFlux(): name={} notified", getAddress());
return source;

// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));
try {
statusFlux = source;
notifyAll();
logger.debug("setFlux(): name={} notified", getAddress());
return source;
} finally {
ThreadContext.pop();
}
}

/**
Expand Down

0 comments on commit e5d7ee0

Please sign in to comment.