Skip to content

Commit

Permalink
feat(#2252): enhance data lake sink schema management options (#2263)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe authored Dec 6, 2023
1 parent 4cecb6e commit 306d30d
Show file tree
Hide file tree
Showing 27 changed files with 702 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public TimeSeriesStore(Environment environment,
DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);

if (enableImageStore) {
// TODO check if event properties are replaces correctly
this.imageStore = new ImageStore(measure, environment);
}

Expand All @@ -67,11 +66,6 @@ public boolean onEvent(Event event) throws SpRuntimeException {
return true;
}


public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
return true;
}

public void close() throws SpRuntimeException {
if (imageStore != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public boolean databaseExists(
return false;
}


private static Environment getEnvironment() {
return Environments.getEnvironment();
}


}
12 changes: 11 additions & 1 deletion streampipes-data-explorer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@
<artifactId>streampipes-storage-management</artifactId>
<version>0.95.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<version>0.95.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,103 @@
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyList;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.storage.api.IDataLakeStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.apache.streampipes.storage.management.StorageDispatcher;

import com.google.gson.JsonObject;
import org.lightcouch.CouchDbClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement {

private static final Logger LOG = LoggerFactory.getLogger(DataExplorerSchemaManagement.class);

IDataLakeStorage dataLakeStorage;

public DataExplorerSchemaManagement(IDataLakeStorage dataLakeStorage) {
this.dataLakeStorage = dataLakeStorage;
}

@Override
public List<DataLakeMeasure> getAllMeasurements() {
return DataExplorerUtils.getInfos();
}

@Override
public DataLakeMeasure getById(String elementId) {
return getDataLakeStorage().findOne(elementId);
return dataLakeStorage.findOne(elementId);
}

/**
* For new measurements an entry is generated in the database. For existing measurements the schema is updated
* according to the update strategy defined by the measurement.
*/
@Override
public DataLakeMeasure createMeasurement(DataLakeMeasure measure) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
Optional<DataLakeMeasure> optional =
dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
.findFirst();

if (optional.isPresent()) {
DataLakeMeasure oldEntry = optional.get();
if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
measure.getEventSchema().getEventProperties())) {
oldEntry.setEventSchema(measure.getEventSchema());
oldEntry.setTimestampField(measure.getTimestampField());
oldEntry.setPipelineName(measure.getPipelineName());
getDataLakeStorage().updateDataLakeMeasure(oldEntry);
return oldEntry;
}
public DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure) {

setDefaultUpdateStrategyIfNoneProvided(measure);

var existingMeasure = getExistingMeasureByName(measure.getMeasureName());

if (existingMeasure.isEmpty()) {
setSchemaVersionAndStoreMeasurement(measure);
} else {
measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
getDataLakeStorage().storeDataLakeMeasure(measure);
return measure;
handleExistingMeasurement(measure, existingMeasure.get());
}

return measure;
}

/**
* Destiguishes between the update straregy for existing measurments
*/
private void handleExistingMeasurement(
DataLakeMeasure measure,
DataLakeMeasure existingMeasure
) {
measure.setElementId(existingMeasure.getElementId());
if (DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA.equals(measure.getSchemaUpdateStrategy())) {
// For the update schema strategy the old schema is overwritten with the new one
updateMeasurement(measure);
} else {
// For the extent existing schema strategy the old schema is merged with the new one
unifyEventSchemaAndUpdateMeasure(measure, existingMeasure);
}
}


/**
* Returns the existing measure that has the provided measure name
*/
private Optional<DataLakeMeasure> getExistingMeasureByName(String measureName) {
return dataLakeStorage.getAllDataLakeMeasures()
.stream()
.filter(m -> m.getMeasureName()
.equals(measureName))
.findFirst();
}

private static void setDefaultUpdateStrategyIfNoneProvided(DataLakeMeasure measure) {
if (measure.getSchemaUpdateStrategy() == null) {
measure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA);
}
}

@Override
public void deleteMeasurement(String elementId) {
if (getDataLakeStorage().findOne(elementId) != null) {
getDataLakeStorage().deleteDataLakeMeasure(elementId);
if (dataLakeStorage.findOne(elementId) != null) {
dataLakeStorage.deleteDataLakeMeasure(elementId);
} else {
throw new IllegalArgumentException("Could not find measure with this ID");
}
Expand All @@ -88,12 +127,23 @@ public void deleteMeasurement(String elementId) {
public boolean deleteMeasurementByName(String measureName) {
boolean isSuccess = false;
CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
List<JsonObject> docs = couchDbClient.view("_all_docs")
.includeDocs(true)
.query(JsonObject.class);

for (JsonObject document : docs) {
if (document.get("measureName").toString().replace("\"", "").equals(measureName)) {
couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
document.get("_rev").toString().replace("\"", ""));
if (document.get("measureName")
.toString()
.replace("\"", "")
.equals(measureName)) {
couchDbClient.remove(
document.get("_id")
.toString()
.replace("\"", ""),
document.get("_rev")
.toString()
.replace("\"", "")
);
isSuccess = true;
break;
}
Expand All @@ -102,58 +152,73 @@ public boolean deleteMeasurementByName(String measureName) {
try {
couchDbClient.close();
} catch (IOException e) {
e.printStackTrace();
LOG.error("Could not close CouchDB client", e);
}
return isSuccess;
}

@Override
public void updateMeasurement(DataLakeMeasure measure) {
var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
var existingMeasure = dataLakeStorage.findOne(measure.getElementId());
if (existingMeasure != null) {
measure.setRev(existingMeasure.getRev());
getDataLakeStorage().updateDataLakeMeasure(measure);
dataLakeStorage.updateDataLakeMeasure(measure);
} else {
getDataLakeStorage().storeDataLakeMeasure(measure);
dataLakeStorage.storeDataLakeMeasure(measure);
}
}

private IDataLakeStorage getDataLakeStorage() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
private void setSchemaVersionAndStoreMeasurement(DataLakeMeasure measure) {
measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
dataLakeStorage.storeDataLakeMeasure(measure);
}

private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
if (prop1.size() != prop2.size()) {
return false;
}

return prop1.stream().allMatch(prop -> {

for (EventProperty property : prop2) {
if (prop.getRuntimeName().equals(property.getRuntimeName())) {

//primitive
if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
if (((EventPropertyPrimitive) prop)
.getRuntimeType()
.equals(((EventPropertyPrimitive) property).getRuntimeType())) {
return true;
}

//list
} else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
Collections.singletonList(((EventPropertyList) property).getEventProperty()));

//nested
} else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
((EventPropertyNested) property).getEventProperties());
}
}
}
return false;
/**
* First the event schemas of the measurements are merged and then the measure is updated in the database
*/
private void unifyEventSchemaAndUpdateMeasure(
DataLakeMeasure measure,
DataLakeMeasure existingMeasure
) {
var properties = getUnifiedEventProperties(
existingMeasure,
measure
);

measure
.getEventSchema()
.setEventProperties(properties);

updateMeasurement(measure);
}

});
/**
* Returns the union of the unique event properties of the two measures.
* They are unique by runtime name.
*/
private List<EventProperty> getUnifiedEventProperties(
DataLakeMeasure measure1,
DataLakeMeasure measure2
) {
// Combine the event properties from both measures into a single Stream
var allMeasurementProperties = Stream.concat(
measure1.getEventSchema()
.getEventProperties()
.stream(),
measure2.getEventSchema()
.getEventProperties()
.stream()
);

// Filter event properties by removing duplicate runtime names
// If there are duplicate keys, choose the first occurrence
var unifiedEventProperties = allMeasurementProperties
.collect(Collectors.toMap(
EventProperty::getRuntimeName,
Function.identity(),
(eventProperty, eventProperty2) -> eventProperty
))
.values();
return new ArrayList<>(unifiedEventProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface IDataExplorerSchemaManagement {

DataLakeMeasure getById(String elementId);

DataLakeMeasure createMeasurement(DataLakeMeasure measure);
DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure);

void deleteMeasurement(String elementId);

Expand Down
Loading

0 comments on commit 306d30d

Please sign in to comment.