diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java index 3e927e9779..07c5058b6c 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java @@ -108,7 +108,7 @@ public Map getTagValues(String measurementId, InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient(); String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault(); Map tags = new HashMap<>(); - if (fields != null && !("".equals(fields))) { + if (fields != null && !(fields.isEmpty())) { List fieldList = Arrays.asList(fields.split(",")); fieldList.forEach(f -> { String q = @@ -118,7 +118,7 @@ public Map getTagValues(String measurementId, QueryResult queryResult = influxDB.query(query); queryResult.getResults().forEach(res -> { res.getSeries().forEach(series -> { - if (series.getValues().size() > 0) { + if (!series.getValues().isEmpty()) { String field = series.getValues().get(0).get(0).toString(); List values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList()); diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCount.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCount.java new file mode 100644 index 0000000000..af8619fef5 --- /dev/null +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCount.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.influx; + +import org.apache.streampipes.dataexplorer.param.model.AggregationFunction; +import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.apache.streampipes.model.datalake.SpQueryResult; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.schema.PropertyScope; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class DataLakeMeasurementCount { + + private final List allMeasurements; + private final List measurementNames; + + private static final String COUNT_FIELD = "count"; + + public DataLakeMeasurementCount(List allMeasurements, + List measurementNames) { + this.allMeasurements = allMeasurements; + this.measurementNames = measurementNames; + } + + public Map countMeasurementSizes() { + Map> futures = measurementNames.stream() + .distinct() + .map(this::getMeasure) + .collect(Collectors.toMap(DataLakeMeasure::getMeasureName, m -> CompletableFuture.supplyAsync(() -> { + var firstColumn = getFirstColumn(m); + var builder = DataLakeInfluxQueryBuilder + .create(m.getMeasureName()).withEndTime(System.currentTimeMillis()) + .withAggregatedColumn(firstColumn, AggregationFunction.COUNT); + var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), true); + if (queryResult.getTotal() > 0) { + var headers = queryResult.getHeaders(); + return extractResult(queryResult, headers); + } else { + return 0; + } + }))); + + Map result = new HashMap<>(); + futures.entrySet().forEach((entry -> { + try { + result.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + result.put(entry.getKey(), 0); + } + })); + + return result; + } + + private Integer extractResult(SpQueryResult queryResult, + List headers) { + return ((Double) ( + queryResult.getAllDataSeries().get(0).getRows().get(0).get(headers.indexOf(COUNT_FIELD))) + ).intValue(); + } + + private DataLakeMeasure getMeasure(String measureName) { + return allMeasurements + .stream() + .filter(m -> m.getMeasureName().equals(measureName)) + .findFirst() + .orElse(null); + } + + private String getFirstColumn(DataLakeMeasure measure) { + return measure.getEventSchema().getEventProperties() + .stream() + .filter(ep -> ep.getPropertyScope() != null + && ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name())) + .map(EventProperty::getRuntimeName) + .findFirst() + .orElse(null); + } +} diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java index 847aa8a5d3..c40f2e56b2 100644 --- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java +++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java @@ -20,6 +20,7 @@ import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement; import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; +import org.apache.streampipes.dataexplorer.influx.DataLakeMeasurementCount; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; @@ -32,9 +33,11 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import java.util.List; import java.util.Objects; @Path("/v4/datalake/measure") @@ -55,6 +58,15 @@ public Response addDataLake(DataLakeMeasure dataLakeMeasure) { return ok(result); } + @GET + @JacksonSerialized + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public Response getDataLakeInfos(@QueryParam("filter") List measurementNames) { + var allMeasurements = this.dataLakeMeasureManagement.getAllMeasurements(); + return ok(new DataLakeMeasurementCount(allMeasurements, measurementNames).countMeasurementSizes()); + } + @GET @JacksonSerialized @Produces(MediaType.APPLICATION_JSON) diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts index e27a8d8471..5dadf7ced0 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts @@ -45,6 +45,16 @@ export class DatalakeRestService { return this.baseUrl + '/api/v4/datalake/measure'; } + getMeasurementEntryCounts( + measurementNameFilter: string[], + ): Observable> { + return this.http + .get(`${this.dataLakeMeasureUrl}`, { + params: { filter: measurementNameFilter }, + }) + .pipe(map(r => r as Record)); + } + getAllMeasurementSeries(): Observable { const url = this.dataLakeUrl + '/measurements/'; return this.http.get(url).pipe( diff --git a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html index d3a02f191b..b202b2955f 100644 --- a/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html +++ b/ui/src/app/configuration/datalake-configuration/datalake-configuration.component.html @@ -92,7 +92,17 @@

data-cy="datalake-number-of-events" *matCellDef="let configurationEntry" > - {{ configurationEntry.events | number }} + Loading + + + {{ configurationEntry.events | number }} + @@ -202,8 +212,9 @@

; @@ -65,8 +60,10 @@ export class DatalakeConfigurationComponent implements OnInit { 'remove', ]; + pageSize = 15; + constructor( - // protected dataLakeRestService: DatalakeRestService, + private http: HttpClient, private datalakeRestService: DatalakeRestService, private dataViewDataExplorerService: DataViewDataExplorerService, private dialogService: DialogService, @@ -94,7 +91,7 @@ export class DatalakeConfigurationComponent implements OnInit { allMeasurements.forEach(measurement => { const entry = new DataLakeConfigurationEntry(); entry.name = measurement.measureName; - + entry.events = -1; inUseMeasurements.forEach(inUseMeasurement => { if ( inUseMeasurement.measureName === @@ -108,32 +105,13 @@ export class DatalakeConfigurationComponent implements OnInit { } } }); - - // get the amount of events from the database - const propertyName = - this.getFirstNoneDimensionProperty( - measurement.eventSchema, - ); - const field: FieldConfig = { - runtimeName: propertyName, - aggregations: ['COUNT'], - selected: true, - numeric: false, - }; - this.datalakeRestService - .getData( - measurement.measureName, - this.buildQ(field), - ) - .subscribe((res: SpQueryResult) => { - // read the count value from the result - entry.events = - res.allDataSeries[0].rows[0][1]; - }); - this.availableMeasurements.push(entry); }); + this.availableMeasurements.sort((a, b) => + a.name.localeCompare(b.name), + ); + this.receiveMeasurementSizes(0); this.dataSource = new MatTableDataSource( this.availableMeasurements, ); @@ -142,21 +120,9 @@ export class DatalakeConfigurationComponent implements OnInit { this.dataSource.sort = this.sort; }); }); - // this.availableMeasurements = response; }); } - private getFirstNoneDimensionProperty(eventSchema: EventSchema): string { - const propertyName = eventSchema.eventProperties.find( - ep => ep.propertyScope !== 'DIMENSION_PROPERTY', - ); - if (!propertyName) { - return '*'; - } else { - return propertyName.runtimeName; - } - } - cleanDatalakeIndex(measurementIndex: string) { const dialogRef: DialogRef = this.dialogService.open(DeleteDatalakeIndexComponent, { @@ -208,9 +174,25 @@ export class DatalakeConfigurationComponent implements OnInit { }); } - private buildQ(column: FieldConfig): DatalakeQueryParameters { - return DatalakeQueryParameterBuilder.create(0, new Date().getTime()) - .withColumnFilter([column], true) - .build(); + onPageChange(event: any) { + this.receiveMeasurementSizes(event.pageIndex); + } + + receiveMeasurementSizes(pageIndex: number) { + const start = pageIndex * this.pageSize; + const end = start + this.pageSize; + const measurements = this.availableMeasurements + .slice(start, end) + .filter(m => m.events === -1) + .map(m => m.name); + this.datalakeRestService + .getMeasurementEntryCounts(measurements) + .subscribe(res => { + this.availableMeasurements.forEach(m => { + if (res[m.name] !== undefined) { + m.events = res[m.name]; + } + }); + }); } }