Skip to content

Commit

Permalink
improve split array processor support nested field
Browse files Browse the repository at this point in the history
Signed-off-by: huangyanfeng <[email protected]>
  • Loading branch information
yanfeng1992 committed Jun 19, 2024
1 parent 183d662 commit b040245
Showing 1 changed file with 5 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,15 @@
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.runtime.field.NestedField;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyList;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
Expand All @@ -44,19 +38,14 @@
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class SplitArrayProcessor extends StreamPipesDataProcessor
implements ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
public class SplitArrayProcessor extends StreamPipesDataProcessor {

public static final String KEEP_PROPERTIES_ID = "keep";
public static final String ARRAY_FIELD_ID = "array-field";
public static final String VALUE = "array_value";

private String arrayField;
private List<String> keepProperties;

@Override
public DataProcessorDescription declareModel() {
Expand All @@ -65,45 +54,19 @@ public DataProcessorDescription declareModel() {
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
Labels.withId(KEEP_PROPERTIES_ID),
PropertyScope.NONE)
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
Labels.withId(ARRAY_FIELD_ID),
PropertyScope.NONE)
.build())
.outputStrategy(OutputStrategies.customTransformation())
.outputStrategy(OutputStrategies.userDefined())
.build();
}

@Override
public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement,
ProcessingElementParameterExtractor extractor)
throws SpRuntimeException {
String arrayFieldSelector = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
List<String> keepPropertySelectors = extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);

List<EventProperty> outProperties = new ArrayList<>();
EventPropertyList arrayProperty = (EventPropertyList) extractor.getEventPropertyBySelector(arrayFieldSelector);
EventProperty newProperty = arrayProperty.getEventProperty();
newProperty.setRuntimeName(VALUE);
newProperty.setLabel("Array Value");
newProperty.setDescription("Contains values of the array. Created by Split Array processor.");

List<EventProperty> keepProperties = extractor.getEventPropertiesBySelector
(keepPropertySelectors);
outProperties.add(newProperty);
outProperties.addAll(keepProperties);

return new EventSchema(outProperties);
}

@Override
public void onInvocation(ProcessorParams parameters,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
keepProperties = parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
}

@Override
Expand All @@ -122,20 +85,15 @@ public void onEvent(Event event,
});

for (AbstractField field : allEvents) {
Event outEvent = new Event();
if (field instanceof NestedField) {
for (Map.Entry<String, AbstractField> key : ((NestedField) field).getRawValue().entrySet()) {
outEvent.addField(key.getValue());
event.addField(key.getValue());
}
} else {
outEvent.addField(SplitArrayProcessor.VALUE, field);
}

for (String propertyName : keepProperties) {
outEvent.addField(event.getFieldBySelector(propertyName));
event.addField(arrayField, field);
}

collector.collect(outEvent);
collector.collect(event);
}
}

Expand Down

0 comments on commit b040245

Please sign in to comment.