Skip to content

Commit

Permalink
fix: Add timestamp to Image enricher, fix SO error (#2924)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer authored Jun 11, 2024
1 parent 2bd36aa commit 052c7dc
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.streampipes.processors.imageprocessing.jvm.processor.genericclassification.GenericImageClassificationProcessor;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.imagecropper.ImageCropperProcessor;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment.ImageEnrichmentProcessor;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.migration.ImageEnrichmentProcessorMigrationv1;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.qrreader.QrCodeReaderProcessor;

import java.util.Collections;
Expand All @@ -48,6 +49,8 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {

@Override
public List<IModelMigrator<?, ?>> migrators() {
return Collections.emptyList();
return List.of(
new ImageEnrichmentProcessorMigrationv1()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public ImageTransformer(Event in) {
super(in);
}

public Optional<BufferedImage> getImage(String imageProperty) {

return getImage(imageProperty);
}

public List<Map<String, Object>> getAllBoxCoordinates(String boxArrayProperty) {
List<Map<String, AbstractField>> allBoxes = in.getFieldBySelector(boxArrayProperty)
.getAsList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImagePropertyConstants;
Expand All @@ -32,7 +33,6 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

Expand All @@ -49,17 +49,21 @@
import java.util.Optional;

public class ImageEnrichmentProcessor extends StreamPipesDataProcessor {

public static final String ID = "org.apache.streampipes.processor.imageclassification.jvm.image-enricher";

private String imageProperty;
private String boxArray;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processor.imageclassification.jvm.image-enricher")
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
return ProcessingElementBuilder.create(ID, 1)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.category(DataProcessorType.IMAGE_PROCESSING)
.requiredStream(RequiredBoxStream.getBoxStream())
.outputStrategy(OutputStrategies.fixed(
EpProperties.timestampProperty("timestamp"),
EpProperties.stringEp(Labels.empty(), ImagePropertyConstants.IMAGE.getProperty(),
"https://image.com")
))
Expand Down Expand Up @@ -118,6 +122,7 @@ public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException {

if (finalImage.isPresent()) {
Event outEvent = new Event();
outEvent.addField("timestamp", System.currentTimeMillis());
outEvent.addField(ImagePropertyConstants.IMAGE.getProperty(),
Base64.getEncoder().encodeToString(finalImage.get()));
out.collect(outEvent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.imageprocessing.jvm.processor.migration;

import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImagePropertyConstants;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment.ImageEnrichmentProcessor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;

import java.util.List;

public class ImageEnrichmentProcessorMigrationv1 implements IDataProcessorMigrator {
@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
ImageEnrichmentProcessor.ID,
SpServiceTagPrefix.DATA_PROCESSOR,
0,
1);
}

@Override
public MigrationResult<DataProcessorInvocation> migrate(DataProcessorInvocation element,
IDataProcessorParameterExtractor extractor)
throws RuntimeException {
element.setOutputStrategies(List.of(
OutputStrategies.fixed(
EpProperties.timestampProperty("timestamp"),
EpProperties.stringEp(Labels.empty(), ImagePropertyConstants.IMAGE.getProperty(),
"https://image.com")
)
));
return MigrationResult.success(element);
}
}

0 comments on commit 052c7dc

Please sign in to comment.