Skip to content

Commit

Permalink
fix: make data retrieval of IOLink sensor more robust (#2024)
Browse files Browse the repository at this point in the history
* fix: make data retrieval of IOLink sensor more robust

* refactor: make IOLink parsing more verbose

* refactor: make IOLink parsing more verbose

* refactor: remove flow statements
  • Loading branch information
bossenti authored Oct 16, 2023
1 parent 857edb3 commit a2d4e61
Showing 1 changed file with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ public class IfmAlMqttAdapter implements StreamPipesAdapter {

public static final String ID = "org.apache.streampipes.connect.iiot.adapters.iolink";

public static final String KEY_PORT_INFORMATION = "/iolinkmaster/port[%s]/iolinkdevice/pdin";

private static final Logger LOG = LoggerFactory.getLogger(IfmAlMqttAdapter.class);

private static final String PORTS = "ports";
private static final String SENSOR_TYPE = "sensor_type";

private final IParser parser;
private boolean missingEventDataDetected = false;
private boolean missingPortInformationDetected = false;

private MqttConsumer mqttConsumer;
private MqttConfig mqttConfig;

private final IParser parser;

private List<String> ports;

public IfmAlMqttAdapter() {
Expand Down Expand Up @@ -110,25 +114,61 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,

for (int i = 0; i < ports.size(); i++) {

var portResult = getMap(payload,
"/iolinkmaster/port[%s]/iolinkdevice/pdin".formatted(ports.get(i)));
var eventData = (String) portResult.get("data");

var parsedEvent = sensor.parseEvent(eventData);
parsedEvent.put("timestamp", System.currentTimeMillis() + i);
parsedEvent.put("port", "port" + ports.get(i));
parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);

collector.collect(parsedEvent);
String keyPortInformation = KEY_PORT_INFORMATION.formatted(ports.get(i));

Map<String, Object> portResult;
if (payload.containsKey(keyPortInformation)) {

portResult = getMap(payload, keyPortInformation);

try {
String eventData;
if (portResult.containsKey("data")) {
eventData = (String) portResult.get("data");

var parsedEvent = sensor.parseEvent(eventData);
parsedEvent.put("timestamp", System.currentTimeMillis() + i);
parsedEvent.put("port", "port" + ports.get(i));
parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);

collector.collect(parsedEvent);
} else {
if (!missingEventDataDetected) {
adapterRuntimeContext
.getLogger()
.warn("Payload for port %s does not contain event data".formatted(i), "");
LOG.error(
"IoLink event does not look like expected. "
+ "No port information found for port {}.", i);
missingEventDataDetected = true;
}
}
} catch (Exception e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("Data from IOLink sensor could not be extracted for port {}: {}", i, e);
}

} else {
if (!missingPortInformationDetected) {
adapterRuntimeContext
.getLogger()
.warn("Event does not contain information about port " + i, "");
LOG.error("IoLink event does not look like expected. No port information found for port {}.", i);
missingPortInformationDetected = true;
}
}
}
});
} catch (Exception e) {
} catch (ParseException e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("Could not parse event", e);
.getLogger()
.error(e);
LOG.error("IOLink master event could not be parsed.", e);
}
});
}
);

Thread thread = new Thread(this.mqttConsumer);
thread.start();
Expand Down

0 comments on commit a2d4e61

Please sign in to comment.