From 8711c772653e53b653c55705ac390696816d0c80 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 20 Jun 2024 07:54:06 +0200 Subject: [PATCH] fix(#2944): Fix connection error in PLC S7 connector (#2946) * fix(#2944): Fix connection error in PLC S7 connector * Use PLC4X connection cache * Remove PlcResponseHandler * Fix test --- .../streampipes-connectors-plc/pom.xml | 7 +- .../plc/PlcConnectorsModuleExport.java | 14 +- .../plc/adapter/GenericAdapterGenerator.java | 13 +- .../adapter/generic/GenericPlc4xAdapter.java | 8 +- .../ContinuousPlcRequestReader.java | 48 ++--- .../connection/OneTimePlcRequestReader.java | 7 + .../adapter/modbus/Plc4xModbusAdapter.java | 24 +-- .../plc/adapter/s7/Plc4xS7Adapter.java | 189 +++++------------- .../adapter/s7/PlcReadResponseHandler.java | 27 --- .../Plc4xModbusAdapterVersionedConfig.java | 6 +- 10 files changed, 104 insertions(+), 239 deletions(-) delete mode 100644 streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/PlcReadResponseHandler.java diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml b/streampipes-extensions/streampipes-connectors-plc/pom.xml index 04f3affbdc..d9e0a6404e 100644 --- a/streampipes-extensions/streampipes-connectors-plc/pom.xml +++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml @@ -55,7 +55,7 @@ org.apache.plc4x plc4j-driver-all - 0.12.0 + ${plc4x.version} pom @@ -72,6 +72,11 @@ + + org.apache.plc4x + plc4j-connection-cache + ${plc4x.version} + diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java index e5ecfe14b4..1bbc1df1b2 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java @@ -28,18 +28,24 @@ import org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter; import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter; +import org.apache.plc4x.java.api.PlcDriverManager; +import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; + import java.util.ArrayList; import java.util.Collections; import java.util.List; public class PlcConnectorsModuleExport implements IExtensionModuleExport { + @Override public List adapters() { - var adapters = new ArrayList(List.of( - new Plc4xModbusAdapter(), - new Plc4xS7Adapter() + var driverManager = PlcDriverManager.getDefault(); + var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build(); + var adapters = new ArrayList<>(List.of( + new Plc4xModbusAdapter(cachedConnectionManager), + new Plc4xS7Adapter(cachedConnectionManager) )); - adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters()); + adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, cachedConnectionManager)); return adapters; } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java index 56f3d83095..e8a0ad2150 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/GenericAdapterGenerator.java @@ -21,6 +21,7 @@ import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter; +import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.PlcDriverManager; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.slf4j.Logger; @@ -41,11 +42,10 @@ public class GenericAdapterGenerator { ); - public List makeAvailableAdapters() { + public List makeAvailableAdapters(PlcDriverManager driverManager, + PlcConnectionManager connectionManager) { var adapters = new ArrayList(); - var protocolCodes = getDrivers(); - var driverManager = PlcDriverManager.getDefault(); - var connectionManager = driverManager.getConnectionManager(); + var protocolCodes = getDrivers(driverManager); protocolCodes .stream() .filter(pc -> !excludedDrivers.contains(pc)) @@ -60,9 +60,8 @@ public List makeAvailableAdapters() { return adapters; } - private Set getDrivers() { - return PlcDriverManager - .getDefault() + private Set getDrivers(PlcDriverManager driverManager) { + return driverManager .getProtocolCodes(); } } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java index 9f8857c713..eb5a14591f 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/GenericPlc4xAdapter.java @@ -63,7 +63,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeC private PullAdapterScheduler pullAdapterScheduler; private final PlcRequestProvider requestProvider; private final EventSchemaProvider schemaProvider; - private ContinuousPlcRequestReader plcRequestReader; private final PlcDriver driver; private final PlcConnectionManager connectionManager; @@ -88,7 +87,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, var settings = new Plc4xConnectionExtractor( extractor.getStaticPropertyExtractor(), driver.getProtocolCode() ).makeSettings(); - this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); + var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); this.pullAdapterScheduler = new PullAdapterScheduler(); this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId()); } @@ -96,11 +95,6 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, @Override public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { - try { - this.plcRequestReader.closeConnection(); - } catch (Exception e) { - LOG.error("Error when closing connection", e); - } this.pullAdapterScheduler.shutdown(); } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java index bb23816a16..0e724b6558 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java @@ -22,26 +22,23 @@ import org.apache.streampipes.extensions.api.connect.IPollingSettings; import org.apache.streampipes.extensions.api.connect.IPullAdapter; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings; -import org.apache.streampipes.extensions.connectors.plc.adapter.s7.PlcReadResponseHandler; import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings; import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.PlcConnectionManager; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class ContinuousPlcRequestReader - extends OneTimePlcRequestReader implements IPullAdapter, PlcReadResponseHandler { + extends OneTimePlcRequestReader implements IPullAdapter { private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class); private final IEventCollector collector; - private PlcConnection plcConnection; public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, Plc4xConnectionSettings settings, @@ -53,45 +50,26 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager, @Override public void pullData() throws RuntimeException { - try { - var connection = getConnection(); - readPlcData(connection, this); + try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) { + if (!plcConnection.isConnected()) { + plcConnection.connect(); + } + readPlcData(plcConnection); } catch (Exception e) { LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e); } } - private PlcConnection getConnection() throws PlcConnectionException { - if (plcConnection == null || !plcConnection.isConnected()) { - this.plcConnection = connectionManager.getConnection(settings.connectionString()); - } - return this.plcConnection; - } - - private void readPlcData(PlcConnection plcConnection, PlcReadResponseHandler handler) { + private void readPlcData(PlcConnection plcConnection) + throws ExecutionException, InterruptedException, TimeoutException { var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); - CompletableFuture asyncResponse = readRequest.execute(); - asyncResponse.whenComplete(handler::onReadResult); + var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); + var event = eventGenerator.makeEvent(response); + collector.collect(event); } @Override public IPollingSettings getPollingInterval() { return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval()); } - - @Override - public void onReadResult(PlcReadResponse response, Throwable throwable) { - if (throwable != null) { - LOG.error(throwable.getMessage()); - } else { - var event = eventGenerator.makeEvent(response); - collector.collect(event); - } - } - - public void closeConnection() throws Exception { - if (this.plcConnection != null && this.plcConnection.isConnected()) { - this.plcConnection.close(); - } - } } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java index e03566d99f..4961b1dc94 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/OneTimePlcRequestReader.java @@ -18,6 +18,7 @@ package org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection; +import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings; import org.apache.plc4x.java.api.PlcConnection; @@ -46,6 +47,12 @@ public OneTimePlcRequestReader(PlcConnectionManager connectionManager, public Map readPlcDataSynchronized() throws Exception { var connectionString = settings.connectionString(); try (PlcConnection plcConnection = connectionManager.getConnection(connectionString)) { + if (!plcConnection.getMetadata().isReadSupported()) { + throw new AdapterException("This PLC does not support reading data"); + } + if (!plcConnection.isConnected()) { + plcConnection.connect(); + } var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes()); var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); return eventGenerator.makeEvent(readResponse); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java index 97af4a211b..6c0673797c 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/modbus/Plc4xModbusAdapter.java @@ -32,6 +32,7 @@ import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.staticproperty.CollectionStaticProperty; @@ -44,11 +45,10 @@ import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; -import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.sdk.utils.Datatypes; import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcDriverManager; +import org.apache.plc4x.java.api.PlcConnectionManager; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; @@ -101,11 +101,9 @@ public class Plc4xModbusAdapter implements StreamPipesAdapter, IPullAdapter { * Connection to the PLC */ private PlcConnection plcConnection; + private PlcConnectionManager connectionManager; - /** - * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory - */ - public Plc4xModbusAdapter() { + public Plc4xModbusAdapter(PlcConnectionManager connectionManager) { } /** @@ -179,7 +177,7 @@ private void before(IStaticPropertyExtractor extractor) throws AdapterException getConfigurations(extractor); try { - this.plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection( + this.plcConnection = connectionManager.getConnection( "modbus-tcp:tcp://" + this.ip + ":" + this.port + "?unit-identifier=" + this.slaveID); if (!this.plcConnection.getMetadata().isReadSupported()) { @@ -213,16 +211,13 @@ public void pullData() { } PlcReadRequest readRequest = builder.build(); - // Execute the request PlcReadResponse response = null; try { response = readRequest.execute().get(); - } catch (InterruptedException ie) { + } catch (InterruptedException | ExecutionException ie) { ie.printStackTrace(); - } catch (ExecutionException ee) { - ee.printStackTrace(); } // Create an event containing the value of the PLC @@ -273,9 +268,9 @@ public PollingSettings getPollingInterval() { */ @Override public IAdapterConfiguration declareConfig() { - return AdapterConfigurationBuilder.create(ID, 1, Plc4xModbusAdapter::new) + return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xModbusAdapter(connectionManager)) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withCategory(AdapterType.Manufacturing) .requiredTextParameter(Labels.withId(PLC_IP)).requiredIntegerParameter(Labels.withId(PLC_PORT)) .requiredTextParameter(Labels.withId(PLC_NODE_ID)).requiredCollection(Labels.withId(PLC_NODES), @@ -298,8 +293,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, } @Override - public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) - throws AdapterException { + public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { this.pullAdapterScheduler.shutdown(); } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java index 1e98d4e224..2761689b3e 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java @@ -22,20 +22,21 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration; import org.apache.streampipes.extensions.api.connect.IEventCollector; -import org.apache.streampipes.extensions.api.connect.IPullAdapter; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.EventSchemaProvider; -import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcEventGenerator; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.ContinuousPlcRequestReader; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.OneTimePlcRequestReader; import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcRequestProvider; +import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings; import org.apache.streampipes.extensions.connectors.plc.adapter.s7.config.ConfigurationParser; import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler; -import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.staticproperty.CollectionStaticProperty; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -49,31 +50,21 @@ import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; -import org.apache.streampipes.sdk.utils.Assets; -import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcDriverManager; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.apache.plc4x.java.api.messages.PlcReadResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.plc4x.java.api.PlcConnectionManager; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -public class Plc4xS7Adapter implements StreamPipesAdapter, IPullAdapter, PlcReadResponseHandler { +public class Plc4xS7Adapter implements StreamPipesAdapter { /** * A unique id to identify the Plc4xS7Adapter */ public static final String ID = "org.apache.streampipes.connect.iiot.adapters.plc4x.s7"; - private static final Logger LOG = LoggerFactory.getLogger(Plc4xS7Adapter.class); - private static final String S7_URL = "s7://"; /** @@ -102,124 +93,15 @@ public class Plc4xS7Adapter implements StreamPipesAdapter, IPullAdapter, PlcRead // [1] https://plc4x.apache.org/users/protocols/s7.html """; - /** - * Values of user configuration parameters - */ - private String ip; - private int pollingInterval; - private Map nodes; - - private PlcDriverManager driverManager; + private final PlcConnectionManager connectionManager; private PullAdapterScheduler pullAdapterScheduler; - private IEventCollector collector; - - private PlcEventGenerator eventGenerator; - private final PlcRequestProvider requestProvider; - public Plc4xS7Adapter() { + public Plc4xS7Adapter(PlcConnectionManager connectionManager) { this.requestProvider = new PlcRequestProvider(); - } - - /** - * This method is executed when the adapter is started. A connection to the PLC is initialized - */ - private void before(IStaticPropertyExtractor extractor) { - // Extract user input - getConfigurations(extractor); - - this.driverManager = PlcDriverManager.getDefault(); - try (PlcConnection plcConnection = this.driverManager.getConnectionManager().getConnection(S7_URL + this.ip)) { - if (!plcConnection.getMetadata().isReadSupported()) { - LOG.error("The S7 on IP: " + this.ip + " does not support reading data"); - } - } catch (PlcConnectionException e) { - LOG.error("Could not establish connection to S7 with ip " + this.ip, e); - } catch (Exception e) { - LOG.error("Could not close connection to S7 with ip " + this.ip, e); - } - } - - - /** - * pullData is called iteratively according to the polling interval defined in getPollInterval. - */ - @Override - public void pullData() { - // Create PLC read request - try (PlcConnection plcConnection = this.driverManager.getConnectionManager().getConnection(S7_URL + this.ip)) { - readPlcData(plcConnection, this); - } catch (Exception e) { - LOG.error("Error while reading from PLC with IP {} ", this.ip, e); - } - } - - private void readPlcData(PlcConnection plcConnection, PlcReadResponseHandler handler) { - var readRequest = requestProvider.makeReadRequest(plcConnection, this.nodes); - // Execute the request - CompletableFuture asyncResponse = readRequest.execute(); - asyncResponse.whenComplete(handler::onReadResult); - } - - private Map readPlcDataSynchronized() throws Exception { - try (PlcConnection plcConnection = this.driverManager.getConnectionManager().getConnection(S7_URL + this.ip)) { - var readRequest = requestProvider.makeReadRequest(plcConnection, this.nodes); - // Execute the request - var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS); - return eventGenerator.makeEvent(readResponse); - } - } - - /** - * Define the polling interval of this adapter. Default is to poll every second - * - * @return PollingSettings - */ - @Override - public PollingSettings getPollingInterval() { - return PollingSettings.from(TimeUnit.MILLISECONDS, this.pollingInterval); - } - - /** - * Extracts the user configuration from the SpecificAdapterStreamDescription and sets the local variales - * - * @param extractor StaticPropertyExtractor - */ - private void getConfigurations(IStaticPropertyExtractor extractor) { - - this.ip = extractor.singleValueParameter(PLC_IP, String.class); - this.pollingInterval = extractor.singleValueParameter(PLC_POLLING_INTERVAL, Integer.class); - - this.nodes = new HashMap<>(); - - var selectedAlternative = extractor.selectedAlternativeInternalId(PLC_NODE_INPUT_ALTERNATIVES); - - if (selectedAlternative.equals(PLC_NODE_INPUT_COLLECTION_ALTERNATIVE)) { - // Alternative Simple - var csp = (CollectionStaticProperty) extractor.getStaticPropertyByName(PLC_NODES); - this.nodes = getNodeInformationFromCollectionStaticProperty(csp); - - } else { - // Alternative Advanced - var codePropertyInput = extractor.codeblockValue(PLC_CODE_BLOCK); - this.nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput); - } - this.eventGenerator = new PlcEventGenerator(this.nodes); - } - - - @Override - public void onReadResult(PlcReadResponse response, Throwable throwable) { - if (throwable != null) { - throwable.printStackTrace(); - LOG.error(throwable.getMessage()); - } else { - var event = eventGenerator.makeEvent(response); - // publish the final event - collector.collect(event); - } + this.connectionManager = connectionManager; } /** @@ -230,9 +112,9 @@ public void onReadResult(PlcReadResponse response, Throwable throwable) { */ @Override public IAdapterConfiguration declareConfig() { - return AdapterConfigurationBuilder.create(ID, 1, Plc4xS7Adapter::new) + return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(connectionManager)) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withCategory(AdapterType.Manufacturing) .requiredTextParameter(Labels.withId(PLC_IP)) .requiredIntegerParameter(Labels.withId(PLC_POLLING_INTERVAL), 1000) @@ -255,10 +137,10 @@ public IAdapterConfiguration declareConfig() { public void onAdapterStarted(IAdapterParameterExtractor extractor, IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext) { - this.before(extractor.getStaticPropertyExtractor()); - this.collector = collector; + var settings = getConfigurations(extractor.getStaticPropertyExtractor()); + var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector); this.pullAdapterScheduler = new PullAdapterScheduler(); - this.pullAdapterScheduler.schedule(this, extractor.getAdapterDescription().getElementId()); + this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId()); } @Override @@ -270,19 +152,18 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor, @Override public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor, IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException { - // Extract user input try { - getConfigurations(extractor.getStaticPropertyExtractor()); + var settings = getConfigurations(extractor.getStaticPropertyExtractor()); - if (this.pollingInterval < 10) { - throw new AdapterException("Polling interval must be higher than 10. Current value: " + this.pollingInterval); + if (settings.pollingInterval() < 10) { + throw new AdapterException( + String.format("Polling interval must be higher than 10. Current value: %s", settings.pollingInterval()) + ); } GuessSchemaBuilder builder = GuessSchemaBuilder.create(); - List allProperties = new EventSchemaProvider().makeSchema(this.nodes); - - this.before(extractor.getStaticPropertyExtractor()); - var event = readPlcDataSynchronized(); + List allProperties = new EventSchemaProvider().makeSchema(settings.nodes()); + var event = new OneTimePlcRequestReader(connectionManager, settings, requestProvider).readPlcDataSynchronized(); builder.properties(allProperties); builder.preview(event); @@ -293,6 +174,34 @@ public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor, } } + /** + * Extracts the user configuration from the SpecificAdapterStreamDescription and sets the local variales + * + * @param extractor StaticPropertyExtractor + */ + private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extractor) { + + var ip = extractor.singleValueParameter(PLC_IP, String.class); + var pollingInterval = extractor.singleValueParameter(PLC_POLLING_INTERVAL, Integer.class); + + Map nodes; + + var selectedAlternative = extractor.selectedAlternativeInternalId(PLC_NODE_INPUT_ALTERNATIVES); + + if (selectedAlternative.equals(PLC_NODE_INPUT_COLLECTION_ALTERNATIVE)) { + // Alternative Simple + var csp = (CollectionStaticProperty) extractor.getStaticPropertyByName(PLC_NODES); + nodes = getNodeInformationFromCollectionStaticProperty(csp); + + } else { + // Alternative Advanced + var codePropertyInput = extractor.codeblockValue(PLC_CODE_BLOCK); + nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput); + } + + return new Plc4xConnectionSettings(S7_URL + ip, pollingInterval, nodes); + } + private Map getNodeInformationFromCollectionStaticProperty(CollectionStaticProperty csp) { var result = new HashMap(); diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/PlcReadResponseHandler.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/PlcReadResponseHandler.java deleted file mode 100644 index 2a4a44c762..0000000000 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/PlcReadResponseHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.connectors.plc.adapter.s7; - -import org.apache.plc4x.java.api.messages.PlcReadResponse; - -public interface PlcReadResponseHandler { - - void onReadResult(PlcReadResponse response, - Throwable throwable); -} diff --git a/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/migration/config/Plc4xModbusAdapterVersionedConfig.java b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/migration/config/Plc4xModbusAdapterVersionedConfig.java index 11509f5d5d..d88fbbb923 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/migration/config/Plc4xModbusAdapterVersionedConfig.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/migration/config/Plc4xModbusAdapterVersionedConfig.java @@ -21,12 +21,12 @@ import org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; -import org.apache.streampipes.sdk.utils.Assets; import static org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter.ID; import static org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter.PLC_IP; @@ -40,9 +40,9 @@ public class Plc4xModbusAdapterVersionedConfig { public static AdapterDescription getPlc4xModbusAdapterDescriptionV0() { - return AdapterConfigurationBuilder.create(ID, 0, Plc4xModbusAdapter::new) + return AdapterConfigurationBuilder.create(ID, 0, () -> new Plc4xModbusAdapter(null)) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withCategory(AdapterType.Manufacturing) .requiredTextParameter(Labels.withId(PLC_IP)) .requiredTextParameter(Labels.withId(PLC_PORT))