Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#1960): Allow modification of adapters with pipelines #1979

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
943e406
feat(#1960): Allow modification of adapters with pipelines
dominikriemer Sep 29, 2023
bf37e88
Add license header
dominikriemer Sep 29, 2023
7aa0bfd
Merge branch 'dev' into 1960-better-support-modification-of-existing-…
dominikriemer Oct 12, 2023
d00ca09
Add data model for schema diff management
dominikriemer Oct 12, 2023
c97b683
Improve schema editor and static value handling
dominikriemer Oct 13, 2023
16dc365
Fix schema preview of nested fields
dominikriemer Oct 15, 2023
08c328b
Improve schema updates
dominikriemer Oct 15, 2023
ead5f69
Improve download dialog
dominikriemer Oct 16, 2023
e62d805
Remove unused stylesheets
dominikriemer Oct 16, 2023
1ea8613
Redirect to adapter overview after adapter creation
dominikriemer Oct 17, 2023
9ea80a3
Merge remote-tracking branch 'origin/dev' into 1960-better-support-mo…
bossenti Oct 17, 2023
44ea8e9
Improve modification of measurement units
dominikriemer Oct 17, 2023
cdbc83a
Merge branch '1960-better-support-modification-of-existing-adapters' …
dominikriemer Oct 17, 2023
26248d1
Fix update of measurement units
dominikriemer Oct 17, 2023
0d8d8b0
Properly display labels and runtime type changes in schema editor
dominikriemer Oct 17, 2023
329bf44
Fix modification of conversion rules
dominikriemer Oct 17, 2023
21675dc
Show selected timestamp transformation rules
dominikriemer Oct 17, 2023
4b84368
Add warning hint and value validation to correction value rule
dominikriemer Oct 17, 2023
5f54f9a
Fix NullPointer when adding static rules
dominikriemer Oct 23, 2023
4a3d58e
Fix unit transformation
dominikriemer Oct 23, 2023
6a4f1b1
Remove unused component
dominikriemer Oct 23, 2023
b3b31eb
Add metadata object to properties
dominikriemer Oct 23, 2023
1b88fbf
modified tests (#2079)
Marcelfrueh Oct 23, 2023
71ea3e9
Merge branch 'dev' into 1960-better-support-modification-of-existing-…
dominikriemer Oct 24, 2023
00ae926
Update typescript model
dominikriemer Oct 24, 2023
93231f4
Remove obsolete service
dominikriemer Oct 24, 2023
a65396c
Add logging message
dominikriemer Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,7 @@ public String addAdapter(AdapterDescription ad,
return ad.getElementId();
}

public void updateAdapter(AdapterDescription ad,
String principalSid)
throws AdapterException {
// update adapter in database
this.adapterResourceManager.encryptAndUpdate(ad);

// update data source in database
this.updateDataSource(ad);
}

public AdapterDescription getAdapter(String elementId) throws AdapterException {
List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();
Expand Down Expand Up @@ -180,16 +172,6 @@ public void startStreamAdapter(String elementId) throws AdapterException {
}
}

private void updateDataSource(AdapterDescription ad) {
// get data source
SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());

SourcesManagement.updateDataStream(ad, dataStream);

// Update data source in database
this.dataStreamResourceManager.update(dataStream);
}

private void installDataSource(SpDataStream stream,
String principalSid,
boolean publicElement) throws AdapterException {
Expand All @@ -204,5 +186,4 @@ private void installDataSource(SpDataStream stream,
private IAdapterStorage getAdapterInstanceStorage() {
return new AdapterInstanceStorageImpl();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.PipelineUpdateInfo;
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
import org.apache.streampipes.resource.management.AdapterResourceManager;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class AdapterUpdateManagement {

private static final Logger LOG = LoggerFactory.getLogger(AdapterUpdateManagement.class);

private final AdapterMasterManagement adapterMasterManagement;
private final AdapterResourceManager adapterResourceManager;
private final DataStreamResourceManager dataStreamResourceManager;

public AdapterUpdateManagement(AdapterMasterManagement adapterMasterManagement) {
this.adapterMasterManagement = adapterMasterManagement;
this.adapterResourceManager = new SpResourceManager().manageAdapters();
this.dataStreamResourceManager = new SpResourceManager().manageDataStreams();
}

public void updateAdapter(AdapterDescription ad)
throws AdapterException {
// update adapter in database
this.adapterResourceManager.encryptAndUpdate(ad);
boolean shouldRestart = ad.isRunning();

if (ad.isRunning()) {
this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
}

// update data source in database
this.updateDataSource(ad);

// update pipelines
var affectedPipelines = PipelineManager.getPipelinesContainingElements(ad.getCorrespondingDataStreamElementId());

affectedPipelines.forEach(p -> {
var shouldRestartPipeline = p.isRunning();
if (shouldRestartPipeline) {
Operations.stopPipeline(p, true);
}
var storedPipeline = PipelineManager.getPipeline(p.getPipelineId());
var pipeline = applyUpdatedDataStream(storedPipeline, ad);
try {
var modificationMessage = Operations.validatePipeline(pipeline);
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
if (!canAutoMigrate) {
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
modifiedPipeline.setPipelineNotifications(toNotification(updateInfo));
modifiedPipeline.setValid(false);
}
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(modifiedPipeline);
if (shouldRestartPipeline && canAutoMigrate) {
Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId()));
}
} catch (Exception e) {
LOG.error("Could not update pipeline {}", pipeline.getName(), e);
}
});

if (shouldRestart) {
this.adapterMasterManagement.startStreamAdapter(ad.getElementId());
}
}

public List<PipelineUpdateInfo> checkPipelineMigrations(AdapterDescription adapterDescription) {
var affectedPipelines = PipelineManager
.getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId());
var updateInfos = new ArrayList<PipelineUpdateInfo>();

affectedPipelines.forEach(pipeline -> {
var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription);
try {
var modificationMessage = Operations.validatePipeline(updatedPipeline);
var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline);
updateInfos.add(updateInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

return updateInfos;
}

private PipelineUpdateInfo makeUpdateInfo(PipelineModificationMessage modificationMessage,
Pipeline pipeline) {
var updateInfo = new PipelineUpdateInfo();
updateInfo.setPipelineId(pipeline.getPipelineId());
updateInfo.setPipelineName(pipeline.getName());
updateInfo.setCanAutoMigrate(canAutoMigrate(modificationMessage));
updateInfo.setValidationInfos(extractModificationWarnings(pipeline, modificationMessage));
return updateInfo;
}

private boolean canAutoMigrate(PipelineModificationMessage modificationMessage) {
return modificationMessage
.getPipelineModifications()
.stream()
.allMatch(m -> m.isPipelineElementValid() && m.getValidationInfos().isEmpty());
}

private List<String> toNotification(PipelineUpdateInfo updateInfo) {
var notifications = new ArrayList<String>();
updateInfo.getValidationInfos().keySet().forEach((k) -> {
var msg = updateInfo
.getValidationInfos()
.get(k)
.stream()
.map(PipelineElementValidationInfo::getMessage)
.toList()
.toString();
notifications.add(String.format("Adapter modification: %s: %s", k, msg));
});
return notifications;
}

private Map<String, List<PipelineElementValidationInfo>> extractModificationWarnings(
Pipeline pipeline,
PipelineModificationMessage modificationMessage) {
var infos = new HashMap<String, List<PipelineElementValidationInfo>>();
modificationMessage
.getPipelineModifications()
.stream()
.filter(v -> !v.getValidationInfos().isEmpty())
.forEach(m -> infos.put(getPipelineElementName(pipeline, m.getElementId()), m.getValidationInfos()));

return infos;
}

private String getPipelineElementName(Pipeline pipeline,
String elementId) {
return Stream
.concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
.filter(p -> p.getElementId().equals(elementId))
.findFirst()
.map(NamedStreamPipesEntity::getName)
.orElse(elementId);
}

private Pipeline applyUpdatedDataStream(Pipeline originalPipeline,
AdapterDescription updatedAdapter) {
var updatedStreams = originalPipeline
.getStreams()
.stream()
.peek(s -> {
if (s.getElementId().equals(updatedAdapter.getCorrespondingDataStreamElementId())) {
s.setEventSchema(updatedAdapter.getEventSchema());
}
})
.toList();

originalPipeline.setStreams(updatedStreams);

return originalPipeline;
}

private void updateDataSource(AdapterDescription ad) {
// get data source
SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());

SourcesManagement.updateDataStream(ad, dataStream);

// Update data source in database
this.dataStreamResourceManager.update(dataStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.streampipes.serializers.json.JacksonSerializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpStatus;
import org.apache.http.client.fluent.Response;
import org.apache.http.util.EntityUtils;
Expand All @@ -41,21 +42,21 @@

public class GuessManagement {

private static Logger logger = LoggerFactory.getLogger(GuessManagement.class);
private WorkerUrlProvider workerUrlProvider;
private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
private final WorkerUrlProvider workerUrlProvider;
private final ObjectMapper objectMapper;

public GuessManagement() {
this.workerUrlProvider = new WorkerUrlProvider();
this.objectMapper = JacksonSerializer.getObjectMapper();
}

public GuessSchema guessSchema(AdapterDescription adapterDescription)
throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException {
var workerUrl = workerUrlProvider.getWorkerBaseUrl(adapterDescription.getAppId());
workerUrl = workerUrl + WorkerPaths.getGuessSchemaPath();

var objectMapper = JacksonSerializer.getObjectMapper();
var workerUrl = getWorkerUrl(adapterDescription.getAppId());
var description = objectMapper.writeValueAsString(adapterDescription);
logger.info("Guess schema at: " + workerUrl);

LOG.info("Guess schema at: " + workerUrl);
Response requestResponse = ExtensionServiceExecutions
.extServicePostRequest(workerUrl, description)
.execute();
Expand All @@ -71,6 +72,11 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription)
}
}

private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId);
return baseUrl + WorkerPaths.getGuessSchemaPath();
}

public String performAdapterEventPreview(AdapterEventPreview previewRequest) throws JsonProcessingException {
return new AdapterEventPreviewPipeline(previewRequest).makePreview();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.util.Cloner;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.utils.Datatypes;

import java.net.URI;
import java.util.List;
import java.util.Objects;

import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findPrimitiveProperty;
import static org.apache.streampipes.connect.shared.preprocessing.utils.ConversionUtils.findProperty;
Expand Down Expand Up @@ -106,7 +107,21 @@ public void visit(AddTimestampRuleDescription rule) {

@Override
public void visit(AddValueTransformationRuleDescription rule) {
this.properties.add(EpProperties.stringEp(Labels.empty(), rule.getRuntimeKey(), ""));
var property = new EventPropertyPrimitive();
property.setElementId("http://eventProperty.de/staticValue/" + rule.getStaticValue());
property.setRuntimeName(rule.getRuntimeKey());
property.setRuntimeType(rule.getDatatype());
property.setLabel(rule.getLabel());
property.setDescription(rule.getDescription());
property.setPropertyScope(rule.getPropertyScope().name());

if (Objects.nonNull(rule.getSemanticType())) {
property.setDomainProperties(List.of(URI.create(rule.getSemanticType())));
}
if (Objects.nonNull(rule.getMeasurementUnit())) {
property.setMeasurementUnit(URI.create(rule.getMeasurementUnit()));
}
this.properties.add(property);
}

@Override
Expand All @@ -117,20 +132,30 @@ public void visit(ChangeDatatypeTransformationRuleDescription rule) {

@Override
public void visit(CorrectionValueTransformationRuleDescription rule) {
// does not affect schema
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
var metadata = property.getAdditionalMetadata();
metadata.put("operator", rule.getOperator());
metadata.put("correctionValue", rule.getCorrectionValue());
}

@Override
public void visit(TimestampTranfsformationRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
property.setDomainProperties(List.of(URI.create("http://schema.org/DateTime")));
property.setRuntimeType(Datatypes.Long.toString());
var metadata = property.getAdditionalMetadata();
metadata.put("mode", rule.getMode());
metadata.put("formatString", rule.getFormatString());
metadata.put("multiplier", rule.getMultiplier());
}

@Override
public void visit(UnitTransformRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
property.setMeasurementUnit(URI.create(rule.getToUnitRessourceURL()));
var metadata = property.getAdditionalMetadata();
metadata.put("fromMeasurementUnit", rule.getFromUnitRessourceURL());
metadata.put("toMeasurementUnit", rule.getToUnitRessourceURL());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void visit(AddTimestampRuleDescription ruleDesc) {
public void visit(AddValueTransformationRuleDescription ruleDesc) {
rules.add(new AddValueTransformationRule(
ruleDesc.getRuntimeKey(),
ruleDesc.getStaticValue()));
ruleDesc.getStaticValue(),
ruleDesc.getDatatype()));
}

@Override
Expand Down
Loading
Loading