Skip to content

Commit

Permalink
Eliminated anonymous data structures to improve readability (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Oct 8, 2023
1 parent ae533b9 commit 13cc043
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package net.sf.dz3r.runtime.config;

import net.sf.dz3r.signal.Signal;
import reactor.core.publisher.Flux;

public record Id2Flux(
String id,
Flux<Signal<Double, Void>> flux
) {

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package net.sf.dz3r.runtime.config;

import net.sf.dz3r.signal.Signal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand All @@ -24,5 +22,5 @@ protected SensorSwitchResolver(Set<T> source) {
this.source = Optional.ofNullable(source).orElse(Set.of());
}

public abstract Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> getSensorFluxes();
public abstract Flux<Id2Flux> getSensorFluxes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import net.sf.dz3r.runtime.config.ConfigurationContext;
import net.sf.dz3r.runtime.config.ConfigurationContextAware;
import net.sf.dz3r.runtime.config.ConfigurationMapper;
import net.sf.dz3r.runtime.config.Id2Flux;
import net.sf.dz3r.runtime.config.connector.HomeAssistantConfig;
import net.sf.dz3r.runtime.config.connector.HomeAssistantConfigParser;
import net.sf.dz3r.runtime.config.protocol.mqtt.MqttDeviceConfig;
import net.sf.dz3r.runtime.config.protocol.mqtt.MqttEndpointSpec;
import net.sf.dz3r.signal.Signal;
import org.apache.commons.lang3.tuple.ImmutablePair;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -49,7 +49,7 @@ public MqttConfigurationParser(ConfigurationContext context) {
* is gathered.
*/
public Mono<Tuple2<
List<Map.Entry<String, Flux<Signal<Double, Void>>>>,
List<Id2Flux>,
List<Map.Entry<String, AbstractMqttSwitch>>>> parse(
Set<MqttDeviceConfig> esphome,
Set<MqttDeviceConfig> zigbee2mqtt,
Expand Down Expand Up @@ -117,7 +117,7 @@ List<Map.Entry<String, AbstractMqttSwitch>>>> parse(
var sensors = mqttConfigs
.publishOn(Schedulers.boundedElastic())
.flatMap(MqttSensorSwitchResolver::getSensorFluxes)
.doOnNext(kv -> context.sensors.register(kv.getKey(), kv.getValue()))
.doOnNext(kv -> context.sensors.register(kv.id(), kv.flux()))
.collectList();

var switches = mqttConfigs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch;
import net.sf.dz3r.device.mqtt.v1.MqttAdapter;
import net.sf.dz3r.runtime.config.ConfigurationMapper;
import net.sf.dz3r.runtime.config.Id2Flux;
import net.sf.dz3r.runtime.config.SensorSwitchResolver;
import net.sf.dz3r.runtime.config.hardware.SensorConfig;
import net.sf.dz3r.runtime.config.hardware.SwitchConfig;
Expand Down Expand Up @@ -46,11 +47,11 @@ protected MqttSensorSwitchResolver(Set<A> source, Map<MqttEndpointSpec, MqttAdap
}

@Override
public final Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> getSensorFluxes() {
public final Flux<Id2Flux> getSensorFluxes() {
return getSensorFluxes(endpoint2adapter, sensorConfigs);
}

private Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> getSensorFluxes(Map<MqttEndpointSpec, MqttAdapter> endpoint2adapter, Set<MqttSensorConfig> source) {
private Flux<Id2Flux> getSensorFluxes(Map<MqttEndpointSpec, MqttAdapter> endpoint2adapter, Set<MqttSensorConfig> source) {

return Flux
.fromIterable(source)
Expand All @@ -61,21 +62,24 @@ private Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> getSensorFluxes(Map<
var adapter = endpoint2adapter.get(ConfigurationMapper.INSTANCE.parseEndpoint(c.mqttBrokerSpec()));
var listener = resolveListener(c.mqttBrokerSpec(), adapter);

return new ImmutablePair<>(c, listener);
return new Config2Listener<>(c.sensorConfig, listener);
})

// This is where things get hairy
.parallel()
.runOn(Schedulers.boundedElastic())
.map(kv -> {
var id = kv.getKey().sensorConfig().id();
var address = kv.getKey().sensorConfig().address();
var flux = kv.getValue().getFlux(address);
.map(c2l -> {
var id = c2l.config.id();
var address = c2l.config.address();
var flux = c2l.listener.getFlux(address);

// ID takes precedence over address
var key = id == null ? address : id;

return (Map.Entry<String, Flux<Signal<Double, Void>>>) new ImmutablePair<>(key, guarded(flux, kv.getKey().sensorConfig));
return new Id2Flux(
key,
guarded(flux, c2l.config)
);
})
.sequential();
}
Expand Down Expand Up @@ -211,4 +215,11 @@ public record MqttSwitchConfig(
MqttBrokerSpec mqttBrokerSpec,
SwitchConfig switchConfig) {
}

private record Config2Listener<L extends SignalSource<String, Double, Void>>(
SensorConfig config,
L listener
) {

}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package net.sf.dz3r.runtime.config.onewire;

import net.sf.dz3r.runtime.config.Id2Flux;
import net.sf.dz3r.runtime.config.SensorSwitchResolver;
import net.sf.dz3r.runtime.config.protocol.onewire.OnewireBusConfig;
import net.sf.dz3r.signal.Signal;
import reactor.core.publisher.Flux;

import java.util.Map;
import java.util.Set;

public class OnewireSensorSwitchResolver extends SensorSwitchResolver<OnewireBusConfig> {
Expand All @@ -15,7 +14,7 @@ private OnewireSensorSwitchResolver(Set<OnewireBusConfig> source) {
}

@Override
public Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> getSensorFluxes() {
public Flux<Id2Flux> getSensorFluxes() {
logger.error("NOT IMPLEMENTED: {}#getSensorFluxes()", getClass().getName());
return Flux.empty();
}
Expand Down

0 comments on commit 13cc043

Please sign in to comment.