From 2b2cdecc5b9230c33f997039f8650123c5aa4194 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Fri, 21 Jun 2024 17:47:35 +0200 Subject: [PATCH] chore: Improve connection handling of PLC adapters (#2951) * chore: Improve connection handling of PLC adapters * Cleanup pom * Use PlcConnectionManager API --- .../streampipes-connectors-plc/pom.xml | 1 - .../plc/PlcConnectorsModuleExport.java | 4 +++- .../adapter/generic/GenericPlc4xAdapter.java | 10 +++------- .../config/AdapterConfigurationProvider.java | 6 +++--- .../ContinuousPlcRequestReader.java | 19 +++++-------------- .../connection/OneTimePlcRequestReader.java | 3 --- .../plc/adapter/s7/Plc4xS7Adapter.java | 3 +-- 7 files changed, 15 insertions(+), 31 deletions(-) diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml b/streampipes-extensions/streampipes-connectors-plc/pom.xml index 5c63b63023..90039ce19d 100644 --- a/streampipes-extensions/streampipes-connectors-plc/pom.xml +++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml @@ -91,5 +91,4 @@ test - diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java index 1bbc1df1b2..a62d2a84c5 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java @@ -40,7 +40,9 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport { @Override public List adapters() { var driverManager = PlcDriverManager.getDefault(); - var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build(); + var cachedConnectionManager = CachedPlcConnectionManager + .getBuilder(driverManager.getConnectionManager()) + .build(); var adapters = new ArrayList<>(List.of( new Plc4xModbusAdapter(cachedConnectionManager), new Plc4xS7Adapter(cachedConnectionManager) diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java index eb5a14591f..56340f903f 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java @@ -28,12 +28,12 @@ import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.EventSchemaProvider; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.ContinuousPlcRequestReader; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.OneTimePlcRequestReader; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcRequestProvider; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionExtractor; import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler; import org.apache.streampipes.model.connect.guess.GuessSchema; @@ -45,8 +45,7 @@ import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.metadata.Option; import org.apache.plc4x.java.api.metadata.OptionMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import java.util.List; import java.util.function.Function; @@ -57,9 +56,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeConfig { - private static final Logger LOG = LoggerFactory.getLogger(GenericPlc4xAdapter.class); - - private PullAdapterScheduler pullAdapterScheduler; private final PlcRequestProvider requestProvider; private final EventSchemaProvider schemaProvider; diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java index c8e8f5354b..1877ce3922 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/config/AdapterConfigurationProvider.java @@ -19,15 +19,15 @@ package org.apache.streampipes.extensions.connectors.plc.adapter.generic.config; import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver; import org.apache.streampipes.model.AdapterType; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.staticproperty.Option; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.CodeLanguage; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; -import org.apache.streampipes.sdk.utils.Assets; import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.PlcDriver; @@ -55,7 +55,7 @@ public IAdapterConfiguration makeConfig(PlcDriver driver, () -> new GenericPlc4xAdapter(driver, connectionManager) ) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withAssetResolver( new PlcAdapterAssetResolver("org.apache.streampipes.connect.iiot.adapters.plc4x.generic", appId, driver) ) diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java index 0e724b6558..befc54a234 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java @@ -26,12 +26,11 @@ import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.PlcConnectionManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class ContinuousPlcRequestReader extends OneTimePlcRequestReader implements IPullAdapter { @@ -51,23 +50,15 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, @Override public void pullData() throws RuntimeException { try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) { - if (!plcConnection.isConnected()) { - plcConnection.connect(); - } - readPlcData(plcConnection); + var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); + var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); + var event = eventGenerator.makeEvent(readResponse); + collector.collect(event); } catch (Exception e) { LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e); } } - private void readPlcData(PlcConnection plcConnection) - throws ExecutionException, InterruptedException, TimeoutException { - var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); - var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); - var event = eventGenerator.makeEvent(response); - collector.collect(event); - } - @Override public IPollingSettings getPollingInterval() { return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval()); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java index 4961b1dc94..cab3a53ad6 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java @@ -50,9 +50,6 @@ public Map readPlcDataSynchronized() throws Exception { if (!plcConnection.getMetadata().isReadSupported()) { throw new AdapterException("This PLC does not support reading data"); } - if (!plcConnection.isConnected()) { - plcConnection.connect(); - } var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); return eventGenerator.makeEvent(readResponse); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java index 2761689b3e..1c7657a79a 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java @@ -94,11 +94,10 @@ public class Plc4xS7Adapter implements StreamPipesAdapter { """; private final PlcConnectionManager connectionManager; + private final PlcRequestProvider requestProvider; private PullAdapterScheduler pullAdapterScheduler; - private final PlcRequestProvider requestProvider; - public Plc4xS7Adapter(PlcConnectionManager connectionManager) { this.requestProvider = new PlcRequestProvider(); this.connectionManager = connectionManager;