Skip to content

Commit

Permalink
feat(#1960): Allow modification of adapters with pipelines (#1979)
Browse files Browse the repository at this point in the history
* feat(#1960): Allow modification of adapters with pipelines

* Add license header

* Add data model for schema diff management

* Improve schema editor and static value handling

* Fix schema preview of nested fields

* Improve schema updates

* Improve download dialog

* Remove unused stylesheets

* Redirect to adapter overview after adapter creation

* Improve modification of measurement units

* Fix update of measurement units

* Properly display labels and runtime type changes in schema editor

* Fix modification of conversion rules

* Show selected timestamp transformation rules

* Add warning hint and value validation to correction value rule

* Fix NullPointer when adding static rules

* Fix unit transformation

* Remove unused component

* Add metadata object to properties

* modified tests (#2079)

* Update typescript model

* Remove obsolete service

* Add logging message

---------

Co-authored-by: Tim Bossenmaier <[email protected]>
Co-authored-by: Marcelfrueh <[email protected]>
  • Loading branch information
3 people authored Oct 27, 2023
1 parent bb32475 commit f9f1bd5
Show file tree
Hide file tree
Showing 82 changed files with 2,310 additions and 930 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,7 @@ public String addAdapter(AdapterDescription ad,
return ad.getElementId();
}

public void updateAdapter(AdapterDescription ad,
String principalSid)
throws AdapterException {
// update adapter in database
this.adapterResourceManager.encryptAndUpdate(ad);

// update data source in database
this.updateDataSource(ad);
}

public AdapterDescription getAdapter(String elementId) throws AdapterException {
List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
Expand Down Expand Up @@ -180,16 +172,6 @@ public void startStreamAdapter(String elementId) throws AdapterException {
}
}

private void updateDataSource(AdapterDescription ad) {
// get data source
SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());

SourcesManagement.updateDataStream(ad, dataStream);

// Update data source in database
this.dataStreamResourceManager.update(dataStream);
}

private void installDataSource(SpDataStream stream,
String principalSid,
boolean publicElement) throws AdapterException {
Expand All @@ -204,5 +186,4 @@ private void installDataSource(SpDataStream stream,
private IAdapterStorage getAdapterInstanceStorage() {
return new AdapterInstanceStorageImpl();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.PipelineUpdateInfo;
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
import org.apache.streampipes.resource.management.AdapterResourceManager;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class AdapterUpdateManagement {

private static final Logger LOG = LoggerFactory.getLogger(AdapterUpdateManagement.class);

private final AdapterMasterManagement adapterMasterManagement;
private final AdapterResourceManager adapterResourceManager;
private final DataStreamResourceManager dataStreamResourceManager;

public AdapterUpdateManagement(AdapterMasterManagement adapterMasterManagement) {
this.adapterMasterManagement = adapterMasterManagement;
this.adapterResourceManager = new SpResourceManager().manageAdapters();
this.dataStreamResourceManager = new SpResourceManager().manageDataStreams();
}

public void updateAdapter(AdapterDescription ad)
throws AdapterException {
// update adapter in database
this.adapterResourceManager.encryptAndUpdate(ad);
boolean shouldRestart = ad.isRunning();

if (ad.isRunning()) {
this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
}

// update data source in database
this.updateDataSource(ad);

// update pipelines
var affectedPipelines = PipelineManager.getPipelinesContainingElements(ad.getCorrespondingDataStreamElementId());

affectedPipelines.forEach(p -> {
var shouldRestartPipeline = p.isRunning();
if (shouldRestartPipeline) {
Operations.stopPipeline(p, true);
}
var storedPipeline = PipelineManager.getPipeline(p.getPipelineId());
var pipeline = applyUpdatedDataStream(storedPipeline, ad);
try {
var modificationMessage = Operations.validatePipeline(pipeline);
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
if (!canAutoMigrate) {
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
modifiedPipeline.setPipelineNotifications(toNotification(updateInfo));
modifiedPipeline.setValid(false);
}
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(modifiedPipeline);
if (shouldRestartPipeline && canAutoMigrate) {
Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId()));
}
} catch (Exception e) {
LOG.error("Could not update pipeline {}", pipeline.getName(), e);
}
});

if (shouldRestart) {
this.adapterMasterManagement.startStreamAdapter(ad.getElementId());
}
}

public List<PipelineUpdateInfo> checkPipelineMigrations(AdapterDescription adapterDescription) {
var affectedPipelines = PipelineManager
.getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId());
var updateInfos = new ArrayList<PipelineUpdateInfo>();

affectedPipelines.forEach(pipeline -> {
var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription);
try {
var modificationMessage = Operations.validatePipeline(updatedPipeline);
var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline);
updateInfos.add(updateInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

return updateInfos;
}

private PipelineUpdateInfo makeUpdateInfo(PipelineModificationMessage modificationMessage,
Pipeline pipeline) {
var updateInfo = new PipelineUpdateInfo();
updateInfo.setPipelineId(pipeline.getPipelineId());
updateInfo.setPipelineName(pipeline.getName());
updateInfo.setCanAutoMigrate(canAutoMigrate(modificationMessage));
updateInfo.setValidationInfos(extractModificationWarnings(pipeline, modificationMessage));
return updateInfo;
}

private boolean canAutoMigrate(PipelineModificationMessage modificationMessage) {
return modificationMessage
.getPipelineModifications()
.stream()
.allMatch(m -> m.isPipelineElementValid() && m.getValidationInfos().isEmpty());
}

private List<String> toNotification(PipelineUpdateInfo updateInfo) {
var notifications = new ArrayList<String>();
updateInfo.getValidationInfos().keySet().forEach((k) -> {
var msg = updateInfo
.getValidationInfos()
.get(k)
.stream()
.map(PipelineElementValidationInfo::getMessage)
.toList()
.toString();
notifications.add(String.format("Adapter modification: %s: %s", k, msg));
});
return notifications;
}

private Map<String, List<PipelineElementValidationInfo>> extractModificationWarnings(
Pipeline pipeline,
PipelineModificationMessage modificationMessage) {
var infos = new HashMap<String, List<PipelineElementValidationInfo>>();
modificationMessage
.getPipelineModifications()
.stream()
.filter(v -> !v.getValidationInfos().isEmpty())
.forEach(m -> infos.put(getPipelineElementName(pipeline, m.getElementId()), m.getValidationInfos()));

return infos;
}

private String getPipelineElementName(Pipeline pipeline,
String elementId) {
return Stream
.concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
.filter(p -> p.getElementId().equals(elementId))
.findFirst()
.map(NamedStreamPipesEntity::getName)
.orElse(elementId);
}

private Pipeline applyUpdatedDataStream(Pipeline originalPipeline,
AdapterDescription updatedAdapter) {
var updatedStreams = originalPipeline
.getStreams()
.stream()
.peek(s -> {
if (s.getElementId().equals(updatedAdapter.getCorrespondingDataStreamElementId())) {
s.setEventSchema(updatedAdapter.getEventSchema());
}
})
.toList();

originalPipeline.setStreams(updatedStreams);

return originalPipeline;
}

private void updateDataSource(AdapterDescription ad) {
// get data source
SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());

SourcesManagement.updateDataStream(ad, dataStream);

// Update data source in database
this.dataStreamResourceManager.update(dataStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.streampipes.serializers.json.JacksonSerializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpStatus;
import org.apache.http.client.fluent.Response;
import org.apache.http.util.EntityUtils;
Expand All @@ -41,21 +42,21 @@

public class GuessManagement {

private static Logger logger = LoggerFactory.getLogger(GuessManagement.class);
private WorkerUrlProvider workerUrlProvider;
private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
private final WorkerUrlProvider workerUrlProvider;
private final ObjectMapper objectMapper;

public GuessManagement() {
this.workerUrlProvider = new WorkerUrlProvider();
this.objectMapper = JacksonSerializer.getObjectMapper();
}

public GuessSchema guessSchema(AdapterDescription adapterDescription)
throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException {
var workerUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
workerUrl = workerUrl + WorkerPaths.getGuessSchemaPath();

var objectMapper = JacksonSerializer.getObjectMapper();
var workerUrl = getWorkerUrl(adapterDescription.getAppId());
var description = objectMapper.writeValueAsString(adapterDescription);
logger.info("Guess schema at: " + workerUrl);

LOG.info("Guess schema at: " + workerUrl);
Response requestResponse = ExtensionServiceExecutions
.extServicePostRequest(workerUrl, description)
.execute();
Expand All @@ -71,6 +72,11 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription)
}
}

private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId);
return baseUrl + WorkerPaths.getGuessSchemaPath();
}

public String performAdapterEventPreview(AdapterEventPreview previewRequest) throws JsonProcessingException {
return new AdapterEventPreviewPipeline(previewRequest).makePreview();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.utils.Datatypes;

import java.net.URI;
import java.util.List;
import java.util.Objects;

import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findPrimitiveProperty;
import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findProperty;
Expand Down Expand Up @@ -106,7 +107,21 @@ public void visit(AddTimestampRuleDescription rule) {

@Override
public void visit(AddValueTransformationRuleDescription rule) {
this.properties.add(EpProperties.stringEp(Labels.empty(), rule.getRuntimeKey(), ""));
var property = new EventPropertyPrimitive();
property.setElementId("http://eventProperty.de/staticValue/" + rule.getStaticValue());
property.setRuntimeName(rule.getRuntimeKey());
property.setRuntimeType(rule.getDatatype());
property.setLabel(rule.getLabel());
property.setDescription(rule.getDescription());
property.setPropertyScope(rule.getPropertyScope().name());

if (Objects.nonNull(rule.getSemanticType())) {
property.setDomainProperties(List.of(URI.create(rule.getSemanticType())));
}
if (Objects.nonNull(rule.getMeasurementUnit())) {
property.setMeasurementUnit(URI.create(rule.getMeasurementUnit()));
}
this.properties.add(property);
}

@Override
Expand All @@ -117,20 +132,30 @@ public void visit(ChangeDatatypeTransformationRuleDescription rule) {

@Override
public void visit(CorrectionValueTransformationRuleDescription rule) {
// does not affect schema
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
var metadata = property.getAdditionalMetadata();
metadata.put("operator", rule.getOperator());
metadata.put("correctionValue", rule.getCorrectionValue());
}

@Override
public void visit(TimestampTranfsformationRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
property.setDomainProperties(List.of(URI.create("http://schema.org/DateTime")));
property.setRuntimeType(Datatypes.Long.toString());
var metadata = property.getAdditionalMetadata();
metadata.put("mode", rule.getMode());
metadata.put("formatString", rule.getFormatString());
metadata.put("multiplier", rule.getMultiplier());
}

@Override
public void visit(UnitTransformRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
property.setMeasurementUnit(URI.create(rule.getToUnitRessourceURL()));
var metadata = property.getAdditionalMetadata();
metadata.put("fromMeasurementUnit", rule.getFromUnitRessourceURL());
metadata.put("toMeasurementUnit", rule.getToUnitRessourceURL());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void visit(AddTimestampRuleDescription ruleDesc) {
public void visit(AddValueTransformationRuleDescription ruleDesc) {
rules.add(new AddValueTransformationRule(
ruleDesc.getRuntimeKey(),
ruleDesc.getStaticValue()));
ruleDesc.getStaticValue(),
ruleDesc.getDatatype()));
}

@Override
Expand Down
Loading

0 comments on commit f9f1bd5

Please sign in to comment.