From 37515178044e9cac9ccb35c25fb640e0ba62e9f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Dywicki?= Date: Wed, 30 Oct 2024 13:28:51 +0100 Subject: [PATCH] Adjust opc ua protocol logic. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ɓukasz Dywicki --- .../opcua/protocol/OpcuaProtocolLogic.java | 28 ++++---- .../protocol/OpcuaSubscriptionHandle.java | 64 +++++++++---------- .../java/spi/tag/TagTagConfigParserTest.java | 17 +++-- 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java index 2209bd4dfdc..06f3d07b01b 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java @@ -29,7 +29,6 @@ import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.*; import org.apache.plc4x.java.api.metadata.Metadata; -import org.apache.plc4x.java.spi.metadata.DefaultMetadata; import org.apache.plc4x.java.api.metadata.time.TimeSource; import org.apache.plc4x.java.api.model.PlcConsumerRegistration; import org.apache.plc4x.java.api.model.PlcSubscriptionHandle; @@ -51,6 +50,7 @@ import org.apache.plc4x.java.spi.configuration.HasConfiguration; import org.apache.plc4x.java.spi.connection.PlcTagHandler; import org.apache.plc4x.java.spi.context.DriverContext; +import org.apache.plc4x.java.spi.generation.Message; import org.apache.plc4x.java.spi.messages.*; import org.apache.plc4x.java.spi.messages.utils.DefaultPlcResponseItem; import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem; @@ -61,6 +61,7 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; import org.apache.plc4x.java.spi.values.LegacyPlcValueHandler; import org.apache.plc4x.java.spi.values.PlcList; +import org.apache.plc4x.java.spi.values.PlcSTRING; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,8 +249,8 @@ public CompletableFuture read(PlcReadRequest readRequest) { Metadata responseMetadata = new Builder() .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis()) .build(); - Map metadata = new LinkedHashMap<>(); - return new DefaultPlcReadResponse(request, readResponse(tagMap, response.getResults(), metadata, responseMetadata), metadata); + Entry, Map>> mappedResponse = readResponse(tagMap, response.getResults(), responseMetadata); + return new DefaultPlcReadResponse(request, mappedResponse.getValue(), mappedResponse.getKey()); }); } @@ -275,44 +276,41 @@ static NodeId generateNodeId(OpcuaTag tag) { return nodeId; } - public Map> readResponse(Map tagMap, List results, Map metadata, Metadata responseMetadata) { - PlcResponseCode responseCode = null; // initialize variable + Entry, Map>> readResponse(Map tagMap, List results, Metadata responseMetadata) { Map> response = new HashMap<>(); + Map metadata = new HashMap<>(); int index = 0; for (String tagName : tagMap.keySet()) { PlcTag tag = tagMap.get(tagName); PlcValue value = null; DataValue dataValue = results.get(index++); + PlcResponseCode responseCode = PlcResponseCode.OK; if (dataValue.getValueSpecified()) { - value = decodePlcValue(tag, dataValue.getValue()); + value = variantToPlcValue(tag, dataValue.getValue()); if (value == null) { + LOGGER.error("Variant type {} is not supported.", dataValue.getValue().getClass()); responseCode = PlcResponseCode.UNSUPPORTED; - LOGGER.error("Data type - " + dataValue.getValue().getClass() + " is not supported "); - } - // response code might be null in first iteration - if (PlcResponseCode.UNSUPPORTED != responseCode) { - responseCode = PlcResponseCode.OK; } } else { StatusCode statusCode = dataValue.getStatusCode(); responseCode = mapOpcStatusCode(statusCode.getStatusCode(), PlcResponseCode.UNSUPPORTED); - LOGGER.error("Error while reading value from OPC UA server error code:- " + dataValue.getStatusCode().toString()); + LOGGER.error("Error while reading value from OPC UA server error code: {}", statusCode.toString()); } Metadata tagMetadata = new Builder(responseMetadata) .put(OpcMetadataKeys.QUALITY, new OpcuaQualityStatus(dataValue.getStatusCode())) .put(OpcMetadataKeys.SERVER_TIMESTAMP, dataValue.getServerTimestamp()) .put(OpcMetadataKeys.SOURCE_TIMESTAMP, dataValue.getSourceTimestamp()) - .put(PlcMetadataKeys.TIMESTAMP, dataValue.getServerTimestamp()) + .put(PlcMetadataKeys.TIMESTAMP, dataValue.getSourceTimestamp()) .put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE) .build(); response.put(tagName, new DefaultPlcResponseItem<>(responseCode, value)); metadata.put(tagName, tagMetadata); } - return response; + return Map.entry(metadata, response); } - static PlcValue decodePlcValue(PlcTag tag, Variant variant) { + static PlcValue variantToPlcValue(PlcTag tag, Variant variant) { PlcValue value = null; if (variant instanceof VariantBoolean) { byte[] array = ((VariantBoolean) variant).getValue(); diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java index 01dc491a51b..e2034da253b 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java @@ -20,11 +20,11 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import java.util.Map.Entry; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.PlcMetadataKeys; import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; @@ -46,6 +46,7 @@ import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle; import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; +import org.apache.plc4x.java.spi.values.PlcNull; import org.apache.plc4x.java.spi.values.PlcStruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +60,9 @@ public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle { - private static final Logger LOGGER = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class); - private final static ScheduledExecutorService EXECUTOR = newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "plc4x-opcua-subscription-scheduler")); + private final Logger logger = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class); private final Set> consumers; private final List tagNames; private final Conversation conversation; @@ -166,22 +166,22 @@ public CompletableFuture onSubscribeCreateMonitoredItem return conversation.submit(createMonitoredItemsRequest, CreateMonitoredItemsResponse.class) .whenComplete((response, error) -> { if (error instanceof TimeoutException) { - LOGGER.info("Timeout while sending the Create Monitored Item Subscription Message", error); + logger.info("Timeout while sending the Create Monitored Item Subscription Message", error); } else if (error != null) { - LOGGER.info("Error while sending the Create Monitored Item Subscription Message", error); + logger.info("Error while sending the Create Monitored Item Subscription Message", error); } }).thenApply(responseMessage -> { MonitoredItemCreateResult[] array = responseMessage.getResults().stream().toArray(MonitoredItemCreateResult[]::new); for (int index = 0, arrayLength = array.length; index < arrayLength; index++) { MonitoredItemCreateResult result = array[index]; if (OpcuaStatusCode.enumForValue(result.getStatusCode().getStatusCode()) != OpcuaStatusCode.Good) { - LOGGER.error("Invalid Tag {}, subscription created without this tag", tagNames.get(index)); + logger.error("Invalid Tag {}, subscription created without this tag", tagNames.get(index)); } else { - LOGGER.debug("Tag {} was added to the subscription", tagNames.get(index)); + logger.debug("Tag {} was added to the subscription", tagNames.get(index)); } } - LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId); + logger.trace("Scheduling publish event for subscription {}", subscriptionId); publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, revisedCycleTime / 2, revisedCycleTime, TimeUnit.MILLISECONDS); return this; }); @@ -210,7 +210,7 @@ private void sendPublishRequest() { // we work in external thread - we need to coordinate access to conversation pipeline RequestTransaction transaction = tm.startRequest(); transaction.submit(() -> { - LOGGER.trace("Sent publish request with {} acks", ackLength); + logger.trace("Sent publish request with {} acks", ackLength); // Create Consumer for the response message, error and timeout to be sent to the Secure Channel conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> { outstandingRequests.remove(responseMessage.getResponseHeader().getRequestHandle()); @@ -222,29 +222,27 @@ private void sendPublishRequest() { for (ExtensionObject notificationMessage : responseMessage.getNotificationMessage().getNotificationData()) { ExtensionObjectDefinition notification = notificationMessage.getBody(); if (notification instanceof DataChangeNotification) { - LOGGER.trace("Found a Data Change Notification"); + logger.trace("Found a Data Change Notification"); DataChangeNotification data = (DataChangeNotification) notification; if (!data.getMonitoredItems().isEmpty()) { - onSubscriptionValue(data.getMonitoredItems()); + onMonitoredValue(data.getMonitoredItems()); } -// } else if (notification instanceof StatusChangeNotification) { -// StatusChangeNotification data = (StatusChangeNotification) notification; } else if (notification instanceof EventNotificationList) { - LOGGER.trace("Found a Event Notification"); + logger.trace("Found a Event Notification"); EventNotificationList data = (EventNotificationList) notification; if (!data.getEvents().isEmpty()) { - onEventSubscription(data.getEvents()); + onEventNotification(data.getEvents()); } } else { - LOGGER.warn("Unsupported Notification type {}", notification.getClass().getName()); + logger.warn("Unsupported Notification type {}", notification.getClass().getName()); } } }).whenComplete((result, error) -> { if (error != null) { - LOGGER.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error); + logger.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error); transaction.failRequest(error); } else { - LOGGER.trace("Completed publish request for subscription {}", subscriptionId); + logger.trace("Completed publish request for subscription {}", subscriptionId); transaction.endRequest(); } }); @@ -270,7 +268,7 @@ public void stopSubscriber() { .thenAccept(responseMessage -> publishTask.cancel(true)) .whenComplete((result, error) -> { if (error != null) { - LOGGER.error("Deletion of subscription resulted in error", error); + logger.error("Deletion of subscription resulted in error", error); transaction.failRequest(error); } else { transaction.endRequest(); @@ -285,7 +283,7 @@ public void stopSubscriber() { * * @param values - array of data values to be sent to the client. */ - private void onSubscriptionValue(List values) { + private void onMonitoredValue(List values) { long receiveTs = System.currentTimeMillis(); Metadata responseMetadata = new DefaultMetadata.Builder() .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs) @@ -299,44 +297,40 @@ private void onSubscriptionValue(List values) { dataValues.add(value.getValue()); } - Map metadata = new HashMap<>(); - Map> tags = plcSubscriber.readResponse(tagMap, dataValues, metadata, responseMetadata); - final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), tags, metadata); - + Entry, Map>> mappedResponse = plcSubscriber.readResponse(tagMap, dataValues, responseMetadata); + PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), mappedResponse.getValue(), mappedResponse.getKey()); consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event)); } - private void onEventSubscription(List events) { + private void onEventNotification(List events) { long receiveTs = System.currentTimeMillis(); Metadata responseMetadata = new DefaultMetadata.Builder() .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs) .build(); Map metadata = new HashMap<>(); - Map tagMap = new LinkedHashMap<>(); - Map> tagValueMapping = new LinkedHashMap<>(); + Map> tagValues = new LinkedHashMap<>(); for (EventFieldList event : events) { String tagName = tagNames.get((int) event.getClientHandle() - 1); OpcuaTag tag = (OpcuaTag) subscriptionRequest.getTag(tagName).getTag(); - tagMap.put(tagName, tag); Iterator fieldNames = tag.getConfig().keySet().iterator(); Map mapping = new LinkedHashMap<>(); + metadata.put(tagName, responseMetadata); for (Variant variant : event.getEventFields()) { if (fieldNames.hasNext()) { String fieldName = fieldNames.next(); - PlcValue plcValue = OpcuaProtocolLogic.decodePlcValue(tag, variant); + PlcValue plcValue = OpcuaProtocolLogic.variantToPlcValue(tag, variant); mapping.put(fieldName, plcValue); + tagValues.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.OK, new PlcStruct(mapping))); } else { - throw new PlcRuntimeException("OPC UA subscription handle received more event fields than it subscribed"); + logger.error("Could not map event notification response, subscription received more data than expected"); + tagValues.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.INTERNAL_ERROR, new PlcNull())); } } - tagValueMapping.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.OK, new PlcStruct(mapping))); } - //Map> tags = plcSubscriber.readResponse(tagMap, dataValues, metadata, responseMetadata); - final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), tagValueMapping, metadata); - + PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), tagValues, metadata); consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event)); } @@ -348,7 +342,7 @@ private void onEventSubscription(List events) { */ @Override public PlcConsumerRegistration register(Consumer consumer) { - LOGGER.info("Registering a new OPCUA subscription consumer"); + logger.info("Registering a new OPCUA subscription consumer"); consumers.add(consumer); return new DefaultPlcConsumerRegistration(plcSubscriber, consumer, this); } diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/tag/TagTagConfigParserTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/tag/TagTagConfigParserTest.java index 7279dffb85d..829dd4a7baf 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/tag/TagTagConfigParserTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/tag/TagTagConfigParserTest.java @@ -38,17 +38,20 @@ public void testConfigOptions() { parse("aaa:123{unit-id: true}", "unit-id", "true"); parse("aaa:123{unit-id: false}", "unit-id", "false"); parse("aaa:123{val1: 1, val2: 2}", "val1", "1", "val2", "2"); + parse("aaa:123{x: '', y: '', z: ''}", "x", "", "y", "", "z", ""); } - private void parse(String address, String key, String value) { - Map config = TagConfigParser.parse(address); - verify(config, key, value); - } + private void parse(String address, String ... pairs) { + if ((pairs.length % 2) != 0) { + throw new IllegalArgumentException("Invalid number of pairs: " + pairs.length); + } - private void parse(String address, String key1, String value1, String key2, String value2) { Map config = TagConfigParser.parse(address); - verify(config, key1, value1); - verify(config, key2, value2); + for (int i = 0; i < pairs.length; i += 2) { + String key = pairs[i]; + String value = pairs[i + 1]; + verify(config, key, value); + } } private void verify(Map config, String key, String value) {