Skip to content

Commit

Permalink
feat(#2023): Provide REST endpoint to fetch measurement counts (#2033)
Browse files Browse the repository at this point in the history
* feat(#2023): Provide REST endpoint to fetch measurement counts

* Remove logging output
  • Loading branch information
dominikriemer authored Oct 17, 2023
1 parent 7ddebea commit b792afb
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Map<String, Object> getTagValues(String measurementId,
InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
Map<String, Object> tags = new HashMap<>();
if (fields != null && !("".equals(fields))) {
if (fields != null && !(fields.isEmpty())) {
List<String> fieldList = Arrays.asList(fields.split(","));
fieldList.forEach(f -> {
String q =
Expand All @@ -118,7 +118,7 @@ public Map<String, Object> getTagValues(String measurementId,
QueryResult queryResult = influxDB.query(query);
queryResult.getResults().forEach(res -> {
res.getSeries().forEach(series -> {
if (series.getValues().size() > 0) {
if (!series.getValues().isEmpty()) {
String field = series.getValues().get(0).get(0).toString();
List<String> values =
series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.dataexplorer.param.model.AggregationFunction;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.PropertyScope;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class DataLakeMeasurementCount {

private final List<DataLakeMeasure> allMeasurements;
private final List<String> measurementNames;

private static final String COUNT_FIELD = "count";

public DataLakeMeasurementCount(List<DataLakeMeasure> allMeasurements,
List<String> measurementNames) {
this.allMeasurements = allMeasurements;
this.measurementNames = measurementNames;
}

public Map<String, Integer> countMeasurementSizes() {
Map<String, CompletableFuture<Integer>> futures = measurementNames.stream()
.distinct()
.map(this::getMeasure)
.collect(Collectors.toMap(DataLakeMeasure::getMeasureName, m -> CompletableFuture.supplyAsync(() -> {
var firstColumn = getFirstColumn(m);
var builder = DataLakeInfluxQueryBuilder
.create(m.getMeasureName()).withEndTime(System.currentTimeMillis())
.withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), true);
if (queryResult.getTotal() > 0) {
var headers = queryResult.getHeaders();
return extractResult(queryResult, headers);
} else {
return 0;
}
})));

Map<String, Integer> result = new HashMap<>();
futures.entrySet().forEach((entry -> {
try {
result.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
result.put(entry.getKey(), 0);
}
}));

return result;
}

private Integer extractResult(SpQueryResult queryResult,
List<String> headers) {
return ((Double) (
queryResult.getAllDataSeries().get(0).getRows().get(0).get(headers.indexOf(COUNT_FIELD)))
).intValue();
}

private DataLakeMeasure getMeasure(String measureName) {
return allMeasurements
.stream()
.filter(m -> m.getMeasureName().equals(measureName))
.findFirst()
.orElse(null);
}

private String getFirstColumn(DataLakeMeasure measure) {
return measure.getEventSchema().getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope() != null
&& ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.findFirst()
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.influx.DataLakeMeasurementCount;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
Expand All @@ -32,9 +33,11 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import java.util.List;
import java.util.Objects;

@Path("/v4/datalake/measure")
Expand All @@ -55,6 +58,15 @@ public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
return ok(result);
}

@GET
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response getDataLakeInfos(@QueryParam("filter") List<String> measurementNames) {
var allMeasurements = this.dataLakeMeasureManagement.getAllMeasurements();
return ok(new DataLakeMeasurementCount(allMeasurements, measurementNames).countMeasurementSizes());
}

@GET
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ export class DatalakeRestService {
return this.baseUrl + '/api/v4/datalake/measure';
}

getMeasurementEntryCounts(
measurementNameFilter: string[],
): Observable<Record<string, number>> {
return this.http
.get(`${this.dataLakeMeasureUrl}`, {
params: { filter: measurementNameFilter },
})
.pipe(map(r => r as Record<string, number>));
}

getAllMeasurementSeries(): Observable<DataLakeMeasure[]> {
const url = this.dataLakeUrl + '/measurements/';
return this.http.get(url).pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,17 @@ <h4 style="margin-bottom: 0px">
data-cy="datalake-number-of-events"
*matCellDef="let configurationEntry"
>
{{ configurationEntry.events | number }}
<mat-spinner
[diameter]="20"
fxLayoutAlign="center"
style="margin: 10px 0 5px 0"
color="accent"
*ngIf="configurationEntry.events < 0"
>Loading
</mat-spinner>
<span *ngIf="configurationEntry.events >= 0">
{{ configurationEntry.events | number }}
</span>
</td>
</ng-container>

Expand Down Expand Up @@ -202,8 +212,9 @@ <h4 style="margin-bottom: 0px">
</div>
<div fxFlex="100" fxLayoutAlign="end end">
<mat-paginator
[pageSizeOptions]="[5, 10, 25, 100]"
[pageSize]="20"
[pageSizeOptions]="[pageSize]"
[pageSize]="pageSize"
(page)="onPageChange($event)"
></mat-paginator>
</div>
<div
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@ import { Component, OnInit, ViewChild } from '@angular/core';
import { MatTableDataSource } from '@angular/material/table';
import { DataLakeConfigurationEntry } from './datalake-configuration-entry';
import {
DatalakeQueryParameterBuilder,
DatalakeQueryParameters,
DatalakeRestService,
DataViewDataExplorerService,
EventSchema,
FieldConfig,
SpQueryResult,
} from '@streampipes/platform-services';
import { MatPaginator } from '@angular/material/paginator';
import { MatSort } from '@angular/material/sort';
Expand All @@ -40,6 +35,7 @@ import { DeleteDatalakeIndexComponent } from '../dialog/delete-datalake-index/de
import { SpConfigurationTabs } from '../configuration-tabs';
import { SpConfigurationRoutes } from '../configuration.routes';
import { DataDownloadDialogComponent } from '../../core-ui/data-download-dialog/data-download-dialog.component';
import { HttpClient } from '@angular/common/http';

@Component({
selector: 'sp-datalake-configuration',
Expand All @@ -50,7 +46,6 @@ export class DatalakeConfigurationComponent implements OnInit {
tabs = SpConfigurationTabs.getTabs();

@ViewChild(MatPaginator) paginator: MatPaginator;
pageSize = 1;
@ViewChild(MatSort) sort: MatSort;

dataSource: MatTableDataSource<DataLakeConfigurationEntry>;
Expand All @@ -65,8 +60,10 @@ export class DatalakeConfigurationComponent implements OnInit {
'remove',
];

pageSize = 15;

constructor(
// protected dataLakeRestService: DatalakeRestService,
private http: HttpClient,
private datalakeRestService: DatalakeRestService,
private dataViewDataExplorerService: DataViewDataExplorerService,
private dialogService: DialogService,
Expand Down Expand Up @@ -94,7 +91,7 @@ export class DatalakeConfigurationComponent implements OnInit {
allMeasurements.forEach(measurement => {
const entry = new DataLakeConfigurationEntry();
entry.name = measurement.measureName;

entry.events = -1;
inUseMeasurements.forEach(inUseMeasurement => {
if (
inUseMeasurement.measureName ===
Expand All @@ -108,32 +105,13 @@ export class DatalakeConfigurationComponent implements OnInit {
}
}
});

// get the amount of events from the database
const propertyName =
this.getFirstNoneDimensionProperty(
measurement.eventSchema,
);
const field: FieldConfig = {
runtimeName: propertyName,
aggregations: ['COUNT'],
selected: true,
numeric: false,
};
this.datalakeRestService
.getData(
measurement.measureName,
this.buildQ(field),
)
.subscribe((res: SpQueryResult) => {
// read the count value from the result
entry.events =
res.allDataSeries[0].rows[0][1];
});

this.availableMeasurements.push(entry);
});

this.availableMeasurements.sort((a, b) =>
a.name.localeCompare(b.name),
);
this.receiveMeasurementSizes(0);
this.dataSource = new MatTableDataSource(
this.availableMeasurements,
);
Expand All @@ -142,21 +120,9 @@ export class DatalakeConfigurationComponent implements OnInit {
this.dataSource.sort = this.sort;
});
});
// this.availableMeasurements = response;
});
}

private getFirstNoneDimensionProperty(eventSchema: EventSchema): string {
const propertyName = eventSchema.eventProperties.find(
ep => ep.propertyScope !== 'DIMENSION_PROPERTY',
);
if (!propertyName) {
return '*';
} else {
return propertyName.runtimeName;
}
}

cleanDatalakeIndex(measurementIndex: string) {
const dialogRef: DialogRef<DeleteDatalakeIndexComponent> =
this.dialogService.open(DeleteDatalakeIndexComponent, {
Expand Down Expand Up @@ -208,9 +174,25 @@ export class DatalakeConfigurationComponent implements OnInit {
});
}

private buildQ(column: FieldConfig): DatalakeQueryParameters {
return DatalakeQueryParameterBuilder.create(0, new Date().getTime())
.withColumnFilter([column], true)
.build();
onPageChange(event: any) {
this.receiveMeasurementSizes(event.pageIndex);
}

receiveMeasurementSizes(pageIndex: number) {
const start = pageIndex * this.pageSize;
const end = start + this.pageSize;
const measurements = this.availableMeasurements
.slice(start, end)
.filter(m => m.events === -1)
.map(m => m.name);
this.datalakeRestService
.getMeasurementEntryCounts(measurements)
.subscribe(res => {
this.availableMeasurements.forEach(m => {
if (res[m.name] !== undefined) {
m.events = res[m.name];
}
});
});
}
}

0 comments on commit b792afb

Please sign in to comment.