Skip to content

Commit

Permalink
feat(#2002): Align adapter registration with other pipeline elements (#…
Browse files Browse the repository at this point in the history
…2008)

* feat(#2002): Align adapter registration with other pipeline elements

* Fix checkstyle

* style: remove trailing whitespace

---------

Co-authored-by: bossenti <[email protected]>
  • Loading branch information
dominikriemer and bossenti authored Oct 27, 2023
1 parent f8be8f6 commit 5987161
Show file tree
Hide file tree
Showing 58 changed files with 776 additions and 632 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.client.api;

import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
Expand All @@ -36,8 +35,6 @@ public interface IAdminApi {

SpServiceConfiguration getServiceConfiguration(String serviceGroup);

void registerAdapters(List<AdapterDescription> adapters);

void registerFunctions(List<FunctionDefinition> functions);

void deregisterFunction(String functionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.util.StreamPipesApiPath;
import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
Expand Down Expand Up @@ -57,11 +56,6 @@ public SpServiceConfiguration getServiceConfiguration(String serviceGroup) {
return opt.orElseGet(SpServiceConfiguration::new);
}

@Override
public void registerAdapters(List<AdapterDescription> adapters) {
post(getConnectPath(), adapters);
}

@Override
public void registerFunctions(List<FunctionDefinition> functions) {
post(getFunctionsPath(), functions);
Expand Down Expand Up @@ -95,14 +89,6 @@ private StreamPipesApiPath getMessagingSettingsPath() {
.addToPath("messaging");
}

private StreamPipesApiPath getConnectPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("connect")
.addToPath("master")
.addToPath("administration");
}

private StreamPipesApiPath getFunctionsPath() {
return StreamPipesApiPath
.fromBaseApiPath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@

package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
import org.apache.streampipes.connect.management.health.AdapterOperationLock;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.resource.management.PermissionResourceManager;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,24 +51,7 @@ public WorkerAdministrationManagement() {
this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
}

public void register(List<AdapterDescription> availableAdapterDescription) {
List<AdapterDescription> alreadyRegisteredAdapters = this.adapterDescriptionStorage.getAllAdapters();

availableAdapterDescription.forEach(adapterDescription -> {

// only install once adapter description per service group
boolean alreadyInstalled =
alreadyRegisteredAdapters.stream().anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()));
if (!alreadyInstalled) {
this.adapterDescriptionStorage.storeAdapter(adapterDescription);
}
});

int retryCount = 0;
checkAndRestore(retryCount);
}

private void checkAndRestore(int retryCount) {
public void checkAndRestore(int retryCount) {
if (AdapterOperationLock.INSTANCE.isLocked()) {
LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 1), MAX_RETRIES);
if (retryCount <= MAX_RETRIES) {
Expand All @@ -83,4 +73,32 @@ private void checkAndRestore(int retryCount) {
AdapterOperationLock.INSTANCE.unlock();
}
}

public void performAdapterMigrations(List<SpServiceTag> tags) {
var installedAdapters = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage().getAllAdapters();
var adminSid = new SpResourceManager().manageUsers().getAdminUser().getPrincipalId();
installedAdapters.stream()
.filter(adapter -> tags.stream().anyMatch(tag -> tag.getValue().equals(adapter.getAppId())))
.forEach(adapter -> {
if (!AssetManager.existsAssetDir(adapter.getAppId())) {
try {
LOG.info("Updating assets for adapter {}", adapter.getAppId());
AssetManager.storeAsset(SpServiceUrlProvider.ADAPTER, adapter.getAppId());
} catch (IOException | NoServiceEndpointsAvailableException e) {
LOG.error(
"Could not fetch asset for adapter {}, please try to manually update this adapter.",
adapter.getAppId(),
e);
}
}
var permissionStorage = CouchDbStorageManager.INSTANCE.getPermissionStorage();
var elementId = adapter.getElementId();
var permissions = permissionStorage.getUserPermissionsForObject(elementId);
if (permissions.isEmpty()) {
LOG.info("Adding default permission for adapter {}", adapter.getAppId());
new PermissionResourceManager()
.createDefault(elementId, AdapterDescription.class, adminSid, true);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(
.extServiceGetRequest(url)
.execute().returnContent().asString();

List<AdapterDescription> result = JacksonSerializer.getObjectMapper().readValue(responseString, List.class);

return result;
return JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
} catch (IOException e) {
LOG.error("List of running adapters could not be fetched", e);
throw new AdapterException("List of running adapters could not be fetched from: " + url);
Expand All @@ -106,7 +104,7 @@ private static void triggerAdapterStateChange(AdapterDescription ad,
try {
String adapterDescription = JacksonSerializer.getObjectMapper().writeValueAsString(ad);

var response = triggerPost(url, ad.getElementId(), adapterDescription);
var response = triggerPost(url, ad.getCorrespondingDataStreamElementId(), adapterDescription);
var responseString = getResponseBody(response);

if (response.getStatusLine().getStatusCode() != 200) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@

import static org.apache.streampipes.extensions.management.util.LocalesUtil.makePath;

public class LabelGenerator {
public class LabelGenerator<T extends NamedStreamPipesEntity> {

private static final Logger LOG = LoggerFactory.getLogger(LabelGenerator.class);

private static final String Delimiter = ".";
private static final String Title = "title";
private static final String Description = "description";

private NamedStreamPipesEntity desc;
private T desc;

public LabelGenerator(NamedStreamPipesEntity desc) {
public LabelGenerator(T desc) {
this.desc = desc;
}

public NamedStreamPipesEntity generateLabels() throws IOException {
public T generateLabels() throws IOException {
if (existsLocalesFile()) {
Properties props = makeProperties();
desc.setName(getTitle(props, desc.getAppId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public static byte[] getAsset(String appId, String assetName) throws IOException
return Files.readAllBytes(Paths.get(getAssetPath(appId, assetName)));
}

public static boolean existsAssetDir(String appId) {
var directory = new File(getAssetDir(appId));
return directory.exists() && directory.isDirectory();
}

public static void storeAsset(SpServiceUrlProvider spServiceUrlProvider,
String appId) throws IOException, NoServiceEndpointsAvailableException {
InputStream assetStream = new AssetFetcher(spServiceUrlProvider, appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@
*/
package org.apache.streampipes.manager.endpoint;

import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.message.Notifications;

import org.apache.http.client.fluent.Request;

import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;

public class EndpointItemParser {

public Message parseAndAddEndpointItem(String url,
String principalSid,
boolean publicElement) {
try {
url = URLDecoder.decode(url, "UTF-8");
url = URLDecoder.decode(url, StandardCharsets.UTF_8);
String payload = parseURIContent(url);
return Operations.verifyAndAddElement(payload, principalSid, publicElement);
} catch (Exception e) {
Expand All @@ -44,9 +44,8 @@ public Message parseAndAddEndpointItem(String url,
}

private String parseURIContent(String url) throws IOException {
return Request
.Get(url)
.addHeader("Accept", "application/json")
return ExtensionServiceExecutions
.extServiceGetRequest(url)
.execute()
.returnContent()
.asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.streampipes.manager.execution.endpoint;

import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

Expand All @@ -28,7 +31,15 @@
public class ExtensionsServiceEndpointUtils {

public static SpServiceUrlProvider getPipelineElementType(NamedStreamPipesEntity entity) {
return isDataProcessor(entity) ? SpServiceUrlProvider.DATA_PROCESSOR : SpServiceUrlProvider.DATA_SINK;
if (isDataProcessor(entity)) {
return SpServiceUrlProvider.DATA_PROCESSOR;
} else if (isDataSink(entity)) {
return SpServiceUrlProvider.DATA_SINK;
} else if (isAdapter(entity)) {
return SpServiceUrlProvider.ADAPTER;
} else {
throw new RuntimeException("Could not find service url for entity " + entity.getClass().getCanonicalName());
}
}

public static SpServiceUrlProvider getPipelineElementType(String appId) {
Expand All @@ -43,4 +54,12 @@ public static SpServiceUrlProvider getPipelineElementType(String appId) {
private static boolean isDataProcessor(NamedStreamPipesEntity entity) {
return entity instanceof DataProcessorInvocation || entity instanceof DataProcessorDescription;
}

private static boolean isDataSink(NamedStreamPipesEntity entity) {
return entity instanceof DataSinkInvocation || entity instanceof DataSinkDescription;
}

private static boolean isAdapter(NamedStreamPipesEntity entity) {
return entity instanceof AdapterDescription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private List<ConsumableStreamPipesEntity> getAllDataProcessors() {
return getTripleStore()
.getAllDataProcessors()
.stream()
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getElementId())))
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getAppId())))
.map(DataProcessorDescription::new)
.collect(Collectors.toList());
}
Expand All @@ -172,7 +172,7 @@ private List<ConsumableStreamPipesEntity> getAllDataSinks() {
return getTripleStore()
.getAllDataSinks()
.stream()
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getElementId())))
.filter(e -> userObjects.stream().anyMatch(u -> u.equals(e.getAppId())))
.map(DataSinkDescription::new)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.manager.verification;

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.assets.AssetManager;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import java.io.IOException;

public class AdapterVerifier extends ElementVerifier<AdapterDescription> {

public AdapterVerifier(String graphData) throws SepaParseException {
super(graphData, AdapterDescription.class);
}

@Override
protected StorageState store() {
var storageState = StorageState.STORED;
if (!storageApi.exists(elementDescription)) {
storageApi.storeAdapterDescription(elementDescription);
} else {
storageState = StorageState.ALREADY_STORED;
}
return storageState;
}

@Override
protected void update() {
storageApi.update(elementDescription);
}

@Override
protected void storeAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.storeAsset(SpServiceUrlProvider.ADAPTER, elementDescription.getAppId());
}
}

@Override
protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.deleteAsset(elementDescription.getAppId());
storeAssets();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected StorageState store() {
if (!storageApi.exists(elementDescription)) {
storageApi.storeDataProcessor(elementDescription);
} else {
storageState = StorageState.ALREADY_IN_SESAME;
storageState = StorageState.ALREADY_STORED;
}
return storageState;
}
Expand All @@ -63,12 +63,4 @@ protected void storeAssets() throws IOException, NoServiceEndpointsAvailableExce
}
}

@Override
protected void updateAssets() throws IOException, NoServiceEndpointsAvailableException {
if (elementDescription.isIncludesAssets()) {
AssetManager.deleteAsset(elementDescription.getAppId());
storeAssets();
}
}

}
Loading

0 comments on commit 5987161

Please sign in to comment.