From 943e406be0e9122eb0f93f19edb616bfb7577948 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Fri, 29 Sep 2023 12:52:59 +0200 Subject: [PATCH] feat(#1960): Allow modification of adapters with pipelines --- .../management/AdapterMasterManagement.java | 19 -- .../management/AdapterUpdateManagement.java | 208 ++++++++++++++++++ .../dataexplorer/commons/TimeSeriesStore.java | 2 +- .../DataExplorerSchemaManagement.java | 4 + .../connect/adapter/PipelineUpdateInfo.java | 80 +++++++ .../streampipes/model/pipeline/Pipeline.java | 11 + .../PipelineModificationGenerator.java | 21 +- .../PipelineVerificationHandlerV2.java | 59 +++-- .../mapping/MappingPropertyCalculator.java | 12 +- .../v2/pipeline/ApplyGroundingStep.java | 15 +- .../v2/pipeline/CheckCompletedVisitor.java | 24 +- .../matching/v2/pipeline/PrepareStep.java | 2 +- .../manager/pipeline/PipelineManager.java | 8 +- .../recommender/ElementRecommender.java | 3 +- .../rest/impl/PipelineResource.java | 1 + .../rest/impl/connect/AdapterResource.java | 22 +- .../src/lib/apis/adapter.service.ts | 23 +- .../src/lib/model/gen/streampipes-model.ts | 51 +++-- .../adapter-configuration.component.ts | 2 +- .../adapter-options-panel.component.html | 15 +- .../adapter-options-panel.component.scss | 3 +- ...start-adapter-configuration.component.html | 4 +- .../start-adapter-configuration.component.ts | 2 +- .../existing-adapters.component.html | 1 - .../existing-adapters.component.ts | 34 ++- .../adapter-started-dialog.component.html | 66 +++++- .../adapter-started-dialog.component.scss | 8 + .../adapter-started-dialog.component.ts | 39 +++- .../static-mapping-unary.component.ts | 1 - .../pipeline-assembly.component.ts | 4 + .../components/pipeline/pipeline.component.ts | 23 +- .../services/pipeline-validation.service.ts | 29 ++- .../pipeline-overview.component.html | 58 +++-- .../pipeline-overview.component.scss | 12 +- ui/src/app/pipelines/pipelines.component.ts | 1 - 35 files changed, 696 insertions(+), 171 deletions(-) create mode 100644 streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index 3debbb8d58..c8d16e4d33 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -91,15 +91,7 @@ public String addAdapter(AdapterDescription ad, return ad.getElementId(); } - public void updateAdapter(AdapterDescription ad, - String principalSid) - throws AdapterException { - // update adapter in database - this.adapterResourceManager.encryptAndUpdate(ad); - // update data source in database - this.updateDataSource(ad); - } public AdapterDescription getAdapter(String elementId) throws AdapterException { List allAdapters = adapterInstanceStorage.getAllAdapters(); @@ -180,16 +172,6 @@ public void startStreamAdapter(String elementId) throws AdapterException { } } - private void updateDataSource(AdapterDescription ad) { - // get data source - SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId()); - - SourcesManagement.updateDataStream(ad, dataStream); - - // Update data source in database - this.dataStreamResourceManager.update(dataStream); - } - private void installDataSource(SpDataStream stream, String principalSid, boolean publicElement) throws AdapterException { @@ -204,5 +186,4 @@ private void installDataSource(SpDataStream stream, private IAdapterStorage getAdapterInstanceStorage() { return new AdapterInstanceStorageImpl(); } - } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java new file mode 100644 index 0000000000..0b2b38a6ef --- /dev/null +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.management.management; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; +import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.pipeline.PipelineManager; +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.connect.adapter.PipelineUpdateInfo; +import org.apache.streampipes.model.message.PipelineModificationMessage; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; +import org.apache.streampipes.model.pipeline.PipelineHealthStatus; +import org.apache.streampipes.resource.management.AdapterResourceManager; +import org.apache.streampipes.resource.management.DataStreamResourceManager; +import org.apache.streampipes.resource.management.SpResourceManager; +import org.apache.streampipes.storage.management.StorageDispatcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class AdapterUpdateManagement { + + private static final Logger LOG = LoggerFactory.getLogger(AdapterUpdateManagement.class); + + private final AdapterMasterManagement adapterMasterManagement; + private final AdapterResourceManager adapterResourceManager; + private final DataStreamResourceManager dataStreamResourceManager; + + public AdapterUpdateManagement(AdapterMasterManagement adapterMasterManagement) { + this.adapterMasterManagement = adapterMasterManagement; + this.adapterResourceManager = new SpResourceManager().manageAdapters(); + this.dataStreamResourceManager = new SpResourceManager().manageDataStreams(); + } + + public void updateAdapter(AdapterDescription ad) + throws AdapterException { + // update adapter in database + this.adapterResourceManager.encryptAndUpdate(ad); + boolean shouldRestart = ad.isRunning(); + + if (ad.isRunning()) { + this.adapterMasterManagement.stopStreamAdapter(ad.getElementId()); + } + + // update data source in database + this.updateDataSource(ad); + + // update pipelines + var affectedPipelines = PipelineManager.getPipelinesContainingElements(ad.getCorrespondingDataStreamElementId()); + + affectedPipelines.forEach(p -> { + var shouldRestartPipeline = p.isRunning(); + if (shouldRestartPipeline) { + Operations.stopPipeline(p, true); + } + var storedPipeline = PipelineManager.getPipeline(p.getPipelineId()); + var pipeline = applyUpdatedDataStream(storedPipeline, ad); + try { + var modificationMessage = Operations.validatePipeline(pipeline); + var updateInfo = makeUpdateInfo(modificationMessage, pipeline); + var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline(); + var canAutoMigrate = canAutoMigrate(modificationMessage); + if (!canAutoMigrate) { + modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); + modifiedPipeline.setPipelineNotifications(toNotification(updateInfo)); + modifiedPipeline.setValid(false); + } + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(modifiedPipeline); + if (shouldRestartPipeline && canAutoMigrate) { + Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId())); + } + } catch (Exception e) { + LOG.error("Could not update pipeline {}", pipeline.getName(), e); + } + }); + + if (shouldRestart) { + this.adapterMasterManagement.startStreamAdapter(ad.getElementId()); + } + } + + public List checkPipelineMigrations(AdapterDescription adapterDescription) { + var affectedPipelines = PipelineManager + .getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId()); + var updateInfos = new ArrayList(); + + affectedPipelines.forEach(pipeline -> { + var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription); + try { + var modificationMessage = Operations.validatePipeline(updatedPipeline); + var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline); + updateInfos.add(updateInfo); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return updateInfos; + } + + private PipelineUpdateInfo makeUpdateInfo(PipelineModificationMessage modificationMessage, + Pipeline pipeline) { + var updateInfo = new PipelineUpdateInfo(); + updateInfo.setPipelineId(pipeline.getPipelineId()); + updateInfo.setPipelineName(pipeline.getName()); + updateInfo.setCanAutoMigrate(canAutoMigrate(modificationMessage)); + updateInfo.setValidationInfos(extractModificationWarnings(pipeline, modificationMessage)); + return updateInfo; + } + + private boolean canAutoMigrate(PipelineModificationMessage modificationMessage) { + return modificationMessage + .getPipelineModifications() + .stream() + .allMatch(m -> m.isPipelineElementValid() && m.getValidationInfos().isEmpty()); + } + + private List toNotification(PipelineUpdateInfo updateInfo) { + var notifications = new ArrayList(); + updateInfo.getValidationInfos().keySet().forEach((k) -> { + var msg = updateInfo + .getValidationInfos() + .get(k) + .stream() + .map(PipelineElementValidationInfo::getMessage) + .toList() + .toString(); + notifications.add(String.format("Adapter modification: %s: %s", k, msg)); + }); + return notifications; + } + + private Map> extractModificationWarnings( + Pipeline pipeline, + PipelineModificationMessage modificationMessage) { + var infos = new HashMap>(); + modificationMessage + .getPipelineModifications() + .stream() + .filter(v -> !v.getValidationInfos().isEmpty()) + .forEach(m -> infos.put(getPipelineElementName(pipeline, m.getElementId()), m.getValidationInfos())); + + return infos; + } + + private String getPipelineElementName(Pipeline pipeline, + String elementId) { + return Stream + .concat(pipeline.getSepas().stream(), pipeline.getActions().stream()) + .filter(p -> p.getElementId().equals(elementId)) + .findFirst() + .map(NamedStreamPipesEntity::getName) + .orElse(elementId); + } + + private Pipeline applyUpdatedDataStream(Pipeline originalPipeline, + AdapterDescription updatedAdapter) { + var updatedStreams = originalPipeline + .getStreams() + .stream() + .peek(s -> { + if (s.getElementId().equals(updatedAdapter.getCorrespondingDataStreamElementId())) { + s.setEventSchema(updatedAdapter.getEventSchema()); + } + }) + .toList(); + + originalPipeline.setStreams(updatedStreams); + + return originalPipeline; + } + + private void updateDataSource(AdapterDescription ad) { + // get data source + SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId()); + + SourcesManagement.updateDataStream(ad, dataStream); + + // Update data source in database + this.dataStreamResourceManager.update(dataStream); + } +} diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java index 99ab512818..5b063fff7c 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java @@ -43,7 +43,7 @@ public TimeSeriesStore(Environment environment, DataLakeMeasure measure, boolean enableImageStore) { - measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure); + DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure); if (enableImageStore) { // TODO check if event properties are replaces correctly diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java index f28cf6cbdb..8c4dc4a344 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java @@ -60,6 +60,10 @@ public DataLakeMeasure createMeasurement(DataLakeMeasure measure) { DataLakeMeasure oldEntry = optional.get(); if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), measure.getEventSchema().getEventProperties())) { + oldEntry.setEventSchema(measure.getEventSchema()); + oldEntry.setTimestampField(measure.getTimestampField()); + oldEntry.setPipelineName(measure.getPipelineName()); + getDataLakeStorage().updateDataLakeMeasure(oldEntry); return oldEntry; } } else { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java new file mode 100644 index 0000000000..0d9f79f47b --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/PipelineUpdateInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.connect.adapter; + +import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; +import org.apache.streampipes.model.shared.annotation.TsModel; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@TsModel +public class PipelineUpdateInfo { + + private String pipelineId; + private String pipelineName; + private boolean canAutoMigrate; + private String migrationInfo; + private Map> validationInfos; + + public PipelineUpdateInfo() { + this.validationInfos = new HashMap<>(); + } + + public String getPipelineId() { + return pipelineId; + } + + public void setPipelineId(String pipelineId) { + this.pipelineId = pipelineId; + } + + public String getPipelineName() { + return pipelineName; + } + + public void setPipelineName(String pipelineName) { + this.pipelineName = pipelineName; + } + + public boolean isCanAutoMigrate() { + return canAutoMigrate; + } + + public void setCanAutoMigrate(boolean canAutoMigrate) { + this.canAutoMigrate = canAutoMigrate; + } + + public String getMigrationInfo() { + return migrationInfo; + } + + public void setMigrationInfo(String migrationInfo) { + this.migrationInfo = migrationInfo; + } + + public Map> getValidationInfos() { + return validationInfos; + } + + public void setValidationInfos(Map> validationInfos) { + this.validationInfos = validationInfos; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java index a8f4be5f5b..7055ca29aa 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/Pipeline.java @@ -34,6 +34,7 @@ public class Pipeline extends ElementComposition { private boolean running; private boolean restartOnSystemReboot; + private boolean valid = true; private long startedAt; private long createdAt; @@ -159,6 +160,14 @@ public void setHealthStatus(PipelineHealthStatus healthStatus) { this.healthStatus = healthStatus; } + public boolean isValid() { + return valid; + } + + public void setValid(boolean valid) { + this.valid = valid; + } + public Pipeline clone() { Pipeline pipeline = new Pipeline(); pipeline.setName(name); @@ -173,6 +182,8 @@ public Pipeline clone() { pipeline.setHealthStatus(healthStatus); pipeline.setPipelineNotifications(pipelineNotifications); pipeline.setRev(rev); + pipeline.setValid(valid); + return pipeline; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java index 3969c159b8..cbfa7ecfaa 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java @@ -27,6 +27,7 @@ import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.matching.MatchingResultMessage; import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.message.EdgeValidationStatusType; import org.apache.streampipes.model.message.Notification; import org.apache.streampipes.model.message.PipelineEdgeValidation; import org.apache.streampipes.model.message.PipelineModificationMessage; @@ -34,6 +35,7 @@ import org.apache.streampipes.model.pipeline.PipelineModification; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,6 +72,11 @@ public PipelineModificationMessage buildPipelineModificationMessage() { List edgesWithoutConnectedStream = collectEdgesWithoutStream(modifications); edgeValidations.addAll(edgesWithoutConnectedStream); message.setEdgeValidations(edgeValidations); + message.setPipelineValid( + edgeValidations + .stream() + .allMatch(e -> e.getStatus().getValidationStatusType() == EdgeValidationStatusType.COMPLETE) + && message.getPipelineModifications().stream().allMatch(PipelineModification::isPipelineElementValid)); return message; } @@ -90,13 +97,13 @@ private void addModification(NamedStreamPipesEntity source, modification.setElementId(t.getElementId()); try { pipelineValidator.apply(source, t, targets, validationInfos); - buildModification(modification, t); + buildModification(modification, t, t.getInputStreams(), true); edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDom(), t.getDom())); } catch (SpValidationException e) { - //e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString()))); + e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString()))); edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.invalid(source.getDom(), t.getDom(), toNotifications(e.getErrorLog()))); - modification.setPipelineElementValid(false); + buildModification(modification, t, Collections.emptyList(), false); } modification.setValidationInfos(validationInfos); this.pipelineModifications.put(t.getDom(), modification); @@ -115,14 +122,16 @@ private List toList(Map map) { } private void buildModification(PipelineModification modification, - InvocableStreamPipesEntity t) { + InvocableStreamPipesEntity t, + List inputStreams, + boolean valid) { if (t instanceof DataProcessorInvocation) { modification.setOutputStrategies(((DataProcessorInvocation) t).getOutputStrategies()); modification.setOutputStream(((DataProcessorInvocation) t).getOutputStream()); } - modification.setInputStreams(t.getInputStreams()); + modification.setInputStreams(inputStreams); modification.setStaticProperties(t.getStaticProperties()); - modification.setPipelineElementValid(true); + modification.setPipelineElementValid(valid); } private Set getConnections(NamedStreamPipesEntity source) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java index 0775550c7c..ac3bed9550 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java @@ -23,6 +23,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.message.PipelineModificationMessage; import org.apache.streampipes.model.pipeline.Pipeline; @@ -46,27 +47,34 @@ public PipelineModificationMessage verifyPipeline() { return new PipelineModificationGenerator(graph).buildPipelineModificationMessage(); } + public Pipeline makeModifiedPipeline() { + var allElements = verifyAndBuildGraphs(false); + pipeline.setSepas(filterAndConvert(allElements, DataProcessorInvocation.class)); + pipeline.setActions(filterAndConvert(allElements, DataSinkInvocation.class)); + return pipeline; + } + + private List filterAndConvert(List elements, + Class clazz) { + return elements + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .toList(); + } + public List verifyAndBuildGraphs(boolean ignoreUnconfigured) { - List pipelineModifications = verifyPipeline().getPipelineModifications(); - List allElements = new AllElementsProvider(pipeline).getAllElements(); - List result = new ArrayList<>(); + var pipelineModifications = verifyPipeline().getPipelineModifications(); + var allElements = new AllElementsProvider(pipeline).getAllElements(); + var result = new ArrayList(); allElements.forEach(pipelineElement -> { - Optional modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications); + var modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications); if (modificationOpt.isPresent()) { PipelineModification modification = modificationOpt.get(); if (pipelineElement instanceof InvocableStreamPipesEntity) { - ((InvocableStreamPipesEntity) pipelineElement).setInputStreams(modification.getInputStreams()); - ((InvocableStreamPipesEntity) pipelineElement).setStaticProperties(modification.getStaticProperties()); + applyModificationsForInvocable((InvocableStreamPipesEntity) pipelineElement, modification); if (pipelineElement instanceof DataProcessorInvocation) { - ((DataProcessorInvocation) pipelineElement).setOutputStream(modification.getOutputStream()); - if (((DataProcessorInvocation) pipelineElement).getOutputStream().getEventGrounding() == null) { - EventGrounding grounding = - new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); - ((DataProcessorInvocation) pipelineElement).getOutputStream().setEventGrounding(grounding); - } - if (modification.getOutputStrategies() != null) { - ((DataProcessorInvocation) pipelineElement).setOutputStrategies(modification.getOutputStrategies()); - } + applyModificationsForDataProcessor((DataProcessorInvocation) pipelineElement, modification); } } if (!ignoreUnconfigured || modification.isPipelineElementValid()) { @@ -80,6 +88,27 @@ public List verifyAndBuildGraphs(boolean ignoreUnconfigu return result; } + private void applyModificationsForDataProcessor(DataProcessorInvocation pipelineElement, + PipelineModification modification) { + if (modification.getOutputStream() != null) { + pipelineElement.setOutputStream(modification.getOutputStream()); + if (pipelineElement.getOutputStream().getEventGrounding() == null) { + EventGrounding grounding = + new GroundingBuilder(pipelineElement, Collections.emptySet()).getEventGrounding(); + pipelineElement.getOutputStream().setEventGrounding(grounding); + } + } + if (modification.getOutputStrategies() != null) { + pipelineElement.setOutputStrategies(modification.getOutputStrategies()); + } + } + + private void applyModificationsForInvocable(InvocableStreamPipesEntity pipelineElement, + PipelineModification modification) { + pipelineElement.setInputStreams(modification.getInputStreams()); + pipelineElement.setStaticProperties(modification.getStaticProperties()); + } + private Optional getModification(String id, List modifications) { return modifications.stream().filter(m -> m.getDomId().equals(id)).findFirst(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java index 079ccf50c6..ea552f02f5 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/mapping/MappingPropertyCalculator.java @@ -28,14 +28,12 @@ public class MappingPropertyCalculator { - private EventSchema schema; - private List availablePropertySelectors; - private EventProperty requirement; + private final EventSchema schema; + private final List availablePropertySelectors; + private final EventProperty requirement; - public MappingPropertyCalculator() { - } - - public MappingPropertyCalculator(EventSchema schema, List availablePropertySelectors, + public MappingPropertyCalculator(EventSchema schema, + List availablePropertySelectors, EventProperty requirement) { this.schema = schema; this.availablePropertySelectors = availablePropertySelectors; diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java index 8cf5c9e459..e691f285c0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/ApplyGroundingStep.java @@ -67,13 +67,16 @@ public void apply(NamedStreamPipesEntity source, .setEventGrounding(selectedGrounding); } - target - .getInputStreams() - .get(getIndex(target)) - .setEventGrounding(selectedGrounding); + if (!target.getInputStreams().isEmpty()) { - if (target.getInputStreams().size() > 1) { - this.visitorHistory.put(target.getDom(), 1); + target + .getInputStreams() + .get(getIndex(target)) + .setEventGrounding(selectedGrounding); + + if (target.getInputStreams().size() > 1) { + this.visitorHistory.put(target.getDom(), 1); + } } } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java index caa87744dc..5274fff602 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java @@ -87,7 +87,13 @@ public void visit(MappingPropertyNary mappingPropertyNary) { .stream() .filter(p -> mappingPropertyNary.getMapsFromOptions().contains(p)) .collect(Collectors.toList())); - validationInfos.add(PipelineElementValidationInfo.info("Auto-updated invalid field selection")); + var info = PipelineElementValidationInfo.info( + String.format( + "Auto-updated invalid field selection: Fields updated to %s", + mappingPropertyNary.getSelectedProperties().toString() + ) + ); + validationInfos.add(info); } } @@ -95,10 +101,18 @@ public void visit(MappingPropertyNary mappingPropertyNary) { public void visit(MappingPropertyUnary mappingPropertyUnary) { if (existsSelection(mappingPropertyUnary)) { if (!(mappingPropertyUnary.getMapsFromOptions().contains(mappingPropertyUnary.getSelectedProperty()))) { - if (mappingPropertyUnary.getMapsFromOptions().size() > 0) { + if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()) { + String existingSelector = mappingPropertyUnary.getSelectedProperty(); String firstSelector = mappingPropertyUnary.getMapsFromOptions().get(0); mappingPropertyUnary.setSelectedProperty(firstSelector); - validationInfos.add(PipelineElementValidationInfo.info("Auto-updated invalid field selection")); + var info = PipelineElementValidationInfo.info( + String.format( + "Auto-updated invalid field selection: Selected field %s was changed to %s", + existingSelector, + firstSelector + ) + ); + validationInfos.add(info); } } } else { @@ -142,10 +156,10 @@ public List getValidationInfos() { } private boolean existsSelection(MappingPropertyUnary mappingProperty) { - return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().equals("")); + return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().isEmpty()); } private boolean existsSelection(MappingPropertyNary mappingProperty) { - return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().size() == 0); + return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().isEmpty()); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java index 6b7a0bfeff..2cb0d2557c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PrepareStep.java @@ -36,7 +36,7 @@ public void apply(NamedStreamPipesEntity source, Set allTargets, List validationInfos) throws SpValidationException { if (target instanceof DataProcessorInvocation) { - if (target.getInputStreams() == null) { + if (target.getInputStreams() == null || target.getInputStreams().isEmpty()) { target.setInputStreams(new ArrayList<>()); for (int i = 0; i < target.getStreamRequirements().size(); i++) { target.getInputStreams().add(new SpDataStream()); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java index c8cc072b12..e849aaa7ea 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java @@ -136,8 +136,12 @@ public static List getPipelinesContainingElements(String elementId) { private static Stream mergePipelineElement(Pipeline pipeline) { return Stream.concat( - Stream.concat(pipeline.getStreams().stream(), pipeline.getSepas().stream()), - pipeline.getActions().stream()); + Stream.concat( + pipeline.getStreams().stream(), + pipeline.getSepas().stream() + ), + pipeline.getActions().stream() + ); } private static void preparePipelineBasics(String username, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java index d6268d661b..fbfafe4bc9 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/recommender/ElementRecommender.java @@ -65,11 +65,10 @@ public PipelineElementRecommendationMessage findRecommendedElements() throws NoS Optional outputStream = getOutputStream(elementsProvider); outputStream.ifPresent(spDataStream -> validate(spDataStream, getAll())); } catch (Exception e) { - e.printStackTrace(); return recommendationMessage; } - if (recommendationMessage.getPossibleElements().size() == 0) { + if (recommendationMessage.getPossibleElements().isEmpty()) { throw new NoSuitableSepasAvailableException(); } else { recommendationMessage diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java index b42f9e02d0..25a17b5732 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java @@ -237,6 +237,7 @@ public Response overwritePipeline(@PathParam("pipelineId") String pipelineId, Pi storedPipeline.setPipelineCategories(pipeline.getPipelineCategories()); storedPipeline.setHealthStatus(pipeline.getHealthStatus()); storedPipeline.setPipelineNotifications(pipeline.getPipelineNotifications()); + storedPipeline.setValid(pipeline.isValid()); Operations.updatePipeline(storedPipeline); SuccessMessage message = Notifications.success("Pipeline modified"); message.addNotification(new Notification("id", pipelineId)); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java index 7de6ed08c6..2f71f1444d 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java @@ -20,6 +20,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.connect.management.management.AdapterUpdateManagement; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.message.Notifications; import org.apache.streampipes.model.monitoring.SpLogMessage; @@ -79,12 +80,9 @@ public Response addAdapter(AdapterDescription adapterDescription) { @Produces(MediaType.APPLICATION_JSON) @PreAuthorize(AuthConstants.HAS_WRITE_ADAPTER_PRIVILEGE) public Response updateAdapter(AdapterDescription adapterDescription) { - var principalSid = getAuthenticatedUserSid(); - var username = getAuthenticatedUsername(); - LOG.info("User: " + username + " updates adapter " + adapterDescription.getElementId()); - + var updateManager = new AdapterUpdateManagement(managementService); try { - managementService.updateAdapter(adapterDescription, principalSid); + updateManager.updateAdapter(adapterDescription); } catch (AdapterException e) { LOG.error("Error while updating adapter with id " + adapterDescription.getElementId(), e); return ok(Notifications.error(e.getMessage())); @@ -93,6 +91,18 @@ public Response updateAdapter(AdapterDescription adapterDescription) { return ok(Notifications.success(adapterDescription.getElementId())); } + @PUT + @JacksonSerialized + @Produces(MediaType.APPLICATION_JSON) + @PreAuthorize(AuthConstants.HAS_WRITE_ADAPTER_PRIVILEGE) + @Path("pipeline-migration-preflight") + public Response performPipelineMigrationPreflight(AdapterDescription adapterDescription) { + var updateManager = new AdapterUpdateManagement(managementService); + var migrations = updateManager.checkPipelineMigrations(adapterDescription); + + return ok(migrations); + } + @GET @JacksonSerialized @Path("/{id}") @@ -148,7 +158,7 @@ public Response startAdapter(@PathParam("id") String adapterId) { public Response deleteAdapter(@PathParam("id") String elementId) { List pipelinesUsingAdapter = getPipelinesUsingAdapter(elementId); - if (pipelinesUsingAdapter.size() == 0) { + if (pipelinesUsingAdapter.isEmpty()) { try { managementService.deleteAdapter(elementId); return ok(Notifications.success("Adapter with id: " + elementId + " is deleted.")); diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts index af3c34c37c..a8a97c3778 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts @@ -22,7 +22,11 @@ import { map } from 'rxjs/operators'; import { Observable } from 'rxjs'; import { PlatformServicesCommons } from './commons.service'; -import { AdapterDescription, Message } from '../model/gen/streampipes-model'; +import { + AdapterDescription, + Message, + PipelineUpdateInfo, +} from '../model/gen/streampipes-model'; @Injectable({ providedIn: 'root' }) export class AdapterService { @@ -95,6 +99,23 @@ export class AdapterService { .pipe(map(response => Message.fromData(response as any))); } + performPipelineMigrationPreflight( + adapter: AdapterDescription, + ): Observable { + return this.http + .put( + `${this.connectPath}/master/adapters/pipeline-migration-preflight`, + adapter, + ) + .pipe( + map(response => { + return (response as any[]).map(p => + PipelineUpdateInfo.fromData(p), + ); + }), + ); + } + get adapterMasterUrl() { return `${this.connectPath}/master/adapters/`; } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 36a810b513..2fa595cacf 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -1,26 +1,7 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.1.1185 on 2023-08-06 11:37:37. +// Generated using typescript-generator version 3.2.1263 on 2023-09-28 22:59:29. export class NamedStreamPipesEntity { '@class': @@ -2446,6 +2427,7 @@ export class Pipeline extends ElementComposition { restartOnSystemReboot: boolean; running: boolean; startedAt: number; + valid: boolean; static fromData(data: Pipeline, target?: Pipeline): Pipeline { if (!data) { @@ -2471,6 +2453,7 @@ export class Pipeline extends ElementComposition { instance.restartOnSystemReboot = data.restartOnSystemReboot; instance.running = data.running; instance.startedAt = data.startedAt; + instance.valid = data.valid; return instance; } } @@ -2959,6 +2942,32 @@ export class PipelineTemplateInvocation { } } +export class PipelineUpdateInfo { + canAutoMigrate: boolean; + migrationInfo: string; + pipelineId: string; + pipelineName: string; + validationInfos: { [index: string]: PipelineElementValidationInfo[] }; + + static fromData( + data: PipelineUpdateInfo, + target?: PipelineUpdateInfo, + ): PipelineUpdateInfo { + if (!data) { + return data; + } + const instance = target || new PipelineUpdateInfo(); + instance.canAutoMigrate = data.canAutoMigrate; + instance.migrationInfo = data.migrationInfo; + instance.pipelineId = data.pipelineId; + instance.pipelineName = data.pipelineName; + instance.validationInfos = __getCopyObjectFn( + __getCopyArrayFn(PipelineElementValidationInfo.fromData), + )(data.validationInfos); + return instance; + } +} + export class ProducedMessagesInfo extends MessagesInfo { totalProducedMessages: number; totalProducedMessagesSincePipelineStart: number; @@ -3521,6 +3530,7 @@ export class SpServiceRegistration { scheme: string; svcGroup: string; svcId: string; + svcType: string; tags: SpServiceTag[]; static fromData( @@ -3540,6 +3550,7 @@ export class SpServiceRegistration { instance.scheme = data.scheme; instance.svcGroup = data.svcGroup; instance.svcId = data.svcId; + instance.svcType = data.svcType; instance.tags = __getCopyArrayFn(SpServiceTag.fromData)(data.tags); return instance; } diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.ts index e2e0c7e331..e0d0ee64b2 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/adapter-configuration.component.ts @@ -65,7 +65,7 @@ export class AdapterConfigurationComponent implements OnInit { } removeSelection() { - this.router.navigate(['connect', 'create']).then(); + this.router.navigate(['connect']).then(); } clickSpecificSettingsNextButton() { diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html index c219c181fd..2bcfd840d1 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html @@ -17,13 +17,18 @@ -->
-
-
+
+
{{ optionIcon }}
-
-
-
+
+
+
@@ -119,6 +120,7 @@ optionDescription="Store all events of this source in the internal data store" optionIcon="save" dataCy="sp-store-in-datalake" + *ngIf="!isEditMode" (optionSelectedEmitter)="handlePersistOption($event)" > @@ -178,7 +180,7 @@ mat-button style="margin-left: 10px" > - Edit Adapter + Update Adapter
diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts index 6edb79139f..f7c6d453c2 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts @@ -141,7 +141,7 @@ export class StartAdapterConfigurationComponent implements OnInit { }); dialogRef.afterClosed().subscribe(() => { - this.adapterStartedEmitter.emit(); + //this.adapterStartedEmitter.emit(); }); } diff --git a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html index e36efb2488..224ded7879 100644 --- a/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html +++ b/ui/src/app/connect/components/existing-adapters/existing-adapters.component.html @@ -297,7 +297,6 @@
matTooltip="Edit adapter" data-cy="edit-adapter" matTooltipPosition="above" - [disabled]="adapter.running" (click)="editAdapter(adapter)" > { - if (effectedPipelines.length > 0) { - this.dialogService.open(CanNotEditAdapterDialog, { - panelType: PanelType.STANDARD_PANEL, - title: 'No edit possible', - width: '50vw', - data: { - pipelines: effectedPipelines, - }, - }); - } else { - this.router.navigate([ - 'connect', - 'edit', - adapter.elementId, - ]); - } + // if (effectedPipelines.length > 0) { + // this.dialogService.open(CanNotEditAdapterDialog, { + // panelType: PanelType.STANDARD_PANEL, + // title: 'No edit possible', + // width: '50vw', + // data: { + // pipelines: effectedPipelines, + // }, + // }); + // } else { + this.router.navigate(['connect', 'edit', adapter.elementId]); + //} }); } diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html index a76b9dddb7..8506681275 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html @@ -19,7 +19,7 @@
-
+

- Please wait while your new adapter is being generated... + Please wait while your new adapter is being + {{ editMode ? 'updated' : 'generated' }}...

+
+

+ This adapter is used by + {{ adapterPipelineUpdateInfos.length }} pipelines. +

+
+
+
+ check Pipeline {{ + updateInfo.pipelineName + }} will be automatically migrated. +
+
+ warning +  Pipeline {{ + updateInfo.pipelineName + }} will be stopped and needs manual review. +
+
+
+
+ warning + Please check and possibly modify existing dashboards and + data views afterwards. +
+
+ +
+
-

Adapter successfully updated

+ check  +

Adapter successfully updated

Adapter successfully updated
>
-
+
, @@ -82,21 +87,43 @@ export class AdapterStartedDialog implements OnInit { ngOnInit() { if (this.editMode) { - this.editAdapter(); + this.initAdapterUpdatePreflight(); } else { this.addAdapter(); } } - editAdapter() { - this.adapterService.updateAdapter(this.adapter).subscribe(status => { - this.adapterStatus = status; + initAdapterUpdatePreflight(): void { + this.loading = true; + this.adapterService + .performPipelineMigrationPreflight(this.adapter) + .subscribe(res => { + if (res.length === 0) { + this.updateAdapter(); + } else { + this.adapterUpdatePreflight = true; + this.adapterPipelineUpdateInfos = res; + this.loading = false; + } + }); + } + + updateAdapter(): void { + this.loading = true; + this.adapterService.updateAdapter(this.adapter).subscribe(res => { + this.loading = false; + this.adapterStatus = res; this.adapterInstalled = true; + if (this.adapter.running) { + this.showAdapterPreview(res, this.adapter.elementId); + } }); } addAdapter() { + this.loading = true; this.adapterService.addAdapter(this.adapter).subscribe(status => { + this.loading = false; this.adapterStatus = status; if (status.success) { @@ -124,7 +151,6 @@ export class AdapterStartedDialog implements OnInit { } showAdapterPreview(status: Message, adapterElementId: string) { - // Start preview on streams and message for sets if (status.success) { this.getLiveViewPreview(adapterElementId); this.adapterInstalled = true; @@ -142,6 +168,7 @@ export class AdapterStartedDialog implements OnInit { this.restService .getSourceDetails(adapter.correspondingDataStreamElementId) .subscribe(st => { + this.showPreview = true; this.streamDescription = st; this.pollingActive = true; }); diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts index 89a9632128..8e85680eda 100644 --- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts +++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts @@ -53,7 +53,6 @@ export class StaticMappingUnaryComponent onStatusChange(status: any) {} onValueChange(value: any) { - console.log(value); this.staticProperty.selectedProperty = value; this.emitUpdate(true); } diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts index 7507ffd11d..0cb54b1b7c 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly.component.ts @@ -251,6 +251,10 @@ export class PipelineAssemblyComponent implements OnInit, AfterViewInit { this.pipelineCanvasMetadata, pipelineModel, ); + pipeline.valid = this.pipelineValidationService.isValidPipeline( + pipelineModel, + this.preview, + ); pipeline.name = this.currentPipelineName; pipeline.description = this.currentPipelineDescription; if (this.currentModifiedPipelineId) { diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts b/ui/src/app/editor/components/pipeline/pipeline.component.ts index ccbfa82583..b096493b68 100644 --- a/ui/src/app/editor/components/pipeline/pipeline.component.ts +++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts @@ -149,7 +149,7 @@ export class PipelineComponent implements OnInit, OnDestroy { this.initPlumb(); } - validatePipeline() { + validatePipeline(pm?: PipelineModificationMessage) { setTimeout(() => { this.ngZone.run(() => { this.pipelineValid = @@ -158,6 +158,7 @@ export class PipelineComponent implements OnInit, OnDestroy { pe => !pe.settings.disabled, ), this.preview, + pm, ); }); }); @@ -301,7 +302,6 @@ export class PipelineComponent implements OnInit, OnDestroy { } checkTopicModel(pipelineElementConfig: PipelineElementConfig) { - console.log(pipelineElementConfig); setTimeout(() => { this.jsplumbService.dataStreamDropped( pipelineElementConfig.payload.dom, @@ -405,7 +405,9 @@ export class PipelineComponent implements OnInit, OnDestroy { edgeValidations, ); if (currentConnectionValid) { - this.validatePipeline(); + this.validatePipeline( + pipelineModificationMessage, + ); this.modifyPipeline( pipelineModificationMessage, ); @@ -620,15 +622,12 @@ export class PipelineComponent implements OnInit, OnDestroy { .updatePipeline(this.currentPipelineModel) .subscribe(pm => { this.modifyPipeline(pm); - // if (!(pipelineElementConfig.payload instanceof DataSinkInvocation)) { - // this.JsplumbBridge.activateEndpoint('out-' + pipelineElementConfig.payload.dom, pipelineElementConfig.settings.completed); - // } this.triggerPipelineCacheUpdate(); this.announceConfiguredElement(pipelineElementConfig); if (this.previewModeActive) { this.deletePipelineElementPreview(true); } - this.validatePipeline(); + this.validatePipeline(pm); }); } else { this.validatePipeline(); @@ -675,6 +674,14 @@ export class PipelineComponent implements OnInit, OnDestroy { ); this.objectProvider .updatePipeline(this.currentPipelineModel) - .subscribe(pm => this.modifyPipeline(pm)); + .subscribe(pm => { + this.modifyPipeline(pm); + this.pipelineValid = + this.pipelineValidationService.isValidPipeline( + this.rawPipelineModel, + this.preview, + pm, + ); + }); } } diff --git a/ui/src/app/editor/services/pipeline-validation.service.ts b/ui/src/app/editor/services/pipeline-validation.service.ts index df6e1ce454..791b632ba8 100644 --- a/ui/src/app/editor/services/pipeline-validation.service.ts +++ b/ui/src/app/editor/services/pipeline-validation.service.ts @@ -30,6 +30,7 @@ import { import { JsplumbFactoryService } from './jsplumb-factory.service'; import { UserErrorMessage } from '../../core-model/base/UserErrorMessage'; import { Connection } from '@jsplumb/browser-ui'; +import { PipelineModificationMessage } from '../../../../projects/streampipes/platform-services/src/lib/model/gen/streampipes-model'; @Injectable({ providedIn: 'root' }) export class PipelineValidationService { @@ -57,11 +58,19 @@ export class PipelineValidationService { 'Did you configure all elements?', "There's a pipeline element which is missing some configuration.", ), + new UserErrorMessage( + 'Invalid pipeline configuration', + 'Check the current pipeline structure for invalid connections and configurations', + ), ]; constructor(private jsplumbFactoryService: JsplumbFactoryService) {} - isValidPipeline(rawPipelineModel, previewConfig: boolean) { + isValidPipeline( + rawPipelineModel: PipelineElementConfig[], + previewConfig: boolean, + pm?: PipelineModificationMessage, + ) { const jsplumbBridge = this.jsplumbFactoryService.getJsplumbBridge(previewConfig); const streamInAssembly = this.isStreamInAssembly(rawPipelineModel); @@ -99,12 +108,18 @@ export class PipelineValidationService { this.errorMessages = []; } + if (pm && !pm.pipelineValid) { + this.errorMessages.push(this.availableErrorMessages[5]); + } + this.pipelineValid = streamInAssembly && actionInAssembly && allElementsConnected && onlyOnePipelineCreated && - allElementsConfigured; + allElementsConfigured && + (pm ? pm.pipelineValid : true); + return this.pipelineValid; } @@ -117,11 +132,11 @@ export class PipelineValidationService { } buildErrorMessages( - streamInAssembly, - actionInAssembly, - allElementsConnected, - onlyOnePipelineCreated, - allElementsConfigured, + streamInAssembly: boolean, + actionInAssembly: boolean, + allElementsConnected: boolean, + onlyOnePipelineCreated: boolean, + allElementsConfigured: boolean, ) { this.errorMessages = []; if (!streamInAssembly) { diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html index b9bf1c8f44..a428276dd5 100644 --- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html +++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.html @@ -36,7 +36,7 @@
- +
+ +
+
+ sync_problem +
@@ -99,7 +115,11 @@ toggleRunningOperation ) " - [disabled]="!hasPipelineWritePrivileges || starting" + [disabled]=" + !hasPipelineWritePrivileges || + starting || + !pipeline.valid + " *ngIf="!pipeline.running" > play_arrow @@ -127,7 +147,7 @@ Name -

{{ pipeline.name }}

+

{{ pipeline.name }}

{{ pipeline.description !== '' ? pipeline.description : '-' diff --git a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss index 3550266599..dd97d20ab8 100644 --- a/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss +++ b/ui/src/app/pipelines/components/pipeline-overview/pipeline-overview.component.scss @@ -28,8 +28,8 @@ } .mat-column-status { - width: 100px; - max-width: 100px; + width: 120px; + max-width: 120px; } .light { @@ -62,3 +62,11 @@ .ml-10 { margin-left: 10px; } + +.pipeline-notification { + color: #dede00; +} + +.notification-icon { + width: 50px; +} diff --git a/ui/src/app/pipelines/pipelines.component.ts b/ui/src/app/pipelines/pipelines.component.ts index 69b21303db..61e02c9d48 100644 --- a/ui/src/app/pipelines/pipelines.component.ts +++ b/ui/src/app/pipelines/pipelines.component.ts @@ -119,7 +119,6 @@ export class PipelinesComponent implements OnInit { getFunctions() { this.functionsService.getActiveFunctions().subscribe(functions => { this.functions = functions.map(f => f.functionId); - console.log(this.functions); this.functionsReady = true; }); }