diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 4d2d662028..340f57d3e4 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -103,39 +103,44 @@ public Response performMigrations( List migrationConfigs) { var serviceManager = new ServiceRegistrationManager(extensionsServiceStorage); - var extensionsServiceConfig = serviceManager.getService(serviceId); - if (!CoreInitialInstallationProgress.INSTANCE.isInitiallyInstalling()) { - new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig); - if (!migrationConfigs.isEmpty()) { - var anyServiceMigrating = serviceManager.isAnyServiceMigrating(); - var coreReady = isCoreReady(); - if (anyServiceMigrating || !coreReady) { - LOG.info( - "Refusing migration request since precondition is not met (anyServiceMigratione={}, coreReady={}.", - anyServiceMigrating, - coreReady - ); - return Response.status(HttpStatus.SC_CONFLICT).build(); - } else { - serviceManager.applyServiceStatus(serviceId, SpServiceStatus.MIGRATING); - var adapterMigrations = filterConfigs(migrationConfigs, List.of(SpServiceTagPrefix.ADAPTER)); - var pipelineElementMigrations = filterConfigs( - migrationConfigs, - List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) - ); - - new AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig, adapterMigrations); - new PipelineElementMigrationManager( - pipelineStorage, - dataProcessorStorage, - dataSinkStorage) - .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); + try { + var extensionsServiceConfig = serviceManager.getService(serviceId); + if (!CoreInitialInstallationProgress.INSTANCE.isInitiallyInstalling()) { + new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig); + if (!migrationConfigs.isEmpty()) { + var anyServiceMigrating = serviceManager.isAnyServiceMigrating(); + var coreReady = isCoreReady(); + if (anyServiceMigrating || !coreReady) { + LOG.info( + "Refusing migration request since precondition is not met (anyServiceMigratione={}, coreReady={}.", + anyServiceMigrating, + coreReady + ); + return Response.status(HttpStatus.SC_CONFLICT).build(); + } else { + serviceManager.applyServiceStatus(serviceId, SpServiceStatus.MIGRATING); + var adapterMigrations = filterConfigs(migrationConfigs, List.of(SpServiceTagPrefix.ADAPTER)); + var pipelineElementMigrations = filterConfigs( + migrationConfigs, + List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) + ); + + new AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig, adapterMigrations); + new PipelineElementMigrationManager( + pipelineStorage, + dataProcessorStorage, + dataSinkStorage) + .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); + } } } + new ServiceRegistrationManager(extensionsServiceStorage) + .applyServiceStatus(extensionsServiceConfig.getSvcId(), SpServiceStatus.HEALTHY); + return ok(); + } catch (IllegalArgumentException e) { + LOG.warn("Refusing migration request since the service {} is not registered.", serviceId); + return notFound(); } - new ServiceRegistrationManager(extensionsServiceStorage) - .applyServiceStatus(extensionsServiceConfig.getSvcId(), SpServiceStatus.HEALTHY); - return ok(); } private boolean isCoreReady() { diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java index ab7f631d02..d5c2e01af0 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java @@ -18,11 +18,15 @@ package org.apache.streampipes.service.extensions; +import org.apache.streampipes.client.api.IStreamPipesClient; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.migration.ModelMigratorConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -51,4 +55,37 @@ public void submitRepeatedRequest(Supplier request, } } } + + public void submitRegistrationRequest(IStreamPipesClient client, + SpServiceRegistration serviceReg) { + submitRepeatedRequest( + () -> { + client.adminApi().registerService(serviceReg); + return true; + }, + "Successfully registered service at core.", + String.format( + "Could not register service at core at url %s", + client.getConnectionConfig().getBaseUrl() + )); + } + + public void submitMigrationRequest(IStreamPipesClient client, + List migrationConfigs, + String serviceId, + SpServiceRegistration serviceReg) { + submitRepeatedRequest( + () -> { + try { + client.adminApi().registerMigrations(migrationConfigs, serviceId); + return true; + } catch (RuntimeException e) { + submitRegistrationRequest(client, serviceReg); + submitMigrationRequest(client, migrationConfigs, serviceId, serviceReg); + return true; + } + }, + "Successfully sent migration request", + "Core currently doesn't accept migration requests."); + } } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java index 24f128cc47..ac1aed13b0 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java @@ -22,6 +22,7 @@ import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler; import org.apache.streampipes.service.extensions.security.WebSecurityConfig; @@ -47,18 +48,13 @@ public void onExit() { } @Override - public void afterServiceRegistered(SpServiceDefinition serviceDef) { + public void afterServiceRegistered(SpServiceDefinition serviceDef, + SpServiceRegistration serviceReg) { StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); // register all migrations at StreamPipes Core var migrationConfigs = serviceDef.getMigrators().stream().map(IModelMigrator::config).toList(); - new CoreRequestSubmitter().submitRepeatedRequest( - () -> { - client.adminApi().registerMigrations(migrationConfigs, serviceId()); - return true; - }, - "Successfully sent migration request", - "Core currently doesn't accept migration requests."); + new CoreRequestSubmitter().submitMigrationRequest(client, migrationConfigs, serviceId(), serviceReg); // initialize all function instances StreamPipesFunctionHandler.INSTANCE.initializeFunctions(serviceDef.getServiceGroup()); diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index 0913fd2587..11c351c8f4 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -43,7 +43,6 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServiceBase { private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class); - private static final int RETRY_INTERVAL_SECONDS = 3; public void init() { SpServiceDefinition serviceDef = provideServiceDefinition(); @@ -67,7 +66,8 @@ public void init(SpServiceDefinition serviceDef) { public abstract SpServiceDefinition provideServiceDefinition(); - public abstract void afterServiceRegistered(SpServiceDefinition serviceDef); + public abstract void afterServiceRegistered(SpServiceDefinition serviceDef, + SpServiceRegistration serviceReg); public void startExtensionsService(Class serviceClass, SpServiceDefinition serviceDef, @@ -90,21 +90,12 @@ public void startExtensionsService(Class serviceClass, networkingConfig ); - this.afterServiceRegistered(serviceDef); + this.afterServiceRegistered(serviceDef, req); } private void registerService(SpServiceRegistration serviceRegistration) { var client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); - new CoreRequestSubmitter().submitRepeatedRequest( - () -> { - client.adminApi().registerService(serviceRegistration); - return true; - }, - "Successfully registered service at core.", - String.format( - "Could not register service at core at url %s", - client.getConnectionConfig().getBaseUrl() - )); + new CoreRequestSubmitter().submitRegistrationRequest(client, serviceRegistration); } protected List getServiceTags() {