Skip to content

Commit

Permalink
improvement: Optimize startup behaviour (#2132)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer authored Nov 3, 2023
1 parent dd990b7 commit f94ec1a
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public enum BackendConfig {
config = SpServiceDiscovery.getSpConfig("backend");
config.register(BackendConfigKeys.IS_CONFIGURED, false,
"Boolean that indicates whether streampipes is " + "already configured or not");
config.register(BackendConfigKeys.IS_SETUP_RUNNING, false,
"Boolean that indicates whether the initial setup " + "is currently running");
}

public String getJmsHost() {
Expand Down Expand Up @@ -128,8 +126,4 @@ private String getJwtSecret() {
private String makeDefaultJwtSecret() {
return TokenGenerator.generateNewToken();
}

public void updateSetupStatus(boolean status) {
config.setBoolean(BackendConfigKeys.IS_SETUP_RUNNING, status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.manager.health;

public enum CoreInitialInstallationProgress {

INSTANCE;

private boolean initiallyInstalling = false;

public boolean isInitiallyInstalling() {
return initiallyInstalling;
}

public void triggerInitiallyInstallingMode() {
this.initiallyInstalling = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ServiceHealthCheck implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(ServiceHealthCheck.class);

private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 20000;
private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 60000;

private final ServiceRegistrationManager serviceRegistrationManager;

Expand All @@ -56,7 +56,7 @@ private void checkServiceHealth(SpServiceRegistration service) {
try {
var request = ExtensionServiceExecutions.extServiceGetRequest(healthCheckUrl);
var response = request.execute();
if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK && !isStarting(service)) {
processUnhealthyService(service);
} else {
if (service.getStatus() == SpServiceStatus.UNHEALTHY) {
Expand All @@ -68,6 +68,10 @@ private void checkServiceHealth(SpServiceRegistration service) {
}
}

private boolean isStarting(SpServiceRegistration service) {
return service.getStatus() == SpServiceStatus.REGISTERED || service.getStatus() == SpServiceStatus.MIGRATING;
}

private void processUnhealthyService(SpServiceRegistration service) {
if (service.getStatus() == SpServiceStatus.HEALTHY) {
serviceRegistrationManager.applyServiceStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.model.client.setup.InitialSettings;

import org.slf4j.Logger;
Expand Down Expand Up @@ -121,7 +120,6 @@ public void notifyFinished(int ec) {
if (errorCount.get() > 0) {
LOG.error("{} errors occurred during the setup process", errorCount);
} else {
BackendConfig.INSTANCE.setIsConfigured(true);
LOG.info("Initial setup completed successfully - you can now open the login page in the browser.");
}
LOG.info("\n\n**********\n\nAuto-Setup finished\n\n**********\n\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.streampipes.rest.impl;


import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;

import com.google.gson.JsonObject;
import io.swagger.v3.oas.annotations.Operation;
Expand All @@ -34,20 +36,23 @@
@Path("/v2/setup")
public class Setup extends AbstractRestResource {

private final ISpCoreConfigurationStorage storage = StorageDispatcher
.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage();

@GET
@Path("/configured")
@Produces(MediaType.APPLICATION_JSON)
@Operation(summary = "Endpoint is used by UI to determine whether the core is running upon startup",
tags = {"Configurated"})
public Response isConfigured() {
JsonObject obj = new JsonObject();
if (BackendConfig.INSTANCE.isConfigured()) {
var statusManager = new CoreServiceStatusManager(storage);
if (statusManager.isCoreReady()) {
obj.addProperty("configured", true);
return ok(obj.toString());
} else {
obj.addProperty("configured", false);
obj.addProperty("setupRunning", BackendConfig.INSTANCE.isConfigured());
return ok(obj.toString());
obj.addProperty("setupRunning", false);
}
return ok(obj.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.streampipes.rest.impl.admin;

import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.connect.management.management.AdapterMigrationManager;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.health.ServiceRegistrationManager;
import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093;
Expand Down Expand Up @@ -104,11 +104,17 @@ public Response performMigrations(

var serviceManager = new ServiceRegistrationManager(extensionsServiceStorage);
var extensionsServiceConfig = serviceManager.getService(serviceId);
if (BackendConfig.INSTANCE.isConfigured()) {
if (!CoreInitialInstallationProgress.INSTANCE.isInitiallyInstalling()) {
new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig);
if (!migrationConfigs.isEmpty()) {
if (serviceManager.isAnyServiceMigrating() || !isCoreReady()) {
LOG.info("Refusing migration request since precondition is not met.");
var anyServiceMigrating = serviceManager.isAnyServiceMigrating();
var coreReady = isCoreReady();
if (anyServiceMigrating || !coreReady) {
LOG.info(
"Refusing migration request since precondition is not met (anyServiceMigratione={}, coreReady={}.",
anyServiceMigrating,
coreReady
);
return Response.status(HttpStatus.SC_CONFLICT).build();
} else {
serviceManager.applyServiceStatus(serviceId, SpServiceStatus.MIGRATING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.streampipes.service.core;

import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.health.PipelineHealthCheck;
import org.apache.streampipes.manager.health.ServiceHealthCheck;
Expand All @@ -41,6 +41,7 @@
import org.apache.streampipes.service.core.migrations.MigrationsHandler;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.couchdb.impl.UserStorage;
import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator;
import org.apache.streampipes.storage.management.StorageDispatcher;

Expand Down Expand Up @@ -121,6 +122,7 @@ public void init() {
new CouchDbViewGenerator().createGenericDatabaseIfNotExists();

if (!isConfigured()) {
CoreInitialInstallationProgress.INSTANCE.triggerInitiallyInstallingMode();
doInitialSetup();
} else {
// Check needs to be present since core configuration is part of migration
Expand Down Expand Up @@ -161,20 +163,17 @@ private void scheduleHealthChecks(List<Runnable> checks) {
}

private boolean isConfigured() {
return BackendConfig.INSTANCE.isConfigured();
return new UserStorage().existsDatabase();
}

private void doInitialSetup() {
LOG.info("\n\n**********\n\nWelcome to Apache StreamPipes!\n\n**********\n\n");
LOG.info("We will perform the initial setup, grab some coffee and cross your fingers ;-)...");

BackendConfig.INSTANCE.updateSetupStatus(true);
LOG.info("Auto-setup will start in 5 seconds to make sure all services are running...");
try {
TimeUnit.SECONDS.sleep(5);
LOG.info("Starting installation procedure");
new AutoInstallation().startAutoInstallation();
BackendConfig.INSTANCE.updateSetupStatus(false);
} catch (InterruptedException e) {
LOG.error("Ooops, something went wrong during the installation", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private List<SpServiceRegistration> findService(int retryCount) {
TimeUnit.MILLISECONDS.sleep(1000);
return findService(retryCount);
} catch (InterruptedException e) {
e.printStackTrace();
LOG.warn("Could not find a service currently due to exception {}", e.getMessage());
return Collections.emptyList();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ public interface IUserStorage {

Principal getUserById(String principalId);

boolean existsDatabase();

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.streampipes.storage.couchdb.dao.CrudViewDao;
import org.apache.streampipes.storage.couchdb.utils.Utils;

import org.apache.http.client.methods.HttpGet;
import org.lightcouch.NoDocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -117,4 +120,14 @@ public Principal getUserById(String principalId) {
return findWithNullIfEmpty(principalId, Principal.class);
}

@Override
public boolean existsDatabase() {
var client = Utils.getCouchDbClient(Utils.USER_DB_NAME, false);
try {
client.executeRequest(new HttpGet(URI.create(client.getBaseUri() + "/" + Utils.USER_DB_NAME)));
return true;
} catch (NoDocumentException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

public class Utils {

public static final String USER_DB_NAME = "users";

public static CouchDbClient getCouchDbDataProcessorDescriptionClient() {
return getCouchDbGsonClient("data-processor");
}
Expand Down Expand Up @@ -135,7 +137,7 @@ public static CouchDbClient getCouchDbDashboardWidgetClient() {
}

public static CouchDbClient getCouchDbUserClient() {
return getCouchDbPrincipalClient("users");
return getCouchDbPrincipalClient(USER_DB_NAME);
}

public static CouchDbClient getCouchDbInternalUsersClient() {
Expand Down Expand Up @@ -188,22 +190,31 @@ private static CouchDbClient getCouchDbStandardSerializerClient(String dbname) {
return new CouchDbClient(props(dbname));
}

public static CouchDbClient getCouchDbClient(String database, boolean createIfNotExists) {
return new CouchDbClient(props(database, createIfNotExists));
}

public static CouchDbClient getCouchDbClient(String database) {
return new CouchDbClient(props(database));
}

private static CouchDbProperties props(String dbname) {
private static CouchDbProperties props(String dbname,
boolean createDbIfNotExists) {
var env = getEnvironment();
return new CouchDbProperties(
dbname,
true,
createDbIfNotExists,
env.getCouchDbProtocol().getValueOrDefault(),
env.getCouchDbHost().getValueOrDefault(),
env.getCouchDbPort().getValueOrDefault(),
env.getCouchDbUsername().getValueOrDefault(),
env.getCouchDbPassword().getValueOrDefault());
}

private static CouchDbProperties props(String dbname) {
return props(dbname, true);
}

public static String getDatabaseRoute(String databaseName) {
return toUrl() + "/" + databaseName;
}
Expand Down

0 comments on commit f94ec1a

Please sign in to comment.