Skip to content

Commit

Permalink
fix(#3325): compact adapters do not add a default property scope (#3328)
Browse files Browse the repository at this point in the history
* fix(#3325): Add better logging if count of events in measurement fails

* fix(#3325): Add a test to validate file stream adapter via compact API
  • Loading branch information
tenthe authored Nov 12, 2024
1 parent 7349246 commit 7105d35
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,45 @@
import org.apache.streampipes.model.datalake.AggregationFunction;
import org.apache.streampipes.model.datalake.DataLakeMeasure;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter {

private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);

private static final String COUNT_FIELD = "count";

public DataLakeMeasurementCounterInflux(List<DataLakeMeasure> allMeasurements,
List<String> measurementNames) {
public DataLakeMeasurementCounterInflux(
List<DataLakeMeasure> allMeasurements,
List<String> measurementNames
) {
super(allMeasurements, measurementNames);
}

@Override
protected CompletableFuture<Integer> createQueryAsAsyncFuture(DataLakeMeasure measure) {
return CompletableFuture.supplyAsync(() -> {
var firstColumn = getFirstMeasurementProperty(measure);
if (firstColumn == null) {
LOG.error(
"Could not count events in measurement: {}, because no measurement property was found in event schema",
measure.getMeasureName()
);
return 0;
}

var builder = DataLakeInfluxQueryBuilder
.create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
.create(measure.getMeasureName())
.withEndTime(System.currentTimeMillis())
.withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
if (queryResult.getTotal() > 0) {
return extractResult(queryResult, COUNT_FIELD);
} else {
return 0;
}

return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements IDataLakeMeasurement
protected final List<DataLakeMeasure> allMeasurements;
protected final List<String> measurementNames;

public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
List<String> measurementNames) {
public DataLakeMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementNames
) {
this.allMeasurements = allMeasurements;
this.measurementNames = measurementNames;
}
Expand All @@ -52,12 +54,14 @@ public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
public Map<String, Integer> countMeasurementSizes() {

// create async futures so that count queries can be executed parallel
Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames.stream()
Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames
.stream()
.map(this::getMeasure)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
DataLakeMeasure::getMeasureName,
this::createQueryAsAsyncFuture)
DataLakeMeasure::getMeasureName,
this::createQueryAsAsyncFuture
)
);

return getQueryResults(countQueriesFutures);
Expand All @@ -72,7 +76,8 @@ public Map<String, Integer> countMeasurementSizes() {
private DataLakeMeasure getMeasure(String measureName) {
return allMeasurements
.stream()
.filter(m -> m.getMeasureName().equals(measureName))
.filter(m -> m.getMeasureName()
.equals(measureName))
.findFirst()
.orElse(null);
}
Expand All @@ -83,7 +88,7 @@ private DataLakeMeasure getMeasure(String measureName) {
* @param queryFutures A Map containing the futures of
* asynchronous count queries mapped by their respective keys.
* @return A Map representing the results of the queries, where each key corresponds to
* a measure name and the value is the count result.
* a measure name and the value is the count result.
*/
private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integer>> queryFutures) {
Map<String, Integer> resultPerMeasure = new HashMap<>();
Expand All @@ -106,18 +111,34 @@ private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integ
* @return The runtime name of the first measurement property, or null if no such property is found.
*/
protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
return measure.getEventSchema().getEventProperties()
var propertyRuntimeName = measure
.getEventSchema()
.getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope() != null
&& ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
&& ep.getPropertyScope()
.equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.findFirst()
.orElse(null);

if (propertyRuntimeName == null) {
LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName());
}

return propertyRuntimeName;
}

protected Integer extractResult(SpQueryResult queryResult, String fieldName) {
return ((Double) (
queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
return (
(Double) (
queryResult.getAllDataSeries()
.get(0)
.getRows()
.get(0)
.get(queryResult.getHeaders()
.indexOf(fieldName))
)
).intValue();
}

Expand Down
2 changes: 2 additions & 0 deletions ui/cypress/fixtures/connect/compact/compactTest.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp;value;temperature
1000;1.0;0.1
40 changes: 40 additions & 0 deletions ui/cypress/fixtures/connect/compact/fileReplay.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

id: sp:adapterdescription:IfmzfQ
name: File Stream Adapter Test
description: ''
appId: org.apache.streampipes.connect.iiot.protocol.stream.file
configuration:
- filePath: compactTest.csv
- replaceTimestamp:
- ''
- replayOnce: 'no'
- speed: keepOriginalTime
- delimiter: ;
format: CSV
header:
- Header
schema:
timestamp:
description: ''
propertyScope: HEADER_PROPERTY
semanticType: http://schema.org/DateTime
transform:
rename: {}
measurementUnit: {}
createOptions:
persist: true
start: true
16 changes: 16 additions & 0 deletions ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
import { CompactAdapterUtils } from '../../../support/utils/connect/CompactAdapterUtils';
import { PipelineUtils } from '../../../support/utils/pipeline/PipelineUtils';
import { FileManagementUtils } from '../../../support/utils/FileManagementUtils';

describe('Add Compact Adapters', () => {
beforeEach('Setup Test', () => {
Expand Down Expand Up @@ -96,4 +97,19 @@ describe('Add Compact Adapters', () => {
},
);
});

it('Add file stream adapter via the compact API. Start Adapter with Pipeline', () => {
FileManagementUtils.addFile('connect/compact/compactTest.csv');

cy.readFile('cypress/fixtures/connect/compact/fileReplay.yml').then(
ymlDescription => {
CompactAdapterUtils.storeCompactYmlAdapter(ymlDescription).then(
() => {
ConnectUtils.validateAdapterIsRunning();
PipelineUtils.checkAmountOfPipelinesPipeline(1);
},
);
},
);
});
});

0 comments on commit 7105d35

Please sign in to comment.