diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml b/streampipes-extensions/streampipes-connectors-plc/pom.xml
index a1d3857ff9..5c63b63023 100644
--- a/streampipes-extensions/streampipes-connectors-plc/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml
@@ -55,7 +55,7 @@
org.apache.plc4x
plc4j-driver-all
- 0.12.0
+ ${plc4x.version}
pom
@@ -72,6 +72,11 @@
+
+ org.apache.plc4x
+ plc4j-connection-cache
+ ${plc4x.version}
+
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
index c6468f3fca..1bbc1df1b2 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
@@ -29,6 +29,7 @@
import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;
import org.apache.plc4x.java.api.PlcDriverManager;
+import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,12 +40,12 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport {
@Override
public List adapters() {
var driverManager = PlcDriverManager.getDefault();
- var connectionManager = driverManager.getConnectionManager();
- var adapters = new ArrayList(List.of(
- new Plc4xModbusAdapter(),
- new Plc4xS7Adapter(driverManager, connectionManager)
+ var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build();
+ var adapters = new ArrayList<>(List.of(
+ new Plc4xModbusAdapter(cachedConnectionManager),
+ new Plc4xS7Adapter(cachedConnectionManager)
));
- adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, connectionManager));
+ adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, cachedConnectionManager));
return adapters;
}
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java
index 9f8857c713..eb5a14591f 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java
@@ -63,7 +63,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeC
private PullAdapterScheduler pullAdapterScheduler;
private final PlcRequestProvider requestProvider;
private final EventSchemaProvider schemaProvider;
- private ContinuousPlcRequestReader plcRequestReader;
private final PlcDriver driver;
private final PlcConnectionManager connectionManager;
@@ -88,7 +87,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
var settings = new Plc4xConnectionExtractor(
extractor.getStaticPropertyExtractor(), driver.getProtocolCode()
).makeSettings();
- this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
+ var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId());
}
@@ -96,11 +95,6 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) {
- try {
- this.plcRequestReader.closeConnection();
- } catch (Exception e) {
- LOG.error("Error when closing connection", e);
- }
this.pullAdapterScheduler.shutdown();
}
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index f10a93bc12..0e724b6558 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -26,7 +26,6 @@
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +39,6 @@ public class ContinuousPlcRequestReader
private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class);
private final IEventCollector collector;
- private PlcConnection plcConnection;
public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
Plc4xConnectionSettings settings,
@@ -52,21 +50,16 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
@Override
public void pullData() throws RuntimeException {
- try {
- var connection = getConnection();
- readPlcData(connection);
+ try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
+ if (!plcConnection.isConnected()) {
+ plcConnection.connect();
+ }
+ readPlcData(plcConnection);
} catch (Exception e) {
LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e);
}
}
- private PlcConnection getConnection() throws PlcConnectionException {
- if (plcConnection == null || !plcConnection.isConnected()) {
- this.plcConnection = connectionManager.getConnection(settings.connectionString());
- }
- return this.plcConnection;
- }
-
private void readPlcData(PlcConnection plcConnection)
throws ExecutionException, InterruptedException, TimeoutException {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
@@ -79,10 +72,4 @@ private void readPlcData(PlcConnection plcConnection)
public IPollingSettings getPollingInterval() {
return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval());
}
-
- public void closeConnection() throws Exception {
- if (this.plcConnection != null && this.plcConnection.isConnected()) {
- this.plcConnection.close();
- }
- }
}
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java
index cab3a53ad6..4961b1dc94 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java
@@ -50,6 +50,9 @@ public Map readPlcDataSynchronized() throws Exception {
if (!plcConnection.getMetadata().isReadSupported()) {
throw new AdapterException("This PLC does not support reading data");
}
+ if (!plcConnection.isConnected()) {
+ plcConnection.connect();
+ }
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
return eventGenerator.makeEvent(readResponse);
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java
index 97af4a211b..6c0673797c 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java
@@ -32,6 +32,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
@@ -44,11 +45,10 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
@@ -101,11 +101,9 @@ public class Plc4xModbusAdapter implements StreamPipesAdapter, IPullAdapter {
* Connection to the PLC
*/
private PlcConnection plcConnection;
+ private PlcConnectionManager connectionManager;
- /**
- * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
- */
- public Plc4xModbusAdapter() {
+ public Plc4xModbusAdapter(PlcConnectionManager connectionManager) {
}
/**
@@ -179,7 +177,7 @@ private void before(IStaticPropertyExtractor extractor) throws AdapterException
getConfigurations(extractor);
try {
- this.plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection(
+ this.plcConnection = connectionManager.getConnection(
"modbus-tcp:tcp://" + this.ip + ":" + this.port + "?unit-identifier=" + this.slaveID);
if (!this.plcConnection.getMetadata().isReadSupported()) {
@@ -213,16 +211,13 @@ public void pullData() {
}
PlcReadRequest readRequest = builder.build();
-
// Execute the request
PlcReadResponse response = null;
try {
response = readRequest.execute().get();
- } catch (InterruptedException ie) {
+ } catch (InterruptedException | ExecutionException ie) {
ie.printStackTrace();
- } catch (ExecutionException ee) {
- ee.printStackTrace();
}
// Create an event containing the value of the PLC
@@ -273,9 +268,9 @@ public PollingSettings getPollingInterval() {
*/
@Override
public IAdapterConfiguration declareConfig() {
- return AdapterConfigurationBuilder.create(ID, 1, Plc4xModbusAdapter::new)
+ return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xModbusAdapter(connectionManager))
.withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Manufacturing)
.requiredTextParameter(Labels.withId(PLC_IP)).requiredIntegerParameter(Labels.withId(PLC_PORT))
.requiredTextParameter(Labels.withId(PLC_NODE_ID)).requiredCollection(Labels.withId(PLC_NODES),
@@ -298,8 +293,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
}
@Override
- public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext)
- throws AdapterException {
+ public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) {
this.pullAdapterScheduler.shutdown();
}
diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
index 5349a19563..2761689b3e 100644
--- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
+++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
@@ -52,9 +52,6 @@
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.plc4x.java.api.PlcConnectionManager;
-import org.apache.plc4x.java.api.PlcDriverManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -68,8 +65,6 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
*/
public static final String ID = "org.apache.streampipes.connect.iiot.adapters.plc4x.s7";
- private static final Logger LOG = LoggerFactory.getLogger(Plc4xS7Adapter.class);
-
private static final String S7_URL = "s7://";
/**
@@ -98,23 +93,14 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
// [1] https://plc4x.apache.org/users/protocols/s7.html
""";
- /**
- * Values of user configuration parameters
- */
- private Map nodes;
-
- private final PlcDriverManager driverManager;
private final PlcConnectionManager connectionManager;
private PullAdapterScheduler pullAdapterScheduler;
- private ContinuousPlcRequestReader plcRequestReader;
private final PlcRequestProvider requestProvider;
- public Plc4xS7Adapter(PlcDriverManager driverManager,
- PlcConnectionManager connectionManager) {
+ public Plc4xS7Adapter(PlcConnectionManager connectionManager) {
this.requestProvider = new PlcRequestProvider();
- this.driverManager = driverManager;
this.connectionManager = connectionManager;
}
@@ -126,7 +112,7 @@ public Plc4xS7Adapter(PlcDriverManager driverManager,
*/
@Override
public IAdapterConfiguration declareConfig() {
- return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(driverManager, connectionManager))
+ return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(connectionManager))
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Manufacturing)
@@ -152,7 +138,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) {
var settings = getConfigurations(extractor.getStaticPropertyExtractor());
- this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
+ var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId());
}
@@ -160,11 +146,6 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) {
- try {
- this.plcRequestReader.closeConnection();
- } catch (Exception e) {
- LOG.error("Error when closing connection", e);
- }
this.pullAdapterScheduler.shutdown();
}
@@ -203,7 +184,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra
var ip = extractor.singleValueParameter(PLC_IP, String.class);
var pollingInterval = extractor.singleValueParameter(PLC_POLLING_INTERVAL, Integer.class);
- Map nodes = Map.of();
+ Map nodes;
var selectedAlternative = extractor.selectedAlternativeInternalId(PLC_NODE_INPUT_ALTERNATIVES);
@@ -215,7 +196,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra
} else {
// Alternative Advanced
var codePropertyInput = extractor.codeblockValue(PLC_CODE_BLOCK);
- this.nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput);
+ nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput);
}
return new Plc4xConnectionSettings(S7_URL + ip, pollingInterval, nodes);