Skip to content

Commit

Permalink
refactor: move influx specific implementations to `DataExplorerInflux…
Browse files Browse the repository at this point in the history
…QueryExecutor` (#2781)

* refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor`

* style: fix indentation

* fix: improve result check
  • Loading branch information
bossenti authored Apr 25, 2024
1 parent c6be90d commit f6dfc87
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,22 @@

package org.apache.streampipes.dataexplorer;

import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.QueryResultProvider;
import org.apache.streampipes.dataexplorer.query.StreamedQueryResultProvider;
import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;

import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {

Expand Down Expand Up @@ -72,10 +61,11 @@ public void getDataAsStream(ProvidedRestQueryParams params,
@Override
public boolean deleteAllData() {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
var queryExecutor = new DataExplorerInfluxQueryExecutor();

for (DataLakeMeasure measure : allMeasurements) {
QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
boolean success = queryExecutor.deleteData(measure);
if (!success) {
return false;
}
}
Expand All @@ -85,14 +75,13 @@ public boolean deleteAllData() {
@Override
public boolean deleteData(String measurementID) {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
for (DataLakeMeasure measure : allMeasurements) {
if (measure.getMeasureName().equals(measurementID)) {
QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();

return !queryResult.hasError();
}
}
return false;
var measureToDeleteOpt = allMeasurements.stream()
.filter(measure -> measure.getMeasureName().equals(measurementID))
.findFirst();

return measureToDeleteOpt.filter(measure -> new DataExplorerInfluxQueryExecutor().deleteData(measure))
.isPresent();
}

@Override
Expand All @@ -105,38 +94,10 @@ public SpQueryResult deleteData(String measurementID, Long startDate, Long endDa
@Override
public Map<String, Object> getTagValues(String measurementId,
String fields) {
InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
Map<String, Object> tags = new HashMap<>();
if (fields != null && !(fields.isEmpty())) {
List<String> fieldList = Arrays.asList(fields.split(","));
fieldList.forEach(f -> {
String q =
"SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
+ "\" WITH KEY = \"" + f + "\"";
Query query = new Query(q);
QueryResult queryResult = influxDB.query(query);
queryResult.getResults().forEach(res -> {
res.getSeries().forEach(series -> {
if (!series.getValues().isEmpty()) {
String field = series.getValues().get(0).get(0).toString();
List<String> values =
series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
tags.put(field, values);
}
});
});
});
}

return tags;
return new DataExplorerInfluxQueryExecutor().getTagValues(measurementId, fields);
}

private List<DataLakeMeasure> getAllMeasurements() {
return this.dataExplorerSchemaManagement.getAllMeasurements();
}

private Environment getEnvironment() {
return Environments.getEnvironment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.SpQueryResult;

Expand All @@ -32,9 +33,15 @@
import org.influxdb.dto.QueryResult;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.streampipes.commons.environment.Environments.getEnvironment;

public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {

Expand Down Expand Up @@ -142,6 +149,43 @@ private Query getQueryWithDatabaseName(Query query) {
}

private String getDatabaseName() {
return Environments.getEnvironment().getTsStorageBucket().getValueOrDefault();
return getEnvironment().getTsStorageBucket().getValueOrDefault();
}

public Map<String, Object> getTagValues(String measurementId, String fields) {
try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
Map<String, Object> tags = new HashMap<>();
if (fields != null && !(fields.isEmpty())) {
List<String> fieldList = Arrays.asList(fields.split(","));
fieldList.forEach(f -> {
String q =
"SHOW TAG VALUES ON \"" + getDatabaseName() + "\" FROM \"" + measurementId
+ "\" WITH KEY = \"" + f + "\"";
Query query = new Query(q);
QueryResult queryResult = influxDB.query(query);
queryResult.getResults().forEach(res -> {
res.getSeries().forEach(series -> {
if (!series.getValues().isEmpty()) {
String field = series.getValues().get(0).get(0).toString();
List<String> values =
series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
tags.put(field, values);
}
});
});
});
}

return tags;
}
}

public boolean deleteData(DataLakeMeasure measure) {
QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();

return !queryResult.hasError() && (queryResult.getResults() == null || queryResult.getResults()
.get(0)
.getError() == null);

}
}

0 comments on commit f6dfc87

Please sign in to comment.