diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java index 1aa54c870d..1896f33194 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointFetcher.java @@ -31,19 +31,19 @@ public class EndpointFetcher { public List getEndpoints() { List endpoints = SpServiceDiscovery.getServiceDiscovery().getActivePipelineElementEndpoints(); - List servicerdExtensionsServiceEndpoints = new LinkedList<>(); + List serviceExtensionsServiceEndpoints = new LinkedList<>(); for (String endpoint : endpoints) { ExtensionsServiceEndpoint extensionsServiceEndpoint = new ExtensionsServiceEndpoint(endpoint); - servicerdExtensionsServiceEndpoints.add(extensionsServiceEndpoint); + serviceExtensionsServiceEndpoints.add(extensionsServiceEndpoint); } List databasedExtensionsServiceEndpoints = StorageDispatcher.INSTANCE.getNoSqlStore() .getRdfEndpointStorage() .getExtensionsServiceEndpoints(); List concatList = - Stream.of(databasedExtensionsServiceEndpoints, servicerdExtensionsServiceEndpoints) + Stream.of(databasedExtensionsServiceEndpoints, serviceExtensionsServiceEndpoints) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java index 6be697a01d..df9158aadc 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/endpoint/EndpointItemFetcher.java @@ -29,12 +29,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class EndpointItemFetcher { Logger logger = LoggerFactory.getLogger(EndpointItemFetcher.class); - private List extensionsServiceEndpoints; + private final List extensionsServiceEndpoints; public EndpointItemFetcher(List extensionsServiceEndpoints) { this.extensionsServiceEndpoints = extensionsServiceEndpoints; @@ -58,8 +59,8 @@ private List getEndpointItems(ExtensionsServiceEn .readValue(result, new TypeReference<>() { }); } catch (IOException e1) { - logger.warn("Processing Element Descriptions could not be fetched from endpoint: " + e.getEndpointUrl()); - return new ArrayList<>(); + logger.warn("Processing Element Descriptions could not be fetched from endpoint: " + e.getEndpointUrl(), e); + return Collections.emptyList(); } } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java index 092806580b..d6f2ef7ae8 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java @@ -63,6 +63,8 @@ public void run() { } catch (InterruptedException e) { e.printStackTrace(); } + } else { + LOG.info("Found endpoint"); } } while (endpoints.isEmpty() && numberOfAttempts < MAX_RETRIES); LOG.info("Found {} endpoints from which we will install extensions.", endpoints.size()); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java index 081379d3c0..339a9f738a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java @@ -29,13 +29,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; public class PipelineElementInstallationStep extends InstallationStep { private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInstallationStep.class); + private static final int MAX_RETRIES = 3; private final ExtensionsServiceEndpoint endpoint; private final String principalSid; + private int retries = 0; + public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint, String principalSid) { @@ -47,16 +51,31 @@ public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint, public void install() { List statusMessages = new ArrayList<>(); List items = Operations.getEndpointUriContents(Collections.singletonList(endpoint)); - LOG.info("Found {} endpoint items for endpoint {}", items.size(), endpoint.getEndpointUrl()); - for (ExtensionsServiceEndpointItem item : items) { - statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(), - principalSid, true)); - } - - if (statusMessages.stream().allMatch(Message::isSuccess)) { - logSuccess(getTitle()); + if (items.isEmpty() && retries <= MAX_RETRIES) { + retries++; + LOG.info( + "Endpoint available but no extensions yet found, so we will retry to fetch pipeline elements ({}/{})", + retries, + MAX_RETRIES + ); + try { + TimeUnit.SECONDS.sleep(1); + install(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } else { - logFailure(getTitle()); + LOG.info("Found {} endpoint items for endpoint {}", items.size(), endpoint.getEndpointUrl()); + for (ExtensionsServiceEndpointItem item : items) { + statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(), + principalSid, true)); + } + + if (statusMessages.stream().allMatch(Message::isSuccess)) { + logSuccess(getTitle()); + } else { + logFailure(getTitle()); + } } } diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index 2ad6bed1f8..ea3ff85d55 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -80,17 +80,19 @@ private boolean allFiltersSupported(SpServiceRegistration service, private List findService(int retryCount) { var services = serviceStorage.getAll(); - if (services.size() == 0) { + if (services.isEmpty()) { if (retryCount < MAX_RETRIES) { try { retryCount++; - TimeUnit.SECONDS.sleep(10); + LOG.info("Could not find any extensions services, retrying ({}/{})", retryCount, MAX_RETRIES); + TimeUnit.MILLISECONDS.sleep(1000); return findService(retryCount); } catch (InterruptedException e) { e.printStackTrace(); return Collections.emptyList(); } } else { + LOG.info("No service found"); return Collections.emptyList(); } } else {