From 52b713fa5773c52055333a811a62336d4bd4afc8 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Wed, 19 Jun 2024 11:45:45 +0200 Subject: [PATCH] Use PLC4X connection cache --- .../streampipes-connectors-plc/pom.xml | 7 ++++- .../plc/PlcConnectorsModuleExport.java | 11 +++---- .../adapter/generic/GenericPlc4xAdapter.java | 8 +---- .../ContinuousPlcRequestReader.java | 23 ++++----------- .../connection/OneTimePlcRequestReader.java | 3 ++ .../adapter/modbus/Plc4xModbusAdapter.java | 24 ++++++--------- .../plc/adapter/s7/Plc4xS7Adapter.java | 29 ++++--------------- 7 files changed, 35 insertions(+), 70 deletions(-) diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml b/streampipes-extensions/streampipes-connectors-plc/pom.xml index a1d3857ff9..5c63b63023 100644 --- a/streampipes-extensions/streampipes-connectors-plc/pom.xml +++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml @@ -55,7 +55,7 @@ org.apache.plc4x plc4j-driver-all - 0.12.0 + ${plc4x.version} pom @@ -72,6 +72,11 @@ + + org.apache.plc4x + plc4j-connection-cache + ${plc4x.version} + diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java index c6468f3fca..1bbc1df1b2 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java @@ -29,6 +29,7 @@ import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter; import org.apache.plc4x.java.api.PlcDriverManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import java.util.ArrayList; import java.util.Collections; @@ -39,12 +40,12 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport { @Override public List adapters() { var driverManager = PlcDriverManager.getDefault(); - var connectionManager = driverManager.getConnectionManager(); - var adapters = new ArrayList(List.of( - new Plc4xModbusAdapter(), - new Plc4xS7Adapter(driverManager, connectionManager) + var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build(); + var adapters = new ArrayList<>(List.of( + new Plc4xModbusAdapter(cachedConnectionManager), + new Plc4xS7Adapter(cachedConnectionManager) )); - adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, connectionManager)); + adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, cachedConnectionManager)); return adapters; } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java index 9f8857c713..eb5a14591f 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java @@ -63,7 +63,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeC private PullAdapterScheduler pullAdapterScheduler; private final PlcRequestProvider requestProvider; private final EventSchemaProvider schemaProvider; - private ContinuousPlcRequestReader plcRequestReader; private final PlcDriver driver; private final PlcConnectionManager connectionManager; @@ -88,7 +87,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, var settings = new Plc4xConnectionExtractor( extractor.getStaticPropertyExtractor(), driver.getProtocolCode() ).makeSettings(); - this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); + var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); this.pullAdapterScheduler = new PullAdapterScheduler(); this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId()); } @@ -96,11 +95,6 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, @Override public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { - try { - this.plcRequestReader.closeConnection(); - } catch (Exception e) { - LOG.error("Error when closing connection", e); - } this.pullAdapterScheduler.shutdown(); } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java index f10a93bc12..0e724b6558 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java @@ -26,7 +26,6 @@ import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.PlcConnectionManager; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +39,6 @@ public class ContinuousPlcRequestReader private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class); private final IEventCollector collector; - private PlcConnection plcConnection; public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, Plc4xConnectionSettings settings, @@ -52,21 +50,16 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, @Override public void pullData() throws RuntimeException { - try { - var connection = getConnection(); - readPlcData(connection); + try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) { + if (!plcConnection.isConnected()) { + plcConnection.connect(); + } + readPlcData(plcConnection); } catch (Exception e) { LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e); } } - private PlcConnection getConnection() throws PlcConnectionException { - if (plcConnection == null || !plcConnection.isConnected()) { - this.plcConnection = connectionManager.getConnection(settings.connectionString()); - } - return this.plcConnection; - } - private void readPlcData(PlcConnection plcConnection) throws ExecutionException, InterruptedException, TimeoutException { var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); @@ -79,10 +72,4 @@ private void readPlcData(PlcConnection plcConnection) public IPollingSettings getPollingInterval() { return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval()); } - - public void closeConnection() throws Exception { - if (this.plcConnection != null && this.plcConnection.isConnected()) { - this.plcConnection.close(); - } - } } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java index cab3a53ad6..4961b1dc94 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java @@ -50,6 +50,9 @@ public Map readPlcDataSynchronized() throws Exception { if (!plcConnection.getMetadata().isReadSupported()) { throw new AdapterException("This PLC does not support reading data"); } + if (!plcConnection.isConnected()) { + plcConnection.connect(); + } var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); return eventGenerator.makeEvent(readResponse); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java index 97af4a211b..6c0673797c 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java @@ -32,6 +32,7 @@ import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.staticproperty.CollectionStaticProperty; @@ -44,11 +45,10 @@ import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; -import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.sdk.utils.Datatypes; import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcDriverManager; +import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; @@ -101,11 +101,9 @@ public class Plc4xModbusAdapter implements StreamPipesAdapter, IPullAdapter { * Connection to the PLC */ private PlcConnection plcConnection; + private PlcConnectionManager connectionManager; - /** - * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory - */ - public Plc4xModbusAdapter() { + public Plc4xModbusAdapter(PlcConnectionManager connectionManager) { } /** @@ -179,7 +177,7 @@ private void before(IStaticPropertyExtractor extractor) throws AdapterException getConfigurations(extractor); try { - this.plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection( + this.plcConnection = connectionManager.getConnection( "modbus-tcp:tcp://" + this.ip + ":" + this.port + "?unit-identifier=" + this.slaveID); if (!this.plcConnection.getMetadata().isReadSupported()) { @@ -213,16 +211,13 @@ public void pullData() { } PlcReadRequest readRequest = builder.build(); - // Execute the request PlcReadResponse response = null; try { response = readRequest.execute().get(); - } catch (InterruptedException ie) { + } catch (InterruptedException | ExecutionException ie) { ie.printStackTrace(); - } catch (ExecutionException ee) { - ee.printStackTrace(); } // Create an event containing the value of the PLC @@ -273,9 +268,9 @@ public PollingSettings getPollingInterval() { */ @Override public IAdapterConfiguration declareConfig() { - return AdapterConfigurationBuilder.create(ID, 1, Plc4xModbusAdapter::new) + return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xModbusAdapter(connectionManager)) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withCategory(AdapterType.Manufacturing) .requiredTextParameter(Labels.withId(PLC_IP)).requiredIntegerParameter(Labels.withId(PLC_PORT)) .requiredTextParameter(Labels.withId(PLC_NODE_ID)).requiredCollection(Labels.withId(PLC_NODES), @@ -298,8 +293,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, } @Override - public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) - throws AdapterException { + public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { this.pullAdapterScheduler.shutdown(); } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java index 5349a19563..2761689b3e 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java @@ -52,9 +52,6 @@ import org.apache.streampipes.sdk.helpers.Options; import org.apache.plc4x.java.api.PlcConnectionManager; -import org.apache.plc4x.java.api.PlcDriverManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -68,8 +65,6 @@ public class Plc4xS7Adapter implements StreamPipesAdapter { */ public static final String ID = "org.apache.streampipes.connect.iiot.adapters.plc4x.s7"; - private static final Logger LOG = LoggerFactory.getLogger(Plc4xS7Adapter.class); - private static final String S7_URL = "s7://"; /** @@ -98,23 +93,14 @@ public class Plc4xS7Adapter implements StreamPipesAdapter { // [1] https://plc4x.apache.org/users/protocols/s7.html """; - /** - * Values of user configuration parameters - */ - private Map nodes; - - private final PlcDriverManager driverManager; private final PlcConnectionManager connectionManager; private PullAdapterScheduler pullAdapterScheduler; - private ContinuousPlcRequestReader plcRequestReader; private final PlcRequestProvider requestProvider; - public Plc4xS7Adapter(PlcDriverManager driverManager, - PlcConnectionManager connectionManager) { + public Plc4xS7Adapter(PlcConnectionManager connectionManager) { this.requestProvider = new PlcRequestProvider(); - this.driverManager = driverManager; this.connectionManager = connectionManager; } @@ -126,7 +112,7 @@ public Plc4xS7Adapter(PlcDriverManager driverManager, */ @Override public IAdapterConfiguration declareConfig() { - return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(driverManager, connectionManager)) + return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(connectionManager)) .withLocales(Locales.EN) .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withCategory(AdapterType.Manufacturing) @@ -152,7 +138,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext) { var settings = getConfigurations(extractor.getStaticPropertyExtractor()); - this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); + var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); this.pullAdapterScheduler = new PullAdapterScheduler(); this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId()); } @@ -160,11 +146,6 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, @Override public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { - try { - this.plcRequestReader.closeConnection(); - } catch (Exception e) { - LOG.error("Error when closing connection", e); - } this.pullAdapterScheduler.shutdown(); } @@ -203,7 +184,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra var ip = extractor.singleValueParameter(PLC_IP, String.class); var pollingInterval = extractor.singleValueParameter(PLC_POLLING_INTERVAL, Integer.class); - Map nodes = Map.of(); + Map nodes; var selectedAlternative = extractor.selectedAlternativeInternalId(PLC_NODE_INPUT_ALTERNATIVES); @@ -215,7 +196,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra } else { // Alternative Advanced var codePropertyInput = extractor.codeblockValue(PLC_CODE_BLOCK); - this.nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput); + nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput); } return new Plc4xConnectionSettings(S7_URL + ip, pollingInterval, nodes);