diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java index db11222324..0a58b8b29a 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java @@ -22,16 +22,23 @@ import org.apache.streampipes.model.datalake.AggregationFunction; import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter { + private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class); + private static final String COUNT_FIELD = "count"; - public DataLakeMeasurementCounterInflux(List allMeasurements, - List measurementNames) { + public DataLakeMeasurementCounterInflux( + List allMeasurements, + List measurementNames + ) { super(allMeasurements, measurementNames); } @@ -39,15 +46,21 @@ public DataLakeMeasurementCounterInflux(List allMeasurements, protected CompletableFuture createQueryAsAsyncFuture(DataLakeMeasure measure) { return CompletableFuture.supplyAsync(() -> { var firstColumn = getFirstMeasurementProperty(measure); + if (firstColumn == null) { + LOG.error( + "Could not count events in measurement: {}, because no measurement property was found in event schema", + measure.getMeasureName() + ); + return 0; + } + var builder = DataLakeInfluxQueryBuilder - .create(measure.getMeasureName()).withEndTime(System.currentTimeMillis()) + .create(measure.getMeasureName()) + .withEndTime(System.currentTimeMillis()) .withAggregatedColumn(firstColumn, AggregationFunction.COUNT); var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true); - if (queryResult.getTotal() > 0) { - return extractResult(queryResult, COUNT_FIELD); - } else { - return 0; - } + + return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0; }); } } diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java index 844ce5b573..bc3a551ed7 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java @@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements IDataLakeMeasurement protected final List allMeasurements; protected final List measurementNames; - public DataLakeMeasurementCounter(List allMeasurements, - List measurementNames) { + public DataLakeMeasurementCounter( + List allMeasurements, + List measurementNames + ) { this.allMeasurements = allMeasurements; this.measurementNames = measurementNames; } @@ -52,12 +54,14 @@ public DataLakeMeasurementCounter(List allMeasurements, public Map countMeasurementSizes() { // create async futures so that count queries can be executed parallel - Map> countQueriesFutures = measurementNames.stream() + Map> countQueriesFutures = measurementNames + .stream() .map(this::getMeasure) .filter(Objects::nonNull) .collect(Collectors.toMap( - DataLakeMeasure::getMeasureName, - this::createQueryAsAsyncFuture) + DataLakeMeasure::getMeasureName, + this::createQueryAsAsyncFuture + ) ); return getQueryResults(countQueriesFutures); @@ -72,7 +76,8 @@ public Map countMeasurementSizes() { private DataLakeMeasure getMeasure(String measureName) { return allMeasurements .stream() - .filter(m -> m.getMeasureName().equals(measureName)) + .filter(m -> m.getMeasureName() + .equals(measureName)) .findFirst() .orElse(null); } @@ -83,7 +88,7 @@ private DataLakeMeasure getMeasure(String measureName) { * @param queryFutures A Map containing the futures of * asynchronous count queries mapped by their respective keys. * @return A Map representing the results of the queries, where each key corresponds to - * a measure name and the value is the count result. + * a measure name and the value is the count result. */ private Map getQueryResults(Map> queryFutures) { Map resultPerMeasure = new HashMap<>(); @@ -106,18 +111,34 @@ private Map getQueryResults(Map ep.getPropertyScope() != null - && ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name())) + && ep.getPropertyScope() + .equals(PropertyScope.MEASUREMENT_PROPERTY.name())) .map(EventProperty::getRuntimeName) .findFirst() .orElse(null); + + if (propertyRuntimeName == null) { + LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName()); + } + + return propertyRuntimeName; } protected Integer extractResult(SpQueryResult queryResult, String fieldName) { - return ((Double) ( - queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName))) + return ( + (Double) ( + queryResult.getAllDataSeries() + .get(0) + .getRows() + .get(0) + .get(queryResult.getHeaders() + .indexOf(fieldName)) + ) ).intValue(); } diff --git a/ui/cypress/fixtures/connect/compact/compactTest.csv b/ui/cypress/fixtures/connect/compact/compactTest.csv new file mode 100644 index 0000000000..6cf117f7c1 --- /dev/null +++ b/ui/cypress/fixtures/connect/compact/compactTest.csv @@ -0,0 +1,2 @@ +timestamp;value;temperature +1000;1.0;0.1 diff --git a/ui/cypress/fixtures/connect/compact/fileReplay.yml b/ui/cypress/fixtures/connect/compact/fileReplay.yml new file mode 100644 index 0000000000..3c57795f10 --- /dev/null +++ b/ui/cypress/fixtures/connect/compact/fileReplay.yml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +id: sp:adapterdescription:IfmzfQ +name: File Stream Adapter Test +description: '' +appId: org.apache.streampipes.connect.iiot.protocol.stream.file +configuration: + - filePath: compactTest.csv + - replaceTimestamp: + - '' + - replayOnce: 'no' + - speed: keepOriginalTime + - delimiter: ; + format: CSV + header: + - Header +schema: + timestamp: + description: '' + propertyScope: HEADER_PROPERTY + semanticType: http://schema.org/DateTime +transform: + rename: {} + measurementUnit: {} +createOptions: + persist: true + start: true diff --git a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts index c19e345f8d..57d2466c15 100644 --- a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts +++ b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts @@ -19,6 +19,7 @@ import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils'; import { CompactAdapterUtils } from '../../../support/utils/connect/CompactAdapterUtils'; import { PipelineUtils } from '../../../support/utils/pipeline/PipelineUtils'; +import { FileManagementUtils } from '../../../support/utils/FileManagementUtils'; describe('Add Compact Adapters', () => { beforeEach('Setup Test', () => { @@ -96,4 +97,19 @@ describe('Add Compact Adapters', () => { }, ); }); + + it('Add file stream adapter via the compact API. Start Adapter with Pipeline', () => { + FileManagementUtils.addFile('connect/compact/compactTest.csv'); + + cy.readFile('cypress/fixtures/connect/compact/fileReplay.yml').then( + ymlDescription => { + CompactAdapterUtils.storeCompactYmlAdapter(ymlDescription).then( + () => { + ConnectUtils.validateAdapterIsRunning(); + PipelineUtils.checkAmountOfPipelinesPipeline(1); + }, + ); + }, + ); + }); });