From 7783d880303ab050695ad662f7f3ad6cfb121b88 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 31 Oct 2024 21:01:57 +0100 Subject: [PATCH] Support output strategies in pipeline code (#3312) * Add visitor to output strategies * Support output strategy in pipeline templates * Update ts model * Add license * Fix checkstyle --- .../compact/PersistPipelineHandler.java | 1 + .../model/output/AppendOutputStrategy.java | 7 +- .../model/output/CustomOutputStrategy.java | 7 +- .../output/CustomTransformOutputStrategy.java | 6 +- .../model/output/FixedOutputStrategy.java | 4 + .../model/output/KeepOutputStrategy.java | 5 + .../model/output/ListOutputStrategy.java | 4 + .../model/output/OutputStrategy.java | 2 + .../model/output/OutputStrategyVisitor.java | 38 ++++++ .../model/output/TransformOutputStrategy.java | 5 + .../output/UserDefinedOutputStrategy.java | 5 + .../compact/CompactPipelineElement.java | 3 +- .../pipeline/compact/OutputConfiguration.java | 28 +++++ .../pipeline/compact/UserDefinedOutput.java | 24 ++++ .../generation/CompactPipelineConverter.java | 39 ++++++- ...DataProcessorPipelineElementGenerator.java | 8 +- .../generation/OutputStrategyGenerator.java | 108 ++++++++++++++++++ .../CompactPipelineTemplateManagement.java | 3 +- .../PersistDataLakePipelineTemplate.java | 3 +- .../src/lib/model/gen/streampipes-model.ts | 44 ++++++- .../adapter-started-dialog.component.ts | 1 + ui/src/app/core-ui/help/help.component.html | 24 ++-- ui/src/app/core-ui/help/help.component.scss | 10 ++ ui/src/app/core-ui/help/help.component.ts | 4 + 24 files changed, 362 insertions(+), 21 deletions(-) create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java index dd0afb7ef7..8b53e019f6 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java @@ -94,6 +94,7 @@ private List makeTemplateConfig(AdapterDescription adapt DATA_LAKE_CONNECTOR_ID, adapterDescription.getCorrespondingDataStreamElementId(), null, + null, null )); return pipelineElements; diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java index 9d22313bc4..fa7ccca3c4 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java @@ -26,8 +26,6 @@ public class AppendOutputStrategy extends OutputStrategy { - private static final long serialVersionUID = 7202888911899551012L; - private List eventProperties; public AppendOutputStrategy() { @@ -35,6 +33,11 @@ public AppendOutputStrategy() { eventProperties = new ArrayList<>(); } + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } + public AppendOutputStrategy(AppendOutputStrategy other) { super(other); this.setEventProperties(new Cloner().properties(other.getEventProperties())); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java index 4468ba14f2..fa859aeef4 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java @@ -23,8 +23,6 @@ public class CustomOutputStrategy extends OutputStrategy { - private static final long serialVersionUID = -5858193127308435472L; - private List selectedPropertyKeys; private boolean outputRight; @@ -77,4 +75,9 @@ public List getAvailablePropertyKeys() { public void setAvailablePropertyKeys(List availablePropertyKeys) { this.availablePropertyKeys = availablePropertyKeys; } + + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java index 91d1a6ae3d..426a8d29eb 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java @@ -32,7 +32,6 @@ public CustomTransformOutputStrategy() { this.eventProperties = new ArrayList<>(); } - public CustomTransformOutputStrategy(CustomTransformOutputStrategy other) { super(other); this.eventProperties = new Cloner().properties(other.getEventProperties()); @@ -45,4 +44,9 @@ public List getEventProperties() { public void setEventProperties(List eventProperties) { this.eventProperties = eventProperties; } + + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java index e698348efd..1558aada49 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java @@ -50,5 +50,9 @@ public void setEventProperties(List eventProperties) { this.eventProperties = eventProperties; } + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java index d455e7b4c2..0c8b9b0e00 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java @@ -63,4 +63,9 @@ public boolean isKeepBoth() { public void setKeepBoth(boolean keepBoth) { this.keepBoth = keepBoth; } + + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java index 4040d9387c..b2e1f4ba51 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java @@ -46,5 +46,9 @@ public void setPropertyName(String propertyName) { this.propertyName = propertyName; } + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java index 05484f1679..ab19a8d89b 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java @@ -75,4 +75,6 @@ public List getRenameRules() { public void setRenameRules(List renameRules) { this.renameRules = renameRules; } + + public abstract void accept(OutputStrategyVisitor visitor); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java new file mode 100644 index 0000000000..024e22b994 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.output; + +public interface OutputStrategyVisitor { + + void visit(AppendOutputStrategy appendOutputStrategy); + + void visit(CustomOutputStrategy customOutputStrategy); + + void visit(CustomTransformOutputStrategy customTransformOutputStrategy); + + void visit(FixedOutputStrategy fixedOutputStrategy); + + void visit(KeepOutputStrategy keepOutputStrategy); + + void visit(ListOutputStrategy listOutputStrategy); + + void visit(TransformOutputStrategy transformOutputStrategy); + + void visit(UserDefinedOutputStrategy userDefinedOutputStrategy); +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java index 1ca951654b..7e9de992ad 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java @@ -43,4 +43,9 @@ public List getTransformOperations() { public void setTransformOperations(List transformOperations) { this.transformOperations = transformOperations; } + + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java index 7f8f725eef..622d1e25ec 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java @@ -42,4 +42,9 @@ public List getEventProperties() { public void setEventProperties(List eventProperties) { this.eventProperties = eventProperties; } + + @Override + public void accept(OutputStrategyVisitor visitor) { + visitor.visit(this); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java index b145c2aad3..f027e60e05 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java @@ -28,5 +28,6 @@ public record CompactPipelineElement(String type, String ref, String id, List connectedTo, - List> configuration) { + List> configuration, + OutputConfiguration output) { } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java new file mode 100644 index 0000000000..f9d88fa97d --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.pipeline.compact; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public record OutputConfiguration(List keep, + List userDefined) { +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java new file mode 100644 index 0000000000..22b9b924a1 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.pipeline.compact; + +public record UserDefinedOutput(String fieldName, + String runtimeType, + String semanticType) { +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java index bebc3b9286..c3230822b3 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java @@ -19,8 +19,15 @@ package org.apache.streampipes.manager.pipeline.compact.generation; import org.apache.streampipes.manager.template.CompactConfigGenerator; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.output.OutputStrategy; +import org.apache.streampipes.model.output.UserDefinedOutputStrategy; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement; +import org.apache.streampipes.model.pipeline.compact.OutputConfiguration; +import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.staticproperty.StaticProperty; import java.util.ArrayList; @@ -38,6 +45,7 @@ public List convert(Pipeline pipeline) { stream.getDom(), stream.getElementId(), null, + null, null)) .forEach(pipelineElements::add); @@ -47,7 +55,8 @@ public List convert(Pipeline pipeline) { processor.getDom(), processor.getAppId(), processor.getConnectedTo(), - getConfig(processor.getStaticProperties()))) + getConfig(processor.getStaticProperties()), + getOutput(processor.getOutputStrategies().get(0)))) .forEach(pipelineElements::add); pipeline.getActions().stream() @@ -56,7 +65,8 @@ public List convert(Pipeline pipeline) { sink.getDom(), sink.getAppId(), sink.getConnectedTo(), - getConfig(sink.getStaticProperties()))) + getConfig(sink.getStaticProperties()), + null)) .forEach(pipelineElements::add); return pipelineElements; @@ -66,11 +76,12 @@ private CompactPipelineElement createElement(String type, String ref, String elementId, List connectedTo, - List> config) { + List> config, + OutputConfiguration outputConfiguration) { var connections = connectedTo != null ? connectedTo.stream() .map(this::replaceId) .toList() : null; - return new CompactPipelineElement(type, replaceId(ref), elementId, connections, config); + return new CompactPipelineElement(type, replaceId(ref), elementId, connections, config, outputConfiguration); } public List> getConfig(List staticProperties) { @@ -79,6 +90,26 @@ public List> getConfig(List staticProperties return configs; } + public OutputConfiguration getOutput(OutputStrategy outputStrategy) { + if (outputStrategy instanceof CustomOutputStrategy) { + return new OutputConfiguration(((CustomOutputStrategy) outputStrategy).getSelectedPropertyKeys(), null); + } else if (outputStrategy instanceof UserDefinedOutputStrategy) { + return new OutputConfiguration( + null, + toCustomConfig(((UserDefinedOutputStrategy) outputStrategy).getEventProperties()) + ); + } else { + return null; + } + } + + private List toCustomConfig(List eventProperties) { + return eventProperties.stream().map(ep -> new UserDefinedOutput( + ep.getRuntimeName(), + ((EventPropertyPrimitive) ep).getRuntimeType(), + ep.getSemanticType())).toList(); + } + private String replaceId(String id) { return id.replaceAll(InvocablePipelineElementGenerator.ID_PREFIX, ""); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java index c736293c20..89f7fd10dd 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java @@ -35,7 +35,13 @@ public DataProcessorInvocation generate(DataProcessorInvocation processor, CompactPipelineElement pipelineElement) { basicGenerator.apply(processor, pipelineElement); var template = basicGenerator.makeTemplate(processor, pipelineElement); - return new DataProcessorTemplateHandler(template, processor, false) + var element = new DataProcessorTemplateHandler(template, processor, false) .applyTemplateOnPipelineElement(); + + if (pipelineElement.output() != null) { + var outputStrategyGenerator = new OutputStrategyGenerator(pipelineElement.output()); + element.getOutputStrategies().forEach(o -> o.accept(outputStrategyGenerator)); + } + return element; } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java new file mode 100644 index 0000000000..2cb4cd6067 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.pipeline.compact.generation; + +import org.apache.streampipes.model.output.AppendOutputStrategy; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.output.CustomTransformOutputStrategy; +import org.apache.streampipes.model.output.FixedOutputStrategy; +import org.apache.streampipes.model.output.KeepOutputStrategy; +import org.apache.streampipes.model.output.ListOutputStrategy; +import org.apache.streampipes.model.output.OutputStrategyVisitor; +import org.apache.streampipes.model.output.TransformOutputStrategy; +import org.apache.streampipes.model.output.UserDefinedOutputStrategy; +import org.apache.streampipes.model.pipeline.compact.OutputConfiguration; +import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput; +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; + +import java.util.List; + +public class OutputStrategyGenerator implements OutputStrategyVisitor { + + private final OutputConfiguration config; + + public OutputStrategyGenerator(OutputConfiguration outputConfiguration) { + this.config = outputConfiguration; + } + + @Override + public void visit(AppendOutputStrategy appendOutputStrategy) { + + } + + @Override + public void visit(CustomOutputStrategy customOutputStrategy) { + var keepConfig = config.keep(); + if (keepConfig != null && !keepConfig.isEmpty()) { + customOutputStrategy.setSelectedPropertyKeys(keepConfig); + } + } + + @Override + public void visit(CustomTransformOutputStrategy customTransformOutputStrategy) { + + } + + @Override + public void visit(FixedOutputStrategy fixedOutputStrategy) { + + } + + @Override + public void visit(KeepOutputStrategy keepOutputStrategy) { + + } + + @Override + public void visit(ListOutputStrategy listOutputStrategy) { + + } + + @Override + public void visit(TransformOutputStrategy transformOutputStrategy) { + + } + + @Override + public void visit(UserDefinedOutputStrategy userDefinedOutputStrategy) { + var userDefinedConfig = config.userDefined(); + if (userDefinedConfig != null && !userDefinedConfig.isEmpty()) { + userDefinedOutputStrategy.setEventProperties( + toEp(userDefinedConfig) + ); + } + } + + private List toEp(List userDefinedOutput) { + return userDefinedOutput + .stream() + .map(this::toPrimitive) + .toList(); + } + + private EventProperty toPrimitive(UserDefinedOutput u) { + var ep = new EventPropertyPrimitive(); + ep.setRuntimeName(u.fieldName()); + ep.setSemanticType(u.semanticType()); + ep.setRuntimeType(u.runtimeType()); + + return ep; + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java index 0bf308cfc4..1238a0aeda 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java @@ -57,7 +57,8 @@ public PipelineModificationResult makePipeline(PipelineTemplateGenerationRequest key, stream.getElementId(), List.of(), - List.of() + List.of(), + null )); }); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java index caa6e7bf94..00c768f4a7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java @@ -52,7 +52,8 @@ public CompactPipelineTemplate getTemplate() { List.of( Map.of("schema_update", "Update schema"), Map.of("ignore_duplicates", false) - ) + ), + null ) ) ); diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index cf07248af8..3aa23a65bf 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-10-11 10:41:46. +// Generated using typescript-generator version 3.2.1263 on 2024-10-29 10:04:46. export class NamedStreamPipesEntity implements Storable { '@class': @@ -844,6 +844,7 @@ export class CompactPipelineElement { configuration: { [index: string]: any }[]; connectedTo: string[]; id: string; + output: OutputConfiguration; ref: string; type: string; @@ -862,6 +863,7 @@ export class CompactPipelineElement { data.connectedTo, ); instance.id = data.id; + instance.output = OutputConfiguration.fromData(data.output); instance.ref = data.ref; instance.type = data.type; return instance; @@ -2605,6 +2607,26 @@ export class Option { } } +export class OutputConfiguration { + keep: string[]; + userDefined: UserDefinedOutput[]; + + static fromData( + data: OutputConfiguration, + target?: OutputConfiguration, + ): OutputConfiguration { + if (!data) { + return data; + } + const instance = target || new OutputConfiguration(); + instance.keep = __getCopyArrayFn(__identity())(data.keep); + instance.userDefined = __getCopyArrayFn(UserDefinedOutput.fromData)( + data.userDefined, + ); + return instance; + } +} + export class Pipeline implements Storable { _id: string; _rev: string; @@ -4056,6 +4078,26 @@ export class UnitTransformRuleDescription extends ValueTransformationRuleDescrip } } +export class UserDefinedOutput { + fieldName: string; + runtimeType: string; + semanticType: string; + + static fromData( + data: UserDefinedOutput, + target?: UserDefinedOutput, + ): UserDefinedOutput { + if (!data) { + return data; + } + const instance = target || new UserDefinedOutput(); + instance.fieldName = data.fieldName; + instance.runtimeType = data.runtimeType; + instance.semanticType = data.semanticType; + return instance; + } +} + export class UserDefinedOutputStrategy extends OutputStrategy { '@class': 'org.apache.streampipes.model.output.UserDefinedOutputStrategy'; 'eventProperties': EventPropertyUnion[]; diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts index 459327fd25..17cc830821 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts @@ -278,6 +278,7 @@ export class AdapterStartedDialog implements OnInit { configuration: undefined, id: adapter.correspondingDataStreamElementId, connectedTo: undefined, + output: undefined, }); return template; } diff --git a/ui/src/app/core-ui/help/help.component.html b/ui/src/app/core-ui/help/help.component.html index ecc82fc6a8..fcdfbe7332 100644 --- a/ui/src/app/core-ui/help/help.component.html +++ b/ui/src/app/core-ui/help/help.component.html @@ -17,18 +17,28 @@ -->
-
-

{{ pipelineElement.name }}

-

- {{ pipelineElement.description }} -

- +
+
+
+

{{ pipelineElement.name }}

+ + {{ pipelineElement.description }} + +
+
+ ID {{ + isDataStream + ? pipelineElement.elementId + : pipelineElement.appId + }} +
+
- + ) {} ngOnInit() { if (this.pipelineElement instanceof SpDataStream) { this.tabs = this.availableTabs; + this.isDataStream = true; } else { this.tabs.push(this.availableTabs[1]); this.selectedTabIndex = 1; @@ -51,4 +53,6 @@ export class HelpComponent implements OnInit { this.dialogRef.close(); }); } + + protected readonly SpDataStream = SpDataStream; }