Skip to content

Commit

Permalink
Adjust opc ua protocol logic.
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Oct 30, 2024
1 parent 516cf64 commit 3751517
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.spi.metadata.DefaultMetadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
Expand All @@ -51,6 +50,7 @@
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.connection.PlcTagHandler;
import org.apache.plc4x.java.spi.context.DriverContext;
import org.apache.plc4x.java.spi.generation.Message;
import org.apache.plc4x.java.spi.messages.*;
import org.apache.plc4x.java.spi.messages.utils.DefaultPlcResponseItem;
import org.apache.plc4x.java.spi.messages.utils.PlcResponseItem;
Expand All @@ -61,6 +61,7 @@
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
import org.apache.plc4x.java.spi.values.LegacyPlcValueHandler;
import org.apache.plc4x.java.spi.values.PlcList;
import org.apache.plc4x.java.spi.values.PlcSTRING;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -248,8 +249,8 @@ public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
Metadata responseMetadata = new Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.build();
Map<String, Metadata> metadata = new LinkedHashMap<>();
return new DefaultPlcReadResponse(request, readResponse(tagMap, response.getResults(), metadata, responseMetadata), metadata);
Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>> mappedResponse = readResponse(tagMap, response.getResults(), responseMetadata);
return new DefaultPlcReadResponse(request, mappedResponse.getValue(), mappedResponse.getKey());
});
}

Expand All @@ -275,44 +276,41 @@ static NodeId generateNodeId(OpcuaTag tag) {
return nodeId;
}

public Map<String, PlcResponseItem<PlcValue>> readResponse(Map<String, PlcTag> tagMap, List<DataValue> results, Map<String, Metadata> metadata, Metadata responseMetadata) {
PlcResponseCode responseCode = null; // initialize variable
Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>> readResponse(Map<String, PlcTag> tagMap, List<DataValue> results, Metadata responseMetadata) {
Map<String, PlcResponseItem<PlcValue>> response = new HashMap<>();
Map<String, Metadata> metadata = new HashMap<>();
int index = 0;
for (String tagName : tagMap.keySet()) {
PlcTag tag = tagMap.get(tagName);
PlcValue value = null;
DataValue dataValue = results.get(index++);
PlcResponseCode responseCode = PlcResponseCode.OK;
if (dataValue.getValueSpecified()) {
value = decodePlcValue(tag, dataValue.getValue());
value = variantToPlcValue(tag, dataValue.getValue());
if (value == null) {
LOGGER.error("Variant type {} is not supported.", dataValue.getValue().getClass());
responseCode = PlcResponseCode.UNSUPPORTED;
LOGGER.error("Data type - " + dataValue.getValue().getClass() + " is not supported ");
}
// response code might be null in first iteration
if (PlcResponseCode.UNSUPPORTED != responseCode) {
responseCode = PlcResponseCode.OK;
}
} else {
StatusCode statusCode = dataValue.getStatusCode();
responseCode = mapOpcStatusCode(statusCode.getStatusCode(), PlcResponseCode.UNSUPPORTED);
LOGGER.error("Error while reading value from OPC UA server error code:- " + dataValue.getStatusCode().toString());
LOGGER.error("Error while reading value from OPC UA server error code: {}", statusCode.toString());
}

Metadata tagMetadata = new Builder(responseMetadata)
.put(OpcMetadataKeys.QUALITY, new OpcuaQualityStatus(dataValue.getStatusCode()))
.put(OpcMetadataKeys.SERVER_TIMESTAMP, dataValue.getServerTimestamp())
.put(OpcMetadataKeys.SOURCE_TIMESTAMP, dataValue.getSourceTimestamp())
.put(PlcMetadataKeys.TIMESTAMP, dataValue.getServerTimestamp())
.put(PlcMetadataKeys.TIMESTAMP, dataValue.getSourceTimestamp())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
response.put(tagName, new DefaultPlcResponseItem<>(responseCode, value));
metadata.put(tagName, tagMetadata);
}
return response;
return Map.entry(metadata, response);
}

static PlcValue decodePlcValue(PlcTag tag, Variant variant) {
static PlcValue variantToPlcValue(PlcTag tag, Variant variant) {
PlcValue value = null;
if (variant instanceof VariantBoolean) {
byte[] array = ((VariantBoolean) variant).getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcMetadataKeys;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
Expand All @@ -46,6 +46,7 @@
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction;
import org.apache.plc4x.java.spi.values.PlcNull;
import org.apache.plc4x.java.spi.values.PlcStruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,10 +60,9 @@

public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle {

private static final Logger LOGGER = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);

private final static ScheduledExecutorService EXECUTOR = newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "plc4x-opcua-subscription-scheduler"));

private final Logger logger = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
private final Set<Consumer<PlcSubscriptionEvent>> consumers;
private final List<String> tagNames;
private final Conversation conversation;
Expand Down Expand Up @@ -166,22 +166,22 @@ public CompletableFuture<OpcuaSubscriptionHandle> onSubscribeCreateMonitoredItem
return conversation.submit(createMonitoredItemsRequest, CreateMonitoredItemsResponse.class)
.whenComplete((response, error) -> {
if (error instanceof TimeoutException) {
LOGGER.info("Timeout while sending the Create Monitored Item Subscription Message", error);
logger.info("Timeout while sending the Create Monitored Item Subscription Message", error);
} else if (error != null) {
LOGGER.info("Error while sending the Create Monitored Item Subscription Message", error);
logger.info("Error while sending the Create Monitored Item Subscription Message", error);
}
}).thenApply(responseMessage -> {
MonitoredItemCreateResult[] array = responseMessage.getResults().stream().toArray(MonitoredItemCreateResult[]::new);
for (int index = 0, arrayLength = array.length; index < arrayLength; index++) {
MonitoredItemCreateResult result = array[index];
if (OpcuaStatusCode.enumForValue(result.getStatusCode().getStatusCode()) != OpcuaStatusCode.Good) {
LOGGER.error("Invalid Tag {}, subscription created without this tag", tagNames.get(index));
logger.error("Invalid Tag {}, subscription created without this tag", tagNames.get(index));
} else {
LOGGER.debug("Tag {} was added to the subscription", tagNames.get(index));
logger.debug("Tag {} was added to the subscription", tagNames.get(index));
}
}

LOGGER.trace("Scheduling publish event for subscription {}", subscriptionId);
logger.trace("Scheduling publish event for subscription {}", subscriptionId);
publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, revisedCycleTime / 2, revisedCycleTime, TimeUnit.MILLISECONDS);
return this;
});
Expand Down Expand Up @@ -210,7 +210,7 @@ private void sendPublishRequest() {
// we work in external thread - we need to coordinate access to conversation pipeline
RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> {
LOGGER.trace("Sent publish request with {} acks", ackLength);
logger.trace("Sent publish request with {} acks", ackLength);
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
outstandingRequests.remove(responseMessage.getResponseHeader().getRequestHandle());
Expand All @@ -222,29 +222,27 @@ private void sendPublishRequest() {
for (ExtensionObject notificationMessage : responseMessage.getNotificationMessage().getNotificationData()) {
ExtensionObjectDefinition notification = notificationMessage.getBody();
if (notification instanceof DataChangeNotification) {
LOGGER.trace("Found a Data Change Notification");
logger.trace("Found a Data Change Notification");
DataChangeNotification data = (DataChangeNotification) notification;
if (!data.getMonitoredItems().isEmpty()) {
onSubscriptionValue(data.getMonitoredItems());
onMonitoredValue(data.getMonitoredItems());
}
// } else if (notification instanceof StatusChangeNotification) {
// StatusChangeNotification data = (StatusChangeNotification) notification;
} else if (notification instanceof EventNotificationList) {
LOGGER.trace("Found a Event Notification");
logger.trace("Found a Event Notification");
EventNotificationList data = (EventNotificationList) notification;
if (!data.getEvents().isEmpty()) {
onEventSubscription(data.getEvents());
onEventNotification(data.getEvents());
}
} else {
LOGGER.warn("Unsupported Notification type {}", notification.getClass().getName());
logger.warn("Unsupported Notification type {}", notification.getClass().getName());
}
}
}).whenComplete((result, error) -> {
if (error != null) {
LOGGER.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error);
logger.warn("Publish request of subscription {} resulted in error reported by server", subscriptionId, error);
transaction.failRequest(error);
} else {
LOGGER.trace("Completed publish request for subscription {}", subscriptionId);
logger.trace("Completed publish request for subscription {}", subscriptionId);
transaction.endRequest();
}
});
Expand All @@ -270,7 +268,7 @@ public void stopSubscriber() {
.thenAccept(responseMessage -> publishTask.cancel(true))
.whenComplete((result, error) -> {
if (error != null) {
LOGGER.error("Deletion of subscription resulted in error", error);
logger.error("Deletion of subscription resulted in error", error);
transaction.failRequest(error);
} else {
transaction.endRequest();
Expand All @@ -285,7 +283,7 @@ public void stopSubscriber() {
*
* @param values - array of data values to be sent to the client.
*/
private void onSubscriptionValue(List<MonitoredItemNotification> values) {
private void onMonitoredValue(List<MonitoredItemNotification> values) {
long receiveTs = System.currentTimeMillis();
Metadata responseMetadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs)
Expand All @@ -299,44 +297,40 @@ private void onSubscriptionValue(List<MonitoredItemNotification> values) {
dataValues.add(value.getValue());
}

Map<String, Metadata> metadata = new HashMap<>();
Map<String, PlcResponseItem<PlcValue>> tags = plcSubscriber.readResponse(tagMap, dataValues, metadata, responseMetadata);
final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), tags, metadata);

Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>> mappedResponse = plcSubscriber.readResponse(tagMap, dataValues, responseMetadata);
PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), mappedResponse.getValue(), mappedResponse.getKey());
consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event));
}

private void onEventSubscription(List<EventFieldList> events) {
private void onEventNotification(List<EventFieldList> events) {
long receiveTs = System.currentTimeMillis();
Metadata responseMetadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs)
.build();

Map<String, Metadata> metadata = new HashMap<>();
Map<String, PlcTag> tagMap = new LinkedHashMap<>();
Map<String, PlcResponseItem<PlcValue>> tagValueMapping = new LinkedHashMap<>();
Map<String, PlcResponseItem<PlcValue>> tagValues = new LinkedHashMap<>();
for (EventFieldList event : events) {
String tagName = tagNames.get((int) event.getClientHandle() - 1);
OpcuaTag tag = (OpcuaTag) subscriptionRequest.getTag(tagName).getTag();
tagMap.put(tagName, tag);

Iterator<String> fieldNames = tag.getConfig().keySet().iterator();
Map<String, PlcValue> mapping = new LinkedHashMap<>();
metadata.put(tagName, responseMetadata);
for (Variant variant : event.getEventFields()) {
if (fieldNames.hasNext()) {
String fieldName = fieldNames.next();
PlcValue plcValue = OpcuaProtocolLogic.decodePlcValue(tag, variant);
PlcValue plcValue = OpcuaProtocolLogic.variantToPlcValue(tag, variant);
mapping.put(fieldName, plcValue);
tagValues.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.OK, new PlcStruct(mapping)));
} else {
throw new PlcRuntimeException("OPC UA subscription handle received more event fields than it subscribed");
logger.error("Could not map event notification response, subscription received more data than expected");
tagValues.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.INTERNAL_ERROR, new PlcNull()));
}
}
tagValueMapping.put(tagName, new DefaultPlcResponseItem<>(PlcResponseCode.OK, new PlcStruct(mapping)));
}

//Map<String, PlcResponseItem<PlcValue>> tags = plcSubscriber.readResponse(tagMap, dataValues, metadata, responseMetadata);
final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), tagValueMapping, metadata);

PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), tagValues, metadata);
consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event));
}

Expand All @@ -348,7 +342,7 @@ private void onEventSubscription(List<EventFieldList> events) {
*/
@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
LOGGER.info("Registering a new OPCUA subscription consumer");
logger.info("Registering a new OPCUA subscription consumer");
consumers.add(consumer);
return new DefaultPlcConsumerRegistration(plcSubscriber, consumer, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ public void testConfigOptions() {
parse("aaa:123{unit-id: true}", "unit-id", "true");
parse("aaa:123{unit-id: false}", "unit-id", "false");
parse("aaa:123{val1: 1, val2: 2}", "val1", "1", "val2", "2");
parse("aaa:123{x: '', y: '', z: ''}", "x", "", "y", "", "z", "");
}

private void parse(String address, String key, String value) {
Map<String, String> config = TagConfigParser.parse(address);
verify(config, key, value);
}
private void parse(String address, String ... pairs) {
if ((pairs.length % 2) != 0) {
throw new IllegalArgumentException("Invalid number of pairs: " + pairs.length);
}

private void parse(String address, String key1, String value1, String key2, String value2) {
Map<String, String> config = TagConfigParser.parse(address);
verify(config, key1, value1);
verify(config, key2, value2);
for (int i = 0; i < pairs.length; i += 2) {
String key = pairs[i];
String value = pairs[i + 1];
verify(config, key, value);
}
}

private void verify(Map<String, String> config, String key, String value) {
Expand Down

0 comments on commit 3751517

Please sign in to comment.