Skip to content

Commit

Permalink
Use PLC4X connection cache
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Jun 19, 2024
1 parent 6ab094b commit 52b713f
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 70 deletions.
7 changes: 6 additions & 1 deletion streampipes-extensions/streampipes-connectors-plc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-all</artifactId>
<version>0.12.0</version>
<version>${plc4x.version}</version>
<type>pom</type>
<exclusions>
<exclusion>
Expand All @@ -72,6 +72,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-connection-cache</artifactId>
<version>${plc4x.version}</version>
</dependency>


<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;

import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -39,12 +40,12 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport {
@Override
public List<StreamPipesAdapter> adapters() {
var driverManager = PlcDriverManager.getDefault();
var connectionManager = driverManager.getConnectionManager();
var adapters = new ArrayList<StreamPipesAdapter>(List.of(
new Plc4xModbusAdapter(),
new Plc4xS7Adapter(driverManager, connectionManager)
var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build();
var adapters = new ArrayList<>(List.of(
new Plc4xModbusAdapter(cachedConnectionManager),
new Plc4xS7Adapter(cachedConnectionManager)
));
adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, connectionManager));
adapters.addAll(new GenericAdapterGenerator().makeAvailableAdapters(driverManager, cachedConnectionManager));
return adapters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeC
private PullAdapterScheduler pullAdapterScheduler;
private final PlcRequestProvider requestProvider;
private final EventSchemaProvider schemaProvider;
private ContinuousPlcRequestReader plcRequestReader;

private final PlcDriver driver;
private final PlcConnectionManager connectionManager;
Expand All @@ -88,19 +87,14 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
var settings = new Plc4xConnectionExtractor(
extractor.getStaticPropertyExtractor(), driver.getProtocolCode()
).makeSettings();
this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId());
}

@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) {
try {
this.plcRequestReader.closeConnection();
} catch (Exception e) {
LOG.error("Error when closing connection", e);
}
this.pullAdapterScheduler.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +39,6 @@ public class ContinuousPlcRequestReader
private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class);

private final IEventCollector collector;
private PlcConnection plcConnection;

public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
Plc4xConnectionSettings settings,
Expand All @@ -52,21 +50,16 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,

@Override
public void pullData() throws RuntimeException {
try {
var connection = getConnection();
readPlcData(connection);
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
readPlcData(plcConnection);
} catch (Exception e) {
LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e);
}
}

private PlcConnection getConnection() throws PlcConnectionException {
if (plcConnection == null || !plcConnection.isConnected()) {
this.plcConnection = connectionManager.getConnection(settings.connectionString());
}
return this.plcConnection;
}

private void readPlcData(PlcConnection plcConnection)
throws ExecutionException, InterruptedException, TimeoutException {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
Expand All @@ -79,10 +72,4 @@ private void readPlcData(PlcConnection plcConnection)
public IPollingSettings getPollingInterval() {
return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval());
}

public void closeConnection() throws Exception {
if (this.plcConnection != null && this.plcConnection.isConnected()) {
this.plcConnection.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public Map<String, Object> readPlcDataSynchronized() throws Exception {
if (!plcConnection.getMetadata().isReadSupported()) {
throw new AdapterException("This PLC does not support reading data");
}
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
return eventGenerator.makeEvent(readResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
Expand All @@ -44,11 +45,10 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
Expand Down Expand Up @@ -101,11 +101,9 @@ public class Plc4xModbusAdapter implements StreamPipesAdapter, IPullAdapter {
* Connection to the PLC
*/
private PlcConnection plcConnection;
private PlcConnectionManager connectionManager;

/**
* Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
*/
public Plc4xModbusAdapter() {
public Plc4xModbusAdapter(PlcConnectionManager connectionManager) {
}

/**
Expand Down Expand Up @@ -179,7 +177,7 @@ private void before(IStaticPropertyExtractor extractor) throws AdapterException
getConfigurations(extractor);

try {
this.plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection(
this.plcConnection = connectionManager.getConnection(
"modbus-tcp:tcp://" + this.ip + ":" + this.port + "?unit-identifier=" + this.slaveID);

if (!this.plcConnection.getMetadata().isReadSupported()) {
Expand Down Expand Up @@ -213,16 +211,13 @@ public void pullData() {
}
PlcReadRequest readRequest = builder.build();


// Execute the request
PlcReadResponse response = null;

try {
response = readRequest.execute().get();
} catch (InterruptedException ie) {
} catch (InterruptedException | ExecutionException ie) {
ie.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
}

// Create an event containing the value of the PLC
Expand Down Expand Up @@ -273,9 +268,9 @@ public PollingSettings getPollingInterval() {
*/
@Override
public IAdapterConfiguration declareConfig() {
return AdapterConfigurationBuilder.create(ID, 1, Plc4xModbusAdapter::new)
return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xModbusAdapter(connectionManager))
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Manufacturing)
.requiredTextParameter(Labels.withId(PLC_IP)).requiredIntegerParameter(Labels.withId(PLC_PORT))
.requiredTextParameter(Labels.withId(PLC_NODE_ID)).requiredCollection(Labels.withId(PLC_NODES),
Expand All @@ -298,8 +293,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
}

@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) {
this.pullAdapterScheduler.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
import org.apache.streampipes.sdk.helpers.Options;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -68,8 +65,6 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
*/
public static final String ID = "org.apache.streampipes.connect.iiot.adapters.plc4x.s7";

private static final Logger LOG = LoggerFactory.getLogger(Plc4xS7Adapter.class);

private static final String S7_URL = "s7://";

/**
Expand Down Expand Up @@ -98,23 +93,14 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
// [1] https://plc4x.apache.org/users/protocols/s7.html
""";

/**
* Values of user configuration parameters
*/
private Map<String, String> nodes;

private final PlcDriverManager driverManager;
private final PlcConnectionManager connectionManager;

private PullAdapterScheduler pullAdapterScheduler;
private ContinuousPlcRequestReader plcRequestReader;

private final PlcRequestProvider requestProvider;

public Plc4xS7Adapter(PlcDriverManager driverManager,
PlcConnectionManager connectionManager) {
public Plc4xS7Adapter(PlcConnectionManager connectionManager) {
this.requestProvider = new PlcRequestProvider();
this.driverManager = driverManager;
this.connectionManager = connectionManager;
}

Expand All @@ -126,7 +112,7 @@ public Plc4xS7Adapter(PlcDriverManager driverManager,
*/
@Override
public IAdapterConfiguration declareConfig() {
return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(driverManager, connectionManager))
return AdapterConfigurationBuilder.create(ID, 1, () -> new Plc4xS7Adapter(connectionManager))
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Manufacturing)
Expand All @@ -152,19 +138,14 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) {
var settings = getConfigurations(extractor.getStaticPropertyExtractor());
this.plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
var plcRequestReader = new ContinuousPlcRequestReader(connectionManager, settings, requestProvider, collector);
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(plcRequestReader, extractor.getAdapterDescription().getElementId());
}

@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) {
try {
this.plcRequestReader.closeConnection();
} catch (Exception e) {
LOG.error("Error when closing connection", e);
}
this.pullAdapterScheduler.shutdown();
}

Expand Down Expand Up @@ -203,7 +184,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra
var ip = extractor.singleValueParameter(PLC_IP, String.class);
var pollingInterval = extractor.singleValueParameter(PLC_POLLING_INTERVAL, Integer.class);

Map<String, String> nodes = Map.of();
Map<String, String> nodes;

var selectedAlternative = extractor.selectedAlternativeInternalId(PLC_NODE_INPUT_ALTERNATIVES);

Expand All @@ -215,7 +196,7 @@ private Plc4xConnectionSettings getConfigurations(IStaticPropertyExtractor extra
} else {
// Alternative Advanced
var codePropertyInput = extractor.codeblockValue(PLC_CODE_BLOCK);
this.nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput);
nodes = new ConfigurationParser().getNodeInformationFromCodeProperty(codePropertyInput);
}

return new Plc4xConnectionSettings(S7_URL + ip, pollingInterval, nodes);
Expand Down

0 comments on commit 52b713f

Please sign in to comment.