Skip to content

Commit

Permalink
Add more logging to pipeline element installation
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Sep 13, 2023
1 parent e9864c0 commit b1fa08f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.streampipes.manager.endpoint;

import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpoint;
import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.serializers.json.JacksonSerializer;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.http.client.fluent.Request;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,18 +48,17 @@ public List<ExtensionsServiceEndpointItem> getItems() {

private List<ExtensionsServiceEndpointItem> getEndpointItems(ExtensionsServiceEndpoint e) {
try {
String result = Request.Get(e.getEndpointUrl())
.addHeader(new BasicHeader("Accept", "application/json"))
.connectTimeout(1000)
String result = ExtensionServiceExecutions
.extServiceGetRequest(e.getEndpointUrl())
.execute()
.returnContent()
.asString();

return JacksonSerializer.getObjectMapper()
.readValue(result, new TypeReference<List<ExtensionsServiceEndpointItem>>() {
.readValue(result, new TypeReference<>() {
});
} catch (IOException e1) {
logger.warn("Processing Element Descriptions could not be fetched from RDF endpoint: " + e.getEndpointUrl());
logger.warn("Processing Element Descriptions could not be fetched from endpoint: " + e.getEndpointUrl());
return new ArrayList<>();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
do {
endpoints = new EndpointFetcher().getEndpoints();
numberOfAttempts++;
if (endpoints.size() == 0) {
if (endpoints.isEmpty()) {
LOG.info("Found 0 endpoints - waiting {} seconds to make sure all endpoints have properly started",
SLEEP_TIME_SECONDS);
try {
Expand All @@ -64,7 +64,7 @@ public void run() {
e.printStackTrace();
}
}
} while (endpoints.size() == 0 && numberOfAttempts < MAX_RETRIES);
} while (endpoints.isEmpty() && numberOfAttempts < MAX_RETRIES);
LOG.info("Found {} endpoints from which we will install extensions.", endpoints.size());
LOG.info(
"Further available extensions can always be installed by navigating to the 'Install pipeline elements' view");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import org.apache.streampipes.model.client.endpoint.ExtensionsServiceEndpointItem;
import org.apache.streampipes.model.message.Message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class PipelineElementInstallationStep extends InstallationStep {

private static final Logger LOG = LoggerFactory.getLogger(PipelineElementInstallationStep.class);

private final ExtensionsServiceEndpoint endpoint;
private final String principalSid;

Expand All @@ -42,6 +47,7 @@ public PipelineElementInstallationStep(ExtensionsServiceEndpoint endpoint,
public void install() {
List<Message> statusMessages = new ArrayList<>();
List<ExtensionsServiceEndpointItem> items = Operations.getEndpointUriContents(Collections.singletonList(endpoint));
LOG.info("Found {} endpoint items for endpoint {}", items.size(), endpoint.getEndpointUrl());
for (ExtensionsServiceEndpointItem item : items) {
statusMessages.add(new EndpointItemParser().parseAndAddEndpointItem(item.getUri(),
principalSid, true));
Expand Down

0 comments on commit b1fa08f

Please sign in to comment.