diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java index f07366814b..7937866ca6 100644 --- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java +++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java @@ -38,8 +38,6 @@ public enum BackendConfig { config = SpServiceDiscovery.getSpConfig("backend"); config.register(BackendConfigKeys.IS_CONFIGURED, false, "Boolean that indicates whether streampipes is " + "already configured or not"); - config.register(BackendConfigKeys.IS_SETUP_RUNNING, false, - "Boolean that indicates whether the initial setup " + "is currently running"); } public String getJmsHost() { @@ -128,8 +126,4 @@ private String getJwtSecret() { private String makeDefaultJwtSecret() { return TokenGenerator.generateNewToken(); } - - public void updateSetupStatus(boolean status) { - config.setBoolean(BackendConfigKeys.IS_SETUP_RUNNING, status); - } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreInitialInstallationProgress.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreInitialInstallationProgress.java new file mode 100644 index 0000000000..d71acc7280 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreInitialInstallationProgress.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.health; + +public enum CoreInitialInstallationProgress { + + INSTANCE; + + private boolean initiallyInstalling = false; + + public boolean isInitiallyInstalling() { + return initiallyInstalling; + } + + public void triggerInitiallyInstallingMode() { + this.initiallyInstalling = true; + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java index 46d2792eaa..88c9292ae9 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java @@ -35,7 +35,7 @@ public class ServiceHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ServiceHealthCheck.class); - private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 20000; + private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 60000; private final ServiceRegistrationManager serviceRegistrationManager; @@ -56,7 +56,7 @@ private void checkServiceHealth(SpServiceRegistration service) { try { var request = ExtensionServiceExecutions.extServiceGetRequest(healthCheckUrl); var response = request.execute(); - if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK && !isStarting(service)) { processUnhealthyService(service); } else { if (service.getStatus() == SpServiceStatus.UNHEALTHY) { @@ -68,6 +68,10 @@ private void checkServiceHealth(SpServiceRegistration service) { } } + private boolean isStarting(SpServiceRegistration service) { + return service.getStatus() == SpServiceStatus.REGISTERED || service.getStatus() == SpServiceStatus.MIGRATING; + } + private void processUnhealthyService(SpServiceRegistration service) { if (service.getStatus() == SpServiceStatus.HEALTHY) { serviceRegistrationManager.applyServiceStatus( diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java index 5782317016..f7e14125a5 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java @@ -20,7 +20,6 @@ import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; -import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.model.client.setup.InitialSettings; import org.slf4j.Logger; @@ -121,7 +120,6 @@ public void notifyFinished(int ec) { if (errorCount.get() > 0) { LOG.error("{} errors occurred during the setup process", errorCount); } else { - BackendConfig.INSTANCE.setIsConfigured(true); LOG.info("Initial setup completed successfully - you can now open the login page in the browser."); } LOG.info("\n\n**********\n\nAuto-Setup finished\n\n**********\n\n"); diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Setup.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Setup.java index 0874680c82..ea249e6f23 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Setup.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Setup.java @@ -19,8 +19,10 @@ package org.apache.streampipes.rest.impl; -import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.rest.core.base.impl.AbstractRestResource; +import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; +import org.apache.streampipes.storage.management.StorageDispatcher; import com.google.gson.JsonObject; import io.swagger.v3.oas.annotations.Operation; @@ -34,6 +36,9 @@ @Path("/v2/setup") public class Setup extends AbstractRestResource { + private final ISpCoreConfigurationStorage storage = StorageDispatcher + .INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage(); + @GET @Path("/configured") @Produces(MediaType.APPLICATION_JSON) @@ -41,13 +46,13 @@ public class Setup extends AbstractRestResource { tags = {"Configurated"}) public Response isConfigured() { JsonObject obj = new JsonObject(); - if (BackendConfig.INSTANCE.isConfigured()) { + var statusManager = new CoreServiceStatusManager(storage); + if (statusManager.isCoreReady()) { obj.addProperty("configured", true); - return ok(obj.toString()); } else { obj.addProperty("configured", false); - obj.addProperty("setupRunning", BackendConfig.INSTANCE.isConfigured()); - return ok(obj.toString()); + obj.addProperty("setupRunning", false); } + return ok(obj.toString()); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 6b28afe0a3..4d2d662028 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -18,8 +18,8 @@ package org.apache.streampipes.rest.impl.admin; -import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; +import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.health.ServiceRegistrationManager; import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093; @@ -104,11 +104,17 @@ public Response performMigrations( var serviceManager = new ServiceRegistrationManager(extensionsServiceStorage); var extensionsServiceConfig = serviceManager.getService(serviceId); - if (BackendConfig.INSTANCE.isConfigured()) { + if (!CoreInitialInstallationProgress.INSTANCE.isInitiallyInstalling()) { new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig); if (!migrationConfigs.isEmpty()) { - if (serviceManager.isAnyServiceMigrating() || !isCoreReady()) { - LOG.info("Refusing migration request since precondition is not met."); + var anyServiceMigrating = serviceManager.isAnyServiceMigrating(); + var coreReady = isCoreReady(); + if (anyServiceMigrating || !coreReady) { + LOG.info( + "Refusing migration request since precondition is not met (anyServiceMigratione={}, coreReady={}.", + anyServiceMigrating, + coreReady + ); return Response.status(HttpStatus.SC_CONFLICT).build(); } else { serviceManager.applyServiceStatus(serviceId, SpServiceStatus.MIGRATING); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index 49b218f738..9038afa4c0 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -17,8 +17,8 @@ */ package org.apache.streampipes.service.core; -import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.connect.management.health.AdapterHealthCheck; +import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.health.PipelineHealthCheck; import org.apache.streampipes.manager.health.ServiceHealthCheck; @@ -41,6 +41,7 @@ import org.apache.streampipes.service.core.migrations.MigrationsHandler; import org.apache.streampipes.storage.api.IPipelineStorage; import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; +import org.apache.streampipes.storage.couchdb.impl.UserStorage; import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -121,6 +122,7 @@ public void init() { new CouchDbViewGenerator().createGenericDatabaseIfNotExists(); if (!isConfigured()) { + CoreInitialInstallationProgress.INSTANCE.triggerInitiallyInstallingMode(); doInitialSetup(); } else { // Check needs to be present since core configuration is part of migration @@ -161,20 +163,17 @@ private void scheduleHealthChecks(List checks) { } private boolean isConfigured() { - return BackendConfig.INSTANCE.isConfigured(); + return new UserStorage().existsDatabase(); } private void doInitialSetup() { LOG.info("\n\n**********\n\nWelcome to Apache StreamPipes!\n\n**********\n\n"); LOG.info("We will perform the initial setup, grab some coffee and cross your fingers ;-)..."); - - BackendConfig.INSTANCE.updateSetupStatus(true); LOG.info("Auto-setup will start in 5 seconds to make sure all services are running..."); try { TimeUnit.SECONDS.sleep(5); LOG.info("Starting installation procedure"); new AutoInstallation().startAutoInstallation(); - BackendConfig.INSTANCE.updateSetupStatus(false); } catch (InterruptedException e) { LOG.error("Ooops, something went wrong during the installation", e); } diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index 0455dc1d8d..9d6851289f 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -89,7 +89,7 @@ private List findService(int retryCount) { TimeUnit.MILLISECONDS.sleep(1000); return findService(retryCount); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("Could not find a service currently due to exception {}", e.getMessage()); return Collections.emptyList(); } } else { diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IUserStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IUserStorage.java index 1757e24682..cd4e0581f6 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IUserStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IUserStorage.java @@ -47,4 +47,6 @@ public interface IUserStorage { Principal getUserById(String principalId); + boolean existsDatabase(); + } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/UserStorage.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/UserStorage.java index b2d3c75c21..677aa21863 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/UserStorage.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/UserStorage.java @@ -25,9 +25,12 @@ import org.apache.streampipes.storage.couchdb.dao.CrudViewDao; import org.apache.streampipes.storage.couchdb.utils.Utils; +import org.apache.http.client.methods.HttpGet; +import org.lightcouch.NoDocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; import java.util.List; import java.util.stream.Collectors; @@ -117,4 +120,14 @@ public Principal getUserById(String principalId) { return findWithNullIfEmpty(principalId, Principal.class); } + @Override + public boolean existsDatabase() { + var client = Utils.getCouchDbClient(Utils.USER_DB_NAME, false); + try { + client.executeRequest(new HttpGet(URI.create(client.getBaseUri() + "/" + Utils.USER_DB_NAME))); + return true; + } catch (NoDocumentException e) { + return false; + } + } } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java index b9b48c3036..401001d9d4 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java @@ -32,6 +32,8 @@ public class Utils { + public static final String USER_DB_NAME = "users"; + public static CouchDbClient getCouchDbDataProcessorDescriptionClient() { return getCouchDbGsonClient("data-processor"); } @@ -135,7 +137,7 @@ public static CouchDbClient getCouchDbDashboardWidgetClient() { } public static CouchDbClient getCouchDbUserClient() { - return getCouchDbPrincipalClient("users"); + return getCouchDbPrincipalClient(USER_DB_NAME); } public static CouchDbClient getCouchDbInternalUsersClient() { @@ -188,15 +190,20 @@ private static CouchDbClient getCouchDbStandardSerializerClient(String dbname) { return new CouchDbClient(props(dbname)); } + public static CouchDbClient getCouchDbClient(String database, boolean createIfNotExists) { + return new CouchDbClient(props(database, createIfNotExists)); + } + public static CouchDbClient getCouchDbClient(String database) { return new CouchDbClient(props(database)); } - private static CouchDbProperties props(String dbname) { + private static CouchDbProperties props(String dbname, + boolean createDbIfNotExists) { var env = getEnvironment(); return new CouchDbProperties( dbname, - true, + createDbIfNotExists, env.getCouchDbProtocol().getValueOrDefault(), env.getCouchDbHost().getValueOrDefault(), env.getCouchDbPort().getValueOrDefault(), @@ -204,6 +211,10 @@ private static CouchDbProperties props(String dbname) { env.getCouchDbPassword().getValueOrDefault()); } + private static CouchDbProperties props(String dbname) { + return props(dbname, true); + } + public static String getDatabaseRoute(String databaseName) { return toUrl() + "/" + databaseName; }