From 306d30dcde22ea8a262ae8aef5dbe1e96448ac88 Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Wed, 6 Dec 2023 12:16:47 +0100 Subject: [PATCH] feat(#2252): enhance data lake sink schema management options (#2263) --- .../dataexplorer/commons/TimeSeriesStore.java | 6 - .../commons/influx/InfluxClientProvider.java | 2 + streampipes-data-explorer/pom.xml | 12 +- .../DataExplorerSchemaManagement.java | 203 +++++++++++------ .../api/IDataExplorerSchemaManagement.java | 2 +- .../query/AutoAggregationHandler.java | 35 ++- .../DataExplorerSchemaManagementTest.java | 204 ++++++++++++++++++ .../influx/sink/InfluxDbClient.java | 17 +- .../streampipes-sinks-internal-jvm/pom.xml | 11 + .../InternalSinksExtensionModuleExports.java | 3 +- .../internal/jvm/datalake/DataLakeSink.java | 56 +++-- .../migration/DataLakeSinkMigrationV1.java | 79 +++++++ .../documentation.md | 22 +- .../strings.en | 3 + .../DataLakeSinkMigrationV1Test.java | 62 ++++++ .../model/datalake/DataLakeMeasure.java | 10 + .../DataLakeMeasureSchemaUpdateStrategy.java | 34 +++ .../ps/DataLakeMeasureResourceV4.java | 14 +- .../streampipes/ps/DataLakeResourceV4.java | 16 +- .../migration/DataSinkMigrationResource.java | 4 + .../streampipes/rest/ResetManagement.java | 8 +- .../lib/model/gen/streampipes-model-client.ts | 2 +- .../src/lib/model/gen/streampipes-model.ts | 14 +- .../adapter-started-dialog.component.ts | 4 + .../template/PipelineInvocationBuilder.ts | 20 ++ .../save-pipeline/save-pipeline.component.ts | 1 - ui/src/app/editor/services/jsplumb.service.ts | 1 - 27 files changed, 702 insertions(+), 143 deletions(-) create mode 100644 streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java create mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1.java create mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1Test.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasureSchemaUpdateStrategy.java diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java index 31ce5cbf34..4bc6628696 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java @@ -47,7 +47,6 @@ public TimeSeriesStore(Environment environment, DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure); if (enableImageStore) { - // TODO check if event properties are replaces correctly this.imageStore = new ImageStore(measure, environment); } @@ -67,11 +66,6 @@ public boolean onEvent(Event event) throws SpRuntimeException { return true; } - - public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) { - return true; - } - public void close() throws SpRuntimeException { if (imageStore != null) { try { diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java index 9a9e7a1273..a95ecfac7e 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java @@ -137,8 +137,10 @@ public boolean databaseExists( return false; } + private static Environment getEnvironment() { return Environments.getEnvironment(); } + } diff --git a/streampipes-data-explorer/pom.xml b/streampipes-data-explorer/pom.xml index c68a436d7d..2b18b0790e 100644 --- a/streampipes-data-explorer/pom.xml +++ b/streampipes-data-explorer/pom.xml @@ -48,13 +48,23 @@ streampipes-storage-management 0.95.0-SNAPSHOT + + org.apache.streampipes + streampipes-test-utils + 0.95.0-SNAPSHOT + test + junit junit test - + + org.mockito + mockito-core + test + org.influxdb influxdb-java diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java index 8c4dc4a344..7db5bad83b 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java @@ -21,24 +21,34 @@ import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils; import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy; import org.apache.streampipes.model.schema.EventProperty; -import org.apache.streampipes.model.schema.EventPropertyList; -import org.apache.streampipes.model.schema.EventPropertyNested; -import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.storage.api.IDataLakeStorage; import org.apache.streampipes.storage.couchdb.utils.Utils; -import org.apache.streampipes.storage.management.StorageDispatcher; import com.google.gson.JsonObject; import org.lightcouch.CouchDbClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement { + private static final Logger LOG = LoggerFactory.getLogger(DataExplorerSchemaManagement.class); + + IDataLakeStorage dataLakeStorage; + + public DataExplorerSchemaManagement(IDataLakeStorage dataLakeStorage) { + this.dataLakeStorage = dataLakeStorage; + } + @Override public List getAllMeasurements() { return DataExplorerUtils.getInfos(); @@ -46,39 +56,68 @@ public List getAllMeasurements() { @Override public DataLakeMeasure getById(String elementId) { - return getDataLakeStorage().findOne(elementId); + return dataLakeStorage.findOne(elementId); } + /** + * For new measurements an entry is generated in the database. For existing measurements the schema is updated + * according to the update strategy defined by the measurement. + */ @Override - public DataLakeMeasure createMeasurement(DataLakeMeasure measure) { - List dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures(); - Optional optional = - dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())) - .findFirst(); - - if (optional.isPresent()) { - DataLakeMeasure oldEntry = optional.get(); - if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), - measure.getEventSchema().getEventProperties())) { - oldEntry.setEventSchema(measure.getEventSchema()); - oldEntry.setTimestampField(measure.getTimestampField()); - oldEntry.setPipelineName(measure.getPipelineName()); - getDataLakeStorage().updateDataLakeMeasure(oldEntry); - return oldEntry; - } + public DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure) { + + setDefaultUpdateStrategyIfNoneProvided(measure); + + var existingMeasure = getExistingMeasureByName(measure.getMeasureName()); + + if (existingMeasure.isEmpty()) { + setSchemaVersionAndStoreMeasurement(measure); } else { - measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION); - getDataLakeStorage().storeDataLakeMeasure(measure); - return measure; + handleExistingMeasurement(measure, existingMeasure.get()); } return measure; } + /** + * Destiguishes between the update straregy for existing measurments + */ + private void handleExistingMeasurement( + DataLakeMeasure measure, + DataLakeMeasure existingMeasure + ) { + measure.setElementId(existingMeasure.getElementId()); + if (DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA.equals(measure.getSchemaUpdateStrategy())) { + // For the update schema strategy the old schema is overwritten with the new one + updateMeasurement(measure); + } else { + // For the extent existing schema strategy the old schema is merged with the new one + unifyEventSchemaAndUpdateMeasure(measure, existingMeasure); + } + } + + + /** + * Returns the existing measure that has the provided measure name + */ + private Optional getExistingMeasureByName(String measureName) { + return dataLakeStorage.getAllDataLakeMeasures() + .stream() + .filter(m -> m.getMeasureName() + .equals(measureName)) + .findFirst(); + } + + private static void setDefaultUpdateStrategyIfNoneProvided(DataLakeMeasure measure) { + if (measure.getSchemaUpdateStrategy() == null) { + measure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA); + } + } + @Override public void deleteMeasurement(String elementId) { - if (getDataLakeStorage().findOne(elementId) != null) { - getDataLakeStorage().deleteDataLakeMeasure(elementId); + if (dataLakeStorage.findOne(elementId) != null) { + dataLakeStorage.deleteDataLakeMeasure(elementId); } else { throw new IllegalArgumentException("Could not find measure with this ID"); } @@ -88,12 +127,23 @@ public void deleteMeasurement(String elementId) { public boolean deleteMeasurementByName(String measureName) { boolean isSuccess = false; CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient(); - List docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class); + List docs = couchDbClient.view("_all_docs") + .includeDocs(true) + .query(JsonObject.class); for (JsonObject document : docs) { - if (document.get("measureName").toString().replace("\"", "").equals(measureName)) { - couchDbClient.remove(document.get("_id").toString().replace("\"", ""), - document.get("_rev").toString().replace("\"", "")); + if (document.get("measureName") + .toString() + .replace("\"", "") + .equals(measureName)) { + couchDbClient.remove( + document.get("_id") + .toString() + .replace("\"", ""), + document.get("_rev") + .toString() + .replace("\"", "") + ); isSuccess = true; break; } @@ -102,58 +152,73 @@ public boolean deleteMeasurementByName(String measureName) { try { couchDbClient.close(); } catch (IOException e) { - e.printStackTrace(); + LOG.error("Could not close CouchDB client", e); } return isSuccess; } @Override public void updateMeasurement(DataLakeMeasure measure) { - var existingMeasure = getDataLakeStorage().findOne(measure.getElementId()); + var existingMeasure = dataLakeStorage.findOne(measure.getElementId()); if (existingMeasure != null) { measure.setRev(existingMeasure.getRev()); - getDataLakeStorage().updateDataLakeMeasure(measure); + dataLakeStorage.updateDataLakeMeasure(measure); } else { - getDataLakeStorage().storeDataLakeMeasure(measure); + dataLakeStorage.storeDataLakeMeasure(measure); } } - private IDataLakeStorage getDataLakeStorage() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage(); + private void setSchemaVersionAndStoreMeasurement(DataLakeMeasure measure) { + measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION); + dataLakeStorage.storeDataLakeMeasure(measure); } - private boolean compareEventProperties(List prop1, List prop2) { - if (prop1.size() != prop2.size()) { - return false; - } - - return prop1.stream().allMatch(prop -> { - - for (EventProperty property : prop2) { - if (prop.getRuntimeName().equals(property.getRuntimeName())) { - - //primitive - if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) { - if (((EventPropertyPrimitive) prop) - .getRuntimeType() - .equals(((EventPropertyPrimitive) property).getRuntimeType())) { - return true; - } - - //list - } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) { - return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()), - Collections.singletonList(((EventPropertyList) property).getEventProperty())); - - //nested - } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) { - return compareEventProperties(((EventPropertyNested) prop).getEventProperties(), - ((EventPropertyNested) property).getEventProperties()); - } - } - } - return false; + /** + * First the event schemas of the measurements are merged and then the measure is updated in the database + */ + private void unifyEventSchemaAndUpdateMeasure( + DataLakeMeasure measure, + DataLakeMeasure existingMeasure + ) { + var properties = getUnifiedEventProperties( + existingMeasure, + measure + ); + + measure + .getEventSchema() + .setEventProperties(properties); + + updateMeasurement(measure); + } - }); + /** + * Returns the union of the unique event properties of the two measures. + * They are unique by runtime name. + */ + private List getUnifiedEventProperties( + DataLakeMeasure measure1, + DataLakeMeasure measure2 + ) { +// Combine the event properties from both measures into a single Stream + var allMeasurementProperties = Stream.concat( + measure1.getEventSchema() + .getEventProperties() + .stream(), + measure2.getEventSchema() + .getEventProperties() + .stream() + ); + + // Filter event properties by removing duplicate runtime names + // If there are duplicate keys, choose the first occurrence + var unifiedEventProperties = allMeasurementProperties + .collect(Collectors.toMap( + EventProperty::getRuntimeName, + Function.identity(), + (eventProperty, eventProperty2) -> eventProperty + )) + .values(); + return new ArrayList<>(unifiedEventProperties); } } diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java index d8dc29648a..a210b515b3 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java @@ -28,7 +28,7 @@ public interface IDataExplorerSchemaManagement { DataLakeMeasure getById(String elementId); - DataLakeMeasure createMeasurement(DataLakeMeasure measure); + DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure); void deleteMeasurement(String elementId); diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java index 1bff44a1d7..68bf23171a 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java @@ -24,6 +24,7 @@ import org.apache.streampipes.dataexplorer.param.model.SelectColumn; import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering; import org.apache.streampipes.model.datalake.SpQueryResult; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,10 @@ public AutoAggregationHandler(ProvidedRestQueryParams params) { } private IDataExplorerQueryManagement getDataLakeQueryManagement() { - return new DataExplorerQueryManagement(new DataExplorerSchemaManagement()); + var dataLakeStorage = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getDataLakeStorage(); + return new DataExplorerQueryManagement(new DataExplorerSchemaManagement(dataLakeStorage)); } public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException { @@ -106,7 +110,13 @@ public Integer getCount(String fieldName) { SpQueryResult result = dataLakeQueryManagement.getData(countParams, true); - return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0; + return result.getTotal() > 0 ? ( + (Double) result.getAllDataSeries() + .get(0) + .getRows() + .get(0) + .get(1) + ).intValue() : 0; } private SpQueryResult fireQuery(ProvidedRestQueryParams params) { @@ -116,7 +126,8 @@ private SpQueryResult fireQuery(ProvidedRestQueryParams params) { private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException { long timerange = extractTimestamp(newest) - extractTimestamp(oldest); double v = timerange / MAX_RETURN_LIMIT; - return Double.valueOf(v).intValue(); + return Double.valueOf(v) + .intValue(); } private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseException { @@ -131,8 +142,12 @@ private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseE private String transformColumns(String rawQuery) { List columns = - Arrays.stream(rawQuery.split(COMMA)).map(SelectColumn::fromApiQueryString).collect(Collectors.toList()); - return columns.stream().map(SelectColumn::getOriginalField).collect(Collectors.joining(COMMA)); + Arrays.stream(rawQuery.split(COMMA)) + .map(SelectColumn::fromApiQueryString) + .collect(Collectors.toList()); + return columns.stream() + .map(SelectColumn::getOriginalField) + .collect(Collectors.joining(COMMA)); } private String getSampleField(SpQueryResult result) { @@ -145,8 +160,14 @@ private String getSampleField(SpQueryResult result) { } private long extractTimestamp(SpQueryResult result) throws ParseException { - int timestampIndex = result.getHeaders().indexOf(TIMESTAMP_FIELD); - return tryParseDate(result.getAllDataSeries().get(0).getRows().get(0).get(timestampIndex).toString()).getTime(); + int timestampIndex = result.getHeaders() + .indexOf(TIMESTAMP_FIELD); + return tryParseDate(result.getAllDataSeries() + .get(0) + .getRows() + .get(0) + .get(timestampIndex) + .toString()).getTime(); } private Date tryParseDate(String v) throws ParseException { diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java new file mode 100644 index 0000000000..baa6053b99 --- /dev/null +++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagementTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer; + +import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.storage.api.IDataLakeStorage; +import org.apache.streampipes.test.generator.EventPropertyPrimitiveTestBuilder; +import org.apache.streampipes.test.generator.EventSchemaTestBuilder; +import org.apache.streampipes.vocabulary.XSD; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.URI; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DataExplorerSchemaManagementTest { + public static final String NEW_PROPERTY = "newProperty"; + public static final String OLD_PROPERTY = "oldProperty"; + + private IDataLakeStorage dataLakeStorageMock; + + @Before + public void setUp() { + dataLakeStorageMock = mock(IDataLakeStorage.class); + } + + @Test + public void createMeasurementThatNotExisted() { + when(dataLakeStorageMock.getAllDataLakeMeasures()).thenReturn(List.of()); + when(dataLakeStorageMock.storeDataLakeMeasure(any())).thenReturn(true); + var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); + + var oldMeasure = getSampleMeasure( + DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA, + List.of() + ); + var resultingMeasure = schemaManagement.createOrUpdateMeasurement(oldMeasure); + + assertEquals(oldMeasure.getMeasureName(), resultingMeasure.getMeasureName()); + verify(dataLakeStorageMock, Mockito.times(1)) + .storeDataLakeMeasure(any()); + } + + + @Test + public void createMeasurementWithUpdateStrategy() { + + var oldMeasure = getSampleMeasure( + DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA, + List.of( + getEventProperty(OLD_PROPERTY, XSD.STRING) + ) + ); + + when(dataLakeStorageMock.getAllDataLakeMeasures()).thenReturn(List.of(oldMeasure)); + when(dataLakeStorageMock.findOne(any())).thenReturn(oldMeasure); + var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); + + var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA); + + var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + + assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); + verify(dataLakeStorageMock, Mockito.times(1)) + .updateDataLakeMeasure(any()); + assertFalse(containsPropertyWithName(resultMeasure, OLD_PROPERTY)); + assertTrue(containsPropertyWithName(resultMeasure, NEW_PROPERTY)); + + } + + + @Test + public void createMeasurementWithExtendSchemaStrategy() { + + var oldMeasure = getSampleMeasure( + DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA, + List.of( + getEventProperty(OLD_PROPERTY, XSD.STRING) + ) + ); + when(dataLakeStorageMock.getAllDataLakeMeasures()).thenReturn(List.of(oldMeasure)); + when(dataLakeStorageMock.findOne(any())).thenReturn(oldMeasure); + var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); + var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA); + + var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + + assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); + verify(dataLakeStorageMock, Mockito.times(1)).updateDataLakeMeasure(any()); + assertTrue(containsPropertyWithName(resultMeasure, OLD_PROPERTY)); + assertTrue(containsPropertyWithName(resultMeasure, NEW_PROPERTY)); + } + + + @Test + public void createMeasurementWithExtendSchemaStrategyAndDifferentPropertyTypes() { + var oldMeasure = getSampleMeasure( + DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA, + List.of( + getEventProperty(OLD_PROPERTY, XSD.STRING), + getEventProperty(NEW_PROPERTY, XSD.INTEGER) + ) + ); + + when(dataLakeStorageMock.getAllDataLakeMeasures()).thenReturn(List.of(oldMeasure)); + when(dataLakeStorageMock.findOne(any())).thenReturn(oldMeasure); + + var schemaManagement = new DataExplorerSchemaManagement(dataLakeStorageMock); + + var newMeasure = getNewMeasure(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA); + + var resultMeasure = schemaManagement.createOrUpdateMeasurement(newMeasure); + assertEquals(newMeasure.getMeasureName(), resultMeasure.getMeasureName()); + verify(dataLakeStorageMock, Mockito.times(1)).updateDataLakeMeasure(any()); + assertEquals( + 2, + resultMeasure.getEventSchema() + .getEventProperties() + .size() + ); + assertTrue(containsPropertyWithName(resultMeasure, OLD_PROPERTY)); + assertTrue(containsPropertyWithName(resultMeasure, NEW_PROPERTY)); + } + + private EventProperty getEventProperty( + String runtimeName, + URI runtimeType + ) { + return EventPropertyPrimitiveTestBuilder + .create() + .withRuntimeName(runtimeName) + .withRuntimeType(runtimeType) + .build(); + } + + private DataLakeMeasure getNewMeasure(DataLakeMeasureSchemaUpdateStrategy updateStrategy) { + return getSampleMeasure( + updateStrategy, + List.of(getEventProperty(NEW_PROPERTY, XSD.STRING)) + ); + } + + private DataLakeMeasure getSampleMeasure( + DataLakeMeasureSchemaUpdateStrategy updateStrategy, + List eventProperties + ) { + var measure = new DataLakeMeasure(); + measure.setMeasureName("testMeasure"); + measure.setSchemaUpdateStrategy(updateStrategy); + + measure.setEventSchema( + EventSchemaTestBuilder + .create() + .withEventProperties( + eventProperties + ) + .build() + ); + + return measure; + } + + private boolean containsPropertyWithName( + DataLakeMeasure measure, + String runtimeName + ) { + return measure + .getEventSchema() + .getEventProperties() + .stream() + .anyMatch( + eventProperty -> eventProperty.getRuntimeName() + .equals(runtimeName) + ); + } +} \ No newline at end of file diff --git a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java index 63b8ce2f6d..1be9306661 100644 --- a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java +++ b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java @@ -26,7 +26,6 @@ import org.influxdb.BatchOptions; import org.influxdb.dto.Point; -import org.influxdb.dto.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +71,7 @@ private void connect() throws SpRuntimeException { // Checking whether the database exists if (!influxClientProvider.databaseExists(influxDb, databaseName)) { LOG.info("Database '" + databaseName + "' not found. Gets created ..."); - createDatabase(databaseName); + influxClientProvider.createDatabase(influxDb, databaseName); } // setting up the database @@ -80,20 +79,6 @@ private void connect() throws SpRuntimeException { influxDb.enableBatch(BatchOptions.DEFAULTS.actions(batchSize).flushDuration(flushDuration)); } - - /** - * Creates a new database with the given name - * - * @param dbName The name of the database which should be created - */ - private void createDatabase(String dbName) throws SpRuntimeException { - if (!dbName.matches("^[a-zA-Z_][a-zA-Z0-9_]*$")) { - throw new SpRuntimeException( - "Databasename '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$"); - } - influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", "")); - } - /** * Saves an event to the connnected InfluxDB database * diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml index f41f811d8b..34680e814d 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml @@ -76,6 +76,17 @@ commons-pool2 + + + junit + junit + test + + + org.mockito + mockito-core + test + diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java index ccf27a5da5..a32aaa7cfc 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java @@ -23,6 +23,7 @@ import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; +import org.apache.streampipes.sinks.internal.jvm.migration.DataLakeSinkMigrationV1; import org.apache.streampipes.sinks.internal.jvm.notification.NotificationProducer; import java.util.Collections; @@ -44,6 +45,6 @@ public List> pipelineElements() { @Override public List> migrators() { - return Collections.emptyList(); + return List.of(new DataLakeSinkMigrationV1()); } } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java index 82d8bf2b5a..cbf1511334 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java @@ -24,24 +24,29 @@ import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy; import org.apache.streampipes.model.graph.DataSinkDescription; import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.sdk.builder.DataSinkBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.params.compat.SinkParams; import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; - public class DataLakeSink extends StreamPipesDataSink { private static final String DATABASE_MEASUREMENT_KEY = "db_measurement"; private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping"; + public static final String SCHEMA_UPDATE_KEY = "schema_update"; + + public static final String SCHEMA_UPDATE_OPTION = "Update schema"; + + public static final String EXTEND_EXISTING_SCHEMA_OPTION = "Extend existing schema"; private TimeSeriesStore timeSeriesStore; @@ -49,32 +54,53 @@ public class DataLakeSink extends StreamPipesDataSink { @Override public DataSinkDescription declareModel() { return DataSinkBuilder - .create("org.apache.streampipes.sinks.internal.jvm.datalake", 0) + .create("org.apache.streampipes.sinks.internal.jvm.datalake", 1) .withLocales(Locales.EN) .withAssets(Assets.DOCUMENTATION, Assets.ICON) .category(DataSinkType.INTERNAL) - .requiredStream(StreamRequirementsBuilder.create() - .requiredPropertyWithUnaryMapping( - EpRequirements.timestampReq(), - Labels.withId(TIMESTAMP_MAPPING_KEY), - PropertyScope.NONE) - .build()) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.timestampReq(), + Labels.withId(TIMESTAMP_MAPPING_KEY), + PropertyScope.NONE + ) + .build()) .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY)) + .requiredSingleValueSelection( + Labels.withId(SCHEMA_UPDATE_KEY), + Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION) + ) + .build(); } @Override public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { - String timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY); - String measureName = parameters.extractor().singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class); - EventSchema eventSchema = parameters.getInputSchemaInfos().get(0).getEventSchema(); + var extractor = parameters.extractor(); + var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY); + var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class); + var eventSchema = parameters.getInputSchemaInfos() + .get(0) + .getEventSchema(); + + + var measure = new DataLakeMeasure(measureName, timestampField, eventSchema); + + var schemaUpdateOptionString = extractor.selectedSingleValue(SCHEMA_UPDATE_KEY, String.class); - DataLakeMeasure measure = new DataLakeMeasure(measureName, timestampField, eventSchema); + if (schemaUpdateOptionString.equals(EXTEND_EXISTING_SCHEMA_OPTION)) { + measure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.EXTEND_EXISTING_SCHEMA); + } else { + measure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA); + } - this.timeSeriesStore = new TimeSeriesStore(Environments.getEnvironment(), + this.timeSeriesStore = new TimeSeriesStore( + Environments.getEnvironment(), runtimeContext.getStreamPipesClient(), measure, - true); + true + ); } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1.java new file mode 100644 index 0000000000..b63c35999f --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.internal.jvm.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.model.staticproperty.OneOfStaticProperty; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; + +public class DataLakeSinkMigrationV1 implements IDataSinkMigrator { + + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + "org.apache.streampipes.sinks.internal.jvm.datalake", + SpServiceTagPrefix.DATA_SINK, + 0, + 1 + ); + } + + /** + * Adds the static property for schema update to the sink and selects the option to update the + * schema as a default + */ + @Override + public MigrationResult migrate( + DataSinkInvocation element, + IDataSinkParameterExtractor extractor + ) throws RuntimeException { + var oneOfStaticProperty = createDefaultSchemaUpdateStrategy(); + + element.getStaticProperties() + .add(oneOfStaticProperty); + + return MigrationResult.success(element); + } + + private static OneOfStaticProperty createDefaultSchemaUpdateStrategy() { + var label = Labels.from( + DataLakeSink.SCHEMA_UPDATE_KEY, + DataLakeSink.SCHEMA_UPDATE_OPTION, + "Update existing schemas with the new one or extend the existing schema with new properties" + ); + var schemaUpdateStaticProperty = new OneOfStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription() + ); + + var options = Options.from(DataLakeSink.SCHEMA_UPDATE_OPTION, DataLakeSink.EXTEND_EXISTING_SCHEMA_OPTION); + options.get(0) + .setSelected(true); + schemaUpdateStaticProperty.setOptions(options); + return schemaUpdateStaticProperty; + } +} diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md index b9661f99bc..2d58b7e3d9 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md @@ -27,7 +27,8 @@ ## Description Stores events in the internal data lake so that data can be visualized in the live dashboard or in the data explorer. -Simply create a pipeline with a data lake sink, switch to one of the data exploration tool and start exploring your data! +Simply create a pipeline with a data lake sink, switch to one of the data exploration tool and start exploring your +data! *** @@ -40,7 +41,22 @@ This sink requires an event that provides a timestamp value (a field that is mar ## Configuration -### Index +### Identifier -The name of the storage group of this event. +The name of the measurement (table) where the events are stored. +### Schema Update Options + +The Schema Update Options dictate the behavior when encountering a measurement (table) with the same identifier. + +#### Option 1: Update Schema + +- **Description:** Overrides the existing schema. +- **Effect on Data:** The data remains in the data lake, but accessing old data is restricted to file export. +- **Impact on Features:** Other StreamPipes features, such as the Data Explorer, will only display the new event schema. + +#### Option 2: Extend Existing Schema + +- **Description:** Keeps old event fields in the event schema. +- **Strategy:** This follows an append-only strategy, allowing continued work with historic data. +- **Consideration:** Old properties may exist for which no new data is generated. diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en index b79cc538b4..0b604bdfd9 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en @@ -23,3 +23,6 @@ db_measurement.description=The name of the identifier under which the data is to timestamp_mapping.title=Timestamp Field timestamp_mapping.description=The value which contains a timestamp + +schema_update.title=Schema Update +schema_update.description=Update existing schemas with the new one or extend the existing schema with new properties diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1Test.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1Test.java new file mode 100644 index 0000000000..a162ded7db --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/migration/DataLakeSinkMigrationV1Test.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.internal.jvm.migration; + +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.staticproperty.OneOfStaticProperty; +import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; +import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class DataLakeSinkMigrationV1Test { + + @Test + public void migrate() { + var dataLakeSinkMigrationV1 = new DataLakeSinkMigrationV1(); + + var extractor = mock(DataSinkParameterExtractor.class); + var invocation = new DataSinkInvocation(); + invocation.setStaticProperties(new ArrayList<>()); + + var actual = dataLakeSinkMigrationV1.migrate(invocation, extractor); + + assertTrue(actual.success()); + assertEquals(actual.element() + .getStaticProperties() + .size(), 1); + var schemaUpdateStaticProperty = getOneOfStaticProperty(actual); + assertEquals(schemaUpdateStaticProperty.getInternalName(), DataLakeSink.SCHEMA_UPDATE_KEY); + assertEquals(schemaUpdateStaticProperty.getOptions().get(0).isSelected(), true); + assertEquals(schemaUpdateStaticProperty.getOptions().get(1).isSelected(), false); + } + + private static OneOfStaticProperty getOneOfStaticProperty(MigrationResult actual) { + return (OneOfStaticProperty) actual.element() + .getStaticProperties() + .get(0); + } +} \ No newline at end of file diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java index 44076e6059..2d9441b283 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java @@ -51,6 +51,8 @@ public class DataLakeMeasure { private String schemaVersion; + private DataLakeMeasureSchemaUpdateStrategy schemaUpdateStrategy = DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA; + public DataLakeMeasure() { super(); } @@ -129,6 +131,14 @@ public void setTimestampField(String timestampField) { this.timestampField = timestampField; } + public DataLakeMeasureSchemaUpdateStrategy getSchemaUpdateStrategy() { + return schemaUpdateStrategy; + } + + public void setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy schemaUpdateStrategy) { + this.schemaUpdateStrategy = schemaUpdateStrategy; + } + /** * This can be used to get the name of the timestamp property without the stream prefix * diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasureSchemaUpdateStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasureSchemaUpdateStrategy.java new file mode 100644 index 0000000000..acd5bc232b --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasureSchemaUpdateStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.datalake; + +/** + * This enum contains the different options that are available for the update strategy of the data lake measure + */ +public enum DataLakeMeasureSchemaUpdateStrategy { + /** + * This strategy will update the schema of the data lake measure when it changes + */ + UPDATE_SCHEMA, + /** + * This strategy will extend the schema of the data lake measure when it changes and keep the old fields + */ + EXTEND_EXISTING_SCHEMA + +} \ No newline at end of file diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java index c40f2e56b2..7ed976cbc8 100644 --- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java +++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java @@ -24,6 +24,7 @@ import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; +import org.apache.streampipes.storage.management.StorageDispatcher; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; @@ -46,7 +47,10 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource { private final IDataExplorerSchemaManagement dataLakeMeasureManagement; public DataLakeMeasureResourceV4() { - this.dataLakeMeasureManagement = new DataExplorerSchemaManagement(); + var dataLakeStorage = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getDataLakeStorage(); + this.dataLakeMeasureManagement = new DataExplorerSchemaManagement(dataLakeStorage); } @POST @@ -54,7 +58,7 @@ public DataLakeMeasureResourceV4() { @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public Response addDataLake(DataLakeMeasure dataLakeMeasure) { - DataLakeMeasure result = this.dataLakeMeasureManagement.createMeasurement(dataLakeMeasure); + DataLakeMeasure result = this.dataLakeMeasureManagement.createOrUpdateMeasurement(dataLakeMeasure); return ok(result); } @@ -85,8 +89,10 @@ public Response getDataLakeMeasure(@PathParam("id") String elementId) { @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @Path("{id}") - public Response updateDataLakeMeasure(@PathParam("id") String elementId, - DataLakeMeasure measure) { + public Response updateDataLakeMeasure( + @PathParam("id") String elementId, + DataLakeMeasure measure + ) { if (elementId.equals(measure.getElementId())) { try { this.dataLakeMeasureManagement.updateMeasurement(measure); diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java index a0d798c370..f2ba8b46c2 100644 --- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java +++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java @@ -27,6 +27,7 @@ import org.apache.streampipes.model.datalake.SpQueryResult; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.rest.core.base.impl.AbstractRestResource; +import org.apache.streampipes.storage.management.StorageDispatcher; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -35,8 +36,6 @@ import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; @@ -80,22 +79,25 @@ @Path("v4/datalake") public class DataLakeResourceV4 extends AbstractRestResource { - private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class); - private DataExplorerQueryManagement dataLakeManagement; private final DataExplorerSchemaManagement dataExplorerSchemaManagement; public DataLakeResourceV4() { - this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement(); + var dataLakeStorage = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getDataLakeStorage(); + this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement(dataLakeStorage); this.dataLakeManagement = new DataExplorerQueryManagement(dataExplorerSchemaManagement); } public DataLakeResourceV4(DataExplorerQueryManagement dataLakeManagement) { + var dataLakeStorage = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getDataLakeStorage(); this.dataLakeManagement = dataLakeManagement; - this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement(); + this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement(dataLakeStorage); } - @DELETE @Path("/measurements/{measurementID}") @Operation(summary = "Remove data from a single measurement series with given id", tags = {"Data Lake"}, diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java index ceea800b11..397fce7b49 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java @@ -25,6 +25,7 @@ import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; + import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -34,9 +35,12 @@ import jakarta.ws.rs.Consumes; import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; + +@Path("api/v1/migrations/sink") public class DataSinkMigrationResource extends MigrateExtensionsResource< DataSinkInvocation, IDataSinkParameterExtractor, diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java index 6638ccc3e6..ff60661596 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java @@ -22,7 +22,6 @@ import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement; import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement; -import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; import org.apache.streampipes.manager.file.FileManager; import org.apache.streampipes.manager.pipeline.PipelineCacheManager; import org.apache.streampipes.manager.pipeline.PipelineCanvasMetadataCacheManager; @@ -95,8 +94,11 @@ public static void reset(String username) { }); // Remove all data in data lake - IDataExplorerSchemaManagement dataLakeMeasureManagement = new DataExplorerSchemaManagement(); - DataExplorerQueryManagement dataExplorerQueryManagement = + var dataLakeStorage = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getDataLakeStorage(); + var dataLakeMeasureManagement = new DataExplorerSchemaManagement(dataLakeStorage); + var dataExplorerQueryManagement = new DataExplorerQueryManagement(dataLakeMeasureManagement); List allMeasurements = dataLakeMeasureManagement.getAllMeasurements(); allMeasurements.forEach(measurement -> { diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts index fe8e13571f..2947b55107 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts @@ -19,7 +19,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2023-10-27 10:44:54. +// Generated using typescript-generator version 3.2.1263 on 2023-12-04 13:14:26. export class ExtensionsServiceEndpointItem { appId: string; diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 85f9a230b6..f1613ff344 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -16,11 +16,10 @@ * specific language governing permissions and limitations * under the License. */ - /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2023-10-30 22:49:29. +// Generated using typescript-generator version 3.2.1263 on 2023-12-04 13:14:24. export class NamedStreamPipesEntity { '@class': @@ -45,10 +44,6 @@ export class NamedStreamPipesEntity { 'includesLocales': boolean; 'internallyManaged': boolean; 'name': string; - /** - * @deprecated - */ - 'uri': string; static 'fromData'( data: NamedStreamPipesEntity, @@ -78,7 +73,6 @@ export class NamedStreamPipesEntity { instance.includesLocales = data.includesLocales; instance.internallyManaged = data.internallyManaged; instance.name = data.name; - instance.uri = data.uri; return instance; } } @@ -1157,6 +1151,7 @@ export class DataLakeMeasure { 'pipelineId': string; 'pipelineIsRunning': boolean; 'pipelineName': string; + 'schemaUpdateStrategy': DataLakeMeasureSchemaUpdateStrategy; 'schemaVersion': string; 'timestampField': string; @@ -1176,6 +1171,7 @@ export class DataLakeMeasure { instance.pipelineId = data.pipelineId; instance.pipelineIsRunning = data.pipelineIsRunning; instance.pipelineName = data.pipelineName; + instance.schemaUpdateStrategy = data.schemaUpdateStrategy; instance.schemaVersion = data.schemaVersion; instance.timestampField = data.timestampField; return instance; @@ -4063,6 +4059,10 @@ export type ConfigurationScope = | 'CONTAINER_GLOBAL_CONFIG' | 'PIPELINE_ELEMENT_CONFIG'; +export type DataLakeMeasureSchemaUpdateStrategy = + | 'UPDATE_SCHEMA' + | 'EXTEND_EXISTING_SCHEMA'; + export type EdgeValidationStatusType = 'COMPLETE' | 'INCOMPLETE' | 'INVALID'; export type EventPropertyUnion = diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts index f467d4f7d6..9b8ef3ce13 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts @@ -231,6 +231,10 @@ export class AdapterStartedDialog implements OnInit { 'timestamp_mapping', 's0::' + this.dataLakeTimestampField, ) + .setOneOfStaticProperty( + 'schema_update', + 'Update schema', + ) .build(); this.pipelineTemplateService diff --git a/ui/src/app/core-services/template/PipelineInvocationBuilder.ts b/ui/src/app/core-services/template/PipelineInvocationBuilder.ts index 03715cf48a..9192a735f3 100644 --- a/ui/src/app/core-services/template/PipelineInvocationBuilder.ts +++ b/ui/src/app/core-services/template/PipelineInvocationBuilder.ts @@ -57,6 +57,26 @@ export class PipelineInvocationBuilder { return this; } + public setOneOfStaticProperty(name: string, value: string) { + this.pipelineTemplateInvocation.staticProperties.forEach(property => { + if ( + // property instanceof OneOfStaticProperty && + property['@class'] === + 'org.apache.streampipes.model.staticproperty.OneOfStaticProperty' && + 'jsplumb_domId2' + name === property.internalName + ) { + // set selected for selected option + property.options.forEach(option => { + if (option.name === value) { + option.selected = true; + } + }); + } + }); + + return this; + } + public setMappingPropertyUnary(name: string, value: string) { this.pipelineTemplateInvocation.staticProperties.forEach(property => { if ( diff --git a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts index f9f12f1bc3..b52f66ed86 100644 --- a/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts +++ b/ui/src/app/editor/dialog/save-pipeline/save-pipeline.component.ts @@ -180,7 +180,6 @@ export class SavePipelineComponent implements OnInit { entity.elementId.substring(0, lastIdIndex + 1) + this.jsplumbService.makeId(5); entity.elementId = newElementId; - entity.uri = newElementId; } storePipelineCanvasMetadata(pipelineId: string, updateMode: boolean) { diff --git a/ui/src/app/editor/services/jsplumb.service.ts b/ui/src/app/editor/services/jsplumb.service.ts index 5303ae18d2..cd82716212 100644 --- a/ui/src/app/editor/services/jsplumb.service.ts +++ b/ui/src/app/editor/services/jsplumb.service.ts @@ -312,7 +312,6 @@ export class JsplumbService { newElementId: string, ) { pipelineElement.elementId = newElementId; - pipelineElement.uri = newElementId; } makeId(count: number) {