Skip to content

Commit

Permalink
chore: Improve connection handling of PLC adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Jun 20, 2024
1 parent 210ceb8 commit 2927d7e
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 43 deletions.
7 changes: 7 additions & 0 deletions streampipes-extensions/streampipes-connectors-plc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,12 @@
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven_central</id>
<name>Maven Central</name>
<url>https://repo.maven.apache.org/maven2/</url>
</repository>
</repositories>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport {
@Override
public List<StreamPipesAdapter> adapters() {
var driverManager = PlcDriverManager.getDefault();
var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build();
var cachedConnectionManager = CachedPlcConnectionManager
.getBuilder(driverManager.getConnectionManager())
.build();
var adapters = new ArrayList<>(List.of(
new Plc4xModbusAdapter(cachedConnectionManager),
new Plc4xS7Adapter(cachedConnectionManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +43,7 @@ public class GenericAdapterGenerator {


public List<StreamPipesAdapter> makeAvailableAdapters(PlcDriverManager driverManager,
PlcConnectionManager connectionManager) {
CachedPlcConnectionManager connectionManager) {
var adapters = new ArrayList<StreamPipesAdapter>();
var protocolCodes = getDrivers(driverManager);
protocolCodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,23 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.EventSchemaProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.ContinuousPlcRequestReader;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.OneTimePlcRequestReader;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcRequestProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionExtractor;
import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.metadata.Option;
import org.apache.plc4x.java.api.metadata.OptionMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.List;
import java.util.function.Function;
Expand All @@ -57,18 +55,15 @@

public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger LOG = LoggerFactory.getLogger(GenericPlc4xAdapter.class);


private PullAdapterScheduler pullAdapterScheduler;
private final PlcRequestProvider requestProvider;
private final EventSchemaProvider schemaProvider;

private final PlcDriver driver;
private final PlcConnectionManager connectionManager;
private final CachedPlcConnectionManager connectionManager;

public GenericPlc4xAdapter(PlcDriver driver,
PlcConnectionManager connectionManager) {
CachedPlcConnectionManager connectionManager) {
this.requestProvider = new PlcRequestProvider();
this.schemaProvider = new EventSchemaProvider();
this.driver = driver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
package org.apache.streampipes.extensions.connectors.plc.adapter.generic.config;

import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import static org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xLabels.PLC_CODE_BLOCK;
import static org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xLabels.PLC_IP;
Expand All @@ -46,7 +46,7 @@ public class AdapterConfigurationProvider {


public IAdapterConfiguration makeConfig(PlcDriver driver,
PlcConnectionManager connectionManager) {
CachedPlcConnectionManager connectionManager) {
var driverMetadata = driver.getMetadata();
var appId = getAdapterAppId(driver);
var adapterBuilder = AdapterConfigurationBuilder.create(
Expand All @@ -55,7 +55,7 @@ public IAdapterConfiguration makeConfig(PlcDriver driver,
() -> new GenericPlc4xAdapter(driver, connectionManager)
)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withAssetResolver(
new PlcAdapterAssetResolver("org.apache.streampipes.connect.iiot.adapters.plc4x.generic", appId, driver)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ContinuousPlcRequestReader
extends OneTimePlcRequestReader implements IPullAdapter {
Expand All @@ -40,7 +38,7 @@ public class ContinuousPlcRequestReader

private final IEventCollector collector;

public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
public ContinuousPlcRequestReader(CachedPlcConnectionManager connectionManager,
Plc4xConnectionSettings settings,
PlcRequestProvider requestProvider,
IEventCollector collector) {
Expand All @@ -51,23 +49,15 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
@Override
public void pullData() throws RuntimeException {
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
readPlcData(plcConnection);
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
var event = eventGenerator.makeEvent(readResponse);
collector.collect(event);
} catch (Exception e) {
LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e);
}
}

private void readPlcData(PlcConnection plcConnection)
throws ExecutionException, InterruptedException, TimeoutException {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
var event = eventGenerator.makeEvent(response);
collector.collect(event);
}

@Override
public IPollingSettings getPollingInterval() {
return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionSettings;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,9 +33,9 @@ public class OneTimePlcRequestReader {
protected final PlcRequestProvider requestProvider;
protected final PlcEventGenerator eventGenerator;

protected final PlcConnectionManager connectionManager;
protected final CachedPlcConnectionManager connectionManager;

public OneTimePlcRequestReader(PlcConnectionManager connectionManager,
public OneTimePlcRequestReader(CachedPlcConnectionManager connectionManager,
Plc4xConnectionSettings settings,
PlcRequestProvider requestProvider) {
this.connectionManager = connectionManager;
Expand All @@ -50,9 +50,6 @@ public Map<String, Object> readPlcDataSynchronized() throws Exception {
if (!plcConnection.getMetadata().isReadSupported()) {
throw new AdapterException("This PLC does not support reading data");
}
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
return eventGenerator.makeEvent(readResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -93,13 +93,13 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
// [1] https://plc4x.apache.org/users/protocols/s7.html
""";

private final PlcConnectionManager connectionManager;
private final CachedPlcConnectionManager connectionManager;

private PullAdapterScheduler pullAdapterScheduler;

private final PlcRequestProvider requestProvider;

public Plc4xS7Adapter(PlcConnectionManager connectionManager) {
public Plc4xS7Adapter(CachedPlcConnectionManager connectionManager) {
this.requestProvider = new PlcRequestProvider();
this.connectionManager = connectionManager;
}
Expand Down

0 comments on commit 2927d7e

Please sign in to comment.