diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index 3debbb8d58..c8d16e4d33 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -91,15 +91,7 @@ public String addAdapter(AdapterDescription ad, return ad.getElementId(); } - public void updateAdapter(AdapterDescription ad, - String principalSid) - throws AdapterException { - // update adapter in database - this.adapterResourceManager.encryptAndUpdate(ad); - // update data source in database - this.updateDataSource(ad); - } public AdapterDescription getAdapter(String elementId) throws AdapterException { List allAdapters = adapterInstanceStorage.getAllAdapters(); @@ -180,16 +172,6 @@ public void startStreamAdapter(String elementId) throws AdapterException { } } - private void updateDataSource(AdapterDescription ad) { - // get data source - SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId()); - - SourcesManagement.updateDataStream(ad, dataStream); - - // Update data source in database - this.dataStreamResourceManager.update(dataStream); - } - private void installDataSource(SpDataStream stream, String principalSid, boolean publicElement) throws AdapterException { @@ -204,5 +186,4 @@ private void installDataSource(SpDataStream stream, private IAdapterStorage getAdapterInstanceStorage() { return new AdapterInstanceStorageImpl(); } - } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java new file mode 100644 index 0000000000..0b2b38a6ef --- /dev/null +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.management.management; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; +import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.pipeline.PipelineManager; +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.connect.adapter.PipelineUpdateInfo; +import org.apache.streampipes.model.message.PipelineModificationMessage; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; +import org.apache.streampipes.model.pipeline.PipelineHealthStatus; +import org.apache.streampipes.resource.management.AdapterResourceManager; +import org.apache.streampipes.resource.management.DataStreamResourceManager; +import org.apache.streampipes.resource.management.SpResourceManager; +import org.apache.streampipes.storage.management.StorageDispatcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class AdapterUpdateManagement { + + private static final Logger LOG = LoggerFactory.getLogger(AdapterUpdateManagement.class); + + private final AdapterMasterManagement adapterMasterManagement; + private final AdapterResourceManager adapterResourceManager; + private final DataStreamResourceManager dataStreamResourceManager; + + public AdapterUpdateManagement(AdapterMasterManagement adapterMasterManagement) { + this.adapterMasterManagement = adapterMasterManagement; + this.adapterResourceManager = new SpResourceManager().manageAdapters(); + this.dataStreamResourceManager = new SpResourceManager().manageDataStreams(); + } + + public void updateAdapter(AdapterDescription ad) + throws AdapterException { + // update adapter in database + this.adapterResourceManager.encryptAndUpdate(ad); + boolean shouldRestart = ad.isRunning(); + + if (ad.isRunning()) { + this.adapterMasterManagement.stopStreamAdapter(ad.getElementId()); + } + + // update data source in database + this.updateDataSource(ad); + + // update pipelines + var affectedPipelines = PipelineManager.getPipelinesContainingElements(ad.getCorrespondingDataStreamElementId()); + + affectedPipelines.forEach(p -> { + var shouldRestartPipeline = p.isRunning(); + if (shouldRestartPipeline) { + Operations.stopPipeline(p, true); + } + var storedPipeline = PipelineManager.getPipeline(p.getPipelineId()); + var pipeline = applyUpdatedDataStream(storedPipeline, ad); + try { + var modificationMessage = Operations.validatePipeline(pipeline); + var updateInfo = makeUpdateInfo(modificationMessage, pipeline); + var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline(); + var canAutoMigrate = canAutoMigrate(modificationMessage); + if (!canAutoMigrate) { + modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); + modifiedPipeline.setPipelineNotifications(toNotification(updateInfo)); + modifiedPipeline.setValid(false); + } + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(modifiedPipeline); + if (shouldRestartPipeline && canAutoMigrate) { + Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId())); + } + } catch (Exception e) { + LOG.error("Could not update pipeline {}", pipeline.getName(), e); + } + }); + + if (shouldRestart) { + this.adapterMasterManagement.startStreamAdapter(ad.getElementId()); + } + } + + public List checkPipelineMigrations(AdapterDescription adapterDescription) { + var affectedPipelines = PipelineManager + .getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId()); + var updateInfos = new ArrayList(); + + affectedPipelines.forEach(pipeline -> { + var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription); + try { + var modificationMessage = Operations.validatePipeline(updatedPipeline); + var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline); + updateInfos.add(updateInfo); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return updateInfos; + } + + private PipelineUpdateInfo makeUpdateInfo(PipelineModificationMessage modificationMessage, + Pipeline pipeline) { + var updateInfo = new PipelineUpdateInfo(); + updateInfo.setPipelineId(pipeline.getPipelineId()); + updateInfo.setPipelineName(pipeline.getName()); + updateInfo.setCanAutoMigrate(canAutoMigrate(modificationMessage)); + updateInfo.setValidationInfos(extractModificationWarnings(pipeline, modificationMessage)); + return updateInfo; + } + + private boolean canAutoMigrate(PipelineModificationMessage modificationMessage) { + return modificationMessage + .getPipelineModifications() + .stream() + .allMatch(m -> m.isPipelineElementValid() && m.getValidationInfos().isEmpty()); + } + + private List toNotification(PipelineUpdateInfo updateInfo) { + var notifications = new ArrayList(); + updateInfo.getValidationInfos().keySet().forEach((k) -> { + var msg = updateInfo + .getValidationInfos() + .get(k) + .stream() + .map(PipelineElementValidationInfo::getMessage) + .toList() + .toString(); + notifications.add(String.format("Adapter modification: %s: %s", k, msg)); + }); + return notifications; + } + + private Map> extractModificationWarnings( + Pipeline pipeline, + PipelineModificationMessage modificationMessage) { + var infos = new HashMap>(); + modificationMessage + .getPipelineModifications() + .stream() + .filter(v -> !v.getValidationInfos().isEmpty()) + .forEach(m -> infos.put(getPipelineElementName(pipeline, m.getElementId()), m.getValidationInfos())); + + return infos; + } + + private String getPipelineElementName(Pipeline pipeline, + String elementId) { + return Stream + .concat(pipeline.getSepas().stream(), pipeline.getActions().stream()) + .filter(p -> p.getElementId().equals(elementId)) + .findFirst() + .map(NamedStreamPipesEntity::getName) + .orElse(elementId); + } + + private Pipeline applyUpdatedDataStream(Pipeline originalPipeline, + AdapterDescription updatedAdapter) { + var updatedStreams = originalPipeline + .getStreams() + .stream() + .peek(s -> { + if (s.getElementId().equals(updatedAdapter.getCorrespondingDataStreamElementId())) { + s.setEventSchema(updatedAdapter.getEventSchema()); + } + }) + .toList(); + + originalPipeline.setStreams(updatedStreams); + + return originalPipeline; + } + + private void updateDataSource(AdapterDescription ad) { + // get data source + SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId()); + + SourcesManagement.updateDataStream(ad, dataStream); + + // Update data source in database + this.dataStreamResourceManager.update(dataStream); + } +} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index a8cae5b72b..104bae82e0 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -31,6 +31,7 @@ import org.apache.streampipes.serializers.json.JacksonSerializer; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpStatus; import org.apache.http.client.fluent.Response; import org.apache.http.util.EntityUtils; @@ -41,21 +42,21 @@ public class GuessManagement { - private static Logger logger = LoggerFactory.getLogger(GuessManagement.class); - private WorkerUrlProvider workerUrlProvider; + private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class); + private final WorkerUrlProvider workerUrlProvider; + private final ObjectMapper objectMapper; public GuessManagement() { this.workerUrlProvider = new WorkerUrlProvider(); + this.objectMapper = JacksonSerializer.getObjectMapper(); } public GuessSchema guessSchema(AdapterDescription adapterDescription) throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException { - var workerUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId()); - workerUrl = workerUrl + WorkerPaths.getGuessSchemaPath(); - - var objectMapper = JacksonSerializer.getObjectMapper(); + var workerUrl = getWorkerUrl(adapterDescription.getAppId()); var description = objectMapper.writeValueAsString(adapterDescription); - logger.info("Guess schema at: " + workerUrl); + + LOG.info("Guess schema at: " + workerUrl); Response requestResponse = ExtensionServiceExecutions .extServicePostRequest(workerUrl, description) .execute(); @@ -71,6 +72,11 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) } } + private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { + var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId); + return baseUrl + WorkerPaths.getGuessSchemaPath(); + } + public String performAdapterEventPreview(AdapterEventPreview previewRequest) throws JsonProcessingException { return new AdapterEventPreviewPipeline(previewRequest).makePreview(); } diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/convert/ToTransformedSchemaConverter.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/convert/ToTransformedSchemaConverter.java index bac84f5058..f42c6a55df 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/convert/ToTransformedSchemaConverter.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/convert/ToTransformedSchemaConverter.java @@ -34,13 +34,14 @@ import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyNested; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.util.Cloner; import org.apache.streampipes.sdk.helpers.EpProperties; -import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.utils.Datatypes; import java.net.URI; import java.util.List; +import java.util.Objects; import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findPrimitiveProperty; import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findProperty; @@ -106,7 +107,21 @@ public void visit(AddTimestampRuleDescription rule) { @Override public void visit(AddValueTransformationRuleDescription rule) { - this.properties.add(EpProperties.stringEp(Labels.empty(), rule.getRuntimeKey(), "")); + var property = new EventPropertyPrimitive(); + property.setElementId("http://eventProperty.de/staticValue/" + rule.getStaticValue()); + property.setRuntimeName(rule.getRuntimeKey()); + property.setRuntimeType(rule.getDatatype()); + property.setLabel(rule.getLabel()); + property.setDescription(rule.getDescription()); + property.setPropertyScope(rule.getPropertyScope().name()); + + if (Objects.nonNull(rule.getSemanticType())) { + property.setDomainProperties(List.of(URI.create(rule.getSemanticType()))); + } + if (Objects.nonNull(rule.getMeasurementUnit())) { + property.setMeasurementUnit(URI.create(rule.getMeasurementUnit())); + } + this.properties.add(property); } @Override @@ -117,7 +132,10 @@ public void visit(ChangeDatatypeTransformationRuleDescription rule) { @Override public void visit(CorrectionValueTransformationRuleDescription rule) { - // does not affect schema + var property = findPrimitiveProperty(properties, rule.getRuntimeKey()); + var metadata = property.getAdditionalMetadata(); + metadata.put("operator", rule.getOperator()); + metadata.put("correctionValue", rule.getCorrectionValue()); } @Override @@ -125,12 +143,19 @@ public void visit(TimestampTranfsformationRuleDescription rule) { var property = findPrimitiveProperty(properties, rule.getRuntimeKey()); property.setDomainProperties(List.of(URI.create("http://schema.org/DateTime"))); property.setRuntimeType(Datatypes.Long.toString()); + var metadata = property.getAdditionalMetadata(); + metadata.put("mode", rule.getMode()); + metadata.put("formatString", rule.getFormatString()); + metadata.put("multiplier", rule.getMultiplier()); } @Override public void visit(UnitTransformRuleDescription rule) { var property = findPrimitiveProperty(properties, rule.getRuntimeKey()); property.setMeasurementUnit(URI.create(rule.getToUnitRessourceURL())); + var metadata = property.getAdditionalMetadata(); + metadata.put("fromMeasurementUnit", rule.getFromUnitRessourceURL()); + metadata.put("toMeasurementUnit", rule.getToUnitRessourceURL()); } @Override diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/StatelessTransformationRuleGeneratorVisitor.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/StatelessTransformationRuleGeneratorVisitor.java index 1c7038c84d..7adf1b3765 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/StatelessTransformationRuleGeneratorVisitor.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/StatelessTransformationRuleGeneratorVisitor.java @@ -91,7 +91,8 @@ public void visit(AddTimestampRuleDescription ruleDesc) { public void visit(AddValueTransformationRuleDescription ruleDesc) { rules.add(new AddValueTransformationRule( ruleDesc.getRuntimeKey(), - ruleDesc.getStaticValue())); + ruleDesc.getStaticValue(), + ruleDesc.getDatatype())); } @Override diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java index c1b8ff688a..77b8b12f20 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java @@ -18,6 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.schema; +import org.apache.streampipes.connect.shared.DatatypeUtils; import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; import java.util.Map; @@ -26,15 +27,20 @@ public class AddValueTransformationRule implements TransformationRule { private final String runtimeKey; private final String value; + private final String datatype; - public AddValueTransformationRule(String runtimeKey, String value) { + public AddValueTransformationRule(String runtimeKey, + String value, + String datatype) { this.runtimeKey = runtimeKey; this.value = value; + this.datatype = datatype; } @Override public Map apply(Map event) { - event.put(runtimeKey, value); + var convertedValue = DatatypeUtils.convertValue(value, datatype); + event.put(runtimeKey, convertedValue); return event; } diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java index 99ab512818..5b063fff7c 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java @@ -43,7 +43,7 @@ public TimeSeriesStore(Environment environment, DataLakeMeasure measure, boolean enableImageStore) { - measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure); + DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure); if (enableImageStore) { // TODO check if event properties are replaces correctly diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java index f28cf6cbdb..8c4dc4a344 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java @@ -60,6 +60,10 @@ public DataLakeMeasure createMeasurement(DataLakeMeasure measure) { DataLakeMeasure oldEntry = optional.get(); if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), measure.getEventSchema().getEventProperties())) { + oldEntry.setEventSchema(measure.getEventSchema()); + oldEntry.setTimestampField(measure.getTimestampField()); + oldEntry.setPipelineName(measure.getPipelineName()); + getDataLakeStorage().updateDataLakeMeasure(oldEntry); return oldEntry; } } else { diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/GuessManagement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/GuessManagement.java index 9f561a34ab..6720c6a299 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/GuessManagement.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/GuessManagement.java @@ -60,12 +60,19 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) throws Ada var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers); + LOG.info("Requesting the event schema for: " + adapterDescription.getAppId()); + try { - var schema = adapterInstance + var guessedSchemaObj = adapterInstance .onSchemaRequested(extractor, guessSchemaContext); - LOG.info("Start guessing schema for: " + adapterDescription.getAppId()); - return schema; + if (!adapterDescription.getEventSchema().getEventProperties().isEmpty()) { + new SchemaUpdateManagement().computeSchemaChanges(adapterDescription, guessedSchemaObj); + } else { + guessedSchemaObj.setTargetSchema(guessedSchemaObj.getEventSchema()); + } + + return guessedSchemaObj; } catch (ParseException e) { LOG.error(e.toString()); diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/SchemaUpdateManagement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/SchemaUpdateManagement.java new file mode 100644 index 0000000000..954fbfec99 --- /dev/null +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/SchemaUpdateManagement.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.management.connect; + +import org.apache.streampipes.connect.shared.preprocessing.convert.SchemaConverter; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; +import org.apache.streampipes.model.schema.EventSchema; + +import java.util.List; +import java.util.Optional; + +public class SchemaUpdateManagement { + + public void computeSchemaChanges(AdapterDescription adapterDescription, + GuessSchema schema) { + var actualFields = schema.getEventSchema().getEventProperties(); + var previousFields = adapterDescription.getEventSchema().getEventProperties(); + var transformationRules = adapterDescription.getRules(); + + var removedFields = findRemovedFields(actualFields, previousFields); + var currentOriginalSchema = new SchemaConverter().toOriginalSchema( + adapterDescription.getEventSchema(), + transformationRules + ); + var guessedOriginalSchema = schema.getEventSchema(); + var modifiedSchema = modifySchema(currentOriginalSchema, guessedOriginalSchema); + var modifiedRules = modifyTransformationRules(transformationRules, modifiedSchema); + var transformedSchema = new SchemaConverter().toTransformedSchema(modifiedSchema, modifiedRules); + schema.setEventSchema(modifiedSchema); + schema.setTargetSchema(transformedSchema); + + schema.setRemovedProperties(removedFields); + } + + private List modifyTransformationRules(List rules, + EventSchema modifiedSchema) { + var properties = modifiedSchema.getEventProperties(); + var visitor = new TransformationRuleUpdateVisitor(properties, rules); + rules.forEach(rule -> rule.accept(visitor)); + return visitor.getValidRules(); + } + + private EventSchema modifySchema(EventSchema currentOriginalSchema, + EventSchema guessedOriginalSchema) { + return new EventSchema( + guessedOriginalSchema + .getEventProperties() + .stream() + .map(ep -> { + var currentEpOpt = getExistingEp(ep, currentOriginalSchema); + return currentEpOpt.orElse(ep); + }) + .toList() + ); + } + + private Optional getExistingEp(EventProperty ep, + EventSchema currentOriginalSchema) { + return currentOriginalSchema + .getEventProperties() + .stream() + .filter(currEp -> epExists(ep, currEp)) + .findFirst(); + } + + private boolean epExists(EventProperty ep, + EventProperty currEp) { + return ep.getClass().equals(currEp.getClass()) + && ep.getRuntimeName().equals(currEp.getRuntimeName()) + && isSameDatatype(ep, currEp); + } + + private boolean isSameDatatype(EventProperty ep, EventProperty currEp) { + if (ep instanceof EventPropertyPrimitive && currEp instanceof EventPropertyPrimitive) { + return ((EventPropertyPrimitive) ep).getRuntimeType() + .equals(((EventPropertyPrimitive) currEp).getRuntimeType()); + } else { + return true; + } + } + + private List findRemovedFields(List actualFields, + List previousFields) { + return previousFields + .stream() + .filter(field -> !existsField(field, actualFields)) + .toList(); + } + + private boolean existsField(EventProperty field, + List actualFields) { + return actualFields + .stream() + .anyMatch(currField -> field.getRuntimeName().equals(currField.getRuntimeName())); + } +} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/TransformationRuleUpdateVisitor.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/TransformationRuleUpdateVisitor.java new file mode 100644 index 0000000000..ef357e4c48 --- /dev/null +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/TransformationRuleUpdateVisitor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.management.connect; + +import org.apache.streampipes.connect.shared.preprocessing.utils.Utils; +import org.apache.streampipes.model.connect.rules.ITransformationRuleVisitor; +import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.schema.CreateNestedRuleDescription; +import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription; +import org.apache.streampipes.model.connect.rules.schema.MoveRuleDescription; +import org.apache.streampipes.model.connect.rules.schema.RenameRuleDescription; +import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; +import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription; +import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription; +import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription; +import org.apache.streampipes.model.schema.EventProperty; + +import java.util.ArrayList; +import java.util.List; + +public class TransformationRuleUpdateVisitor implements ITransformationRuleVisitor { + + private final List existingPropertyRuntimeNames; + private final List validRules; + private final List allRules; + + + public TransformationRuleUpdateVisitor(List existingProperties, + List allRules) { + this.existingPropertyRuntimeNames = existingProperties + .stream() + .map(EventProperty::getRuntimeName).toList(); + this.allRules = allRules; + this.validRules = new ArrayList<>(); + } + + @Override + public void visit(CreateNestedRuleDescription rule) { + validRules.add(rule); + } + + @Override + public void visit(DeleteRuleDescription rule) { + if (containsKey(rule.getRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(MoveRuleDescription rule) { + if (containsKey(rule.getOldRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(RenameRuleDescription rule) { + if (containsKey(rule.getOldRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(EventRateTransformationRuleDescription rule) { + // Do nothing + } + + @Override + public void visit(RemoveDuplicatesTransformationRuleDescription rule) { + // Do nothing + } + + @Override + public void visit(AddTimestampRuleDescription rule) { + validRules.add(rule); + } + + @Override + public void visit(AddValueTransformationRuleDescription rule) { + validRules.add(rule); + } + + @Override + public void visit(ChangeDatatypeTransformationRuleDescription rule) { + if (containsKey(rule.getRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(CorrectionValueTransformationRuleDescription rule) { + if (containsKey(rule.getRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(TimestampTranfsformationRuleDescription rule) { + if (containsKey(rule.getRuntimeKey())) { + validRules.add(rule); + } + } + + @Override + public void visit(UnitTransformRuleDescription rule) { + if (containsKey(rule.getRuntimeKey())) { + validRules.add(rule); + } + } + + private boolean containsKey(String fullRuntimeKey) { + var runtimeKeys = Utils.toKeyArray(fullRuntimeKey); + if (!runtimeKeys.isEmpty()) { + return this.existingPropertyRuntimeNames.contains(runtimeKeys.get(0)) || inRenameRule(runtimeKeys.get(0)); + } else { + return false; + } + } + + private boolean inRenameRule(String runtimeKey) { + return this.allRules + .stream() + .filter(rule -> rule instanceof RenameRuleDescription) + .anyMatch(rule -> ((RenameRuleDescription) rule).getNewRuntimeKey().equals(runtimeKey)); + } + + public List getValidRules() { + return validRules; + } + +} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java index 7edd585ecb..b703b94b3c 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java @@ -30,7 +30,7 @@ public class AdapterPipeline implements IAdapterPipeline { private List pipelineElements; private IAdapterPipelineElement pipelineSink; - private EventSchema resultingEventSchema; + private final EventSchema resultingEventSchema; public AdapterPipeline(List pipelineElements, EventSchema resultingEventSchema) { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java new file mode 100644 index 0000000000..0d9f79f47b --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.connect.adapter; + +import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; +import org.apache.streampipes.model.shared.annotation.TsModel; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@TsModel +public class PipelineUpdateInfo { + + private String pipelineId; + private String pipelineName; + private boolean canAutoMigrate; + private String migrationInfo; + private Map> validationInfos; + + public PipelineUpdateInfo() { + this.validationInfos = new HashMap<>(); + } + + public String getPipelineId() { + return pipelineId; + } + + public void setPipelineId(String pipelineId) { + this.pipelineId = pipelineId; + } + + public String getPipelineName() { + return pipelineName; + } + + public void setPipelineName(String pipelineName) { + this.pipelineName = pipelineName; + } + + public boolean isCanAutoMigrate() { + return canAutoMigrate; + } + + public void setCanAutoMigrate(boolean canAutoMigrate) { + this.canAutoMigrate = canAutoMigrate; + } + + public String getMigrationInfo() { + return migrationInfo; + } + + public void setMigrationInfo(String migrationInfo) { + this.migrationInfo = migrationInfo; + } + + public Map> getValidationInfos() { + return validationInfos; + } + + public void setValidationInfos(Map> validationInfos) { + this.validationInfos = validationInfos; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/GuessSchema.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/GuessSchema.java index dfa446b4aa..c72e74fcae 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/GuessSchema.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/GuessSchema.java @@ -18,6 +18,9 @@ package org.apache.streampipes.model.connect.guess; +import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; +import org.apache.streampipes.model.message.Notification; +import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.shared.annotation.TsModel; @@ -34,23 +37,22 @@ public class GuessSchema { public EventSchema eventSchema; - - //public List> eventPreview; - + public EventSchema targetSchema; private List eventPreview; - + private List modifiedRules; public Map fieldStatusInfo; + // for adapter updates + private List removedProperties; + private List updateNotifications; + public GuessSchema() { super(); + this.modifiedRules = new ArrayList<>(); this.eventPreview = new ArrayList<>(); this.fieldStatusInfo = new HashMap<>(); - } - - public GuessSchema(GuessSchema other) { - this.eventSchema = other.getEventSchema() != null ? new EventSchema(other.getEventSchema()) : null; - this.eventPreview = other.getEventPreview(); - this.fieldStatusInfo = other.getFieldStatusInfo(); + this.removedProperties = new ArrayList<>(); + this.updateNotifications = new ArrayList<>(); } public EventSchema getEventSchema() { @@ -61,6 +63,14 @@ public void setEventSchema(EventSchema eventSchema) { this.eventSchema = eventSchema; } + public EventSchema getTargetSchema() { + return targetSchema; + } + + public void setTargetSchema(EventSchema targetSchema) { + this.targetSchema = targetSchema; + } + public List getEventPreview() { return eventPreview; } @@ -77,6 +87,30 @@ public void setFieldStatusInfo(Map fieldStatusInfo) { this.fieldStatusInfo = fieldStatusInfo; } + public List getRemovedProperties() { + return removedProperties; + } + + public void setRemovedProperties(List removedProperties) { + this.removedProperties = removedProperties; + } + + public List getUpdateNotifications() { + return updateNotifications; + } + + public void setUpdateNotifications(List updateNotifications) { + this.updateNotifications = updateNotifications; + } + + public List getModifiedRules() { + return modifiedRules; + } + + public void setModifiedRules(List modifiedRules) { + this.modifiedRules = modifiedRules; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRulePriority.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRulePriority.java index d3d1587fd2..3fcb8ba429 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRulePriority.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRulePriority.java @@ -24,8 +24,8 @@ public enum TransformationRulePriority { ADD_VALUE(110), RENAME(210), - MOVE(220), CREATE_NESTED(230), + MOVE(235), DELETE(240), CHANGE_UNIT(310), diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/AddValueTransformationRuleDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/AddValueTransformationRuleDescription.java index f077dd09ba..1c32b33f4d 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/AddValueTransformationRuleDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/AddValueTransformationRuleDescription.java @@ -20,12 +20,20 @@ import org.apache.streampipes.model.connect.rules.ITransformationRuleVisitor; import org.apache.streampipes.model.connect.rules.TransformationRulePriority; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.vocabulary.XSD; public class AddValueTransformationRuleDescription extends ValueTransformationRuleDescription { private String runtimeKey; private String staticValue; + private String datatype = XSD.STRING.toString(); + private String semanticType; + private String measurementUnit; + private String label; + private String description; + private PropertyScope propertyScope = PropertyScope.MEASUREMENT_PROPERTY; public AddValueTransformationRuleDescription() { super(); @@ -53,6 +61,54 @@ public void setStaticValue(String staticValue) { this.staticValue = staticValue; } + public String getDatatype() { + return datatype; + } + + public void setDatatype(String datatype) { + this.datatype = datatype; + } + + public String getSemanticType() { + return semanticType; + } + + public void setSemanticType(String semanticType) { + this.semanticType = semanticType; + } + + public String getMeasurementUnit() { + return measurementUnit; + } + + public void setMeasurementUnit(String measurementUnit) { + this.measurementUnit = measurementUnit; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public PropertyScope getPropertyScope() { + return propertyScope; + } + + public void setPropertyScope(PropertyScope propertyScope) { + this.propertyScope = propertyScope; + } + @Override public void accept(ITransformationRuleVisitor visitor) { visitor.visit(this); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java index a8f4be5f5b..7055ca29aa 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java @@ -34,6 +34,7 @@ public class Pipeline extends ElementComposition { private boolean running; private boolean restartOnSystemReboot; + private boolean valid = true; private long startedAt; private long createdAt; @@ -159,6 +160,14 @@ public void setHealthStatus(PipelineHealthStatus healthStatus) { this.healthStatus = healthStatus; } + public boolean isValid() { + return valid; + } + + public void setValid(boolean valid) { + this.valid = valid; + } + public Pipeline clone() { Pipeline pipeline = new Pipeline(); pipeline.setName(name); @@ -173,6 +182,8 @@ public Pipeline clone() { pipeline.setHealthStatus(healthStatus); pipeline.setPipelineNotifications(pipelineNotifications); pipeline.setRev(rev); + pipeline.setValid(valid); + return pipeline; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/schema/EventProperty.java b/streampipes-model/src/main/java/org/apache/streampipes/model/schema/EventProperty.java index 9cc33eae6b..73ea4da700 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/schema/EventProperty.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/schema/EventProperty.java @@ -26,7 +26,9 @@ import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; @JsonSubTypes({ @@ -57,10 +59,13 @@ public abstract class EventProperty { private String runtimeId; + private Map additionalMetadata; + public EventProperty() { this.elementId = ElementIdGenerator.makeElementId(EventProperty.class); this.domainProperties = new ArrayList<>(); + this.additionalMetadata = new HashMap<>(); } public EventProperty(EventProperty other) { @@ -73,6 +78,7 @@ public EventProperty(EventProperty other) { this.propertyScope = other.getPropertyScope(); this.runtimeId = other.getRuntimeId(); this.index = other.getIndex(); + this.additionalMetadata = other.getAdditionalMetadata(); } public EventProperty(List subClassOf) { @@ -168,6 +174,14 @@ public void setElementId(String elementId) { this.elementId = elementId; } + public Map getAdditionalMetadata() { + return additionalMetadata; + } + + public void setAdditionalMetadata(Map additionalMetadata) { + this.additionalMetadata = additionalMetadata; + } + @Override public int hashCode() { return Objects.hash(elementId, label, description, runtimeName, required, domainProperties, propertyScope, index, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java index 3969c159b8..cbfa7ecfaa 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java @@ -27,6 +27,7 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.matching.MatchingResultMessage; import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.message.EdgeValidationStatusType; import org.apache.streampipes.model.message.Notification; import org.apache.streampipes.model.message.PipelineEdgeValidation; import org.apache.streampipes.model.message.PipelineModificationMessage; @@ -34,6 +35,7 @@ import org.apache.streampipes.model.pipeline.PipelineModification; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,6 +72,11 @@ public PipelineModificationMessage buildPipelineModificationMessage() { List edgesWithoutConnectedStream = collectEdgesWithoutStream(modifications); edgeValidations.addAll(edgesWithoutConnectedStream); message.setEdgeValidations(edgeValidations); + message.setPipelineValid( + edgeValidations + .stream() + .allMatch(e -> e.getStatus().getValidationStatusType() == EdgeValidationStatusType.COMPLETE) + && message.getPipelineModifications().stream().allMatch(PipelineModification::isPipelineElementValid)); return message; } @@ -90,13 +97,13 @@ private void addModification(NamedStreamPipesEntity source, modification.setElementId(t.getElementId()); try { pipelineValidator.apply(source, t, targets, validationInfos); - buildModification(modification, t); + buildModification(modification, t, t.getInputStreams(), true); edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDom(), t.getDom())); } catch (SpValidationException e) { - //e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString()))); + e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString()))); edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.invalid(source.getDom(), t.getDom(), toNotifications(e.getErrorLog()))); - modification.setPipelineElementValid(false); + buildModification(modification, t, Collections.emptyList(), false); } modification.setValidationInfos(validationInfos); this.pipelineModifications.put(t.getDom(), modification); @@ -115,14 +122,16 @@ private List toList(Map map) { } private void buildModification(PipelineModification modification, - InvocableStreamPipesEntity t) { + InvocableStreamPipesEntity t, + List inputStreams, + boolean valid) { if (t instanceof DataProcessorInvocation) { modification.setOutputStrategies(((DataProcessorInvocation) t).getOutputStrategies()); modification.setOutputStream(((DataProcessorInvocation) t).getOutputStream()); } - modification.setInputStreams(t.getInputStreams()); + modification.setInputStreams(inputStreams); modification.setStaticProperties(t.getStaticProperties()); - modification.setPipelineElementValid(true); + modification.setPipelineElementValid(valid); } private Set getConnections(NamedStreamPipesEntity source) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java index 0775550c7c..ac3bed9550 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java @@ -23,6 +23,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.message.PipelineModificationMessage; import org.apache.streampipes.model.pipeline.Pipeline; @@ -46,27 +47,34 @@ public PipelineModificationMessage verifyPipeline() { return new PipelineModificationGenerator(graph).buildPipelineModificationMessage(); } + public Pipeline makeModifiedPipeline() { + var allElements = verifyAndBuildGraphs(false); + pipeline.setSepas(filterAndConvert(allElements, DataProcessorInvocation.class)); + pipeline.setActions(filterAndConvert(allElements, DataSinkInvocation.class)); + return pipeline; + } + + private List filterAndConvert(List elements, + Class clazz) { + return elements + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .toList(); + } + public List verifyAndBuildGraphs(boolean ignoreUnconfigured) { - List pipelineModifications = verifyPipeline().getPipelineModifications(); - List allElements = new AllElementsProvider(pipeline).getAllElements(); - List result = new ArrayList<>(); + var pipelineModifications = verifyPipeline().getPipelineModifications(); + var allElements = new AllElementsProvider(pipeline).getAllElements(); + var result = new ArrayList(); allElements.forEach(pipelineElement -> { - Optional modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications); + var modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications); if (modificationOpt.isPresent()) { PipelineModification modification = modificationOpt.get(); if (pipelineElement instanceof InvocableStreamPipesEntity) { - ((InvocableStreamPipesEntity) pipelineElement).setInputStreams(modification.getInputStreams()); - ((InvocableStreamPipesEntity) pipelineElement).setStaticProperties(modification.getStaticProperties()); + applyModificationsForInvocable((InvocableStreamPipesEntity) pipelineElement, modification); if (pipelineElement instanceof DataProcessorInvocation) { - ((DataProcessorInvocation) pipelineElement).setOutputStream(modification.getOutputStream()); - if (((DataProcessorInvocation) pipelineElement).getOutputStream().getEventGrounding() == null) { - EventGrounding grounding = - new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); - ((DataProcessorInvocation) pipelineElement).getOutputStream().setEventGrounding(grounding); - } - if (modification.getOutputStrategies() != null) { - ((DataProcessorInvocation) pipelineElement).setOutputStrategies(modification.getOutputStrategies()); - } + applyModificationsForDataProcessor((DataProcessorInvocation) pipelineElement, modification); } } if (!ignoreUnconfigured || modification.isPipelineElementValid()) { @@ -80,6 +88,27 @@ public List verifyAndBuildGraphs(boolean ignoreUnconfigu return result; } + private void applyModificationsForDataProcessor(DataProcessorInvocation pipelineElement, + PipelineModification modification) { + if (modification.getOutputStream() != null) { + pipelineElement.setOutputStream(modification.getOutputStream()); + if (pipelineElement.getOutputStream().getEventGrounding() == null) { + EventGrounding grounding = + new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); + pipelineElement.getOutputStream().setEventGrounding(grounding); + } + } + if (modification.getOutputStrategies() != null) { + pipelineElement.setOutputStrategies(modification.getOutputStrategies()); + } + } + + private void applyModificationsForInvocable(InvocableStreamPipesEntity pipelineElement, + PipelineModification modification) { + pipelineElement.setInputStreams(modification.getInputStreams()); + pipelineElement.setStaticProperties(modification.getStaticProperties()); + } + private Optional getModification(String id, List modifications) { return modifications.stream().filter(m -> m.getDomId().equals(id)).findFirst(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java index 079ccf50c6..ea552f02f5 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java @@ -28,14 +28,12 @@ public class MappingPropertyCalculator { - private EventSchema schema; - private List availablePropertySelectors; - private EventProperty requirement; + private final EventSchema schema; + private final List availablePropertySelectors; + private final EventProperty requirement; - public MappingPropertyCalculator() { - } - - public MappingPropertyCalculator(EventSchema schema, List availablePropertySelectors, + public MappingPropertyCalculator(EventSchema schema, + List availablePropertySelectors, EventProperty requirement) { this.schema = schema; this.availablePropertySelectors = availablePropertySelectors; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java index 8cf5c9e459..e691f285c0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java @@ -67,13 +67,16 @@ public void apply(NamedStreamPipesEntity source, .setEventGrounding(selectedGrounding); } - target - .getInputStreams() - .get(getIndex(target)) - .setEventGrounding(selectedGrounding); + if (!target.getInputStreams().isEmpty()) { - if (target.getInputStreams().size() > 1) { - this.visitorHistory.put(target.getDom(), 1); + target + .getInputStreams() + .get(getIndex(target)) + .setEventGrounding(selectedGrounding); + + if (target.getInputStreams().size() > 1) { + this.visitorHistory.put(target.getDom(), 1); + } } } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java index caa87744dc..5274fff602 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java @@ -87,7 +87,13 @@ public void visit(MappingPropertyNary mappingPropertyNary) { .stream() .filter(p -> mappingPropertyNary.getMapsFromOptions().contains(p)) .collect(Collectors.toList())); - validationInfos.add(PipelineElementValidationInfo.info("Auto-updated invalid field selection")); + var info = PipelineElementValidationInfo.info( + String.format( + "Auto-updated invalid field selection: Fields updated to %s", + mappingPropertyNary.getSelectedProperties().toString() + ) + ); + validationInfos.add(info); } } @@ -95,10 +101,18 @@ public void visit(MappingPropertyNary mappingPropertyNary) { public void visit(MappingPropertyUnary mappingPropertyUnary) { if (existsSelection(mappingPropertyUnary)) { if (!(mappingPropertyUnary.getMapsFromOptions().contains(mappingPropertyUnary.getSelectedProperty()))) { - if (mappingPropertyUnary.getMapsFromOptions().size() > 0) { + if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()) { + String existingSelector = mappingPropertyUnary.getSelectedProperty(); String firstSelector = mappingPropertyUnary.getMapsFromOptions().get(0); mappingPropertyUnary.setSelectedProperty(firstSelector); - validationInfos.add(PipelineElementValidationInfo.info("Auto-updated invalid field selection")); + var info = PipelineElementValidationInfo.info( + String.format( + "Auto-updated invalid field selection: Selected field %s was changed to %s", + existingSelector, + firstSelector + ) + ); + validationInfos.add(info); } } } else { @@ -142,10 +156,10 @@ public List getValidationInfos() { } private boolean existsSelection(MappingPropertyUnary mappingProperty) { - return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().equals("")); + return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().isEmpty()); } private boolean existsSelection(MappingPropertyNary mappingProperty) { - return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().size() == 0); + return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().isEmpty()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java index 6b7a0bfeff..2cb0d2557c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java @@ -36,7 +36,7 @@ public void apply(NamedStreamPipesEntity source, Set allTargets, List validationInfos) throws SpValidationException { if (target instanceof DataProcessorInvocation) { - if (target.getInputStreams() == null) { + if (target.getInputStreams() == null || target.getInputStreams().isEmpty()) { target.setInputStreams(new ArrayList<>()); for (int i = 0; i < target.getStreamRequirements().size(); i++) { target.getInputStreams().add(new SpDataStream()); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java index c8cc072b12..e849aaa7ea 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java @@ -136,8 +136,12 @@ public static List getPipelinesContainingElements(String elementId) { private static Stream mergePipelineElement(Pipeline pipeline) { return Stream.concat( - Stream.concat(pipeline.getStreams().stream(), pipeline.getSepas().stream()), - pipeline.getActions().stream()); + Stream.concat( + pipeline.getStreams().stream(), + pipeline.getSepas().stream() + ), + pipeline.getActions().stream() + ); } private static void preparePipelineBasics(String username, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java index d6268d661b..a87a5a3721 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java @@ -38,6 +38,9 @@ import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.storage.management.StorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -45,6 +48,8 @@ public class ElementRecommender { + private static final Logger LOG = LoggerFactory.getLogger(ElementRecommender.class); + private final Pipeline pipeline; private final String baseRecDomId; private final PipelineElementRecommendationMessage recommendationMessage; @@ -65,11 +70,11 @@ public PipelineElementRecommendationMessage findRecommendedElements() throws NoS Optional outputStream = getOutputStream(elementsProvider); outputStream.ifPresent(spDataStream -> validate(spDataStream, getAll())); } catch (Exception e) { - e.printStackTrace(); + LOG.warn("Could not find root node or output stream of provided pipeline"); return recommendationMessage; } - if (recommendationMessage.getPossibleElements().size() == 0) { + if (recommendationMessage.getPossibleElements().isEmpty()) { throw new NoSuitableSepasAvailableException(); } else { recommendationMessage diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java index b42f9e02d0..25a17b5732 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java @@ -237,6 +237,7 @@ public Response overwritePipeline(@PathParam("pipelineId") String pipelineId, Pi storedPipeline.setPipelineCategories(pipeline.getPipelineCategories()); storedPipeline.setHealthStatus(pipeline.getHealthStatus()); storedPipeline.setPipelineNotifications(pipeline.getPipelineNotifications()); + storedPipeline.setValid(pipeline.isValid()); Operations.updatePipeline(storedPipeline); SuccessMessage message = Notifications.success("Pipeline modified"); message.addNotification(new Notification("id", pipelineId)); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java index 7de6ed08c6..2f71f1444d 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java @@ -20,6 +20,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.message.Notifications; import org.apache.streampipes.model.monitoring.SpLogMessage; @@ -79,12 +80,9 @@ public Response addAdapter(AdapterDescription adapterDescription) { @Produces(MediaType.APPLICATION_JSON) @PreAuthorize(AuthConstants.HAS_WRITE_ADAPTER_PRIVILEGE) public Response updateAdapter(AdapterDescription adapterDescription) { - var principalSid = getAuthenticatedUserSid(); - var username = getAuthenticatedUsername(); - LOG.info("User: " + username + " updates adapter " + adapterDescription.getElementId()); - + var updateManager = new AdapterUpdateManagement(managementService); try { - managementService.updateAdapter(adapterDescription, principalSid); + updateManager.updateAdapter(adapterDescription); } catch (AdapterException e) { LOG.error("Error while updating adapter with id " + adapterDescription.getElementId(), e); return ok(Notifications.error(e.getMessage())); @@ -93,6 +91,18 @@ public Response updateAdapter(AdapterDescription adapterDescription) { return ok(Notifications.success(adapterDescription.getElementId())); } + @PUT + @JacksonSerialized + @Produces(MediaType.APPLICATION_JSON) + @PreAuthorize(AuthConstants.HAS_WRITE_ADAPTER_PRIVILEGE) + @Path("pipeline-migration-preflight") + public Response performPipelineMigrationPreflight(AdapterDescription adapterDescription) { + var updateManager = new AdapterUpdateManagement(managementService); + var migrations = updateManager.checkPipelineMigrations(adapterDescription); + + return ok(migrations); + } + @GET @JacksonSerialized @Path("/{id}") @@ -148,7 +158,7 @@ public Response startAdapter(@PathParam("id") String adapterId) { public Response deleteAdapter(@PathParam("id") String elementId) { List pipelinesUsingAdapter = getPipelinesUsingAdapter(elementId); - if (pipelinesUsingAdapter.size() == 0) { + if (pipelinesUsingAdapter.isEmpty()) { try { managementService.deleteAdapter(elementId); return ok(Notifications.success("Adapter with id: " + elementId + " is deleted.")); diff --git a/ui/cypress/support/utils/ConnectEventSchemaUtils.ts b/ui/cypress/support/utils/ConnectEventSchemaUtils.ts index 2172d066ee..1d933399ff 100644 --- a/ui/cypress/support/utils/ConnectEventSchemaUtils.ts +++ b/ui/cypress/support/utils/ConnectEventSchemaUtils.ts @@ -121,10 +121,16 @@ export class ConnectEventSchemaUtils { cy.dataCy('edit-' + propertyName.toLowerCase(), { timeout: 10000, }).click({ force: true }); - cy.dataCy('connect-schema-unit-from-input', { timeout: 10000 }).should( - 'have.value', - fromUnit, - ); + + cy.dataCy('connect-schema-unit-from-dropdown').eq(0).clear(); + cy.dataCy('connect-schema-unit-from-dropdown').eq(0).type(fromUnit); + cy.dataCy('connect-schema-unit-from-dropdown', { + timeout: 10000, + }).should('have.value', fromUnit); + //cy.dataCy("connect-schema-unit-transform-btn").click(); + cy.dataCy('connect-schema-unit-to-dropdown') + .contains(toUnit) + .click({ force: true }); cy.dataCy('connect-schema-unit-to-dropdown', { timeout: 10000, }).contains(toUnit); @@ -133,7 +139,7 @@ export class ConnectEventSchemaUtils { 'have.length', 1, ); - cy.dataCy('sp-save-edit-property').click(); + cy.dataCy('sp-save-edit-property').click({ force: true }); } public static addStaticProperty( @@ -146,12 +152,15 @@ export class ConnectEventSchemaUtils { cy.wait(100); // Edit new property - cy.dataCy('edit-key_0', { timeout: 10000 }).click(); - - cy.dataCy('connect-edit-field-runtime-name', { timeout: 10000 }).type( + cy.dataCy('connect-add-field-name', { timeout: 10000 }).type( '{backspace}{backspace}{backspace}{backspace}{backspace}' + propertyName, ); + cy.dataCy('connect-add-field-name-button').click(); + + cy.dataCy('edit-' + propertyName.toLowerCase()).click(); + + cy.dataCy('connect-edit-field-static-value').clear(); cy.dataCy('connect-edit-field-static-value', { timeout: 10000 }).type( propertyValue, ); @@ -212,7 +221,7 @@ export class ConnectEventSchemaUtils { cy.dataCy('sp-connect-schema-editor', { timeout: 10000 }).should( 'be.visible', ); - cy.get('#event-schema-next-button').click(); + cy.dataCy('sp-event-schema-next-button').click(); } public static clickEditProperty(propertyName: string) { diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts index facf4c2d34..b6a3f28e80 100644 --- a/ui/cypress/support/utils/connect/ConnectUtils.ts +++ b/ui/cypress/support/utils/connect/ConnectUtils.ts @@ -260,7 +260,7 @@ export class ConnectUtils { adapterConfiguration: AdapterInput, expectedFile: string, ignoreTime: boolean, - waitTime = 0, + waitTime = 1000, ) { ConnectUtils.startAdapter(adapterConfiguration, true); diff --git a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts index 5bad56c0a0..28e4fe04a2 100644 --- a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts +++ b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts @@ -32,9 +32,6 @@ describe('Test Edit Adapter', () => { ConnectUtils.goToConnect(); - // check that edit button is deactivated while adapter is running - ConnectBtns.editAdapter().should('be.disabled'); - // stop adapter ConnectBtns.stopAdapter().click(); @@ -52,12 +49,7 @@ describe('Test Edit Adapter', () => { ConnectUtils.configureAdapter(newUserConfiguration); - cy.get('.schema-validation-text-warning').contains('Edit mode'); - // Update event schema - ConnectBtns.refreshSchema().click(); - - cy.dataCy('schema-validation-ok').should('not.exist'); ConnectUtils.finishEventSchemaConfiguration(); @@ -65,7 +57,7 @@ describe('Test Edit Adapter', () => { ConnectBtns.storeEditAdapter().click(); - cy.dataCy('info-adapter-successfully-updated', { + cy.dataCy('sp-connect-adapter-success-added', { timeout: 60000, }).should('be.visible'); diff --git a/ui/cypress/tests/adapter/editAdapterWithPipeline.smoke.spec.ts b/ui/cypress/tests/adapter/editAdapterWithPipeline.smoke.spec.ts deleted file mode 100644 index 07cf664318..0000000000 --- a/ui/cypress/tests/adapter/editAdapterWithPipeline.smoke.spec.ts +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { ConnectUtils } from '../../support/utils/connect/ConnectUtils'; -import { PipelineUtils } from '../../support/utils/PipelineUtils'; -import { ConnectBtns } from '../../support/utils/connect/ConnectBtns'; - -describe('Test Edit Adapter', () => { - beforeEach('Setup Test', () => { - // To set up test add a stream adapter that can be configured - cy.initStreamPipesTest(); - - ConnectUtils.addMachineDataSimulator('simulator', true); - }); - - it('Perform Test', () => { - // ensure edit mode is disabled - ConnectUtils.goToConnect(); - ConnectBtns.editAdapter().should('be.disabled'); - - // stop adapter - ConnectBtns.stopAdapter().click(); - ConnectBtns.startAdapter().should('be.visible'); - - // ensure edit mode is still disabled - ConnectBtns.editAdapter().should('not.be.disabled'); - ConnectBtns.editAdapter().click(); - - cy.dataCy('can-not-edit-adapter-dialog-warning').should('be.visible'); - cy.dataCy('can-not-edit-adapter-dialog-close').click(); - - // Delete pipeline - PipelineUtils.deletePipeline(); - - // edit mode is enabled - ConnectUtils.goToConnect(); - ConnectBtns.editAdapter().should('not.be.disabled'); - }); -}); diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts index af3c34c37c..a8a97c3778 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts @@ -22,7 +22,11 @@ import { map } from 'rxjs/operators'; import { Observable } from 'rxjs'; import { PlatformServicesCommons } from './commons.service'; -import { AdapterDescription, Message } from '../model/gen/streampipes-model'; +import { + AdapterDescription, + Message, + PipelineUpdateInfo, +} from '../model/gen/streampipes-model'; @Injectable({ providedIn: 'root' }) export class AdapterService { @@ -95,6 +99,23 @@ export class AdapterService { .pipe(map(response => Message.fromData(response as any))); } + performPipelineMigrationPreflight( + adapter: AdapterDescription, + ): Observable { + return this.http + .put( + `${this.connectPath}/master/adapters/pipeline-migration-preflight`, + adapter, + ) + .pipe( + map(response => { + return (response as any[]).map(p => + PipelineUpdateInfo.fromData(p), + ); + }), + ); + } + get adapterMasterUrl() { return `${this.connectPath}/master/adapters/`; } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 04a5fb54cf..1d04b839d6 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2023-10-19 12:31:37. +// Generated using typescript-generator version 3.2.1263 on 2023-10-24 20:02:02. export class NamedStreamPipesEntity { '@class': @@ -319,7 +319,13 @@ export class AddTimestampRuleDescription extends ValueTransformationRuleDescript export class AddValueTransformationRuleDescription extends ValueTransformationRuleDescription { '@class': 'org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription'; + 'datatype': string; + 'description': string; + 'label': string; + 'measurementUnit': string; + 'propertyScope': PropertyScope; 'runtimeKey': string; + 'semanticType': string; 'staticValue': string; static 'fromData'( @@ -331,7 +337,13 @@ export class AddValueTransformationRuleDescription extends ValueTransformationRu } const instance = target || new AddValueTransformationRuleDescription(); super.fromData(data, instance); + instance.datatype = data.datatype; + instance.description = data.description; + instance.label = data.label; + instance.measurementUnit = data.measurementUnit; + instance.propertyScope = data.propertyScope; instance.runtimeKey = data.runtimeKey; + instance.semanticType = data.semanticType; instance.staticValue = data.staticValue; return instance; } @@ -1535,6 +1547,7 @@ export class EventProperty { | 'org.apache.streampipes.model.schema.EventPropertyList' | 'org.apache.streampipes.model.schema.EventPropertyNested' | 'org.apache.streampipes.model.schema.EventPropertyPrimitive'; + 'additionalMetadata': { [index: string]: any }; 'description': string; 'domainProperties': string[]; 'elementId': string; @@ -1554,6 +1567,9 @@ export class EventProperty { } const instance = target || new EventProperty(); instance['@class'] = data['@class']; + instance.additionalMetadata = __getCopyObjectFn(__identity())( + data.additionalMetadata, + ); instance.description = data.description; instance.domainProperties = __getCopyArrayFn(__identity())( data.domainProperties, @@ -1919,6 +1935,10 @@ export class GuessSchema { 'eventPreview': string[]; 'eventSchema': EventSchema; 'fieldStatusInfo': { [index: string]: FieldStatusInfo }; + 'modifiedRules': TransformationRuleDescriptionUnion[]; + 'removedProperties': EventPropertyUnion[]; + 'targetSchema': EventSchema; + 'updateNotifications': Notification[]; static 'fromData'(data: GuessSchema, target?: GuessSchema): GuessSchema { if (!data) { @@ -1933,6 +1953,16 @@ export class GuessSchema { instance.fieldStatusInfo = __getCopyObjectFn(FieldStatusInfo.fromData)( data.fieldStatusInfo, ); + instance.modifiedRules = __getCopyArrayFn( + TransformationRuleDescription.fromDataUnion, + )(data.modifiedRules); + instance.removedProperties = __getCopyArrayFn( + EventProperty.fromDataUnion, + )(data.removedProperties); + instance.targetSchema = EventSchema.fromData(data.targetSchema); + instance.updateNotifications = __getCopyArrayFn(Notification.fromData)( + data.updateNotifications, + ); return instance; } } @@ -2448,6 +2478,7 @@ export class Pipeline extends ElementComposition { restartOnSystemReboot: boolean; running: boolean; startedAt: number; + valid: boolean; static fromData(data: Pipeline, target?: Pipeline): Pipeline { if (!data) { @@ -2473,6 +2504,7 @@ export class Pipeline extends ElementComposition { instance.restartOnSystemReboot = data.restartOnSystemReboot; instance.running = data.running; instance.startedAt = data.startedAt; + instance.valid = data.valid; return instance; } } @@ -2961,6 +2993,32 @@ export class PipelineTemplateInvocation { } } +export class PipelineUpdateInfo { + canAutoMigrate: boolean; + migrationInfo: string; + pipelineId: string; + pipelineName: string; + validationInfos: { [index: string]: PipelineElementValidationInfo[] }; + + static fromData( + data: PipelineUpdateInfo, + target?: PipelineUpdateInfo, + ): PipelineUpdateInfo { + if (!data) { + return data; + } + const instance = target || new PipelineUpdateInfo(); + instance.canAutoMigrate = data.canAutoMigrate; + instance.migrationInfo = data.migrationInfo; + instance.pipelineId = data.pipelineId; + instance.pipelineName = data.pipelineName; + instance.validationInfos = __getCopyObjectFn( + __getCopyArrayFn(PipelineElementValidationInfo.fromData), + )(data.validationInfos); + return instance; + } +} + export class ProducedMessagesInfo extends MessagesInfo { totalProducedMessages: number; totalProducedMessagesSincePipelineStart: number; @@ -3521,6 +3579,7 @@ export class SpServiceRegistration { port: number; rev: string; scheme: string; + serviceUrl: string; svcGroup: string; svcId: string; svcType: string; @@ -3541,6 +3600,7 @@ export class SpServiceRegistration { instance.port = data.port; instance.rev = data.rev; instance.scheme = data.scheme; + instance.serviceUrl = data.serviceUrl; instance.svcGroup = data.svcGroup; instance.svcId = data.svcId; instance.svcType = data.svcType; @@ -4002,6 +4062,12 @@ export type OutputStrategyUnion = export type PipelineHealthStatus = 'OK' | 'REQUIRES_ATTENTION' | 'FAILURE'; +export type PropertyScope = + | 'HEADER_PROPERTY' + | 'DIMENSION_PROPERTY' + | 'MEASUREMENT_PROPERTY' + | 'NONE'; + export type SelectionStaticPropertyUnion = | AnyStaticProperty | OneOfStaticProperty; @@ -4020,8 +4086,7 @@ export type SpServiceTagPrefix = | 'ADAPTER' | 'DATA_STREAM' | 'DATA_PROCESSOR' - | 'DATA_SINK' - | 'DATA_SET'; + | 'DATA_SINK'; export type StaticPropertyType = | 'AnyStaticProperty' diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.html index 96ecde4a98..e1380b45f2 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.html +++ b/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.html @@ -48,12 +48,9 @@ Configure fields Start Adapter marked as timestamp +
+ + S + +
Measurement + >Measurement + Dimension + >Dimension + Header diff --git a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.scss b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.scss index 60d1eb5f31..c3c5c92172 100644 --- a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.scss +++ b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.scss @@ -90,3 +90,14 @@ width: 170px; text-align: center; } + +.static-value-info-outer { + width: 20px; +} + +.static-value-info { + background: var(--color-default-text); + color: var(--color-bg-0); + padding: 3px; + border-radius: 5px; +} diff --git a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.ts b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.ts index e9c970e158..ac24ea577b 100644 --- a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-property-row/event-property-row.component.ts @@ -19,7 +19,6 @@ import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core'; import { UUID } from 'angular2-uuid'; import { TreeNode } from '@circlon/angular-tree-component'; -import { MatDialog } from '@angular/material/dialog'; import { EventProperty, EventPropertyList, @@ -28,9 +27,12 @@ import { EventPropertyUnion, EventSchema, FieldStatusInfo, + TransformationRuleDescription, } from '@streampipes/platform-services'; import { EditEventPropertyComponent } from '../../../../dialog/edit-event-property/edit-event-property.component'; import { DialogService, PanelType } from '@streampipes/shared-ui'; +import { StaticValueTransformService } from '../../../../services/static-value-transform.service'; +import { EventPropertyUtilsService } from '../../../../services/event-property-utils.service'; import { ShepherdService } from '../../../../../services/tour/shepherd.service'; @Component({ @@ -53,19 +55,24 @@ export class EventPropertyRowComponent implements OnInit { @Output() countSelectedChange = new EventEmitter(); label: string; + isPrimitive = false; isNested = false; isList = false; + isStaticValue = false; + timestampProperty = false; showFieldStatus = false; runtimeType: string; originalRuntimeType: string; originalRuntimeName: string; + originalProperty: EventPropertyUnion; constructor( - private dialog: MatDialog, + private staticValueService: StaticValueTransformService, private dialogService: DialogService, + private epUtils: EventPropertyUtilsService, private shepherdService: ShepherdService, ) {} @@ -74,27 +81,17 @@ export class EventPropertyRowComponent implements OnInit { this.isPrimitive = this.isEventPropertyPrimitive(this.node.data); this.isList = this.isEventPropertyList(this.node.data); this.isNested = this.isEventPropertyNested(this.node.data); + this.isStaticValue = this.staticValueService.isStaticValueProperty( + this.node.data.elementId, + ); this.timestampProperty = this.isTimestampProperty(this.node.data); if (this.node.data instanceof EventProperty) { - const originalProperty = this.findOriginalProperty( + this.originalProperty = this.epUtils.findPropertyByElementId( this.originalEventSchema.eventProperties, + this.node.data.elementId, ); - if (originalProperty) { - this.originalRuntimeName = originalProperty.runtimeName; - this.showFieldStatus = - this.fieldStatusInfo && - this.fieldStatusInfo[this.originalRuntimeName] !== - undefined; - if (this.isPrimitive) { - this.originalRuntimeType = this.parseType( - originalProperty.runtimeType, - ); - this.runtimeType = this.parseType( - (this.node.data as EventPropertyPrimitive).runtimeType, - ); - } - } + this.checkAndDisplayProperties(); } if (!this.node.data.propertyScope) { @@ -102,21 +99,27 @@ export class EventPropertyRowComponent implements OnInit { } } - private findOriginalProperty(properties: EventPropertyUnion[]): any { - let result: EventPropertyUnion | undefined; + private checkAndDisplayProperties() { + if (this.originalProperty) { + this.applyDisplayedProperties(this.originalProperty); + } else { + this.applyDisplayedProperties(this.node.data); + } + } - for (const property of properties) { - if (property.elementId === this.node.data.elementId) { - result = property; - break; - } else if (property instanceof EventPropertyNested) { - result = this.findOriginalProperty(property.eventProperties); - if (result) { - break; - } - } + private applyDisplayedProperties(ep: EventProperty) { + this.originalRuntimeName = ep.runtimeName; + this.showFieldStatus = + this.fieldStatusInfo && + this.fieldStatusInfo[this.originalRuntimeName] !== undefined; + if (this.isPrimitive) { + this.originalRuntimeType = this.parseType( + (ep as EventPropertyPrimitive).runtimeType, + ); + this.runtimeType = this.parseType( + (this.node.data as EventPropertyPrimitive).runtimeType, + ); } - return result; } private parseType(runtimeType: string) { @@ -174,6 +177,7 @@ export class EventPropertyRowComponent implements OnInit { width: '50vw', data: { property: data, + originalProperty: this.originalProperty, isEditable: this.isEditable, }, }); @@ -181,6 +185,8 @@ export class EventPropertyRowComponent implements OnInit { dialogRef.afterClosed().subscribe(refresh => { this.timestampProperty = this.isTimestampProperty(this.node.data); + this.label = this.getLabel(this.node.data); + this.checkAndDisplayProperties(); this.refreshTreeEmitter.emit(true); }); } diff --git a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-schema/event-schema.component.html b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-schema/event-schema.component.html index cabb6ba21b..0438a55142 100644 --- a/ui/src/app/connect/components/adapter-configuration/schema-editor/event-schema/event-schema.component.html +++ b/ui/src/app/connect/components/adapter-configuration/schema-editor/event-schema/event-schema.component.html @@ -33,8 +33,11 @@ (); - @Output() - adapterChange = new EventEmitter(); - @Output() - eventSchemaChange = new EventEmitter(); - @Output() - oldEventSchemaChange = new EventEmitter(); @Output() goBackEmitter: EventEmitter = new EventEmitter(); @@ -94,8 +91,7 @@ export class EventSchemaComponent implements OnChanges { @Output() clickNextEmitter: EventEmitter = new EventEmitter(); - @ViewChild(TreeComponent, { static: true }) - tree: TreeComponent; + _tree: TreeComponent; schemaGuess: GuessSchema = new GuessSchema(); countSelected = 0; @@ -103,7 +99,7 @@ export class EventSchemaComponent implements OnChanges { isError = false; isPreviewEnabled = false; errorMessage: SpLogMessage; - nodes: EventProperty[] = new Array(); + nodes: EventPropertyUnion[] = new Array(); validEventSchema = false; schemaErrorHints: UserErrorMessage[] = []; @@ -118,8 +114,8 @@ export class EventSchemaComponent implements OnChanges { }, allowDrop: (node, { parent }) => { return ( - parent.data.eventProperties !== undefined && - parent.parent !== null + parent.data instanceof EventPropertyNested || + parent.data.virtual ); }, displayField: 'runTimeName', @@ -146,28 +142,20 @@ export class EventSchemaComponent implements OnChanges { guessSchema => { this.eventPreview = guessSchema.eventPreview; this.fieldStatusInfo = guessSchema.fieldStatusInfo; - this.eventSchema = guessSchema.eventSchema; - this.eventSchema.eventProperties.sort((a, b) => { + this.targetSchema = guessSchema.targetSchema; + this.targetSchema.eventProperties.sort((a, b) => { return a.runtimeName < b.runtimeName ? -1 : 1; }); - this.eventSchemaChange.emit(this.eventSchema); this.schemaGuess = guessSchema; - this.validEventSchema = this.checkIfValid(this.eventSchema); - - this.oldEventSchema = EventSchema.fromData( - this.eventSchema, - new EventSchema(), - ); - this.oldEventSchemaChange.emit(this.oldEventSchema); - - this.refreshTree(); + this.originalSchema = guessSchema.eventSchema; + this.validEventSchema = this.checkIfValid(this.targetSchema); this.isEditable = true; this.isEditableChange.emit(true); this.isLoading = false; this.refreshedEventSchema = true; - + this.refreshTree(); if ( guessSchema.eventPreview && guessSchema.eventPreview.length > 0 @@ -179,17 +167,24 @@ export class EventSchemaComponent implements OnChanges { this.errorMessage = errorMessage.error; this.isError = true; this.isLoading = false; - this.eventSchema = new EventSchema(); + this.targetSchema = new EventSchema(); }, ); } public refreshTree(refreshPreview = true): void { - this.nodes = new Array(); - this.nodes.push(this.eventSchema as unknown as EventProperty); - this.validEventSchema = this.checkIfValid(this.eventSchema); - if (refreshPreview) { - this.updatePreview(); + if (this.targetSchema && this.targetSchema.eventProperties) { + this.nodes = new Array(); + this.nodes.push(...this.targetSchema.eventProperties); + this.validEventSchema = this.checkIfValid(this.targetSchema); + if (refreshPreview) { + this.updatePreview(); + } + setTimeout(() => { + if (this._tree) { + this._tree.treeModel.expandAll(); + } + }); } } @@ -202,8 +197,9 @@ export class EventSchemaComponent implements OnChanges { nested.eventProperties = []; nested.domainProperties = []; nested.runtimeName = 'nested'; + nested.additionalMetadata = {}; if (!eventProperty) { - this.eventSchema.eventProperties.push(nested); + this.targetSchema.eventProperties.push(nested); } else { eventProperty.eventProperties.push(nested); } @@ -211,7 +207,7 @@ export class EventSchemaComponent implements OnChanges { } public removeSelectedProperties(eventProperties?: any): void { - eventProperties = eventProperties || this.eventSchema.eventProperties; + eventProperties = eventProperties || this.targetSchema.eventProperties; for (let i = eventProperties.length - 1; i >= 0; --i) { if (eventProperties[i].eventProperties) { this.removeSelectedProperties( @@ -226,19 +222,20 @@ export class EventSchemaComponent implements OnChanges { this.refreshTree(); } - public addStaticValueProperty(): void { + public addStaticValueProperty(runtimeName: string): void { const eventProperty = new EventPropertyPrimitive(); eventProperty['@class'] = 'org.apache.streampipes.model.schema.EventPropertyPrimitive'; eventProperty.elementId = - 'http://eventProperty.de/staticValue/' + UUID.UUID(); + this.staticValueTransformService.makeDefaultElementId(); - eventProperty.runtimeName = 'key_0'; - (eventProperty as any).staticValue = ''; + eventProperty.runtimeName = runtimeName; eventProperty.runtimeType = this.dataTypesService.getStringTypeUrl(); eventProperty.domainProperties = []; + eventProperty.propertyScope = 'DIMENSION_PROPERTY'; + eventProperty.additionalMetadata = {}; - this.eventSchema.eventProperties.push(eventProperty); + this.targetSchema.eventProperties.push(eventProperty); this.refreshTree(); } @@ -255,17 +252,19 @@ export class EventSchemaComponent implements OnChanges { eventProperty.domainProperties = ['http://schema.org/DateTime']; eventProperty.propertyScope = 'HEADER_PROPERTY'; eventProperty.runtimeType = 'http://www.w3.org/2001/XMLSchema#long'; + eventProperty.additionalMetadata = {}; - this.eventSchema.eventProperties.push(eventProperty); + this.targetSchema.eventProperties.push(eventProperty); this.refreshTree(); } public updatePreview(): void { this.isPreviewEnabled = false; - this.transformationRuleService.setOldEventSchema(this.oldEventSchema); - this.transformationRuleService.setNewEventSchema(this.eventSchema); const ruleDescriptions = - this.transformationRuleService.getTransformationRuleDescriptions(); + this.transformationRuleService.getTransformationRuleDescriptions( + this.originalSchema, + this.targetSchema, + ); if (this.eventPreview && this.eventPreview.length > 0) { this.restService .getAdapterEventPreview({ @@ -298,10 +297,10 @@ export class EventSchemaComponent implements OnChanges { } private checkIfValid(eventSchema: EventSchema): boolean { - let hasTimestamp = false; + this.timestampPresent = false; eventSchema.eventProperties.forEach(p => { if (p.domainProperties.indexOf('http://schema.org/DateTime') > -1) { - hasTimestamp = true; + this.timestampPresent = true; } }); @@ -311,7 +310,7 @@ export class EventSchemaComponent implements OnChanges { this.setEventSchemaEditWarning(); } - if (!hasTimestamp) { + if (!this.timestampPresent) { this.schemaErrorHints.push( new UserErrorMessage( 'Missing Timestamp', @@ -338,6 +337,29 @@ export class EventSchemaComponent implements OnChanges { } } - return hasTimestamp; + return this.timestampPresent; + } + + getOriginalSchema(): EventSchema { + return this.originalSchema; + } + + getTargetSchema(): EventSchema { + this.targetSchema.eventProperties = this.nodes; + return this.targetSchema; + } + + onNodeMove(event: any) { + this.targetSchema.eventProperties = this.nodes; + this.updatePreview(); + } + + @ViewChild('tree') + set tree(treeComponent: TreeComponent) { + this._tree = treeComponent; + } + + get tree(): TreeComponent { + return this._tree; } } diff --git a/ui/src/app/connect/components/adapter-configuration/schema-editor/schema-editor-header/schema-editor-header.component.html b/ui/src/app/connect/components/adapter-configuration/schema-editor/schema-editor-header/schema-editor-header.component.html index 8fea1dd7b4..268fbae655 100644 --- a/ui/src/app/connect/components/adapter-configuration/schema-editor/schema-editor-header/schema-editor-header.component.html +++ b/ui/src/app/connect/components/adapter-configuration/schema-editor/schema-editor-header/schema-editor-header.component.html @@ -33,16 +33,51 @@ mat-button data-cy="connect-add-static-property" matTooltip="Add a static value to event" - (click)="addStaticValueProperty()" + [matMenuTriggerFor]="staticValueMenu" > add  Add static value + +
+
+
+
Field name
+ + + +
+
+ +
+
+
+
diff --git a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html index fa14b6045d..b3beb28264 100644 --- a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html +++ b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html @@ -325,7 +325,6 @@
matTooltip="Edit adapter" data-cy="edit-adapter" matTooltipPosition="above" - [disabled]="adapter.running" (click)="editAdapter(adapter)" > { - if (effectedPipelines.length > 0) { - this.dialogService.open(CanNotEditAdapterDialog, { - panelType: PanelType.STANDARD_PANEL, - title: 'No edit possible', - width: '50vw', - data: { - pipelines: effectedPipelines, - }, - }); - } else { - this.router.navigate([ - 'connect', - 'edit', - adapter.elementId, - ]); - } + // if (effectedPipelines.length > 0) { + // this.dialogService.open(CanNotEditAdapterDialog, { + // panelType: PanelType.STANDARD_PANEL, + // title: 'No edit possible', + // width: '50vw', + // data: { + // pipelines: effectedPipelines, + // }, + // }); + // } else { + this.router.navigate(['connect', 'edit', adapter.elementId]); + //} }); } diff --git a/ui/src/app/connect/connect.module.ts b/ui/src/app/connect/connect.module.ts index 7feaba1af8..3de5aa826e 100644 --- a/ui/src/app/connect/connect.module.ts +++ b/ui/src/app/connect/connect.module.ts @@ -35,7 +35,6 @@ import { MatInputModule } from '@angular/material/input'; import { AdapterStartedDialog } from './dialog/adapter-started/adapter-started-dialog.component'; import { DataTypesService } from './services/data-type.service'; import { StaticPropertyUtilService } from '../core-ui/static-properties/static-property-util.service'; -import { TransformationRuleService } from './services/transformation-rule.service'; import { ConnectService } from './services/connect.service'; import { AdapterDescriptionComponent } from './components/data-marketplace/adapter-description/adapter-description.component'; import { DataMarketplaceComponent } from './components/data-marketplace/data-marketplace.component'; @@ -85,6 +84,10 @@ import { SpAdapterDetailsMetricsComponent } from './components/adapter-details/a import { CanNotEditAdapterDialog } from './dialog/can-not-edit-adapter-dialog/can-not-edit-adapter-dialog.component'; import { AllAdapterActionsComponent } from './dialog/start-all-adapters/all-adapter-actions-dialog.component'; import { AdapterSettingsComponent } from './components/adapter-configuration/adapter-settings/adapter-settings.component'; +import { SpAdapterStartedLoadingComponent } from './dialog/adapter-started/adapter-started-loading/adapter-started-loading.component'; +import { SpAdapterStartedSuccessComponent } from './dialog/adapter-started/adapter-started-success/adapter-started-success.component'; +import { SpAdapterStartedUpdateMigrationComponent } from './dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component'; +import { SpAdapterStartedPreviewComponent } from './dialog/adapter-started/adapter-started-preview/adapter-started-preview.component'; @NgModule({ imports: [ @@ -178,6 +181,10 @@ import { AdapterSettingsComponent } from './components/adapter-configuration/ada SpAdapterDetailsLogsComponent, SpAdapterDetailsMetricsComponent, SpAdapterOptionsPanelComponent, + SpAdapterStartedPreviewComponent, + SpAdapterStartedLoadingComponent, + SpAdapterStartedSuccessComponent, + SpAdapterStartedUpdateMigrationComponent, SpAdapterTemplateDialogComponent, SpConnectFilterToolbarComponent, NewAdapterComponent, @@ -190,7 +197,6 @@ import { AdapterSettingsComponent } from './components/adapter-configuration/ada RestService, ConnectService, DataTypesService, - TransformationRuleService, StaticPropertyUtilService, UnitProviderService, TimestampPipe, diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html index a76b9dddb7..3e623b6140 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html @@ -19,110 +19,38 @@
-
-
- Loading - -
-
-

- Please wait while your new adapter is being generated... -

-
-
-
-
-

Adapter successfully updated

-
-
-
-
-
- warning -  {{ - templateErrorMessage.notifications[0].title - }} -
-
-
+
+ - - - -
-
+ -
-
- done -  Your new data stream is now available in - the pipeline editor. -
-
-
- -
-
-
-
- error - Something went wrong during the adapter - installation. -
-
+ *ngIf="adapterInstalled" + [adapterInstallationSuccessMessage]=" + adapterInstallationSuccessMessage + " + [adapterStatus]="adapterStatus" + [pipelineOperationStatus]="pipelineOperationStatus" + [saveInDataLake]="saveInDataLake" + [templateErrorMessage]="templateErrorMessage" + > +
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.scss b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.scss index 2d6f701343..704f843e4b 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.scss +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.scss @@ -17,17 +17,3 @@ */ @import 'src/scss/sp/sp-dialog'; - -.info-message { - text-align: center; - font-size: 14pt; - margin-top: 20px; - margin-bottom: 20px; -} - -.warning-message { - border: 1px solid var(--color-warn); - padding: 10px; - font-size: 12pt; - background: var(--color-bg-2); -} diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts index 73056e6462..f467d4f7d6 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts @@ -18,18 +18,17 @@ import { Component, Input, OnInit } from '@angular/core'; import { ShepherdService } from '../../../services/tour/shepherd.service'; -import { RestService } from '../../services/rest.service'; import { AdapterDescription, + AdapterService, ErrorMessage, Message, PipelineOperationStatus, PipelineTemplateService, - SpDataStream, + PipelineUpdateInfo, } from '@streampipes/platform-services'; import { DialogRef } from '@streampipes/shared-ui'; import { PipelineInvocationBuilder } from '../../../core-services/template/PipelineInvocationBuilder'; -import { AdapterService } from '../../../../../projects/streampipes/platform-services/src/lib/apis/adapter.service'; @Component({ selector: 'sp-dialog-adapter-started-dialog', @@ -39,7 +38,6 @@ import { AdapterService } from '../../../../../projects/streampipes/platform-ser export class AdapterStartedDialog implements OnInit { adapterInstalled = false; public adapterStatus: Message; - public streamDescription: SpDataStream; pollingActive = false; public pipelineOperationStatus: PipelineOperationStatus; @@ -71,63 +69,131 @@ export class AdapterStartedDialog implements OnInit { @Input() startAdapterNow = true; templateErrorMessage: ErrorMessage; + adapterUpdatePreflight = false; + adapterPipelineUpdateInfos: PipelineUpdateInfo[]; + loading = false; + loadingText = ''; + showPreview = false; + adapterInstallationSuccessMessage = ''; + adapterElementId = ''; constructor( public dialogRef: DialogRef, private adapterService: AdapterService, - private restService: RestService, private shepherdService: ShepherdService, private pipelineTemplateService: PipelineTemplateService, ) {} ngOnInit() { if (this.editMode) { - this.editAdapter(); + this.initAdapterUpdatePreflight(); } else { this.addAdapter(); } } - editAdapter() { - this.adapterService.updateAdapter(this.adapter).subscribe(status => { - this.adapterStatus = status; - this.adapterInstalled = true; - }); + initAdapterUpdatePreflight(): void { + this.loadingText = `Checking migrations for adapter ${this.adapter.name}`; + this.loading = true; + this.adapterService + .performPipelineMigrationPreflight(this.adapter) + .subscribe(res => { + if (res.length === 0) { + this.updateAdapter(); + } else { + this.adapterUpdatePreflight = true; + this.adapterPipelineUpdateInfos = res; + this.loading = false; + } + }); } - addAdapter() { - this.adapterService.addAdapter(this.adapter).subscribe(status => { - this.adapterStatus = status; - - if (status.success) { - const adapterElementId = status.notifications[0].title; + updateAdapter(): void { + this.loadingText = `Updating adapter ${this.adapter.name}`; + this.loading = true; + this.adapterService.updateAdapter(this.adapter).subscribe( + res => { + this.adapterStatus = res; + this.onAdapterReady( + `Adapter ${this.adapter.name} was successfully updated and is available in the pipeline editor.`, + ); + }, + error => { + this.onAdapterFailure(error.error.title); + }, + ); + } - if (this.saveInDataLake) { - this.startSaveInDataLakePipeline(adapterElementId, status); + addAdapter() { + this.loadingText = `Creating adapter ${this.adapter.name}`; + this.loading = true; + this.adapterService.addAdapter(this.adapter).subscribe( + status => { + this.adapterStatus = status; + if (status.success) { + const adapterElementId = status.notifications[0].title; + if (this.saveInDataLake) { + this.startSaveInDataLakePipeline(adapterElementId); + } else { + this.startAdapter(adapterElementId, true); + } } else { - this.startAdapter(status, adapterElementId); + const errorMsg = + status.notifications.length > 0 + ? status.notifications[0].title + : 'Unknown Error'; + this.onAdapterFailure(errorMsg); } - } - }); + }, + error => { + this.onAdapterFailure(error.error.title); + }, + ); } - startAdapter(status: Message, adapterElementId: string) { + startAdapter(adapterElementId: string, showPreview = false) { + const successMessage = + 'Your new data stream is now available in the pipeline editor.'; if (this.startAdapterNow) { + this.adapterElementId = adapterElementId; + this.loadingText = `Starting adapter ${this.adapter.name}`; this.adapterService .startAdapterByElementId(adapterElementId) - .subscribe(startStatus => { - this.showAdapterPreview(startStatus, adapterElementId); - }); + .subscribe( + startStatus => { + this.onAdapterReady(successMessage, showPreview); + }, + error => { + this.onAdapterFailure(error.error.title); + }, + ); } else { - this.showAdapterPreview(status, adapterElementId); + this.onAdapterReady(successMessage, false); } } - showAdapterPreview(status: Message, adapterElementId: string) { - // Start preview on streams and message for sets - if (status.success) { - this.getLiveViewPreview(adapterElementId); - this.adapterInstalled = true; + onAdapterFailure(errorMessageText: string) { + this.adapterInstalled = true; + this.adapterStatus = { + success: false, + elementName: this.adapter.name, + notifications: [ + { + title: errorMessageText, + description: '', + additionalInformation: '', + }, + ], + }; + this.loading = false; + } + + onAdapterReady(successMessage: string, showPreview = false): void { + this.adapterInstallationSuccessMessage = successMessage; + this.adapterInstalled = true; + this.loading = false; + if (showPreview) { + this.showPreview = true; } } @@ -137,21 +203,8 @@ export class AdapterStartedDialog implements OnInit { this.shepherdService.trigger('confirm_adapter_started_button'); } - private getLiveViewPreview(adapterElementId: string) { - this.adapterService.getAdapter(adapterElementId).subscribe(adapter => { - this.restService - .getSourceDetails(adapter.correspondingDataStreamElementId) - .subscribe(st => { - this.streamDescription = st; - this.pollingActive = true; - }); - }); - } - - private startSaveInDataLakePipeline( - adapterElementId: string, - message: Message, - ) { + private startSaveInDataLakePipeline(adapterElementId: string) { + this.loadingText = 'Creating pipeline to persist data stream'; this.adapterService.getAdapter(adapterElementId).subscribe(adapter => { const pipelineId = 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate'; @@ -184,15 +237,20 @@ export class AdapterStartedDialog implements OnInit { .createPipelineTemplateInvocation( pipelineInvocation, ) - .subscribe(pipelineOperationStatus => { - this.pipelineOperationStatus = - pipelineOperationStatus; - this.startAdapter(message, adapterElementId); - }); + .subscribe( + pipelineOperationStatus => { + this.pipelineOperationStatus = + pipelineOperationStatus; + this.startAdapter(adapterElementId, true); + }, + error => { + this.onAdapterFailure(error.error.title); + }, + ); }, res => { this.templateErrorMessage = res.error; - this.startAdapter(message, adapterElementId); + this.startAdapter(adapterElementId); }, ); }); diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.html new file mode 100644 index 0000000000..34024865d4 --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.html @@ -0,0 +1,34 @@ + + +
+
+ Loading + +
+
+

+ {{ textMessage }} +

+
+
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.ts new file mode 100644 index 0000000000..48f4f9338e --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-loading/adapter-started-loading.component.ts @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, Input } from '@angular/core'; + +@Component({ + selector: 'sp-adapter-started-loading', + templateUrl: './adapter-started-loading.component.html', +}) +export class SpAdapterStartedLoadingComponent { + @Input() + textMessage = ''; +} diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html new file mode 100644 index 0000000000..26ed16172e --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html @@ -0,0 +1,24 @@ + + +
+ +
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts new file mode 100644 index 0000000000..2b8fd3b27a --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, Input, OnDestroy, OnInit } from '@angular/core'; +import { AdapterService, SpDataStream } from '@streampipes/platform-services'; +import { RestService } from '../../../services/rest.service'; + +@Component({ + selector: 'sp-adapter-started-preview', + templateUrl: './adapter-started-preview.component.html', +}) +export class SpAdapterStartedPreviewComponent implements OnInit, OnDestroy { + @Input() + streamDescription: SpDataStream; + + @Input() + adapterElementId: string; + + pollingActive = false; + + constructor( + private adapterService: AdapterService, + private restService: RestService, + ) {} + + ngOnInit() { + this.getLiveViewPreview(); + } + + private getLiveViewPreview() { + this.adapterService + .getAdapter(this.adapterElementId) + .subscribe(adapter => { + this.restService + .getSourceDetails(adapter.correspondingDataStreamElementId) + .subscribe(st => { + this.streamDescription = st; + this.pollingActive = true; + }); + }); + } + + ngOnDestroy(): void { + this.pollingActive = false; + } +} diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html new file mode 100644 index 0000000000..85346b2101 --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html @@ -0,0 +1,88 @@ + + +
+
+
+
+ done +  {{ adapterInstallationSuccessMessage }} +
+
+
+
+
+ warning +  {{ templateErrorMessage.notifications[0].title }} +
+
+
+ + + +
+
+
+
+ error +

Something went wrong during the adapter installation.

+
+
+ {{ + adapterStatus.notifications.length > 0 + ? adapterStatus.notifications[0].title + : 'Unknown Error' + }} +
+
+
+
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.scss b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.scss new file mode 100644 index 0000000000..333c430cd6 --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.scss @@ -0,0 +1,33 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +.info-message { + text-align: center; + font-size: 14pt; + margin-top: 20px; + margin-bottom: 20px; + border: 1px solid var(--color-bg-2); + background: var(--color-bg-1); +} + +.warning-message { + border: 1px solid var(--color-warn); + padding: 10px; + font-size: 12pt; + background: var(--color-bg-2); +} diff --git a/ui/cypress/tests/adapter/randomDataSimulatorStream.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.ts similarity index 54% rename from ui/cypress/tests/adapter/randomDataSimulatorStream.ts rename to ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.ts index e20e1b7b73..69b8ad3375 100644 --- a/ui/cypress/tests/adapter/randomDataSimulatorStream.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.ts @@ -16,23 +16,31 @@ * */ -import { ConnectUtils } from '../../support/utils/connect/ConnectUtils'; -import { AdapterBuilder } from '../../support/builder/AdapterBuilder'; +import { Component, Input } from '@angular/core'; +import { + ErrorMessage, + Message, + PipelineOperationStatus, +} from '@streampipes/platform-services'; -describe('Test Random Data Simulator Stream Adapter', () => { - beforeEach('Setup Test', () => { - cy.initStreamPipesTest(); - }); +@Component({ + selector: 'sp-adapter-started-success', + templateUrl: './adapter-started-success.component.html', + styleUrls: ['./adapter-started-success.component.scss'], +}) +export class SpAdapterStartedSuccessComponent { + @Input() + templateErrorMessage: ErrorMessage; - it('Perform Test', () => { - const adapterInput = AdapterBuilder.create( - 'Random_Data_Simulator_\\(Stream\\)', - ) - .setName('Random Data Simulator Adapter Test') - .addInput('input', 'wait-time-ms', '1000') - .build(); + @Input() + adapterInstallationSuccessMessage = ''; - ConnectUtils.testAdapter(adapterInput); - ConnectUtils.deleteAdapter(); - }); -}); + @Input() + pipelineOperationStatus: PipelineOperationStatus; + + @Input() + saveInDataLake: boolean; + + @Input() + adapterStatus: Message; +} diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.html new file mode 100644 index 0000000000..85da7bdb47 --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.html @@ -0,0 +1,57 @@ + + +
+

+ This adapter is used by + {{ adapterPipelineUpdateInfos.length }} pipelines. +

+
+
+
+ check Pipeline {{ + updateInfo.pipelineName + }} will be automatically migrated. +
+
+ warning +  Pipeline {{ updateInfo.pipelineName }} will be stopped and needs manual review. +
+
+
+
+ warning + Please check and possibly modify existing dashboards and data views + afterwards. +
+
+ +
+
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.scss b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.scss new file mode 100644 index 0000000000..832b51d64d --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.scss @@ -0,0 +1,23 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +.migration-infos { + padding: 10px; + margin-top: 10px; + margin-bottom: 10px; +} diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.ts new file mode 100644 index 0000000000..7c02f77b52 --- /dev/null +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-update-migration/adapter-started-update-migration.component.ts @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, EventEmitter, Input, Output } from '@angular/core'; +import { PipelineUpdateInfo } from '@streampipes/platform-services'; + +@Component({ + selector: 'sp-adapter-started-update-migration', + templateUrl: './adapter-started-update-migration.component.html', + styleUrls: ['./adapter-started-update-migration.component.scss'], +}) +export class SpAdapterStartedUpdateMigrationComponent { + @Input() + adapterPipelineUpdateInfos: PipelineUpdateInfo[] = []; + + @Output() + startUpdateEmitter: EventEmitter = new EventEmitter(); +} diff --git a/ui/src/app/connect/dialog/edit-event-property/components/edit-unit-transformation/edit-unit-transformation.component.html b/ui/src/app/connect/dialog/edit-event-property/components/edit-unit-transformation/edit-unit-transformation.component.html index ba0b0cb630..cec9b61ff4 100644 --- a/ui/src/app/connect/dialog/edit-event-property/components/edit-unit-transformation/edit-unit-transformation.component.html +++ b/ui/src/app/connect/dialog/edit-event-property/components/edit-unit-transformation/edit-unit-transformation.component.html @@ -27,52 +27,35 @@ >
- + {{ unit.label }} - - -
+
+ +
+
+ sync_problem +
@@ -99,7 +115,11 @@ toggleRunningOperation ) " - [disabled]="!hasPipelineWritePrivileges || starting" + [disabled]=" + !hasPipelineWritePrivileges || + starting || + !pipeline.valid + " *ngIf="!pipeline.running" > play_arrow @@ -127,7 +147,7 @@ Name -

{{ pipeline.name }}

+

{{ pipeline.name }}

{{ pipeline.description !== '' ? pipeline.description : '-' diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss index 3550266599..dd97d20ab8 100644 --- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss +++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss @@ -28,8 +28,8 @@ } .mat-column-status { - width: 100px; - max-width: 100px; + width: 120px; + max-width: 120px; } .light { @@ -62,3 +62,11 @@ .ml-10 { margin-left: 10px; } + +.pipeline-notification { + color: #dede00; +} + +.notification-icon { + width: 50px; +}