Skip to content

Commit

Permalink
refactor(#2246): Refactor InfluxStore.onEvent (#2247)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe authored Dec 1, 2023
1 parent 581fd25 commit 709d6e3
Show file tree
Hide file tree
Showing 12 changed files with 1,006 additions and 216 deletions.
12 changes: 11 additions & 1 deletion streampipes-data-explorer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@
<artifactId>streampipes-commons</artifactId>
<version>0.95.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<version>0.95.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Others -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.image.ImageStore;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxStore;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
Expand Down Expand Up @@ -50,7 +51,7 @@ public TimeSeriesStore(Environment environment,
this.imageStore = new ImageStore(measure, environment);
}

this.influxStore = new InfluxStore(measure, environment);
this.influxStore = new InfluxStore(measure, environment, new InfluxClientProvider());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@

import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class InfluxClientProvider {

private static final Logger LOG = LoggerFactory.getLogger(InfluxClientProvider.class);

/**
* Create a new InfluxDB client from environment variables
*
Expand Down Expand Up @@ -58,7 +67,78 @@ public static InfluxDB getInfluxDBClient(InfluxConnectionSettings settings) {
}
}


public InfluxDB getInitializedInfluxDBClient(Environment environment) {

var settings = InfluxConnectionSettings.from(environment);
var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
var databaseName = settings.getDatabaseName();

// Checking, if server is available
var response = influxDb.ping();
if (response.getVersion()
.equalsIgnoreCase("unknown")) {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl());
}

// Checking whether the database exists
if (!databaseExists(influxDb, databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
createDatabase(influxDb, databaseName);
}

// setting up the database
influxDb.setDatabase(databaseName);
var batchSize = 2000;
var flushDuration = 500;
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);

return influxDb;
}

/**
* Creates a new database with the given name
*
* @param influxDb The InfluxDB client
* @param dbName The name of the database which should be created
*/
public void createDatabase(
InfluxDB influxDb,
String dbName
) throws SpRuntimeException {
if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
throw new SpRuntimeException(
"Database name '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
}
influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}

/**
* Checks whether the given database exists.
*
* @param influxDb The InfluxDB client instance
* @param dbName The name of the database, the method should look for
* @return True if the database exists, false otherwise
*/
public boolean databaseExists(
InfluxDB influxDb,
String dbName
) {
var queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
for (List<Object> a : queryResult.getResults()
.get(0)
.getSeries()
.get(0)
.getValues()) {
if (!a.isEmpty() && dbName.equals(a.get(0))) {
return true;
}
}
return false;
}

private static Environment getEnvironment() {
return Environments.getEnvironment();
}

}

This file was deleted.

Loading

0 comments on commit 709d6e3

Please sign in to comment.