Skip to content

Commit

Permalink
feat(#3353): Support sign and encrypt security modes in OPC-UA adapte… (
Browse files Browse the repository at this point in the history
#3354)

* feat(#3353): Support sign and encrypt security modes in OPC-UA adapter and sink

* Fix test

* Fix dependency convergence

* Modify OPC e2e tests

* Improve completed static property validation

* Minor bug fixes

* Fix test

* Add additional environment variables
  • Loading branch information
dominikriemer authored Dec 1, 2024
1 parent c616fb3 commit 97aecd0
Show file tree
Hide file tree
Showing 65 changed files with 1,408 additions and 546 deletions.
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<amqp-client.version>5.21.0</amqp-client.version>
<apache-sis-referencing.version>1.2</apache-sis-referencing.version>
<boofcv.version>1.1.0</boofcv.version>
<bcprov-jdk18on.version>1.78.1</bcprov-jdk18on.version>
<classindex.version>3.9</classindex.version>
<checker-qual.version>3.43.0</checker-qual.version>
<commons-codec.version>1.17.0</commons-codec.version>
Expand All @@ -54,7 +55,7 @@
<commons-pool2.version>2.12.0</commons-pool2.version>
<commons-text.version>1.12.0</commons-text.version>
<ditto-client.version>1.0.0</ditto-client.version>
<eclipse.milo.version>0.6.9</eclipse.milo.version>
<eclipse.milo.version>0.6.14</eclipse.milo.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
Expand Down Expand Up @@ -398,6 +399,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bcprov-jdk18on.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcutil-jdk18on</artifactId>
<version>${bcprov-jdk18on.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-turtle</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,18 @@ public enum Envs {

// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),

// OPC-UA security
SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/streampipes-security/opcua"),
SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
SP_OPCUA_KEYSTORE_TYPE("SP_OPCUA_KEYSTORE_TYPE", "PKCS12"),
SP_OPCUA_KEYSTORE_ALIAS("SP_OPCUA_KEYSTORE_ALIAS", "apache-streampipes"),
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
);

private final String envVariableName;
private String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,33 @@ public StringEnvironmentVariable getAllowedUploadFiletypes() {
return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
}

@Override
public StringEnvironmentVariable getOpcUaSecurityDir() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreFile() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystorePassword() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getOpcUaApplicationUri() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
}

@Override
public StringEnvironmentVariable getOPcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public interface Environment {
IntEnvironmentVariable getServicePort();

StringEnvironmentVariable getSpCoreScheme();

StringEnvironmentVariable getSpCoreHost();

IntEnvironmentVariable getSpCorePort();

// Time series storage env variables
Expand Down Expand Up @@ -144,12 +146,15 @@ public interface Environment {

// Broker defaults
StringEnvironmentVariable getKafkaHost();

IntEnvironmentVariable getKafkaPort();

StringEnvironmentVariable getMqttHost();

IntEnvironmentVariable getMqttPort();

StringEnvironmentVariable getNatsHost();

IntEnvironmentVariable getNatsPort();

StringEnvironmentVariable getPulsarUrl();
Expand All @@ -158,4 +163,15 @@ public interface Environment {

StringEnvironmentVariable getAllowedUploadFiletypes();

StringEnvironmentVariable getOpcUaSecurityDir();

StringEnvironmentVariable getOpcUaKeystoreFile();

StringEnvironmentVariable getOpcUaKeystorePassword();

StringEnvironmentVariable getOpcUaApplicationUri();

StringEnvironmentVariable getOPcUaKeystoreType();

StringEnvironmentVariable getOpcUaKeystoreAlias();
}
4 changes: 2 additions & 2 deletions streampipes-extensions/streampipes-connectors-opcua/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-parent</artifactId>
<artifactId>streampipes-extensions</artifactId>
<version>0.97.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>streampipes-connectors-opcua</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,35 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;

import java.util.List;

public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {

private final OpcUaClientProvider clientProvider;

public OpcUaConnectorsModuleExport() {
this.clientProvider = new OpcUaClientProvider();
}

@Override
public List<StreamPipesAdapter> adapters() {
return List.of(
new OpcUaAdapter()
new OpcUaAdapter(clientProvider)
);
}

@Override
public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
new OpcUaSink()
new OpcUaSink(clientProvider)
);
}

Expand All @@ -50,7 +60,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
new OpcUaAdapterMigrationV1(),
new OpcUaAdapterMigrationV2(),
new OpcUaAdapterMigrationV3()
new OpcUaAdapterMigrationV3(),
new OpcUaAdapterMigrationV4(),
new OpcUaSinkMigrationV1()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
Expand All @@ -42,7 +43,6 @@
import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
Expand All @@ -66,7 +66,6 @@
import java.util.stream.Collectors;

import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
Expand All @@ -78,7 +77,9 @@ public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, SupportsR
private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);

private int pullingIntervalMilliSeconds;
private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
private final OpcUaClientProvider clientProvider;
private ConnectedOpcUaClient connectedClient;
private OpcUaAdapterConfig opcUaAdapterConfig;
private List<OpcNode> allNodes;
private List<NodeId> allNodeIds;
private int numberProperties;
Expand All @@ -92,15 +93,14 @@ public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, SupportsR
*/
private final Map<String, String> nodeIdToLabelMapping;

public OpcUaAdapter() {
super();
public OpcUaAdapter(OpcUaClientProvider clientProvider) {
this.clientProvider = clientProvider;
this.numberProperties = 0;
this.event = new HashMap<>();
this.nodeIdToLabelMapping = new HashMap<>();
}

private void prepareAdapter(IAdapterParameterExtractor extractor) throws AdapterException {

this.allNodeIds = new ArrayList<>();
List<String> deleteKeys = extractor
.getAdapterDescription()
Expand All @@ -111,21 +111,21 @@ private void prepareAdapter(IAdapterParameterExtractor extractor) throws Adapter
.collect(Collectors.toList());

try {
this.spOpcUaClient.connect();
this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
OpcUaNodeBrowser browserClient =
new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
new OpcUaNodeBrowser(this.connectedClient.getClient(), this.opcUaAdapterConfig);
this.allNodes = browserClient.findNodes(deleteKeys);


for (OpcNode node : this.allNodes) {
this.allNodeIds.add(node.getNodeId());
}

if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
this.pullingIntervalMilliSeconds = spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
if (opcUaAdapterConfig.inPullMode()) {
this.pullingIntervalMilliSeconds = opcUaAdapterConfig.getPullIntervalMilliSeconds();
} else {
this.numberProperties = this.allNodeIds.size();
this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
this.connectedClient.createListSubscription(this.allNodeIds, this);
}

this.allNodes.forEach(node -> this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
Expand All @@ -139,7 +139,7 @@ private void prepareAdapter(IAdapterParameterExtractor extractor) throws Adapter
@Override
public void pullData() throws ExecutionException, RuntimeException, InterruptedException, TimeoutException {
var response =
this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
this.connectedClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
boolean badStatusCodeReceived = false;
boolean emptyValueReceived = false;
List<DataValue> returnValues =
Expand Down Expand Up @@ -168,7 +168,7 @@ public void pullData() throws ExecutionException, RuntimeException, InterruptedE

private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
return badStatusCodeReceived
&& this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
&& this.opcUaAdapterConfig.getIncompleteEventStrategy()
.equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
}

Expand Down Expand Up @@ -208,13 +208,13 @@ public PollingSettings getPollingInterval() {
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
this.spOpcUaClient = new SpOpcUaClient<>(
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
);
this.opcUaAdapterConfig =
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
//this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
this.collector = collector;
this.prepareAdapter(extractor);

if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(this, extractor.getAdapterDescription().getElementId());
}
Expand All @@ -223,22 +223,22 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
this.spOpcUaClient.disconnect();
clientProvider.releaseClient(this.opcUaAdapterConfig);

if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler.shutdown();
}
}

@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor) throws SpConfigurationException {
return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
}

@Override
public IAdapterConfiguration declareConfig() {
var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new OpcUaAdapter(clientProvider))
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
Expand All @@ -255,6 +255,6 @@ public IAdapterConfiguration declareConfig() {
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
return getSchema(extractor);
return getSchema(clientProvider, extractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.eclipse.milo.opcua.sdk.client.AddressSpace;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
Expand All @@ -45,13 +46,13 @@

public class OpcUaNodeBrowser {

private final OpcUaClient client;
private final UaClient client;
private final OpcUaConfig spOpcConfig;

private static final Logger LOG = LoggerFactory.getLogger(OpcUaNodeBrowser.class);

public OpcUaNodeBrowser(
OpcUaClient client,
UaClient client,
OpcUaConfig spOpcUaClientConfig
) {
this.client = client;
Expand Down Expand Up @@ -127,7 +128,7 @@ private OpcNode toOpcNode(String nodeName) throws UaException {
}

private List<TreeInputNode> findChildren(
OpcUaClient client,
UaClient client,
NodeId nodeId
) throws UaException {
return client
Expand Down
Loading

0 comments on commit 97aecd0

Please sign in to comment.