Skip to content

Commit

Permalink
chore: Code cleanup of pipeline management (#3126)
Browse files Browse the repository at this point in the history
* chore: Code cleanup of pipeline management

* Fix checkstyle

* Fix checkstyle

* Fix pipeline update

* Fix bug
  • Loading branch information
dominikriemer authored Aug 11, 2024
1 parent 225a213 commit 3ac47b9
Show file tree
Hide file tree
Showing 34 changed files with 68 additions and 794 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
Expand Down Expand Up @@ -77,12 +77,12 @@ public void updateAdapter(AdapterDescription ad)
affectedPipelines.forEach(p -> {
var shouldRestartPipeline = p.isRunning();
if (shouldRestartPipeline) {
Operations.stopPipeline(p, true);
new PipelineExecutor(p).stopPipeline(true);
}
var storedPipeline = PipelineManager.getPipeline(p.getPipelineId());
var pipeline = applyUpdatedDataStream(storedPipeline, ad);
try {
var modificationMessage = Operations.validatePipeline(pipeline);
var modificationMessage = new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
Expand All @@ -93,7 +93,7 @@ public void updateAdapter(AdapterDescription ad)
}
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(modifiedPipeline);
if (shouldRestartPipeline && canAutoMigrate) {
Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId()));
new PipelineExecutor(PipelineManager.getPipeline(p.getPipelineId())).startPipeline();
}
} catch (Exception e) {
LOG.error("Could not update pipeline {}", pipeline.getName(), e);
Expand All @@ -113,7 +113,7 @@ public List<PipelineUpdateInfo> checkPipelineMigrations(AdapterDescription adapt
affectedPipelines.forEach(pipeline -> {
var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription);
try {
var modificationMessage = Operations.validatePipeline(updatedPipeline);
var modificationMessage = new PipelineVerificationHandlerV2(updatedPipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline);
updateInfos.add(updateInfo);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.manager.data;

import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;

import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -30,10 +29,6 @@ public static List<SpDataStream> findStreams(PipelineGraph pipelineGraph) {
return find(pipelineGraph, SpDataStream.class);
}

public static List<InvocableStreamPipesEntity> findInvocableElements(PipelineGraph pipelineGraph) {
return find(pipelineGraph, InvocableStreamPipesEntity.class);
}

private static <T> List<T> find(PipelineGraph pipelineGraph, Class<T> clazz) {
return pipelineGraph
.vertexSet()
Expand Down
Loading

0 comments on commit 3ac47b9

Please sign in to comment.