Skip to content

Commit

Permalink
fix(#2944): Fix connection error in PLC S7 connector (#2946)
Browse files Browse the repository at this point in the history
* fix(#2944): Fix connection error in PLC S7 connector

* Use PLC4X connection cache

* Remove PlcResponseHandler

* Fix test
  • Loading branch information
dominikriemer committed Jun 20, 2024
1 parent c6ab700 commit 8711c77
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 239 deletions.
7 changes: 6 additions & 1 deletion streampipes-extensions/streampipes-connectors-plc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-all</artifactId>
<version>0.12.0</version>
<version>${plc4x.version}</version>
<type>pom</type>
<exclusions>
<exclusion>
Expand All @@ -72,6 +72,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-connection-cache</artifactId>
<version>${plc4x.version}</version>
</dependency>


<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,24 @@
import org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;

import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class PlcConnectorsModuleExport implements IExtensionModuleExport {

@Override
public List<StreamPipesAdapter> adapters() {
var adapters = new ArrayList<StreamPipesAdapter>(List.of(
new Plc4xModbusAdapter(),
new Plc4xS7Adapter()
var driverManager = PlcDriverManager.getDefault();
var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build();
var adapters = new ArrayList<>(List.of(
new Plc4xModbusAdapter(cachedConnectionManager),
new Plc4xS7Adapter(cachedConnectionManager)
));
adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters());
adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, cachedConnectionManager));
return adapters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.slf4j.Logger;
Expand All @@ -41,11 +42,10 @@ public class GenericAdapterGenerator {
);


public List<StreamPipesAdapter> makeAvailableAdapters() {
public List<StreamPipesAdapter> makeAvailableAdapters(PlcDriverManager driverManager,
PlcConnectionManager connectionManager) {
var adapters = new ArrayList<StreamPipesAdapter>();
var protocolCodes = getDrivers();
var driverManager = PlcDriverManager.getDefault();
var connectionManager = driverManager.getConnectionManager();
var protocolCodes = getDrivers(driverManager);
protocolCodes
.stream()
.filter(pc -> !excludedDrivers.contains(pc))
Expand All @@ -60,9 +60,8 @@ public List<StreamPipesAdapter> makeAvailableAdapters() {
return adapters;
}

private Set<String> getDrivers() {
return PlcDriverManager
.getDefault()
private Set<String> getDrivers(PlcDriverManager driverManager) {
return driverManager
.getProtocolCodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeC
private PullAdapterScheduler pullAdapterScheduler;
private final PlcRequestProvider requestProvider;
private final EventSchemaProvider schemaProvider;
private ContinuousPlcRequestReader plcRequestReader;

private final PlcDriver driver;
private final PlcConnectionManager connectionManager;
Expand All @@ -88,19 +87,14 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
var settings = new Plc4xConnectionExtractor(
extractor.getStaticPropertyExtractor(), driver.getProtocolCode()
).makeSettings();
this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId());
}

@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) {
try {
this.plcRequestReader.closeConnection();
} catch (Exception e) {
LOG.error("Error when closing connection", e);
}
this.pullAdapterScheduler.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@
import org.apache.streampipes.extensions.api.connect.IPollingSettings;
import org.apache.streampipes.extensions.api.connect.IPullAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings;
import org.apache.streampipes.extensions.connectors.plc.adapter.s7.PlcReadResponseHandler;
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ContinuousPlcRequestReader
extends OneTimePlcRequestReader implements IPullAdapter, PlcReadResponseHandler {
extends OneTimePlcRequestReader implements IPullAdapter {

private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class);

private final IEventCollector collector;
private PlcConnection plcConnection;

public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
Plc4xConnectionSettings settings,
Expand All @@ -53,45 +50,26 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,

@Override
public void pullData() throws RuntimeException {
try {
var connection = getConnection();
readPlcData(connection, this);
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
readPlcData(plcConnection);
} catch (Exception e) {
LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e);
}
}

private PlcConnection getConnection() throws PlcConnectionException {
if (plcConnection == null || !plcConnection.isConnected()) {
this.plcConnection = connectionManager.getConnection(settings.connectionString());
}
return this.plcConnection;
}

private void readPlcData(PlcConnection plcConnection, PlcReadResponseHandler handler) {
private void readPlcData(PlcConnection plcConnection)
throws ExecutionException, InterruptedException, TimeoutException {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
CompletableFuture<? extends PlcReadResponse> asyncResponse = readRequest.execute();
asyncResponse.whenComplete(handler::onReadResult);
var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
var event = eventGenerator.makeEvent(response);
collector.collect(event);
}

@Override
public IPollingSettings getPollingInterval() {
return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval());
}

@Override
public void onReadResult(PlcReadResponse response, Throwable throwable) {
if (throwable != null) {
LOG.error(throwable.getMessage());
} else {
var event = eventGenerator.makeEvent(response);
collector.collect(event);
}
}

public void closeConnection() throws Exception {
if (this.plcConnection != null && this.plcConnection.isConnected()) {
this.plcConnection.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings;

import org.apache.plc4x.java.api.PlcConnection;
Expand Down Expand Up @@ -46,6 +47,12 @@ public OneTimePlcRequestReader(PlcConnectionManager connectionManager,
public Map<String, Object> readPlcDataSynchronized() throws Exception {
var connectionString = settings.connectionString();
try (PlcConnection plcConnection = connectionManager.getConnection(connectionString)) {
if (!plcConnection.getMetadata().isReadSupported()) {
throw new AdapterException("This PLC does not support reading data");
}
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
return eventGenerator.makeEvent(readResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
Expand All @@ -44,11 +45,10 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
Expand Down Expand Up @@ -101,11 +101,9 @@ public class Plc4xModbusAdapter implements StreamPipesAdapter, IPullAdapter {
* Connection to the PLC
*/
private PlcConnection plcConnection;
private PlcConnectionManager connectionManager;

/**
* Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
*/
public Plc4xModbusAdapter() {
public Plc4xModbusAdapter(PlcConnectionManager connectionManager) {
}

/**
Expand Down Expand Up @@ -179,7 +177,7 @@ private void before(IStaticPropertyExtractor extractor) throws AdapterException
getConfigurations(extractor);

try {
this.plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection(
this.plcConnection = connectionManager.getConnection(
"modbus-tcp:tcp://" + this.ip + ":" + this.port + "?unit-identifier=" + this.slaveID);

if (!this.plcConnection.getMetadata().isReadSupported()) {
Expand Down Expand Up @@ -213,16 +211,13 @@ public void pullData() {
}
PlcReadRequest readRequest = builder.build();


// Execute the request
PlcReadResponse response = null;

try {
response = readRequest.execute().get();
} catch (InterruptedException ie) {
} catch (InterruptedException | ExecutionException ie) {
ie.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
}

// Create an event containing the value of the PLC
Expand Down Expand Up @@ -273,9 +268,9 @@ public PollingSettings getPollingInterval() {
*/
@Override
public IAdapterConfiguration declareConfig() {
return AdapterConfigurationBuilder.create(ID, 1, Plc4xModbusAdapter::new)
return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xModbusAdapter(connectionManager))
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Manufacturing)
.requiredTextParameter(Labels.withId(PLC_IP)).requiredIntegerParameter(Labels.withId(PLC_PORT))
.requiredTextParameter(Labels.withId(PLC_NODE_ID)).requiredCollection(Labels.withId(PLC_NODES),
Expand All @@ -298,8 +293,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
}

@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) {
this.pullAdapterScheduler.shutdown();
}

Expand Down
Loading

0 comments on commit 8711c77

Please sign in to comment.