From 3ac47b9813db3d825812d27720c8a3ba78c872e7 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Sun, 11 Aug 2024 21:53:25 +0200 Subject: [PATCH] chore: Code cleanup of pipeline management (#3126) * chore: Code cleanup of pipeline management * Fix checkstyle * Fix checkstyle * Fix pipeline update * Fix bug --- .../exceptions/NoMatchingFormatException.java | 28 ---- .../NoMatchingJsonSchemaException.java | 22 --- .../NoMatchingProtocolException.java | 23 --- .../exceptions/NoMatchingSchemaException.java | 28 ---- .../exceptions/NoSepaInPipelineException.java | 28 ---- .../NoValidConnectionException.java | 23 --- .../exceptions/NoValidSecTypeException.java | 23 --- .../exceptions/NoValidSepTypeException.java | 23 --- .../NoValidSepaStructureException.java | 23 --- .../exceptions/NoValidSepaTypeException.java | 23 --- .../RemoteServerNotAccessibleException.java | 47 ------- .../exceptions/TooManyEdgesException.java | 23 --- .../management/AdapterUpdateManagement.java | 10 +- .../manager/data/PipelineGraphHelpers.java | 5 - .../manager/execution/PipelineExecutor.java | 7 +- .../extensions/ExtensionItemInstaller.java | 6 +- .../manager/matching/ConnectionValidator.java | 76 ---------- .../matching/v2/utils/MatchingUtils.java | 2 +- .../migration/AbstractMigrationManager.java | 4 +- .../PipelineElementMigrationManager.java | 4 +- .../ExtensionsServiceLogExecutor.java | 6 - .../manager/operations/Operations.java | 132 ------------------ .../manager/pipeline/PipelineManager.java | 23 ++- .../PipelineElementTemplateVisitor.java | 4 - .../template/PipelineTemplateGenerator.java | 64 +-------- .../PipelineTemplateInvocationHandler.java | 21 +-- .../util/PipelineVerificationUtils.java | 56 -------- .../streampipes/manager/util/TreeUtils.java | 68 --------- .../rest/impl/ContainerProvidedOptions.java | 4 +- .../streampipes/rest/impl/DataStream.java | 4 +- .../rest/impl/PipelineResource.java | 26 +--- .../rest/impl/PipelineTemplate.java | 18 ++- .../service/core/PostStartupTask.java | 4 +- .../core/StreamPipesCoreApplication.java | 4 +- 34 files changed, 68 insertions(+), 794 deletions(-) delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java delete mode 100644 streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java deleted file mode 100644 index 75e024b3d3..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoMatchingFormatException extends Exception { - - /** - * - */ - private static final long serialVersionUID = -3381149054836186412L; - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java deleted file mode 100644 index b79ead019b..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoMatchingJsonSchemaException extends Exception { -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java deleted file mode 100644 index b72747ebea..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoMatchingProtocolException extends Exception { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java deleted file mode 100644 index 05c1a961f3..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoMatchingSchemaException extends Exception { - - /** - * - */ - private static final long serialVersionUID = 1L; - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java deleted file mode 100644 index e844968d3e..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoSepaInPipelineException extends Exception { - - /** - * - */ - private static final long serialVersionUID = 1L; - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java deleted file mode 100644 index b2fb57c4fb..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoValidConnectionException extends Exception { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java deleted file mode 100644 index 8bffbbf8e0..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoValidSecTypeException { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java deleted file mode 100644 index f20417ee10..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoValidSepTypeException { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java deleted file mode 100644 index 7a9d7c5dd2..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoValidSepaStructureException { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java deleted file mode 100644 index d3712af38c..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class NoValidSepaTypeException { - -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java deleted file mode 100644 index 93c587f450..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class RemoteServerNotAccessibleException extends Exception { - - /** - * - */ - private static final long serialVersionUID = 1L; - - private String serverUrl; - - public RemoteServerNotAccessibleException(String message, String serverUrl) { - super(message); - this.serverUrl = serverUrl; - } - - public RemoteServerNotAccessibleException(RemoteServerNotAccessibleException e) { - super(e.getMessage()); - this.serverUrl = e.getServerUrl(); - } - - public String getServerUrl() { - return serverUrl; - } - - public void setServerUrl(String serverUrl) { - this.serverUrl = serverUrl; - } -} diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java deleted file mode 100644 index 5b406f0b1a..0000000000 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.commons.exceptions; - -public class TooManyEdgesException extends Exception { - -} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java index 43af954fa2..9aae62888d 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java @@ -19,8 +19,8 @@ package org.apache.streampipes.connect.management.management; import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; -import org.apache.streampipes.manager.operations.Operations; import org.apache.streampipes.manager.pipeline.PipelineManager; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.base.NamedStreamPipesEntity; @@ -77,12 +77,12 @@ public void updateAdapter(AdapterDescription ad) affectedPipelines.forEach(p -> { var shouldRestartPipeline = p.isRunning(); if (shouldRestartPipeline) { - Operations.stopPipeline(p, true); + new PipelineExecutor(p).stopPipeline(true); } var storedPipeline = PipelineManager.getPipeline(p.getPipelineId()); var pipeline = applyUpdatedDataStream(storedPipeline, ad); try { - var modificationMessage = Operations.validatePipeline(pipeline); + var modificationMessage = new PipelineVerificationHandlerV2(pipeline).verifyPipeline(); var updateInfo = makeUpdateInfo(modificationMessage, pipeline); var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline(); var canAutoMigrate = canAutoMigrate(modificationMessage); @@ -93,7 +93,7 @@ public void updateAdapter(AdapterDescription ad) } StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(modifiedPipeline); if (shouldRestartPipeline && canAutoMigrate) { - Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId())); + new PipelineExecutor(PipelineManager.getPipeline(p.getPipelineId())).startPipeline(); } } catch (Exception e) { LOG.error("Could not update pipeline {}", pipeline.getName(), e); @@ -113,7 +113,7 @@ public List checkPipelineMigrations(AdapterDescription adapt affectedPipelines.forEach(pipeline -> { var updatedPipeline = applyUpdatedDataStream(pipeline, adapterDescription); try { - var modificationMessage = Operations.validatePipeline(updatedPipeline); + var modificationMessage = new PipelineVerificationHandlerV2(updatedPipeline).verifyPipeline(); var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline); updateInfos.add(updateInfo); } catch (Exception e) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java index 0291c92030..e5693317c0 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java @@ -19,7 +19,6 @@ package org.apache.streampipes.manager.data; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import java.util.List; import java.util.stream.Collectors; @@ -30,10 +29,6 @@ public static List findStreams(PipelineGraph pipelineGraph) { return find(pipelineGraph, SpDataStream.class); } - public static List findInvocableElements(PipelineGraph pipelineGraph) { - return find(pipelineGraph, InvocableStreamPipesEntity.class); - } - private static List find(PipelineGraph pipelineGraph, Class clazz) { return pipelineGraph .vertexSet() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java index bc14e46f58..1371c8b7b3 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java @@ -27,19 +27,16 @@ public class PipelineExecutor { private final Pipeline pipeline; - private final boolean forceStop; - public PipelineExecutor(Pipeline pipeline, - boolean forceStop) { + public PipelineExecutor(Pipeline pipeline) { this.pipeline = pipeline; - this.forceStop = forceStop; } public PipelineOperationStatus startPipeline() { return executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline)); } - public PipelineOperationStatus stopPipeline() { + public PipelineOperationStatus stopPipeline(boolean forceStop) { return executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline, forceStop)); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java index bfa691964d..16fc95a029 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException; import org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.verification.extractor.TypeExtractor; import org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest; import org.apache.streampipes.model.message.Message; @@ -38,13 +38,13 @@ public Message installExtension(ExtensionItemInstallationRequest req, String principalSid) throws IOException, SepaParseException { var descriptionUrl = getDescriptionUrl(req); var description = fetchDescription(descriptionUrl); - return Operations.verifyAndAddElement(description, principalSid, req.publicElement()); + return new TypeExtractor(description).getTypeVerifier().verifyAndAdd(principalSid, req.publicElement()); } public Message updateExtension(ExtensionItemInstallationRequest req) throws IOException, SepaParseException { var descriptionUrl = getDescriptionUrl(req); var description = fetchDescription(descriptionUrl); - return Operations.verifyAndUpdateElement(description); + return new TypeExtractor(description).getTypeVerifier().verifyAndUpdate(); } private String getDescriptionUrl(ExtensionItemInstallationRequest req) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java deleted file mode 100644 index 4b3ded7949..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.manager.matching; - -import org.apache.streampipes.manager.matching.v2.ElementVerification; -import org.apache.streampipes.manager.util.TreeUtils; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.client.exception.InvalidConnectionException; -import org.apache.streampipes.model.graph.DataProcessorInvocation; -import org.apache.streampipes.model.pipeline.Pipeline; - -import java.util.List; - -public class ConnectionValidator { - - private final Pipeline pipeline; - private final List invocationGraphs; - private final InvocableStreamPipesEntity rootPipelineElement; - private final ElementVerification verifier; - - public ConnectionValidator(Pipeline pipeline, - List invocationGraphs, - InvocableStreamPipesEntity rootPipelineElement) { - this.pipeline = pipeline; - this.invocationGraphs = invocationGraphs; - this.rootPipelineElement = rootPipelineElement; - this.verifier = new ElementVerification(); - } - - public List validateConnection() throws InvalidConnectionException { - boolean verified = true; - InvocableStreamPipesEntity rightElement = rootPipelineElement; - List connectedTo = rootPipelineElement.getConnectedTo(); - - for (String domId : connectedTo) { - NamedStreamPipesEntity element = TreeUtils.findSEPAElement(domId, pipeline.getSepas(), pipeline.getStreams()); - if (element instanceof SpDataStream) { - SpDataStream leftSpDataStream = (SpDataStream) element; - if (!(verifier.verify(leftSpDataStream, rightElement))) { - verified = false; - } - } else { - DataProcessorInvocation ancestor = findInvocationGraph(invocationGraphs, element.getDom()); - if (!(verifier.verify(ancestor, rightElement))) { - verified = false; - } - } - } - if (!verified) { - throw new InvalidConnectionException(verifier.getErrorLog()); - } - - return invocationGraphs; - } - - private DataProcessorInvocation findInvocationGraph(List graphs, String domId) { - return (DataProcessorInvocation) TreeUtils.findByDomId(domId, graphs); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java index c0f5c8b432..f20a8e7727 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java @@ -21,7 +21,7 @@ public class MatchingUtils { public static boolean nullCheck(Object offer, Object requirement) { - return ((offer == null) && (requirement == null)) || (requirement == null); + return (requirement == null); } public static boolean nullCheckRightNullDisallowed(Object offer, Object requirement) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java index 5493bbf69d..a27b19b400 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java @@ -20,7 +20,7 @@ import org.apache.streampipes.commons.exceptions.SepaParseException; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.verification.extractor.TypeExtractor; import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; import org.apache.streampipes.model.extensions.migration.MigrationRequest; import org.apache.streampipes.model.message.Notification; @@ -139,7 +139,7 @@ protected void performUpdate(String requestUrl) { .execute() .returnContent() .asString(); - var updateResult = Operations.verifyAndUpdateElement(entityPayload); + var updateResult = new TypeExtractor(entityPayload).getTypeVerifier().verifyAndUpdate(); if (!updateResult.isSuccess()) { LOG.error( "Updating the pipeline element description failed: {}", diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java index 40ff88acf3..5ac2ba0a58 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java @@ -176,8 +176,8 @@ protected void handleFailedMigrations(Pipeline pipeline, List public void stopPipeline(Pipeline pipeline) { - var pipelineExecutor = new PipelineExecutor(pipeline, true); - var pipelineStopResult = pipelineExecutor.stopPipeline(); + var pipelineExecutor = new PipelineExecutor(pipeline); + var pipelineStopResult = pipelineExecutor.stopPipeline(true); if (pipelineStopResult.isSuccess()) { LOG.info("Pipeline successfully stopped."); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java index 8d76e31252..9f2f430b20 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java @@ -22,12 +22,10 @@ import org.apache.streampipes.commons.constants.InstanceIdExtractor; import org.apache.streampipes.commons.prometheus.pipelines.PipelineFlowStats; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; -import org.apache.streampipes.model.client.user.Principal; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo; -import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.serializers.json.JacksonSerializer; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; @@ -97,10 +95,6 @@ private Request makeRequest(String serviceEndpointUrl) { return ExtensionServiceExecutions.extServiceGetRequest(makeLogUrl(serviceEndpointUrl)); } - private Principal getServiceAdmin() { - return new SpResourceManager().manageUsers().getServiceAdmin(); - } - private List getActiveExtensionsEndpoints() { return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints( DefaultSpServiceTypes.EXT, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java deleted file mode 100644 index f2addd2b20..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.operations; - -import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException; -import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.execution.PipelineExecutor; -import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; -import org.apache.streampipes.manager.recommender.ElementRecommender; -import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler; -import org.apache.streampipes.manager.storage.PipelineStorageService; -import org.apache.streampipes.manager.template.PipelineTemplateGenerator; -import org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator; -import org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler; -import org.apache.streampipes.manager.topic.WildcardTopicGenerator; -import org.apache.streampipes.manager.verification.extractor.TypeExtractor; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.message.Message; -import org.apache.streampipes.model.message.PipelineModificationMessage; -import org.apache.streampipes.model.pipeline.Pipeline; -import org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage; -import org.apache.streampipes.model.pipeline.PipelineOperationStatus; -import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; -import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; -import org.apache.streampipes.model.template.PipelineTemplateDescription; -import org.apache.streampipes.model.template.PipelineTemplateInvocation; -import org.apache.streampipes.storage.management.StorageDispatcher; - -import java.util.ArrayList; -import java.util.List; - - -/** - * class that provides several (partial) pipeline verification methods - */ - -public class Operations { - - /** - * @param pipeline the pipeline to validate - * @return PipelineModificationMessage a message containing desired pipeline modifications - */ - public static PipelineModificationMessage validatePipeline(Pipeline pipeline) throws Exception { - return new PipelineVerificationHandlerV2(pipeline).verifyPipeline(); - } - - public static Message verifyAndAddElement(String graphData, - String principalSid, - boolean publicElement) throws SepaParseException { - return new TypeExtractor(graphData).getTypeVerifier().verifyAndAdd(principalSid, publicElement); - } - - public static Message verifyAndUpdateElement(String graphData) throws SepaParseException { - return new TypeExtractor(graphData).getTypeVerifier().verifyAndUpdate(); - } - - public static PipelineElementRecommendationMessage findRecommendedElements(Pipeline partialPipeline, - String baseRecId) - throws NoSuitableSepasAvailableException { - return new ElementRecommender(partialPipeline, baseRecId).findRecommendedElements(); - } - - public static void storePipeline(Pipeline pipeline) { - new PipelineStorageService(pipeline).addPipeline(); - } - - public static void updatePipeline(Pipeline pipeline) { - new PipelineStorageService(pipeline).updatePipeline(); - } - - public static PipelineOperationStatus startPipeline(Pipeline pipeline) { - return new PipelineExecutor(pipeline, false).startPipeline(); - } - - public static List stopAllPipelines(boolean forceStop) { - List status = new ArrayList<>(); - List pipelines = - StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll(); - - pipelines.forEach(p -> { - if (p.isRunning()) { - status.add(Operations.stopPipeline(p, forceStop)); - } - }); - return status; - } - - public static PipelineOperationStatus stopPipeline(Pipeline pipeline, - boolean forceStop) { - return new PipelineExecutor(pipeline, forceStop).stopPipeline(); - } - - public static SpDataStream updateActualTopic(SpDataStream stream) { - return new WildcardTopicGenerator(stream).computeActualTopic(); - } - - public static RuntimeOptionsResponse fetchRemoteOptions(RuntimeOptionsRequest request) { - return new ContainerProvidedOptionsHandler().fetchRemoteOptions(request); - } - - public static List getAllPipelineTemplates() { - return new PipelineTemplateGenerator().getAllPipelineTemplates(); - } - - public static PipelineOperationStatus handlePipelineTemplateInvocation( - String userSid, - PipelineTemplateInvocation pipelineTemplateInvocation) { - return new PipelineTemplateInvocationHandler(userSid, pipelineTemplateInvocation).handlePipelineInvocation(); - } - - public static PipelineTemplateInvocation getPipelineInvocationTemplate( - SpDataStream dataStream, - PipelineTemplateDescription pipelineTemplateDescription) { - return new PipelineTemplateInvocationGenerator(dataStream, pipelineTemplateDescription).generateInvocation(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java index e57675ecba..ca477d73db 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java @@ -19,8 +19,9 @@ package org.apache.streampipes.manager.pipeline; import org.apache.streampipes.commons.random.UUIDGenerator; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.manager.permission.PermissionManager; +import org.apache.streampipes.manager.storage.PipelineStorageService; import org.apache.streampipes.model.base.NamedStreamPipesEntity; import org.apache.streampipes.model.client.user.Permission; import org.apache.streampipes.model.pipeline.Pipeline; @@ -30,6 +31,7 @@ import org.apache.streampipes.storage.api.IPipelineStorage; import org.apache.streampipes.storage.management.StorageDispatcher; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Objects; @@ -71,7 +73,7 @@ public static String addPipeline(String principalSid, ? UUIDGenerator.generateUuid() : pipeline.getPipelineId(); preparePipelineBasics(principalSid, pipeline, pipelineId); - Operations.storePipeline(pipeline); + new PipelineStorageService(pipeline).addPipeline(); Permission permission = new PermissionManager().makePermission(pipeline, principalSid); getPermissionStorage().persist(permission); @@ -88,7 +90,7 @@ public static String addPipeline(String principalSid, */ public static PipelineOperationStatus startPipeline(String pipelineId) { Pipeline pipeline = getPipeline(pipelineId); - return Operations.startPipeline(pipeline); + return new PipelineExecutor(pipeline).startPipeline(); } /** @@ -103,7 +105,7 @@ public static PipelineOperationStatus stopPipeline(String pipelineId, boolean forceStop) { Pipeline pipeline = getPipeline(pipelineId); - return Operations.stopPipeline(pipeline, forceStop); + return new PipelineExecutor(pipeline).stopPipeline(forceStop); } /** @@ -119,6 +121,19 @@ public static void deletePipeline(String pipelineId) { } } + public static List stopAllPipelines(boolean forceStop) { + List status = new ArrayList<>(); + List pipelines = + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll(); + + pipelines.forEach(p -> { + if (p.isRunning()) { + status.add(new PipelineExecutor(p).stopPipeline(forceStop)); + } + }); + return status; + } + /** * Checks for the pipelines that contain the processing element diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java index b301a5243a..a32bd5d6d3 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java @@ -202,10 +202,6 @@ public void visit(RuntimeResolvableGroupStaticProperty groupStaticProperty) { // TODO not yet supported } - private Object getValue(StaticProperty sp) { - return ((Map) configs.get(sp.getInternalName())).get("value"); - } - private List getValueAsList(StaticProperty sp) { return (List) configs.get(sp.getInternalName()); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java index 4531fc88f2..86c46bc742 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java @@ -18,15 +18,9 @@ package org.apache.streampipes.manager.template; import org.apache.streampipes.commons.exceptions.ElementNotFoundException; -import org.apache.streampipes.manager.matching.v2.ElementVerification; import org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate; import org.apache.streampipes.manager.template.instances.PipelineTemplate; -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.graph.DataProcessorDescription; -import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.graph.DataSinkDescription; -import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.template.PipelineTemplateDescription; import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -43,7 +37,7 @@ public class PipelineTemplateGenerator { Logger logger = LoggerFactory.getLogger(PipelineTemplateGenerator.class); - private List availableDescriptions = new ArrayList<>(); + private final List availableDescriptions = new ArrayList<>(); public List getAllPipelineTemplates() { @@ -65,62 +59,6 @@ public List getAllPipelineTemplates() { return availableDescriptions; } - public List getCompatibleTemplates(String streamId) { - List compatibleTemplates = new ArrayList<>(); - ElementVerification verifier = new ElementVerification(); - SpDataStream streamOffer = null; - - try { - streamOffer = getStream(streamId); - streamOffer = new SpDataStream(streamOffer); - if (streamOffer != null) { - for (PipelineTemplateDescription pipelineTemplateDescription : getAllPipelineTemplates()) { - // TODO make this work for 2+ input streams - InvocableStreamPipesEntity entity = - cloneInvocation(pipelineTemplateDescription.getBoundTo().get(0).getPipelineElementTemplate()); - if (verifier.verify(streamOffer, entity)) { - compatibleTemplates.add(pipelineTemplateDescription); - } - } - } - - } catch (ElementNotFoundException e) { - e.printStackTrace(); - } - - return compatibleTemplates; - } - - private InvocableStreamPipesEntity cloneInvocation(InvocableStreamPipesEntity pipelineElementTemplate) { - if (pipelineElementTemplate instanceof DataProcessorInvocation) { - return new DataProcessorInvocation((DataProcessorInvocation) pipelineElementTemplate); - } else { - return new DataSinkInvocation((DataSinkInvocation) pipelineElementTemplate); - } - } - - protected SpDataStream getStream(String streamId) throws ElementNotFoundException { - SpDataStream result = getStorage() - .getEventStreamById(streamId); - - if (result == null) { - throw new ElementNotFoundException("Data stream " + streamId + " is not installed!"); - } - - return result; - } - - protected DataProcessorDescription getProcessor(String id) throws ElementNotFoundException { - DataProcessorDescription result = getStorage() - .getDataProcessorByAppId(id); - - if (result == null) { - throw new ElementNotFoundException("Data processor " + id + " is not installed!"); - } - - return result; - } - protected DataSinkDescription getSink(String id) throws ElementNotFoundException { try { return getStorage() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java index 0848abdac9..7b1748a9ef 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java @@ -17,8 +17,9 @@ */ package org.apache.streampipes.manager.template; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.manager.permission.PermissionManager; +import org.apache.streampipes.manager.storage.PipelineStorageService; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.client.user.Permission; import org.apache.streampipes.model.pipeline.Pipeline; @@ -33,9 +34,9 @@ public class PipelineTemplateInvocationHandler { - private PipelineTemplateInvocation pipelineTemplateInvocation; - private PipelineTemplateDescription pipelineTemplateDescription; - private String username; + private final PipelineTemplateInvocation pipelineTemplateInvocation; + private final PipelineTemplateDescription pipelineTemplateDescription; + private final String username; public PipelineTemplateInvocationHandler(String username, PipelineTemplateInvocation pipelineTemplateInvocation) { this.username = username; @@ -43,26 +44,18 @@ public PipelineTemplateInvocationHandler(String username, PipelineTemplateInvoca this.pipelineTemplateDescription = getTemplateById(pipelineTemplateInvocation.getPipelineTemplateId()); } - public PipelineTemplateInvocationHandler(String username, PipelineTemplateInvocation pipelineTemplateInvocation, - PipelineTemplateDescription pipelineTemplateDescription) { - this.username = username; - this.pipelineTemplateInvocation = pipelineTemplateInvocation; - this.pipelineTemplateDescription = pipelineTemplateDescription; - } - - public PipelineOperationStatus handlePipelineInvocation() { Pipeline pipeline = new PipelineGenerator(pipelineTemplateInvocation.getDataStreamId(), pipelineTemplateDescription, pipelineTemplateInvocation.getKviName()).makePipeline(); pipeline.setCreatedByUser(username); pipeline.setCreatedAt(System.currentTimeMillis()); replaceStaticProperties(pipeline); - Operations.storePipeline(pipeline); + new PipelineStorageService(pipeline).addPipeline(); Permission permission = new PermissionManager().makePermission(pipeline, username); StorageDispatcher.INSTANCE.getNoSqlStore().getPermissionStorage().persist(permission); Pipeline storedPipeline = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getElementById(pipeline.getPipelineId()); - return Operations.startPipeline(storedPipeline); + return new PipelineExecutor(storedPipeline).startPipeline(); } private void replaceStaticProperties(Pipeline pipeline) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java deleted file mode 100644 index 404c34f5e4..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.util; - -import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.pipeline.Pipeline; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class PipelineVerificationUtils { - - /** - * returns the root node of a partial pipeline (a pipeline without an action) - * - * @param pipeline - * @return {@link org.apache.streampipes.model.base.InvocableStreamPipesEntity} - */ - - public static InvocableStreamPipesEntity getRootNode(Pipeline pipeline) throws NoSepaInPipelineException { - List elements = new ArrayList<>(); - elements.addAll(pipeline.getSepas()); - elements.addAll(pipeline.getActions()); - - List unconfiguredElements = elements - .stream() - .filter(e -> !e.isConfigured()) - .collect(Collectors.toList()); - - - if (unconfiguredElements.size() != 1) { - throw new NoSepaInPipelineException(); - } else { - return unconfiguredElements.get(0); - } - - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java deleted file mode 100644 index 4986bba5a2..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.util; - -import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; -import org.apache.streampipes.model.graph.DataProcessorInvocation; - -import java.util.ArrayList; -import java.util.List; - -public class TreeUtils { - - /** - * @param id the DOM ID - * @param sepas list of sepas in model-client format - * @param streams list of streams in model-client format - * @return a SEPA-client element - */ - - public static NamedStreamPipesEntity findSEPAElement(String id, List sepas, - List - streams) { - List allElements = new ArrayList<>(); - allElements.addAll(sepas); - allElements.addAll(streams); - - for (NamedStreamPipesEntity element : allElements) { - if (id.equals(element.getDom())) { - return element; - } - } - //TODO - return null; - } - - /** - * @param id the DOM ID - * @param graphs list of invocation graphs - * @return an invocation graph with a given DOM Id - */ - public static InvocableStreamPipesEntity findByDomId(String id, List graphs) { - for (InvocableStreamPipesEntity graph : graphs) { - if (graph.getDom().equals(id)) { - return graph; - } - } - //TODO - return null; - } -} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java index cce62bc080..1d41f47cc4 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java @@ -17,7 +17,7 @@ */ package org.apache.streampipes.rest.impl; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.rest.core.base.impl.AbstractRestResource; @@ -38,6 +38,6 @@ public class ContainerProvidedOptions extends AbstractRestResource { consumes = MediaType.APPLICATION_JSON_VALUE ) public ResponseEntity fetchRemoteOptions(@RequestBody RuntimeOptionsRequest request) { - return ok(Operations.fetchRemoteOptions(request)); + return ok(new ContainerProvidedOptionsHandler().fetchRemoteOptions(request)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java index db72f9085f..be98a833ab 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java @@ -17,7 +17,7 @@ */ package org.apache.streampipes.rest.impl; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.topic.WildcardTopicGenerator; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.rest.core.base.impl.AbstractRestResource; @@ -34,6 +34,6 @@ public class DataStream extends AbstractRestResource { @PostMapping(path = "/update", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity getStreamsBySource(@RequestBody SpDataStream stream) { - return ok(Operations.updateActualTopic(stream)); + return ok(new WildcardTopicGenerator(stream).computeActualTopic()); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java index 75e636cd6b..a358839ec1 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java @@ -18,16 +18,12 @@ package org.apache.streampipes.rest.impl; -import org.apache.streampipes.commons.exceptions.NoMatchingFormatException; -import org.apache.streampipes.commons.exceptions.NoMatchingJsonSchemaException; -import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException; -import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException; import org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException; -import org.apache.streampipes.commons.exceptions.RemoteServerNotAccessibleException; import org.apache.streampipes.manager.execution.status.PipelineStatusManager; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2; import org.apache.streampipes.manager.pipeline.PipelineManager; -import org.apache.streampipes.model.client.exception.InvalidConnectionException; +import org.apache.streampipes.manager.recommender.ElementRecommender; +import org.apache.streampipes.manager.storage.PipelineStorageService; import org.apache.streampipes.model.message.ErrorMessage; import org.apache.streampipes.model.message.Message; import org.apache.streampipes.model.message.Notification; @@ -178,7 +174,7 @@ public ResponseEntity addPipeline(@RequestBody Pipeline pipeline public PipelineElementRecommendationMessage recommend(@RequestBody Pipeline pipeline, @PathVariable("recId") String baseRecElement) { try { - return Operations.findRecommendedElements(pipeline, baseRecElement); + return new ElementRecommender(pipeline, baseRecElement).findRecommendedElements(); } catch (JsonSyntaxException e) { throw new SpNotificationException( HttpStatus.BAD_REQUEST, @@ -207,19 +203,9 @@ public PipelineElementRecommendationMessage recommend(@RequestBody Pipeline pipe @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE) public ResponseEntity validatePipeline(@RequestBody Pipeline pipeline) { try { - return ok(Operations.validatePipeline(pipeline)); + return ok(new PipelineVerificationHandlerV2(pipeline).verifyPipeline()); } catch (JsonSyntaxException e) { return badRequest(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage())); - } catch (NoMatchingSchemaException e) { - return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION, e.getMessage())); - } catch (NoMatchingFormatException e) { - return badRequest(new Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION, e.getMessage())); - } catch (NoMatchingProtocolException e) { - return badRequest(new Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION, e.getMessage())); - } catch (RemoteServerNotAccessibleException | NoMatchingJsonSchemaException e) { - return serverError(new Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE, e.getMessage())); - } catch (InvalidConnectionException e) { - return badRequest(e.getErrorLog()); } catch (Exception e) { LOG.error(e.getMessage()); return serverError(new Notification(NotificationType.UNKNOWN_ERROR, e.getMessage())); @@ -244,7 +230,7 @@ public ResponseEntity overwritePipeline(@PathVariable("pipelineI storedPipeline.setHealthStatus(pipeline.getHealthStatus()); storedPipeline.setPipelineNotifications(pipeline.getPipelineNotifications()); storedPipeline.setValid(pipeline.isValid()); - Operations.updatePipeline(storedPipeline); + new PipelineStorageService(storedPipeline).updatePipeline(); SuccessMessage message = Notifications.success("Pipeline modified"); message.addNotification(new Notification("id", pipelineId)); return ok(message); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java index f3496c1213..5957fe01e2 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java @@ -17,7 +17,9 @@ */ package org.apache.streampipes.rest.impl; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.template.PipelineTemplateGenerator; +import org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator; +import org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.SpDataStreamContainer; import org.apache.streampipes.model.message.Notifications; @@ -67,7 +69,10 @@ public ResponseEntity getPipelineTemplateInvocation( var pipelineTemplateDescriptionOpt = getPipelineTemplateDescription(pipelineTemplateId); if (pipelineTemplateDescriptionOpt.isPresent()) { PipelineTemplateInvocation invocation = - Operations.getPipelineInvocationTemplate(dataStream, pipelineTemplateDescriptionOpt.get()); + new PipelineTemplateInvocationGenerator( + dataStream, + pipelineTemplateDescriptionOpt.get() + ).generateInvocation(); PipelineTemplateInvocation clonedInvocation = new PipelineTemplateInvocation(invocation); return ok(new PipelineTemplateInvocation(clonedInvocation)); } else { @@ -85,14 +90,15 @@ public ResponseEntity getPipelineTemplateInvocation( public ResponseEntity generatePipeline( @RequestBody PipelineTemplateInvocation pipelineTemplateInvocation) { - PipelineOperationStatus status = Operations - .handlePipelineTemplateInvocation(getAuthenticatedUserSid(), pipelineTemplateInvocation); - + PipelineOperationStatus status = new PipelineTemplateInvocationHandler( + getAuthenticatedUserSid(), + pipelineTemplateInvocation + ).handlePipelineInvocation(); return ok(status); } private Optional getPipelineTemplateDescription(String pipelineTemplateId) { - return Operations + return new PipelineTemplateGenerator() .getAllPipelineTemplates() .stream() .filter(pt -> pt.getAppId().equals(pipelineTemplateId)) diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index f7f0c32d22..06c7257bf0 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -20,8 +20,8 @@ import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; +import org.apache.streampipes.manager.execution.PipelineExecutor; import org.apache.streampipes.manager.health.ServiceHealthCheck; -import org.apache.streampipes.manager.operations.Operations; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; @@ -120,7 +120,7 @@ private void startAllPreviouslyStoppedPipelines() { } private void startPipeline(Pipeline pipeline, boolean restartOnReboot) { - PipelineOperationStatus status = Operations.startPipeline(pipeline); + PipelineOperationStatus status = new PipelineExecutor(pipeline).startPipeline(); if (status.isSuccess()) { LOG.info("Pipeline {} successfully restarted", status.getPipelineName()); Pipeline storedPipeline = getPipelineStorage().getElementById(pipeline.getPipelineId()); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index f9c062b546..610cf23f41 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -26,7 +26,7 @@ import org.apache.streampipes.manager.health.PipelineHealthCheck; import org.apache.streampipes.manager.health.ServiceHealthCheck; import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsServiceLogExecutor; -import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.pipeline.PipelineManager; import org.apache.streampipes.manager.setup.AutoInstallation; import org.apache.streampipes.manager.setup.StreamPipesEnvChecker; import org.apache.streampipes.messaging.SpProtocolManager; @@ -221,7 +221,7 @@ public void onExit() { }); LOG.info("Gracefully stopping all running pipelines..."); - List status = Operations.stopAllPipelines(true); + List status = PipelineManager.stopAllPipelines(true); status.forEach(s -> { if (s.isSuccess()) { LOG.info("Pipeline {} successfully stopped", s.getPipelineName());