Skip to content

Commit

Permalink
chore: Improve connection handling of PLC adapters (#2951)
Browse files Browse the repository at this point in the history
* chore: Improve connection handling of PLC adapters

* Cleanup pom

* Use PlcConnectionManager API
  • Loading branch information
dominikriemer authored Jun 21, 2024
1 parent ce32db8 commit 2b2cdec
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 31 deletions.
1 change: 0 additions & 1 deletion streampipes-extensions/streampipes-connectors-plc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport {
@Override
public List<StreamPipesAdapter> adapters() {
var driverManager = PlcDriverManager.getDefault();
var cachedConnectionManager = CachedPlcConnectionManager.getBuilder().build();
var cachedConnectionManager = CachedPlcConnectionManager
.getBuilder(driverManager.getConnectionManager())
.build();
var adapters = new ArrayList<>(List.of(
new Plc4xModbusAdapter(cachedConnectionManager),
new Plc4xS7Adapter(cachedConnectionManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.EventSchemaProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.ContinuousPlcRequestReader;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.OneTimePlcRequestReader;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.connection.PlcRequestProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.AdapterConfigurationProvider;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.config.MetadataOptionGenerator;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.model.Plc4xConnectionExtractor;
import org.apache.streampipes.extensions.management.connect.PullAdapterScheduler;
import org.apache.streampipes.model.connect.guess.GuessSchema;
Expand All @@ -45,8 +45,7 @@
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.metadata.Option;
import org.apache.plc4x.java.api.metadata.OptionMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.util.List;
import java.util.function.Function;
Expand All @@ -57,9 +56,6 @@

public class GenericPlc4xAdapter implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger LOG = LoggerFactory.getLogger(GenericPlc4xAdapter.class);


private PullAdapterScheduler pullAdapterScheduler;
private final PlcRequestProvider requestProvider;
private final EventSchemaProvider schemaProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
package org.apache.streampipes.extensions.connectors.plc.adapter.generic.config;

import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.GenericPlc4xAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.generic.assets.PlcAdapterAssetResolver;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;

import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriver;
Expand Down Expand Up @@ -55,7 +55,7 @@ public IAdapterConfiguration makeConfig(PlcDriver driver,
() -> new GenericPlc4xAdapter(driver, connectionManager)
)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withAssetResolver(
new PlcAdapterAssetResolver("org.apache.streampipes.connect.iiot.adapters.plc4x.generic", appId, driver)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ContinuousPlcRequestReader
extends OneTimePlcRequestReader implements IPullAdapter {
Expand All @@ -51,23 +50,15 @@ public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
@Override
public void pullData() throws RuntimeException {
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
readPlcData(plcConnection);
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
var event = eventGenerator.makeEvent(readResponse);
collector.collect(event);
} catch (Exception e) {
LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e);
}
}

private void readPlcData(PlcConnection plcConnection)
throws ExecutionException, InterruptedException, TimeoutException {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var response = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
var event = eventGenerator.makeEvent(response);
collector.collect(event);
}

@Override
public IPollingSettings getPollingInterval() {
return PollingSettings.from(TimeUnit.MILLISECONDS, settings.pollingInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ public Map<String, Object> readPlcDataSynchronized() throws Exception {
if (!plcConnection.getMetadata().isReadSupported()) {
throw new AdapterException("This PLC does not support reading data");
}
if (!plcConnection.isConnected()) {
plcConnection.connect();
}
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
return eventGenerator.makeEvent(readResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
""";

private final PlcConnectionManager connectionManager;
private final PlcRequestProvider requestProvider;

private PullAdapterScheduler pullAdapterScheduler;

private final PlcRequestProvider requestProvider;

public Plc4xS7Adapter(PlcConnectionManager connectionManager) {
this.requestProvider = new PlcRequestProvider();
this.connectionManager = connectionManager;
Expand Down

0 comments on commit 2b2cdec

Please sign in to comment.