From d3883dbaca49dbf4264a079c484c62550c6cf477 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 12 Sep 2023 23:27:59 +0200 Subject: [PATCH] Improve and centralize HTTP requests to extensions --- .../management/health/AdapterHealthCheck.java | 7 +- .../management/GuessManagement.java | 9 +-- .../management/SourcesManagement.java | 17 ----- .../management/WorkerRestClient.java | 25 +++---- .../execution/ExtensionServiceExecutions.java | 65 +++++++++++++++++++ .../PipelineElementEndpointHealthCheck.java | 5 +- .../manager/health/ServiceHealthCheck.java | 5 +- .../ExtensionsServiceLogExecutor.java | 5 +- .../manager/setup/StreamPipesEnvChecker.java | 5 ++ .../manager/util/AuthTokenUtils.java | 5 ++ .../security/UnauthenticatedInterfaces.java | 10 ++- 11 files changed, 105 insertions(+), 53 deletions(-) create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index c575e9a43c..07d70cf7dd 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -26,6 +26,9 @@ import org.apache.streampipes.storage.api.IAdapterStorage; import org.apache.streampipes.storage.couchdb.CouchDbStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,6 +36,8 @@ public class AdapterHealthCheck { + private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class); + private final IAdapterStorage adapterStorage; private final AdapterMasterManagement adapterMasterManagement; @@ -125,7 +130,7 @@ public void recoverAdapters(Map adaptersToRecover) { this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId()); } } catch (AdapterException e) { - e.printStackTrace(); + LOG.warn("Could not start adapter {}", adapterDescription.getName(), e); } } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index 383e889cba..ba99844883 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -24,6 +24,7 @@ import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline; import org.apache.streampipes.connect.management.util.WorkerPaths; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessSchema; @@ -31,9 +32,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.http.HttpStatus; -import org.apache.http.client.fluent.Request; import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,10 +56,8 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) var objectMapper = JacksonSerializer.getObjectMapper(); var description = objectMapper.writeValueAsString(adapterDescription); logger.info("Guess schema at: " + workerUrl); - Response requestResponse = Request.Post(workerUrl) - .bodyString(description, ContentType.APPLICATION_JSON) - .connectTimeout(1000) - .socketTimeout(100000) + Response requestResponse = ExtensionServiceExecutions + .extServicePostRequest(workerUrl, null, description) .execute(); var httpResponse = requestResponse.returnResponse(); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/SourcesManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/SourcesManagement.java index da2c96a01f..74eefd7345 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/SourcesManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/SourcesManagement.java @@ -21,26 +21,9 @@ import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.grounding.EventGrounding; -import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SourcesManagement { - private final Logger logger = LoggerFactory.getLogger(SourcesManagement.class); - - private final AdapterInstanceStorageImpl adapterInstanceStorage; - - - public SourcesManagement(AdapterInstanceStorageImpl adapterStorage) { - this.adapterInstanceStorage = adapterStorage; - } - - public SourcesManagement() { - this(new AdapterInstanceStorageImpl()); - } - public static SpDataStream updateDataStream(AdapterDescription adapterDescription, SpDataStream oldDataStream) { diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index adefb1f6ee..58f6a49bc1 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -21,6 +21,7 @@ import org.apache.streampipes.commons.exceptions.SpConfigurationException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.util.WorkerPaths; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; @@ -35,7 +36,6 @@ import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.fluent.Request; -import org.apache.http.entity.ContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,10 +73,8 @@ public static void stopStreamAdapter(String baseUrl, public static List getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException { try { logger.info("Requesting all running adapter description instances: " + url); - - var responseString = Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) + var responseString = ExtensionServiceExecutions + .extServiceGetRequest(url) .execute().returnContent().asString(); List result = JacksonSerializer.getObjectMapper().readValue(responseString, List.class); @@ -108,7 +106,7 @@ private static void triggerAdapterStateChange(AdapterDescription ad, try { String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad); - var response = triggerPost(url, adapterDescription); + var response = triggerPost(url, ad.getElementId(), adapterDescription); var responseString = getResponseBody(response); if (response.getStatusLine().getStatusCode() != 200) { @@ -116,7 +114,7 @@ private static void triggerAdapterStateChange(AdapterDescription ad, throw new AdapterException(exception.getMessage(), exception.getCause()); } - logger.info("Adapter {} on endpoint: " + url + " with Response: " + responseString); + logger.info("Adapter {} on endpoint: " + url + " with Response: ", ad.getName() + responseString); } catch (IOException e) { logger.error("Adapter was not {} successfully", action, e); @@ -129,12 +127,10 @@ private static String getResponseBody(HttpResponse response) throws IOException } private static HttpResponse triggerPost(String url, + String elementId, String payload) throws IOException { - return Request.Post(url) - .bodyString(payload, ContentType.APPLICATION_JSON) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnResponse(); + var request = ExtensionServiceExecutions.extServicePostRequest(url, elementId, payload); + return request.execute().returnResponse(); } public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, @@ -145,10 +141,7 @@ public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); - var response = Request.Post(url) - .bodyString(payload, ContentType.APPLICATION_JSON) - .connectTimeout(1000) - .socketTimeout(100000) + var response = ExtensionServiceExecutions.extServicePostRequest(url, payload) .execute() .returnResponse(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java new file mode 100644 index 0000000000..68071248da --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.execution; + +import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.resource.management.SpResourceManager; + +import org.apache.http.client.fluent.Request; +import org.apache.http.entity.ContentType; + +public class ExtensionServiceExecutions { + + public static Request extServiceGetRequest(String url) { + return Request + .Get(url) + .addHeader("Authorization", AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid())) + .connectTimeout(10000); + } + + + private static String getServiceAdminSid() { + return new SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId(); + } + + public static Request extServicePostRequest(String url, + String payload) { + return authenticatedPostRequest(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload); + } + + public static Request extServicePostRequest(String url, + String elementId, + String payload) { + return authenticatedPostRequest( + url, + AuthTokenUtils.getAuthToken(elementId), + payload + ); + } + + private static Request authenticatedPostRequest(String url, + String token, + String payload) { + return Request.Post(url) + .addHeader("Authorization", token) + .bodyString(payload, ContentType.APPLICATION_JSON) + .connectTimeout(1000) + .socketTimeout(100000); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java index 8d9dc937e0..9bec880a89 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineElementEndpointHealthCheck.java @@ -17,10 +17,10 @@ */ package org.apache.streampipes.manager.health; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.serializers.json.JacksonSerializer; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.http.client.fluent.Request; import java.io.IOException; import java.util.Arrays; @@ -37,7 +37,8 @@ public PipelineElementEndpointHealthCheck(String endpointUrl) { } public List checkRunningInstances() throws IOException { - return asList(Request.Get(makeRequestUrl()).execute().returnContent().toString()); + var request = ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl()); + return asList(request.execute().returnContent().toString()); } private List asList(String json) throws JsonProcessingException { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java index 768430d5a8..dadfe9c64e 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java @@ -18,11 +18,11 @@ package org.apache.streampipes.manager.health; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.management.StorageDispatcher; -import org.apache.http.client.fluent.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +51,8 @@ private void checkServiceHealth(SpServiceRegistration service) { String healthCheckUrl = makeHealthCheckUrl(service); try { - var response = Request.Get(healthCheckUrl).execute(); + var request = ExtensionServiceExecutions.extServiceGetRequest(healthCheckUrl); + var response = request.execute(); if (response.returnResponse().getStatusLine().getStatusCode() != 200) { processUnhealthyService(service); } else { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java index ee8e19885f..124c216c22 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java @@ -19,7 +19,7 @@ package org.apache.streampipes.manager.monitoring.pipeline; -import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.client.user.Principal; import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo; import org.apache.streampipes.resource.management.SpResourceManager; @@ -61,8 +61,7 @@ public void triggerUpdate() { } private Request makeRequest(String serviceEndpointUrl) { - return Request.Get(makeLogUrl(serviceEndpointUrl)) - .addHeader("Authorization", AuthTokenUtils.getAuthTokenForUser(getServiceAdmin())); + return ExtensionServiceExecutions.extServiceGetRequest(makeLogUrl(serviceEndpointUrl)); } private Principal getServiceAdmin() { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java index 17dc5072e8..c4fc1bba08 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java @@ -70,7 +70,12 @@ private void updateJwtSettings() { if (signingMode.exists()) { localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(signingMode.getValue())); + } else { + if (localAuthConfig.getJwtSigningMode() != JwtSigningMode.HMAC) { + localAuthConfig.setJwtSigningMode(JwtSigningMode.HMAC); + } } + if (jwtSecret.exists()) { localAuthConfig.setTokenSecret(jwtSecret.getValue()); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/AuthTokenUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/AuthTokenUtils.java index 43fb7089e2..12c88359be 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/AuthTokenUtils.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/AuthTokenUtils.java @@ -28,6 +28,11 @@ public class AuthTokenUtils { + public static String getAuthTokenForCurrentUser() { + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + return makeBearerToken(new JwtTokenProvider().createToken(auth)); + } + public static String getAuthToken(String resourceId) { if (SecurityContextHolder.getContext().getAuthentication() != null) { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/UnauthenticatedInterfaces.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/UnauthenticatedInterfaces.java index bd28e9c298..de1a715dbc 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/UnauthenticatedInterfaces.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/UnauthenticatedInterfaces.java @@ -27,12 +27,10 @@ public class UnauthenticatedInterfaces { public static Collection get() { return Arrays.asList( - "/svchealth/*", - "/", - "/sec/**", - "/sepa/**", - "/stream/**", - "/api/v1/worker/**" + "/sec/*/assets/**", + "/sepa/*/assets/**", + "/stream/*/assets/**", + "/api/v1/worker/adapters/*/assets/**" ); } }