Skip to content

Commit

Permalink
Support output strategy in pipeline templates
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Oct 29, 2024
1 parent d47b815 commit 08666dc
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private List<CompactPipelineElement> makeTemplateConfig(AdapterDescription adapt
DATA_LAKE_CONNECTOR_ID,
adapterDescription.getCorrespondingDataStreamElementId(),
null,
null,
null
));
return pipelineElements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public record CompactPipelineElement(String type,
String ref,
String id,
List<String> connectedTo,
List<Map<String, Object>> configuration) {
List<Map<String, Object>> configuration,
OutputConfiguration output) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.pipeline.compact;

import com.fasterxml.jackson.annotation.JsonInclude;

import java.util.List;

@JsonInclude(JsonInclude.Include.NON_NULL)
public record OutputConfiguration(List<String> keep,
List<UserDefinedOutput> userDefined) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.pipeline.compact;

public record UserDefinedOutput(String fieldName,
String runtimeType,
String semanticType) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
package org.apache.streampipes.manager.pipeline.compact.generation;

import org.apache.streampipes.manager.template.CompactConfigGenerator;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.staticproperty.StaticProperty;

import java.util.ArrayList;
Expand All @@ -38,6 +45,7 @@ public List<CompactPipelineElement> convert(Pipeline pipeline) {
stream.getDom(),
stream.getElementId(),
null,
null,
null))
.forEach(pipelineElements::add);

Expand All @@ -47,7 +55,8 @@ public List<CompactPipelineElement> convert(Pipeline pipeline) {
processor.getDom(),
processor.getAppId(),
processor.getConnectedTo(),
getConfig(processor.getStaticProperties())))
getConfig(processor.getStaticProperties()),
getOutput(processor.getOutputStrategies().get(0))))
.forEach(pipelineElements::add);

pipeline.getActions().stream()
Expand All @@ -56,7 +65,8 @@ public List<CompactPipelineElement> convert(Pipeline pipeline) {
sink.getDom(),
sink.getAppId(),
sink.getConnectedTo(),
getConfig(sink.getStaticProperties())))
getConfig(sink.getStaticProperties()),
null))
.forEach(pipelineElements::add);

return pipelineElements;
Expand All @@ -66,11 +76,12 @@ private CompactPipelineElement createElement(String type,
String ref,
String elementId,
List<String> connectedTo,
List<Map<String, Object>> config) {
List<Map<String, Object>> config,
OutputConfiguration outputConfiguration) {
var connections = connectedTo != null ? connectedTo.stream()
.map(this::replaceId)
.toList() : null;
return new CompactPipelineElement(type, replaceId(ref), elementId, connections, config);
return new CompactPipelineElement(type, replaceId(ref), elementId, connections, config, outputConfiguration);
}

public List<Map<String, Object>> getConfig(List<StaticProperty> staticProperties) {
Expand All @@ -79,6 +90,23 @@ public List<Map<String, Object>> getConfig(List<StaticProperty> staticProperties
return configs;
}

public OutputConfiguration getOutput(OutputStrategy outputStrategy) {
if (outputStrategy instanceof CustomOutputStrategy) {
return new OutputConfiguration(((CustomOutputStrategy) outputStrategy).getSelectedPropertyKeys(), null);
} else if (outputStrategy instanceof UserDefinedOutputStrategy) {
return new OutputConfiguration(null, toCustomConfig(((UserDefinedOutputStrategy) outputStrategy).getEventProperties()));
} else {
return null;
}
}

private List<UserDefinedOutput> toCustomConfig(List<EventProperty> eventProperties) {
return eventProperties.stream().map(ep -> new UserDefinedOutput(
ep.getRuntimeName(),
((EventPropertyPrimitive) ep).getRuntimeType(),
ep.getSemanticType())).toList();
}

private String replaceId(String id) {
return id.replaceAll(InvocablePipelineElementGenerator.ID_PREFIX, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ public DataProcessorInvocation generate(DataProcessorInvocation processor,
CompactPipelineElement pipelineElement) {
basicGenerator.apply(processor, pipelineElement);
var template = basicGenerator.makeTemplate(processor, pipelineElement);
return new DataProcessorTemplateHandler(template, processor, false)
var element = new DataProcessorTemplateHandler(template, processor, false)
.applyTemplateOnPipelineElement();

if (pipelineElement.output() != null) {
var outputStrategyGenerator = new OutputStrategyGenerator(pipelineElement.output());
element.getOutputStrategies().forEach(o -> o.accept(outputStrategyGenerator));
}
return element;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.manager.pipeline.compact.generation;

import org.apache.streampipes.model.output.AppendOutputStrategy;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
import org.apache.streampipes.model.output.FixedOutputStrategy;
import org.apache.streampipes.model.output.KeepOutputStrategy;
import org.apache.streampipes.model.output.ListOutputStrategy;
import org.apache.streampipes.model.output.OutputStrategyVisitor;
import org.apache.streampipes.model.output.TransformOutputStrategy;
import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;

import java.util.List;

public class OutputStrategyGenerator implements OutputStrategyVisitor {

private final OutputConfiguration config;

public OutputStrategyGenerator(OutputConfiguration outputConfiguration) {
this.config = outputConfiguration;
}

@Override
public void visit(AppendOutputStrategy appendOutputStrategy) {

}

@Override
public void visit(CustomOutputStrategy customOutputStrategy) {
var keepConfig = config.keep();
if (keepConfig != null && !keepConfig.isEmpty()) {
customOutputStrategy.setSelectedPropertyKeys(keepConfig);
}
}

@Override
public void visit(CustomTransformOutputStrategy customTransformOutputStrategy) {

}

@Override
public void visit(FixedOutputStrategy fixedOutputStrategy) {

}

@Override
public void visit(KeepOutputStrategy keepOutputStrategy) {

}

@Override
public void visit(ListOutputStrategy listOutputStrategy) {

}

@Override
public void visit(TransformOutputStrategy transformOutputStrategy) {

}

@Override
public void visit(UserDefinedOutputStrategy userDefinedOutputStrategy) {
var userDefinedConfig = config.userDefined();
if (userDefinedConfig != null && !userDefinedConfig.isEmpty()) {
userDefinedOutputStrategy.setEventProperties(
toEp(userDefinedConfig)
);
}
}

private List<EventProperty> toEp(List<UserDefinedOutput> userDefinedOutput) {
return userDefinedOutput
.stream()
.map(this::toPrimitive)
.toList();
}

private EventProperty toPrimitive(UserDefinedOutput u) {
var ep = new EventPropertyPrimitive();
ep.setRuntimeName(u.fieldName());
ep.setSemanticType(u.semanticType());
ep.setRuntimeType(u.runtimeType());

return ep;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public PipelineModificationResult makePipeline(PipelineTemplateGenerationRequest
key,
stream.getElementId(),
List.of(),
List.of()
List.of(),
null
));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public CompactPipelineTemplate getTemplate() {
List.of(
Map.of("schema_update", "Update schema"),
Map.of("ignore_duplicates", false)
)
),
null
)
)
);
Expand Down
24 changes: 17 additions & 7 deletions ui/src/app/core-ui/help/help.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
-->

<div class="sp-dialog-container">
<div class="sp-dialog-content p-15">
<h4>{{ pipelineElement.name }}</h4>
<p>
{{ pipelineElement.description }}
</p>

<div class="sp-dialog-content p-15" fxLayout="column">
<div fxLayout="row" fxFlex="100">
<div fxLayout="column" fxFlex>
<h4>{{ pipelineElement.name }}</h4>
<small>
{{ pipelineElement.description }}
</small>
</div>
<div class="element-id" fxLayoutAlign="end start">
<span>ID</span>&nbsp;<b>{{
isDataStream
? pipelineElement.elementId
: pipelineElement.appId
}}</b>
</div>
</div>
<mat-tab-group
color="accent"
[selectedIndex]="selectedTabIndex"
(selectedIndexChange)="selectedTabIndex = $event"
>
<mat-tab *ngFor="let tab of tabs" label="{{ tab }}"> </mat-tab>
<mat-tab *ngFor="let tab of tabs" label="{{ tab }}"></mat-tab>
</mat-tab-group>

<sp-pipeline-element-runtime-info
Expand Down
10 changes: 10 additions & 0 deletions ui/src/app/core-ui/help/help.component.scss
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@
*/

@import '../../../scss/sp/sp-dialog.scss';

.element-id {
border-radius: 5px;
margin-right: 10px;
margin-top: 5px;
margin-bottom: 5px;
font-size: small;
display: inline-block;
padding: 5px;
}
4 changes: 4 additions & 0 deletions ui/src/app/core-ui/help/help.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ export class HelpComponent implements OnInit {

@Input()
pipelineElement: PipelineElementUnion;
isDataStream: boolean;

constructor(private dialogRef: DialogRef<HelpComponent>) {}

ngOnInit() {
if (this.pipelineElement instanceof SpDataStream) {
this.tabs = this.availableTabs;
this.isDataStream = true;
} else {
this.tabs.push(this.availableTabs[1]);
this.selectedTabIndex = 1;
Expand All @@ -51,4 +53,6 @@ export class HelpComponent implements OnInit {
this.dialogRef.close();
});
}

protected readonly SpDataStream = SpDataStream;
}

0 comments on commit 08666dc

Please sign in to comment.