Skip to content

Commit

Permalink
Integrate extensions service discovery & configuration management int…
Browse files Browse the repository at this point in the history
…o core (#1815)

* Remove consul dependency from service discovery

* Migrate configuration storage from Consul to CouchDB

* Add service health check

* Move configs to messaging settings

* Migrate configuration section in UI

* Migrate configuration section in UI

* Rename consul configuration components in UI

* Add messaging layer settings to configuration section

* Migrate config extractors

* Improve service registration and deregistration

* Cleanup, delete obsolete code

* Remove empty css file

* Fix imports

* Modify setup to install pipeline elements in background

* Fix client resolution

* Fix typo in Cypress config
  • Loading branch information
dominikriemer authored Sep 10, 2023
1 parent 8e4d866 commit 001e0e2
Show file tree
Hide file tree
Showing 173 changed files with 3,138 additions and 2,307 deletions.
5 changes: 5 additions & 0 deletions streampipes-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>streampipes-model</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-service-discovery-api</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@

package org.apache.streampipes.client.api;

import org.apache.streampipes.model.config.MessagingSettings;
import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;

import java.util.List;

public interface IAdminApi {

void registerService(SpServiceRegistration serviceRegistration);

void deregisterService(String serviceId);

void registerServiceConfiguration(SpServiceConfiguration serviceConfiguration);

SpServiceConfiguration getServiceConfiguration(String serviceGroup);

void registerAdapters(List<AdapterDescription> adapters);

void registerFunctions(List<FunctionDefinition> functions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import org.apache.streampipes.client.http.DeleteRequest;
import org.apache.streampipes.client.http.GetRequest;
import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
import org.apache.streampipes.client.http.PostRequestWithoutPayload;
import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
import org.apache.streampipes.client.http.PutRequest;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.serializer.ObjectSerializer;
import org.apache.streampipes.client.serializer.Serializer;
import org.apache.streampipes.client.util.StreamPipesApiPath;
import org.apache.streampipes.commons.exceptions.SpHttpErrorStatusCode;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;

import java.util.Optional;

public class AbstractClientApi {

protected StreamPipesClientConfig clientConfig;
Expand All @@ -41,6 +45,11 @@ protected <T> T post(StreamPipesApiPath apiPath, Class<T> responseClass) {
return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, responseClass).executeRequest();
}

protected void post(StreamPipesApiPath apiPath) {
ObjectSerializer<Void, Void> serializer = new ObjectSerializer<>();
new PostRequestWithoutPayload<>(clientConfig, apiPath, serializer).executeRequest();
}

protected <T> void post(StreamPipesApiPath apiPath, T object) {
ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
Expand Down Expand Up @@ -73,4 +82,18 @@ protected <T> T getSingle(StreamPipesApiPath apiPath, Class<T> targetClass) thro
ObjectSerializer<Void, T> serializer = new ObjectSerializer<>();
return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
}

protected <T> Optional<T> getSingleOpt(StreamPipesApiPath apiPath,
Class<T> targetClass) throws SpRuntimeException {
try {
ObjectSerializer<Void, T> serializer = new ObjectSerializer<>();
return Optional.of(new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest());
} catch (SpHttpErrorStatusCode e) {
if (e.getHttpStatusCode() == 404) {
return Optional.empty();
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.util.StreamPipesApiPath;
import org.apache.streampipes.model.config.MessagingSettings;
import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.message.SuccessMessage;

Expand All @@ -32,6 +34,29 @@ public AdminApi(StreamPipesClientConfig clientConfig) {
super(clientConfig);
}

@Override
public void registerService(SpServiceRegistration serviceRegistration) {
post(getExtensionsServiceRegistrationPath(), serviceRegistration);
}

@Override
public void deregisterService(String serviceId) {
post(getExtensionsServiceRegistrationPath().addToPath(serviceId));
}

@Override
public void registerServiceConfiguration(SpServiceConfiguration serviceConfiguration) {
post(getExtensionsServiceConfigurationPath(), serviceConfiguration);
}

@Override
public SpServiceConfiguration getServiceConfiguration(String serviceGroup) {
var opt = getSingleOpt(
getExtensionsServiceConfigurationPath().addToPath(serviceGroup), SpServiceConfiguration.class);

return opt.orElseGet(SpServiceConfiguration::new);
}

@Override
public void registerAdapters(List<AdapterDescription> adapters) {
post(getConnectPath(), adapters);
Expand All @@ -52,10 +77,21 @@ public MessagingSettings getMessagingSettings() {
return getSingle(getMessagingSettingsPath(), MessagingSettings.class);
}

private StreamPipesApiPath getExtensionsServiceRegistrationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("extensions-services");
}

private StreamPipesApiPath getExtensionsServiceConfigurationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("extensions-services-configurations");
}

private StreamPipesApiPath getMessagingSettingsPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("consul")
.addToPath("messaging");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,17 @@ public T executeRequest() {
try {
HttpResponse response = request.execute().returnResponse();
StatusLine status = response.getStatusLine();
if (status.getStatusCode() == HttpStatus.SC_OK) {
if (status.getStatusCode() == HttpStatus.SC_OK || status.getStatusCode() == HttpStatus.SC_CREATED) {
return afterRequest(serializer, response.getEntity());
} else {
switch (status.getStatusCode()) {
case HttpStatus.SC_UNAUTHORIZED:
throw new SpHttpErrorStatusCode(
" 401 - Access to this resource is forbidden - did you provide a poper API key or client secret?",
401);
case HttpStatus.SC_NOT_FOUND:
throw new SpHttpErrorStatusCode(" 404 - The requested resource could not be found.", 404);
default:
throw new SpHttpErrorStatusCode(status.getStatusCode() + " - " + status.getReasonPhrase(),
status.getStatusCode());
case HttpStatus.SC_UNAUTHORIZED -> throw new SpHttpErrorStatusCode(
" 401 - Access to this resource is forbidden - did you provide a poper API key or client secret?",
401);
case HttpStatus.SC_NOT_FOUND ->
throw new SpHttpErrorStatusCode(" 404 - The requested resource could not be found.", 404);
default -> throw new SpHttpErrorStatusCode(status.getStatusCode() + " - " + status.getReasonPhrase(),
status.getStatusCode());
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public abstract class PostRequest<K, V, T> extends HttpRequest<K, V, T> {

private K body;
private boolean withBody;
private final boolean withBody;

public PostRequest(StreamPipesClientConfig clientConfig,
StreamPipesApiPath apiPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,23 @@
*
*/

table {
width: 100%;
}
package org.apache.streampipes.client.http;

.service-icon-passing {
cursor: default;
color: #4caf50;
}
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.serializer.Serializer;
import org.apache.streampipes.client.util.StreamPipesApiPath;

.service-icon-critical {
cursor: default;
color: #f44336;
}
import org.apache.http.HttpEntity;

tr.consul-configuration-detail-row {
height: 0;
}
public class PostRequestWithoutPayload<K, V> extends PostRequest<K, V, Void> {

tr.consul-configuration-row:not(.consul-configuration-row-expanded):hover {
background: var(--color-bg-2);
}

tr.consul-configuration-row:not(.consul-configuration-row-expanded):active {
background: var(--color-bg-1);
}

.consul-configuration-row td {
border-bottom-width: 0;
}

.consul-configuration-detail {
overflow: hidden;
display: flex;
}

.mat-table {
}
public PostRequestWithoutPayload(StreamPipesClientConfig clientConfig,
StreamPipesApiPath apiPath, Serializer<K, V, Void> serializer) {
super(clientConfig, apiPath, serializer);
}

.consul-service {
padding: 5px;
@Override
protected Void afterRequest(Serializer<K, V, Void> serializer, HttpEntity entity) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ public enum Envs {
SP_CONSUL_HOST("SP_CONSUL_HOST",
DefaultEnvValues.CONSUL_HOST_DEFAULT,
DefaultEnvValues.LOCALHOST),

SP_CORE_SCHEME("SP_CORE_SCHEME", "http", "http"),
SP_CORE_HOST("SP_CORE_HOST", "backend", "localhost"),
SP_CORE_PORT("SP_CORE_PORT", "8030", "8030"),

SP_CONSUL_PORT("SP_CONSUL_PORT", DefaultEnvValues.CONSUL_PORT_DEFAULT),
SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", DefaultEnvValues.SP_KAFKA_RETENTION_MS_DEFAULT),
SP_PRIORITIZED_PROTOCOL("SP_PRIORITIZED_PROTOCOL", "kafka"),
SP_JWT_SECRET("JWT_SECRET"),
SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"),
SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ public IntEnvironmentVariable getServicePort() {
return new IntEnvironmentVariable(Envs.SP_PORT);
}

@Override
public StringEnvironmentVariable getSpCoreScheme() {
return new StringEnvironmentVariable(Envs.SP_CORE_SCHEME);
}

@Override
public StringEnvironmentVariable getSpCoreHost() {
return new StringEnvironmentVariable(Envs.SP_CORE_HOST);
}

@Override
public IntEnvironmentVariable getSpCorePort() {
return new IntEnvironmentVariable(Envs.SP_CORE_PORT);
}

@Override
public StringEnvironmentVariable getTsStorageProtocol() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE_PROTOCOL);
Expand Down Expand Up @@ -149,6 +164,11 @@ public StringEnvironmentVariable getKafkaRetentionTimeMs() {
return new StringEnvironmentVariable(Envs.SP_KAFKA_RETENTION_MS);
}

@Override
public StringEnvironmentVariable getPrioritizedProtocol() {
return new StringEnvironmentVariable(Envs.SP_PRIORITIZED_PROTOCOL);
}

@Override
public BooleanEnvironmentVariable getSetupInstallPipelineElements() {
return new BooleanEnvironmentVariable(Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public interface Environment {

IntEnvironmentVariable getServicePort();

StringEnvironmentVariable getSpCoreScheme();
StringEnvironmentVariable getSpCoreHost();
IntEnvironmentVariable getSpCorePort();

// Time series storage env variables

Expand Down Expand Up @@ -89,6 +92,8 @@ public interface Environment {
// Messaging
StringEnvironmentVariable getKafkaRetentionTimeMs();

StringEnvironmentVariable getPrioritizedProtocol();


// Setup
BooleanEnvironmentVariable getSetupInstallPipelineElements();
Expand Down
5 changes: 5 additions & 0 deletions streampipes-config/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>streampipes-model</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-model-client</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-serializers-json</artifactId>
Expand Down
Loading

0 comments on commit 001e0e2

Please sign in to comment.