Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into 1960-better-support-mo…
Browse files Browse the repository at this point in the history
…dification-of-existing-adapters
  • Loading branch information
bossenti committed Oct 17, 2023
2 parents 1ea8613 + 7ddebea commit 9ea80a3
Show file tree
Hide file tree
Showing 26 changed files with 987 additions and 774 deletions.
8 changes: 1 addition & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
<httpcore-osgi.version>4.4.9</httpcore-osgi.version>
<immutable-value.version>2.9.3</immutable-value.version>
<influxdb.version>2.23</influxdb.version>
<j2html.version>1.6.0</j2html.version>
<jackson.version>2.15.0</jackson.version>
Expand Down Expand Up @@ -175,7 +174,7 @@
<scala-parser-combinators.version>2.2.0</scala-parser-combinators.version>
<slack-api.version>1.4.0</slack-api.version>
<xerces.version>2.12.2</xerces.version>
<inlong.version>1.7.0</inlong.version>
<inlong.version>1.9.0</inlong.version>

<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven.javadoc.plugin.version>3.1.1</maven.javadoc.plugin.version>
Expand Down Expand Up @@ -1314,11 +1313,6 @@
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<version>${immutable-value.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
Expand Down
44 changes: 22 additions & 22 deletions streampipes-client-python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions streampipes-client-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ flake8 = "6.1.0"
interrogate = { version = "1.5.0", extras = ["png"] }
isort = "5.12.0"
mypy = "1.6.0"
ruff = "0.0.280"
pre-commit = "3.4.0"
ruff = "0.1.0"
pre-commit = "3.5.0"
pytest = "7.4.0"
pytest-cov = "4.1.0"
pyupgrade = "3.15.0"
Expand Down
14 changes: 14 additions & 0 deletions streampipes-client-python/streampipes/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class StreamPipesClient:
dataStreamApi: DataStreamEndpoint
Instance of the data stream endpoint
Raises
------
AttributeError:
In case an invalid configuration of the `StreamPipesClientConfig` is passed
Examples
--------
Expand Down Expand Up @@ -117,6 +122,15 @@ def __init__(
client_config: StreamPipesClientConfig,
logging_level: Optional[int] = logging.INFO,
):
# validate client config
# `https_disabled` and `port` 443 is an invalid configuration
if client_config.https_disabled and client_config.port == 443:
raise AttributeError(
"Invalid configuration passed! The given client configuration has "
"`https_disabled` set to `True` and `port` set to `443`.\n "
"If you want to connect to port 443, use `https_disabled=False` or "
"alternatively connect to port `80`."
)
self.client_config = client_config

# set up a requests session
Expand Down
12 changes: 12 additions & 0 deletions streampipes-client-python/tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def test_client_create(self, server_version: MagicMock):
self.assertTrue(isinstance(result.dataLakeMeasureApi, DataLakeMeasureEndpoint))
self.assertEqual(result.base_api_path, "https://localhost:443/streampipes-backend/")

def test_client_create_invalid_config(self):

with self.assertRaises(AttributeError):
StreamPipesClient.create(
client_config=StreamPipesClientConfig(
credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
host_address="localhost",
https_disabled=True,
port=443,
)
)

@patch("streampipes.client.client.logger", autospec=True)
@patch("streampipes.endpoint.endpoint.APIEndpoint._make_request", autospec=True)
def test_client_describe(self, make_request: MagicMock, mocked_logger: MagicMock):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ public class IfmAlMqttAdapter implements StreamPipesAdapter {

public static final String ID = "org.apache.streampipes.connect.iiot.adapters.iolink";

public static final String KEY_PORT_INFORMATION = "/iolinkmaster/port[%s]/iolinkdevice/pdin";

private static final Logger LOG = LoggerFactory.getLogger(IfmAlMqttAdapter.class);

private static final String PORTS = "ports";
private static final String SENSOR_TYPE = "sensor_type";

private final IParser parser;
private boolean missingEventDataDetected = false;
private boolean missingPortInformationDetected = false;

private MqttConsumer mqttConsumer;
private MqttConfig mqttConfig;

private final IParser parser;

private List<String> ports;

public IfmAlMqttAdapter() {
Expand Down Expand Up @@ -110,25 +114,61 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,

for (int i = 0; i < ports.size(); i++) {

var portResult = getMap(payload,
"/iolinkmaster/port[%s]/iolinkdevice/pdin".formatted(ports.get(i)));
var eventData = (String) portResult.get("data");

var parsedEvent = sensor.parseEvent(eventData);
parsedEvent.put("timestamp", System.currentTimeMillis() + i);
parsedEvent.put("port", "port" + ports.get(i));
parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);

collector.collect(parsedEvent);
String keyPortInformation = KEY_PORT_INFORMATION.formatted(ports.get(i));

Map<String, Object> portResult;
if (payload.containsKey(keyPortInformation)) {

portResult = getMap(payload, keyPortInformation);

try {
String eventData;
if (portResult.containsKey("data")) {
eventData = (String) portResult.get("data");

var parsedEvent = sensor.parseEvent(eventData);
parsedEvent.put("timestamp", System.currentTimeMillis() + i);
parsedEvent.put("port", "port" + ports.get(i));
parsedEvent.put(SensorVVB001.IO_LINK_MASTER_SN, serialnumber);

collector.collect(parsedEvent);
} else {
if (!missingEventDataDetected) {
adapterRuntimeContext
.getLogger()
.warn("Payload for port %s does not contain event data".formatted(i), "");
LOG.error(
"IoLink event does not look like expected. "
+ "No port information found for port {}.", i);
missingEventDataDetected = true;
}
}
} catch (Exception e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("Data from IOLink sensor could not be extracted for port {}: {}", i, e);
}

} else {
if (!missingPortInformationDetected) {
adapterRuntimeContext
.getLogger()
.warn("Event does not contain information about port " + i, "");
LOG.error("IoLink event does not look like expected. No port information found for port {}.", i);
missingPortInformationDetected = true;
}
}
}
});
} catch (Exception e) {
} catch (ParseException e) {
adapterRuntimeContext
.getLogger()
.error(e);
LOG.error("Could not parse event", e);
.getLogger()
.error(e);
LOG.error("IOLink master event could not be parsed.", e);
}
});
}
);

Thread thread = new Thread(this.mqttConsumer);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<version>0.93.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@

public class MergeByTimeProcessor extends StreamPipesDataProcessor {

private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
private static final String NUMBER_MAPPING = "number_mapping";
private static final String TIME_INTERVAL = "time-interval";
protected static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
protected static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
protected static final String NUMBER_MAPPING = "number_mapping";
protected static final String TIME_INTERVAL = "time-interval";

private List<String> outputKeySelectors;
private String timestampFieldStream0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@

public class TextFilterProcessor extends StreamPipesDataProcessor {

private static final String KEYWORD_ID = "keyword";
private static final String OPERATION_ID = "operation";
private static final String MAPPING_PROPERTY_ID = "text";
protected static final String KEYWORD_ID = "keyword";
protected static final String OPERATION_ID = "operation";
protected static final String MAPPING_PROPERTY_ID = "text";

private String keyword;
private StringOperator stringOperator;
Expand Down
Loading

0 comments on commit 9ea80a3

Please sign in to comment.