From 2927d7e02b7756b73e9d2f844982d430913e12bd Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 20 Jun 2024 21:20:02 +0200 Subject: [PATCH] chore: Improve connection handling of PLC adapters --- .../streampipes-connectors-plc/pom.xml | 7 ++++++ .../plc/PlcConnectorsModuleExport.java | 4 +++- .../plc/adapter/GenericAdapterGenerator.java | 4 ++-- .../adapter/generic/GenericPlc4xAdapter.java | 15 +++++-------- .../config/AdapterConfigurationProvider.java | 10 ++++----- .../ContinuousPlcRequestReader.java | 22 +++++-------------- .../connection/OneTimePlcRequestReader.java | 9 +++----- .../plc/adapter/s7/Plc4xS7Adapter.java | 6 ++--- 8 files changed, 34 insertions(+), 43 deletions(-) diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml b/streampipes-extensions/streampipes-connectors-plc/pom.xml index 5c63b63023..5b4349c776 100644 --- a/streampipes-extensions/streampipes-connectors-plc/pom.xml +++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml @@ -91,5 +91,12 @@ test + + + maven_central + Maven Central + https://repo.maven.apache.org/maven2/ + + diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java index 1bbc1df1b2..a62d2a84c5 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java @@ -40,7 +40,9 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport { @Override public List adapters() { var driverManager = PlcDriverManager.getDefault(); - var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build(); + var cachedConnectionManager = CachedPlcConnectionManager + .getBuilder(driverManager.getConnectionManager()) + .build(); var adapters = new ArrayList<>(List.of( new Plc4xModbusAdapter(cachedConnectionManager), new Plc4xS7Adapter(cachedConnectionManager) diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java index e8a0ad2150..284062ff4d 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java @@ -21,9 +21,9 @@ import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter; -import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.PlcDriverManager; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,7 @@ public class GenericAdapterGenerator { public List makeAvailableAdapters(PlcDriverManager driverManager, - PlcConnectionManager connectionManager) { + CachedPlcConnectionManager connectionManager) { var adapters = new ArrayList(); var protocolCodes = getDrivers(driverManager); protocolCodes diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java index eb5a14591f..b75b8937e8 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java @@ -28,12 +28,12 @@ import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.EventSchemaProvider; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.ContinuousPlcRequestReader; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.OneTimePlcRequestReader; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcRequestProvider; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionExtractor; import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler; import org.apache.streampipes.model.connect.guess.GuessSchema; @@ -41,12 +41,10 @@ import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder; -import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.metadata.Option; import org.apache.plc4x.java.api.metadata.OptionMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import java.util.List; import java.util.function.Function; @@ -57,18 +55,15 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeConfig { - private static final Logger LOG = LoggerFactory.getLogger(GenericPlc4xAdapter.class); - - private PullAdapterScheduler pullAdapterScheduler; private final PlcRequestProvider requestProvider; private final EventSchemaProvider schemaProvider; private final PlcDriver driver; - private final PlcConnectionManager connectionManager; + private final CachedPlcConnectionManager connectionManager; public GenericPlc4xAdapter(PlcDriver driver, - PlcConnectionManager connectionManager) { + CachedPlcConnectionManager connectionManager) { this.requestProvider = new PlcRequestProvider(); this.schemaProvider = new EventSchemaProvider(); this.driver = driver; diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java index c8e8f5354b..74ab520033 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java @@ -19,18 +19,18 @@ package org.apache.streampipes.extensions.connectors.plc.adapter.generic.config; import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver; import org.apache.streampipes.model.AdapterType; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.staticproperty.Option; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.CodeLanguage; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; -import org.apache.streampipes.sdk.utils.Assets; -import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.PlcDriver; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import static org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xLabels.PLC_CODE_BLOCK; import static org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xLabels.PLC_IP; @@ -46,7 +46,7 @@ public class AdapterConfigurationProvider { public IAdapterConfiguration makeConfig(PlcDriver driver, - PlcConnectionManager connectionManager) { + CachedPlcConnectionManager connectionManager) { var driverMetadata = driver.getMetadata(); var appId = getAdapterAppId(driver); var adapterBuilder = AdapterConfigurationBuilder.create( @@ -55,7 +55,7 @@ public IAdapterConfiguration makeConfig(PlcDriver driver, () -> new GenericPlc4xAdapter(driver, connectionManager) ) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withAssetResolver( new PlcAdapterAssetResolver("org.apache.streampipes.connect.iiot.adapters.plc4x.generic", appId, driver) ) diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java index 0e724b6558..4540df808e 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java @@ -25,13 +25,11 @@ import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings; import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcConnectionManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class ContinuousPlcRequestReader extends OneTimePlcRequestReader implements IPullAdapter { @@ -40,7 +38,7 @@ public class ContinuousPlcRequestReader private final IEventCollector collector; - public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, + public ContinuousPlcRequestReader(CachedPlcConnectionManager connectionManager, Plc4xConnectionSettings settings, PlcRequestProvider requestProvider, IEventCollector collector) { @@ -51,23 +49,15 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, @Override public void pullData() throws RuntimeException { try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) { - if (!plcConnection.isConnected()) { - plcConnection.connect(); - } - readPlcData(plcConnection); + var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); + var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); + var event = eventGenerator.makeEvent(readResponse); + collector.collect(event); } catch (Exception e) { LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e); } } - private void readPlcData(PlcConnection plcConnection) - throws ExecutionException, InterruptedException, TimeoutException { - var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); - var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); - var event = eventGenerator.makeEvent(response); - collector.collect(event); - } - @Override public IPollingSettings getPollingInterval() { return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval()); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java index 4961b1dc94..bfbe756ad5 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java @@ -22,7 +22,7 @@ import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings; import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcConnectionManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -33,9 +33,9 @@ public class OneTimePlcRequestReader { protected final PlcRequestProvider requestProvider; protected final PlcEventGenerator eventGenerator; - protected final PlcConnectionManager connectionManager; + protected final CachedPlcConnectionManager connectionManager; - public OneTimePlcRequestReader(PlcConnectionManager connectionManager, + public OneTimePlcRequestReader(CachedPlcConnectionManager connectionManager, Plc4xConnectionSettings settings, PlcRequestProvider requestProvider) { this.connectionManager = connectionManager; @@ -50,9 +50,6 @@ public Map readPlcDataSynchronized() throws Exception { if (!plcConnection.getMetadata().isReadSupported()) { throw new AdapterException("This PLC does not support reading data"); } - if (!plcConnection.isConnected()) { - plcConnection.connect(); - } var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); return eventGenerator.makeEvent(readResponse); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java index 2761689b3e..57944f84c7 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java @@ -51,7 +51,7 @@ import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; -import org.apache.plc4x.java.api.PlcConnectionManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import java.util.ArrayList; import java.util.HashMap; @@ -93,13 +93,13 @@ public class Plc4xS7Adapter implements StreamPipesAdapter { // [1] https://plc4x.apache.org/users/protocols/s7.html """; - private final PlcConnectionManager connectionManager; + private final CachedPlcConnectionManager connectionManager; private PullAdapterScheduler pullAdapterScheduler; private final PlcRequestProvider requestProvider; - public Plc4xS7Adapter(PlcConnectionManager connectionManager) { + public Plc4xS7Adapter(CachedPlcConnectionManager connectionManager) { this.requestProvider = new PlcRequestProvider(); this.connectionManager = connectionManager; }