diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/Id2Flux.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/Id2Flux.java new file mode 100644 index 000000000..a0d3dc8ba --- /dev/null +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/Id2Flux.java @@ -0,0 +1,11 @@ +package net.sf.dz3r.runtime.config; + +import net.sf.dz3r.signal.Signal; +import reactor.core.publisher.Flux; + +public record Id2Flux( + String id, + Flux> flux +) { + +} diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/SensorSwitchResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/SensorSwitchResolver.java index eb3621e25..d967b86b8 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/SensorSwitchResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/SensorSwitchResolver.java @@ -1,11 +1,9 @@ package net.sf.dz3r.runtime.config; -import net.sf.dz3r.signal.Signal; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -24,5 +22,5 @@ protected SensorSwitchResolver(Set source) { this.source = Optional.ofNullable(source).orElse(Set.of()); } - public abstract Flux>>> getSensorFluxes(); + public abstract Flux getSensorFluxes(); } diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java index b06f04221..a444e9b76 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttConfigurationParser.java @@ -7,11 +7,11 @@ import net.sf.dz3r.runtime.config.ConfigurationContext; import net.sf.dz3r.runtime.config.ConfigurationContextAware; import net.sf.dz3r.runtime.config.ConfigurationMapper; +import net.sf.dz3r.runtime.config.Id2Flux; import net.sf.dz3r.runtime.config.connector.HomeAssistantConfig; import net.sf.dz3r.runtime.config.connector.HomeAssistantConfigParser; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttDeviceConfig; import net.sf.dz3r.runtime.config.protocol.mqtt.MqttEndpointSpec; -import net.sf.dz3r.signal.Signal; import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,7 +49,7 @@ public MqttConfigurationParser(ConfigurationContext context) { * is gathered. */ public Mono>>>, + List, List>>> parse( Set esphome, Set zigbee2mqtt, @@ -117,7 +117,7 @@ List>>> parse( var sensors = mqttConfigs .publishOn(Schedulers.boundedElastic()) .flatMap(MqttSensorSwitchResolver::getSensorFluxes) - .doOnNext(kv -> context.sensors.register(kv.getKey(), kv.getValue())) + .doOnNext(kv -> context.sensors.register(kv.id(), kv.flux())) .collectList(); var switches = mqttConfigs diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttSensorSwitchResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttSensorSwitchResolver.java index f34d233a2..400ce50a0 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttSensorSwitchResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/mqtt/MqttSensorSwitchResolver.java @@ -3,6 +3,7 @@ import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch; import net.sf.dz3r.device.mqtt.v1.MqttAdapter; import net.sf.dz3r.runtime.config.ConfigurationMapper; +import net.sf.dz3r.runtime.config.Id2Flux; import net.sf.dz3r.runtime.config.SensorSwitchResolver; import net.sf.dz3r.runtime.config.hardware.SensorConfig; import net.sf.dz3r.runtime.config.hardware.SwitchConfig; @@ -46,11 +47,11 @@ protected MqttSensorSwitchResolver(Set source, Map>>> getSensorFluxes() { + public final Flux getSensorFluxes() { return getSensorFluxes(endpoint2adapter, sensorConfigs); } - private Flux>>> getSensorFluxes(Map endpoint2adapter, Set source) { + private Flux getSensorFluxes(Map endpoint2adapter, Set source) { return Flux .fromIterable(source) @@ -61,21 +62,24 @@ private Flux>>> getSensorFluxes(Map< var adapter = endpoint2adapter.get(ConfigurationMapper.INSTANCE.parseEndpoint(c.mqttBrokerSpec())); var listener = resolveListener(c.mqttBrokerSpec(), adapter); - return new ImmutablePair<>(c, listener); + return new Config2Listener<>(c.sensorConfig, listener); }) // This is where things get hairy .parallel() .runOn(Schedulers.boundedElastic()) - .map(kv -> { - var id = kv.getKey().sensorConfig().id(); - var address = kv.getKey().sensorConfig().address(); - var flux = kv.getValue().getFlux(address); + .map(c2l -> { + var id = c2l.config.id(); + var address = c2l.config.address(); + var flux = c2l.listener.getFlux(address); // ID takes precedence over address var key = id == null ? address : id; - return (Map.Entry>>) new ImmutablePair<>(key, guarded(flux, kv.getKey().sensorConfig)); + return new Id2Flux( + key, + guarded(flux, c2l.config) + ); }) .sequential(); } @@ -211,4 +215,11 @@ public record MqttSwitchConfig( MqttBrokerSpec mqttBrokerSpec, SwitchConfig switchConfig) { } + + private record Config2Listener>( + SensorConfig config, + L listener + ) { + + } } diff --git a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/onewire/OnewireSensorSwitchResolver.java b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/onewire/OnewireSensorSwitchResolver.java index e0bdf193d..e5afdf5af 100644 --- a/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/onewire/OnewireSensorSwitchResolver.java +++ b/dz3r-bootstrap/src/main/java/net/sf/dz3r/runtime/config/onewire/OnewireSensorSwitchResolver.java @@ -1,11 +1,10 @@ package net.sf.dz3r.runtime.config.onewire; +import net.sf.dz3r.runtime.config.Id2Flux; import net.sf.dz3r.runtime.config.SensorSwitchResolver; import net.sf.dz3r.runtime.config.protocol.onewire.OnewireBusConfig; -import net.sf.dz3r.signal.Signal; import reactor.core.publisher.Flux; -import java.util.Map; import java.util.Set; public class OnewireSensorSwitchResolver extends SensorSwitchResolver { @@ -15,7 +14,7 @@ private OnewireSensorSwitchResolver(Set source) { } @Override - public Flux>>> getSensorFluxes() { + public Flux getSensorFluxes() { logger.error("NOT IMPLEMENTED: {}#getSensorFluxes()", getClass().getName()); return Flux.empty(); }