From a2d4e617fff81538a73aec6507352e05098994a7 Mon Sep 17 00:00:00 2001 From: Tim <50115603+bossenti@users.noreply.github.com> Date: Mon, 16 Oct 2023 14:56:42 +0200 Subject: [PATCH] fix: make data retrieval of IOLink sensor more robust (#2024) * fix: make data retrieval of IOLink sensor more robust * refactor: make IOLink parsing more verbose * refactor: make IOLink parsing more verbose * refactor: remove flow statements --- .../adapters/iolink/IfmAlMqttAdapter.java | 74 ++++++++++++++----- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java index e2606996d9..79b3dfe677 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java @@ -54,16 +54,20 @@ public class IfmAlMqttAdapter implements StreamPipesAdapter { public static final String ID = "org.apache.streampipes.connect.iiot.adapters.iolink"; + public static final String KEY_PORT_INFORMATION = "/iolinkmaster/port[%s]/iolinkdevice/pdin"; + private static final Logger LOG = LoggerFactory.getLogger(IfmAlMqttAdapter.class); private static final String PORTS = "ports"; private static final String SENSOR_TYPE = "sensor_type"; + private final IParser parser; + private boolean missingEventDataDetected = false; + private boolean missingPortInformationDetected = false; + private MqttConsumer mqttConsumer; private MqttConfig mqttConfig; - private final IParser parser; - private List ports; public IfmAlMqttAdapter() { @@ -110,25 +114,61 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, for (int i = 0; i < ports.size(); i++) { - var portResult = getMap(payload, - "/iolinkmaster/port[%s]/iolinkdevice/pdin".formatted(ports.get(i))); - var eventData = (String) portResult.get("data"); - - var parsedEvent = sensor.parseEvent(eventData); - parsedEvent.put("timestamp", System.currentTimeMillis() + i); - parsedEvent.put("port", "port" + ports.get(i)); - parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber); - - collector.collect(parsedEvent); + String keyPortInformation = KEY_PORT_INFORMATION.formatted(ports.get(i)); + + Map portResult; + if (payload.containsKey(keyPortInformation)) { + + portResult = getMap(payload, keyPortInformation); + + try { + String eventData; + if (portResult.containsKey("data")) { + eventData = (String) portResult.get("data"); + + var parsedEvent = sensor.parseEvent(eventData); + parsedEvent.put("timestamp", System.currentTimeMillis() + i); + parsedEvent.put("port", "port" + ports.get(i)); + parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber); + + collector.collect(parsedEvent); + } else { + if (!missingEventDataDetected) { + adapterRuntimeContext + .getLogger() + .warn("Payload for port %s does not contain event data".formatted(i), ""); + LOG.error( + "IoLink event does not look like expected. " + + "No port information found for port {}.", i); + missingEventDataDetected = true; + } + } + } catch (Exception e) { + adapterRuntimeContext + .getLogger() + .error(e); + LOG.error("Data from IOLink sensor could not be extracted for port {}: {}", i, e); + } + + } else { + if (!missingPortInformationDetected) { + adapterRuntimeContext + .getLogger() + .warn("Event does not contain information about port " + i, ""); + LOG.error("IoLink event does not look like expected. No port information found for port {}.", i); + missingPortInformationDetected = true; + } + } } }); - } catch (Exception e) { + } catch (ParseException e) { adapterRuntimeContext - .getLogger() - .error(e); - LOG.error("Could not parse event", e); + .getLogger() + .error(e); + LOG.error("IOLink master event could not be parsed.", e); } - }); + } + ); Thread thread = new Thread(this.mqttConsumer); thread.start();