From 02b60f7692d7e3aa5a0c8a80a00e22d56b8a4b6a Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 3 Sep 2024 17:37:23 +0200 Subject: [PATCH] fix: Assign new elementIds in pipeline preview (#3211) --- .../model/preview/PipelinePreviewModel.java | 14 ++++---- .../PipelineModificationGenerator.java | 8 +++-- .../PipelineVerificationHandlerV2.java | 4 ++- .../AbstractPipelineValidationStep.java | 10 +----- .../v2/pipeline/IPipelineValidationStep.java | 34 +++++++++++++++++++ .../v2/pipeline/PipelineValidationSteps.java | 2 +- .../v2/pipeline/PipelineValidator.java | 11 +++--- .../manager/preview/PipelinePreview.java | 29 ++++++++++++++-- .../src/lib/model/gen/streampipes-model.ts | 10 +++--- ...ipeline-assembly-drawing-area.component.ts | 4 ++- .../pipeline-element-preview.component.ts | 8 +++-- .../dropped-pipeline-element.component.html | 6 ++-- ui/src/app/services/live-preview.service.ts | 8 +++-- 13 files changed, 107 insertions(+), 41 deletions(-) create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java b/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java index b63db03d75..021c6d428f 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java @@ -19,16 +19,18 @@ import org.apache.streampipes.model.shared.annotation.TsModel; -import java.util.List; +import java.util.HashMap; +import java.util.Map; @TsModel public class PipelinePreviewModel { private String previewId; - private List supportedPipelineElementDomIds; + private Map elementIdMappings; public PipelinePreviewModel() { + this.elementIdMappings = new HashMap<>(); } public String getPreviewId() { @@ -39,11 +41,11 @@ public void setPreviewId(String previewId) { this.previewId = previewId; } - public List getSupportedPipelineElementDomIds() { - return supportedPipelineElementDomIds; + public Map getElementIdMappings() { + return elementIdMappings; } - public void setSupportedPipelineElementDomIds(List supportedPipelineElementDomIds) { - this.supportedPipelineElementDomIds = supportedPipelineElementDomIds; + public void setElementIdMappings(Map elementIdMappings) { + this.elementIdMappings = elementIdMappings; } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java index cbfa7ecfaa..362833f842 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java @@ -20,6 +20,7 @@ import org.apache.streampipes.manager.data.PipelineGraph; import org.apache.streampipes.manager.data.PipelineGraphHelpers; +import org.apache.streampipes.manager.matching.v2.pipeline.IPipelineValidationStep; import org.apache.streampipes.manager.matching.v2.pipeline.PipelineValidator; import org.apache.streampipes.manager.matching.v2.pipeline.SpValidationException; import org.apache.streampipes.model.SpDataStream; @@ -49,10 +50,11 @@ public class PipelineModificationGenerator { private final Map edgeValidations; private final PipelineValidator pipelineValidator; - public PipelineModificationGenerator(PipelineGraph pipelineGraph) { + public PipelineModificationGenerator(PipelineGraph pipelineGraph, + List steps) { this.pipelineGraph = pipelineGraph; this.pipelineModifications = new HashMap<>(); - this.pipelineValidator = new PipelineValidator(); + this.pipelineValidator = new PipelineValidator(steps); this.edgeValidations = new HashMap<>(); } @@ -147,6 +149,6 @@ private List toNotifications(List matchingR return matchingResultMessages .stream() .map(m -> new Notification(m.getTitle(), m.toString())) - .collect(Collectors.toList()); + .toList(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java index ac3bed9550..a00a5c43b0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java @@ -19,6 +19,7 @@ import org.apache.streampipes.manager.data.PipelineGraph; import org.apache.streampipes.manager.data.PipelineGraphBuilder; +import org.apache.streampipes.manager.matching.v2.pipeline.PipelineValidationSteps; import org.apache.streampipes.manager.recommender.AllElementsProvider; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.base.NamedStreamPipesEntity; @@ -44,7 +45,8 @@ public PipelineVerificationHandlerV2(Pipeline pipeline) { public PipelineModificationMessage verifyPipeline() { PipelineGraph graph = new PipelineGraphBuilder(pipeline).buildGraph(); - return new PipelineModificationGenerator(graph).buildPipelineModificationMessage(); + var steps = new PipelineValidationSteps().collect(); + return new PipelineModificationGenerator(graph, steps).buildPipelineModificationMessage(); } public Pipeline makeModifiedPipeline() { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java index 9043d495ba..7af64b589c 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java @@ -19,26 +19,18 @@ package org.apache.streampipes.manager.matching.v2.pipeline; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.matching.MatchingResultMessage; -import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -public abstract class AbstractPipelineValidationStep { +public abstract class AbstractPipelineValidationStep implements IPipelineValidationStep { protected final Map visitorHistory = new HashMap<>(); - public abstract void apply(NamedStreamPipesEntity source, - InvocableStreamPipesEntity target, - Set allTargets, - List validationInfos) throws SpValidationException; - public List getNewErrorLog() { return new ArrayList<>(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java new file mode 100644 index 0000000000..ae8ed493d9 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.matching.v2.pipeline; + +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo; + +import java.util.List; +import java.util.Set; + +public interface IPipelineValidationStep { + + void apply(NamedStreamPipesEntity source, + InvocableStreamPipesEntity target, + Set allTargets, + List validationInfos) throws SpValidationException; +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java index 4ead31cbab..0e78372e46 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java @@ -23,7 +23,7 @@ public class PipelineValidationSteps { - public List collect() { + public List collect() { return Arrays.asList( new PrepareStep(), new ApplyGroundingStep(), diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java index 6648082f86..439330c535 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java @@ -25,19 +25,20 @@ import java.util.List; import java.util.Set; -public class PipelineValidator { +public class PipelineValidator implements IPipelineValidationStep { - private final List steps; + private final List steps; - public PipelineValidator() { - this.steps = new PipelineValidationSteps().collect(); + public PipelineValidator(List steps) { + this.steps = steps; } + @Override public void apply(NamedStreamPipesEntity source, InvocableStreamPipesEntity target, Set allTargets, List validationInfos) throws SpValidationException { - for (AbstractPipelineValidationStep step : steps) { + for (var step : steps) { step.apply(source, target, allTargets, validationInfos); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java index 6795d6f67c..0e6b22bdc6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java @@ -30,10 +30,13 @@ import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.preview.PipelinePreviewModel; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -45,18 +48,20 @@ public class PipelinePreview { public PipelinePreviewModel initiatePreview(Pipeline pipeline) { String previewId = generatePreviewId(); + var elementIdMappings = new HashMap(); pipeline.setActions(new ArrayList<>()); List pipelineElements = new ArrayList<>( new PipelineVerificationHandlerV2(pipeline) .verifyAndBuildGraphs(true) ); + rewriteElementIds(pipelineElements, elementIdMappings); invokeGraphs(filter(pipelineElements)); storeGraphs(previewId, pipelineElements); LOG.info("Preview pipeline {} started", previewId); - return makePreviewModel(previewId, pipelineElements); + return makePreviewModel(previewId, elementIdMappings); } public void deletePreview(String previewId) { @@ -84,6 +89,24 @@ public Map getPipelineElementPreviewStreams(String preview )); } + private void rewriteElementIds(List pipelineElements, + Map elementIdMappings) { + pipelineElements + .forEach(pe -> { + if (pe instanceof DataProcessorInvocation) { + var originalElementId = pe.getElementId(); + var newElementId = (String.format( + "%s:%s", + StringUtils.substringBeforeLast(pe.getElementId(), ":"), + RandomStringUtils.randomAlphanumeric(5))); + pe.setElementId(newElementId); + elementIdMappings.put(originalElementId, newElementId); + } else { + elementIdMappings.put(pe.getElementId(), pe.getElementId()); + } + }); + } + private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { return new ExtensionsServiceEndpointGenerator() .getEndpointResourceUrl( @@ -124,10 +147,10 @@ private String generatePreviewId() { } private PipelinePreviewModel makePreviewModel(String previewId, - List graphs) { + Map elementIdMappings) { PipelinePreviewModel previewModel = new PipelinePreviewModel(); previewModel.setPreviewId(previewId); - previewModel.setSupportedPipelineElementDomIds(collectElementIds(graphs)); + previewModel.setElementIdMappings(elementIdMappings); return previewModel; } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 6edbaea91e..e03d42804b 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-08-28 16:22:24. +// Generated using typescript-generator version 3.2.1263 on 2024-09-03 16:16:50. export class NamedStreamPipesEntity implements Storable { '@class': @@ -2867,8 +2867,8 @@ export class PipelineOperationStatus { } export class PipelinePreviewModel { + elementIdMappings: { [index: string]: string }; previewId: string; - supportedPipelineElementDomIds: string[]; static fromData( data: PipelinePreviewModel, @@ -2878,10 +2878,10 @@ export class PipelinePreviewModel { return data; } const instance = target || new PipelinePreviewModel(); + instance.elementIdMappings = __getCopyObjectFn(__identity())( + data.elementIdMappings, + ); instance.previewId = data.previewId; - instance.supportedPipelineElementDomIds = __getCopyArrayFn( - __identity(), - )(data.supportedPipelineElementDomIds); return instance; } } diff --git a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts index c461f67e10..8a0d66bc51 100644 --- a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts +++ b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts @@ -160,7 +160,9 @@ export class PipelineAssemblyDrawingAreaComponent implements OnInit { const data = this.livePreviewService.convert( res as HttpDownloadProgressEvent, ); - this.livePreviewService.eventSub.next(data); + if (data) { + this.livePreviewService.eventSub.next(data); + } }); }); } else { diff --git a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts index 1e0d949ff8..821f5b1a7b 100644 --- a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts +++ b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts @@ -20,6 +20,7 @@ import { Component, Input, OnDestroy, OnInit } from '@angular/core'; import { Subscription } from 'rxjs'; import { KeyValue } from '@angular/common'; import { LivePreviewService } from '../../../services/live-preview.service'; +import { PipelinePreviewModel } from '@streampipes/platform-services'; @Component({ selector: 'sp-pipeline-element-preview', @@ -28,7 +29,7 @@ import { LivePreviewService } from '../../../services/live-preview.service'; }) export class PipelineElementPreviewComponent implements OnInit, OnDestroy { @Input() - previewId: string; + pipelinePreview: PipelinePreviewModel; @Input() elementId: string; @@ -53,7 +54,10 @@ export class PipelineElementPreviewComponent implements OnInit, OnDestroy { getLatestRuntimeInfo() { this.previewSub = this.livePreviewService.eventSub.subscribe(event => { if (event) { - this.runtimeData = event[this.elementId]; + this.runtimeData = + event[ + this.pipelinePreview.elementIdMappings[this.elementId] + ]; } else { this.runtimeDataError = true; } diff --git a/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html b/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html index 6cf404f42d..e32ce03c46 100644 --- a/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html +++ b/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html @@ -96,11 +96,11 @@ diff --git a/ui/src/app/services/live-preview.service.ts b/ui/src/app/services/live-preview.service.ts index 8f6ccb39fd..53c0558fad 100644 --- a/ui/src/app/services/live-preview.service.ts +++ b/ui/src/app/services/live-preview.service.ts @@ -28,7 +28,11 @@ export class LivePreviewService { convert(event: HttpDownloadProgressEvent) { const { partialText } = event; - const chunks = partialText.split('\n'); - return JSON.parse(chunks[chunks.length - 2]); + if (partialText) { + const chunks = partialText.split('\n'); + return JSON.parse(chunks[chunks.length - 2]); + } else { + return undefined; + } } }