Skip to content

Commit

Permalink
Improve startup behaviour (#2105)
Browse files Browse the repository at this point in the history
* feat(#2002): Align adapter registration with other pipeline elements

* Fix checkstyle

* style: remove trailing whitespace

* Add initial draft of migration concept

* refactor: fix logger configuration

* refactor: extend storage implementations by method to get all instances by the app id

* refactor: update generated typescript model

* feat: add version to models & builders

* refactor: implement string representation of Notification

* feat: implement data model for migration

* feat: register migrations at service

* Revert "refactor: implement string representation of Notification"

This reverts commit 646e792.

* refactor: use correct Notification class

* feat: introduce migrate extensions resource

* feat: introduce migrate adapter endpoint

* feat: implement adapter migration at the core

* remove data lake migration

* ensure order & uniqueness of migrations

* remove redundant exception

* remove redundant exception

* add tests

* remove outdated test

* refactor: separate adapter migration from pipeline element migrations

* refactor: move MigrationResult to StreamPipes model

* refactor: migration result

* refactor: introduce generic migration request

* feat: send migration requests to core

* feat: process migrations at core

* refactor: remove legacy generic

* refactor: introduce versioned StreamPipes entity

* refactor: remove deprecated generic type

* feat: implement migration for processing elements & data sinks

* docs: add endpoint documentation

* refactor: move to correct module

* feature: add update for descriptions

* refactor: adapt ProcessingElementBuilder to be capable of versions

* refactor: minor improvements

* style: fix checkstyle issues

* refactor: remove legacy type definition

* refactor: update generated TS models

* fix: add missing license header

* Fix adapter model migration, add OPC adapter migration as sample

* Fix typo

* Extract MigrationResource logic into smaller units

* Use single request for submitting migrations from extensions to core

* Improve execution order of migrations and service startup tasks

* Improve exception logging

* Improve pipeline health check

* Properly execute migration of adapter models

* Fix adapter model migration

* Improve service health check

* Add more checks to adapter migration

* Simplify migration request handling

* Fix registration

---------

Co-authored-by: bossenti <[email protected]>
  • Loading branch information
dominikriemer and bossenti authored Nov 2, 2023
1 parent 2c60d52 commit 552f3fe
Show file tree
Hide file tree
Showing 36 changed files with 692 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;

public class AdapterHealthCheck {
public class AdapterHealthCheck implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class);

Expand All @@ -52,6 +52,11 @@ public AdapterHealthCheck(IAdapterStorage adapterStorage,
this.adapterMasterManagement = adapterMasterManagement;
}

@Override
public void run() {
this.checkAndRestoreAdapters();
}

/**
* In this method it is checked which adapters are currently running.
* Then it calls all workers to validate if the adapter instance is
Expand Down Expand Up @@ -114,7 +119,7 @@ public Map<String, AdapterDescription> getAdaptersToRecover(
allRunningInstancesOfOneWorker.forEach(adapterDescription ->
allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
} catch (AdapterException e) {
e.printStackTrace();
LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage());
}
});

Expand All @@ -130,10 +135,9 @@ public void recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
}
} catch (AdapterException e) {
LOG.warn("Could not start adapter {}", adapterDescription.getName(), e);
LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage());
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ public static void stopStreamAdapter(String baseUrl,

public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
try {
LOG.info("Requesting all running adapter description instances: " + url);
var responseString = ExtensionServiceExecutions
.extServiceGetRequest(url)
.execute().returnContent().asString();

return JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
} catch (IOException e) {
LOG.error("List of running adapters could not be fetched", e);
throw new AdapterException("List of running adapters could not be fetched from: " + url);
}
}
Expand Down Expand Up @@ -112,9 +110,6 @@ private static void triggerAdapterStateChange(AdapterDescription ad,
var exception = getSerializer().readValue(responseString, AdapterException.class);
throw new AdapterException(exception.getMessage(), exception.getCause());
}

LOG.info("Adapter {} on endpoint: " + url + " with Response: ", ad.getName() + responseString);

} catch (IOException e) {
LOG.error("Adapter was not {} successfully", action, e);
throw new AdapterException("Adapter was not " + action + " successfully with url " + url, e);
Expand Down Expand Up @@ -153,8 +148,7 @@ public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
throw new SpConfigurationException(exception.getMessage(), exception.getCause());
}
} catch (IOException e) {
e.printStackTrace();
throw new AdapterException("Could not resolve runtime configurations from " + url);
throw new AdapterException("Could not resolve runtime configurations from " + url, e);
}
}

Expand All @@ -178,11 +172,10 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException {
String url = baseUrl + "/assets/icon";

try {
byte[] responseString = Request.Get(url)
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
return responseString;
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get icon endpoint: " + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;

public interface IAdapterMigrator extends IModelMigrator<AdapterDescription, IStaticPropertyExtractor> {
public interface IAdapterMigrator extends IModelMigrator<AdapterDescription, IStaticPropertyExtractor> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface DataSinkMigrator extends IModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {
public interface IDataSinkMigrator extends IModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SpCoreConfiguration {
private GeneralConfig generalConfig;

private boolean isConfigured;
private SpCoreConfigurationStatus serviceStatus;

private String assetDir;
private String filesDir;
Expand Down Expand Up @@ -120,4 +121,12 @@ public EmailTemplateConfig getEmailTemplateConfig() {
public void setEmailTemplateConfig(EmailTemplateConfig emailTemplateConfig) {
this.emailTemplateConfig = emailTemplateConfig;
}

public SpCoreConfigurationStatus getServiceStatus() {
return this.serviceStatus;
}

public void setServiceStatus(SpCoreConfigurationStatus serviceStatus) {
this.serviceStatus = serviceStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.configuration;

public enum SpCoreConfigurationStatus {
INSTALLING,
MIGRATING,
READY
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public String getRev(JsonObject adapter) {
return adapter.get(REV).getAsString();
}

public String getAppId(JsonObject adapter) {
return adapter.get("properties").getAsJsonObject().get(APP_ID).getAsString();
}

public void updateType(JsonObject adapter,
String typeFieldName) {
adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class SpServiceRegistration {
private int port;
private List<SpServiceTag> tags;
private String healthCheckPath;
private boolean healthy = true;
private long firstTimeSeenUnhealthy = 0;
private SpServiceStatus status = SpServiceStatus.REGISTERED;

public SpServiceRegistration() {
}
Expand Down Expand Up @@ -133,14 +133,6 @@ public void setRev(String rev) {
this.rev = rev;
}

public boolean isHealthy() {
return healthy;
}

public void setHealthy(boolean healthy) {
this.healthy = healthy;
}

public String getScheme() {
return scheme;
}
Expand Down Expand Up @@ -168,4 +160,12 @@ public String getSvcType() {
public void setSvcType(String svcType) {
this.svcType = svcType;
}

public SpServiceStatus getStatus() {
return status;
}

public void setStatus(SpServiceStatus status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.extensions.svcdiscovery;

public enum SpServiceStatus {
REGISTERED,
MIGRATING,
HEALTHY,
UNHEALTHY
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public DataProcessorInvocation(DataProcessorInvocation other) {
public DataProcessorInvocation(DataProcessorDescription sepa, String domId) {
this(sepa);
this.dom = domId;
this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR;
}

public DataProcessorInvocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public DataSinkInvocation(DataSinkDescription other) {
public DataSinkInvocation(DataSinkDescription sec, String domId) {
this(sec);
this.setDom(domId);
this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK;
}

public DataSinkInvocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private List<String> getServiceEndpoints() {

private String selectService() throws NoServiceEndpointsAvailableException {
List<String> serviceEndpoints = getServiceEndpoints();
if (serviceEndpoints.size() > 0) {
if (!serviceEndpoints.isEmpty()) {
return getServiceEndpoints().get(0);
} else {
LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.manager.health;

import org.apache.streampipes.model.configuration.SpCoreConfiguration;
import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoreServiceStatusManager {

private static final Logger LOG = LoggerFactory.getLogger(CoreServiceStatusManager.class);

private final ISpCoreConfigurationStorage storage;

public CoreServiceStatusManager(ISpCoreConfigurationStorage storage) {
this.storage = storage;
}

public boolean existsConfig() {
return storage.exists();
}

public boolean isCoreReady() {
return existsConfig() && storage.get().getServiceStatus() == SpCoreConfigurationStatus.READY;
}

public void updateCoreStatus(SpCoreConfigurationStatus status) {
var config = storage.get();
config.setServiceStatus(status);
storage.updateElement(config);
logService(config);
}

private void logService(SpCoreConfiguration coreConfig) {
LOG.info(
"Core is now in {} state",
coreConfig.getServiceStatus()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;

public class PipelineHealthCheck implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
Expand All @@ -68,7 +70,7 @@ public void checkAndRestorePipelineElements() {
pipelinesStats.setRunningPipelines(runningPipelines.size());
pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - pipelinesStats.getRunningPipelines());

if (runningPipelines.size() > 0) {
if (!runningPipelines.isEmpty()) {
Map<String, List<InvocableStreamPipesEntity>> endpointMap = generateEndpointMap();
List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());

Expand Down Expand Up @@ -115,15 +117,16 @@ public void checkAndRestorePipelineElements() {
}
});
if (shouldUpdatePipeline.get()) {
if (failedInstances.size() > 0) {
pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
var currentPipeline = getPipeline(pipeline.getPipelineId());
if (!failedInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
pipelinesStats.failedIncrease();
} else if (recoveredInstances.size() > 0) {
pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
} else if (!recoveredInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
pipelinesStats.attentionRequiredIncrease();
}
pipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
currentPipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(currentPipeline);
}
});
int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines()
Expand Down Expand Up @@ -233,13 +236,11 @@ private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) {
}

private List<Pipeline> getAllPipelines() {
List<Pipeline> allPipelines = StorageDispatcher
return StorageDispatcher
.INSTANCE
.getNoSqlStore()
.getPipelineStorageAPI()
.getAllPipelines();

return allPipelines;
}

private int getElementsCount(List<Pipeline> allPipelines){
Expand Down
Loading

0 comments on commit 552f3fe

Please sign in to comment.