Skip to content

Commit

Permalink
2354 harmonize influx store and influx db client (#3154)
Browse files Browse the repository at this point in the history
* #2354 refactored both InfluxDbClient classes and TimeSeriesStorageInflux

* #2354 cleanup

* #2354 cleanup

* #2354 cleanup

* #2354 cleanup
  • Loading branch information
IsaakKrut authored Aug 27, 2024
1 parent 54b3f3a commit 9a8ef70
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TimeSeriesStorageInflux(
InfluxClientProvider influxClientProvider
) throws SpRuntimeException {
super(measure);
influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
this.influxDb = influxClientProvider.getSetUpInfluxDBClient(environment);
propertyHandler = new PropertyHandler();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,50 @@

public class InfluxClientProvider {

private static final int DEFAULT_BATCH_SIZE = 2000;
private static final int DEFAULT_FLUSH_DURATION = 500;

private static final Logger LOG = LoggerFactory.getLogger(InfluxClientProvider.class);

/**
* Create a new InfluxDB client from Environment and ensures database is available
* @param environment Environment
* @return InfluxDB
*/
public InfluxDB getSetUpInfluxDBClient(Environment environment){
return getSetUpInfluxDBClient(InfluxConnectionSettings.from(environment));
}

/**
* Create a new InfluxDB client from Connection Settings and ensures database is available
* @param settings Connection Settings
* @return InfluxDB
*/
public InfluxDB getSetUpInfluxDBClient(InfluxConnectionSettings settings){
var influxDb = getInitializedInfluxDBClient(settings);
this.setupDatabaseAndBatching(influxDb, settings.getDatabaseName());

return influxDb;
}

/**
* Create a new InfluxDB client from provided settings and verify it's available
* @param settings Connection settings
* @return InfluxDB
*/
public InfluxDB getInitializedInfluxDBClient(InfluxConnectionSettings settings){
var influxDb = InfluxClientProvider.getInfluxDBClient(settings);

// Checking, if server is available
var response = influxDb.ping();
if (response.getVersion()
.equalsIgnoreCase("unknown")) {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl());
}

return influxDb;
}

/**
* Create a new InfluxDB client from environment variables
*
Expand Down Expand Up @@ -68,20 +110,23 @@ public static InfluxDB getInfluxDBClient(InfluxConnectionSettings settings) {
}
}

/**
* Creates the specified database in the influxDb instance if it does not exist. Enables batching with default values
* @param influxDb The InfluxDB client instance
* @param databaseName The name of the database
*/
public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName) {
this.setupDatabaseAndBatching(influxDb, databaseName, DEFAULT_BATCH_SIZE, DEFAULT_FLUSH_DURATION);
}

public InfluxDB getInitializedInfluxDBClient(Environment environment) {

var settings = InfluxConnectionSettings.from(environment);
var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
var databaseName = settings.getDatabaseName();

// Checking, if server is available
var response = influxDb.ping();
if (response.getVersion()
.equalsIgnoreCase("unknown")) {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl());
}

/**
* Creates the specified database in the influxDb instance if it does not exist. Enables batching
* @param influxDb The InfluxDB client instance
* @param databaseName The name of the database
* @param batchSize Batch Size
* @param flushDuration Flush Duration
*/
public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName, int batchSize, int flushDuration) {
// Checking whether the database exists
if (!databaseExists(influxDb, databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
Expand All @@ -90,11 +135,7 @@ public InfluxDB getInitializedInfluxDBClient(Environment environment) {

// setting up the database
influxDb.setDatabase(databaseName);
var batchSize = 2000;
var flushDuration = 500;
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);

return influxDb;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.streampipes.dataexplorer.influx;


import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
Expand Down Expand Up @@ -383,7 +384,7 @@ private TimeSeriesStorageInflux getInfluxStore(EventSchema eventSchema) {
);

var influxClientProviderMock = Mockito.mock(InfluxClientProvider.class);
Mockito.when(influxClientProviderMock.getInitializedInfluxDBClient(ArgumentMatchers.any()))
Mockito.when(influxClientProviderMock.getSetUpInfluxDBClient((Environment) ArgumentMatchers.any()))
.thenReturn(influxDBMock);

return new TimeSeriesStorageInflux(measure, null, influxClientProviderMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void connect() throws AdapterException {

public void disconnect() {
if (connected) {
influxDb.close();
super.disconnect();
connected = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampipes.dataexplorer.influx.client.InfluxConnectionSettings;

import org.influxdb.InfluxDB;
import org.influxdb.dto.Pong;

public abstract class SharedInfluxClient {

Expand All @@ -42,12 +41,14 @@ public SharedInfluxClient(InfluxConnectionSettings connectionSettings,


protected void initClient() throws SpRuntimeException {
this.influxDb = InfluxClientProvider.getInfluxDBClient(connectionSettings);
InfluxClientProvider influxClientProvider = new InfluxClientProvider();
this.influxDb = influxClientProvider.getInitializedInfluxDBClient(connectionSettings);
}

// Checking, if server is available
Pong response = influxDb.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + connectionSettings.getConnectionUrl());
}
/**
* Shuts down the connection to the InfluxDB server
*/
public void disconnect() {
influxDb.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@
import org.apache.streampipes.extensions.connectors.influx.shared.SharedInfluxClient;
import org.apache.streampipes.model.runtime.Event;

import org.influxdb.BatchOptions;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class InfluxDbClient extends SharedInfluxClient {

private static final Logger LOG = LoggerFactory.getLogger(InfluxDbClient.class);

private final String timestampField;
private final Integer batchSize;
Expand Down Expand Up @@ -66,17 +62,8 @@ public class InfluxDbClient extends SharedInfluxClient {
*/
private void connect() throws SpRuntimeException {
super.initClient();
var databaseName = connectionSettings.getDatabaseName();

// Checking whether the database exists
if (!influxClientProvider.databaseExists(influxDb, databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
influxClientProvider.createDatabase(influxDb, databaseName);
}

// setting up the database
influxDb.setDatabase(databaseName);
influxDb.enableBatch(BatchOptions.DEFAULTS.actions(batchSize).flushDuration(flushDuration));
influxClientProvider.setupDatabaseAndBatching(
influxDb, connectionSettings.getDatabaseName(), batchSize, flushDuration);
}

/**
Expand Down Expand Up @@ -107,11 +94,4 @@ void save(Event event) throws SpRuntimeException {

influxDb.write(p.build());
}

/**
* Shuts down the connection to the InfluxDB server
*/
void stop() {
influxDb.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void onEvent(Event event) {

@Override
public void onDetach() throws SpRuntimeException {
influxDbClient.stop();
influxDbClient.disconnect();
}

public static String prepareString(String s) {
Expand Down

0 comments on commit 9a8ef70

Please sign in to comment.