diff --git a/docker-compose.yml b/docker-compose.yml
index 5570b5f3d4..4ac2fed520 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -113,30 +113,23 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
- hostname: kafka
- depends_on:
- - zookeeper
+ image: bitnami/kafka:3.5.1
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
+ - kafka3:/bitnami
logging: *default-logging
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
- logging: *default-logging
- volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ restart: unless-stopped
networks:
spnet:
@@ -160,11 +153,10 @@ services:
spnet:
volumes:
- kafka:
+ kafka3:
files:
consul:
couchdb:
- zookeeper:
influxdb:
influxdb2:
backend:
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
index 0076dd3542..5d6433c21c 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
@@ -17,17 +17,20 @@ version: "3.4"
services:
kafka:
ports:
+ - "9093:9093"
+ - "9092:9092"
- "9094:9094"
- depends_on:
- - zookeeper
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 #(local, i.e. Single-Node with IDE + Docker)
- KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+ # KRaft settings
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ # Listeners
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml b/installer/cli/deploy/standalone/kafka/docker-compose.yml
index 9bac791c4e..306663268d 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml
@@ -16,24 +16,24 @@
version: "3.4"
services:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.5.1
hostname: kafka
- depends_on:
- - zookeeper
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+ # KRaft settings
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ # Listeners
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
+ - kafka3:/bitnami
logging:
driver: "json-file"
options:
@@ -43,7 +43,7 @@ services:
spnet:
volumes:
- kafka:
+ kafka3:
networks:
spnet:
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml
index 102fd87cc5..e4fe18a04b 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -91,32 +91,21 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
- hostname: kafka
- depends_on:
- - zookeeper
+ image: bitnami/kafka:3.5.1
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
- volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ - kafka3:/bitnami
logging: *default-logging
restart: unless-stopped
networks:
@@ -154,24 +143,12 @@ services:
networks:
spnet:
- sources-watertank-simulator:
- image: "${SP_DOCKER_REGISTRY}/sources-watertank-simulator:${SP_VERSION}"
- depends_on:
- - consul
- - kafka
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
-
volumes:
backend:
consul:
connect:
couchdb:
- kafka:
- zookeeper:
+ kafka3:
influxdb:
influxdb2:
files:
diff --git a/installer/k8s/README.md b/installer/k8s/README.md
index 135236c44a..cb2f641e84 100644
--- a/installer/k8s/README.md
+++ b/installer/k8s/README.md
@@ -224,20 +224,6 @@ rm -rf ${HOME}/streampipes-k8s
| external.kafka.persistence.pvName | Name of the Kafka PersistentVolume | "kafka-pv" |
-####Zookeeper common parameters
-| Parameter Name | Description | Value |
-|-------------------------------------------------|----------------------------------------------------------|------------------------------------------|
-| external.zookeeper.appName | ZooKeeper application name | "zookeeper" |
-| external.zookeeper.version | ZooKeeper version | 3.4.13 |
-| external.zookeeper.port | Port for the ZooKeeper service | 2181 |
-| external.zookeeper.service.name | Name of the ZooKeeper service | "zookeeper" |
-| external.zookeeper.service.port | TargetPort of the ZooKeeper service | 2181 |
-| external.zookeeper.persistence.storageClassName | Storage class name for ZooKeeper PVs | "hostpath" |
-| external.zookeeper.persistence.storageSize | Size of the ZooKeeper PV | "1Gi" |
-| external.zookeeper.persistence.claimName | Name of the ZooKeeper PersistentVolumeClaim | "zookeeper-pvc" |
-| external.zookeeper.persistence.pvName | Name of the ZooKeeper PersistentVolume | "zookeeper-pv" |
-
-
####Pulsar common parameters
| Parameter Name | Description | Value |
|-------------------------------------------------|----------------------------------------------------------|------------------------------------------|
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
deleted file mode 100644
index 07314cdeb3..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: {{ .Values.external.zookeeper.appName }}
-spec:
- selector:
- matchLabels:
- app: {{ .Values.external.zookeeper.appName }}
- replicas: 1
- template:
- metadata:
- labels:
- app: {{ .Values.external.zookeeper.appName }}
- spec:
- restartPolicy: {{ .Values.restartPolicy }}
- volumes:
- - name: {{ .Values.external.zookeeper.persistence.pvName }}
- persistentVolumeClaim:
- claimName: {{ .Values.external.zookeeper.persistence.claimName }}
- containers:
- #TODO: wurstmeister/zookeeper:latest is running ZK 3.4.13. Once this
- # changes, the mount path needs to be adapted
- - name: {{ .Values.external.zookeeper.appName }}
- image: fogsyio/zookeeper:{{ .Values.external.zookeeper.version }}
- imagePullPolicy: {{ .Values.pullPolicy }}
- ports:
- - containerPort: {{ .Values.external.zookeeper.port }}
- volumeMounts:
- - mountPath: "/opt/zookeeper-{{ .Values.external.zookeeper.version }}/data"
- name: {{ .Values.external.zookeeper.persistence.pvName }}
- livenessProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port }}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
- readinessProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port }}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
- startupProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port }}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
deleted file mode 100644
index 465963d6e8..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: PersistentVolume
-metadata:
- name: {{ .Values.external.zookeeper.persistence.pvName }}
-spec:
- storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName }}
- capacity:
- storage: {{ .Values.external.zookeeper.persistence.storageSize }}
- accessModes:
- - {{ .Values.persistentVolumeAccessModes }}
- persistentVolumeReclaimPolicy: {{ .Values.persistentVolumeReclaimPolicy }}
- hostPath:
- path: {{ .Values.hostPath }}/zookeeper
----
-apiVersion: v1
-kind: PersistentVolumeClaim
-metadata:
- labels:
- app: {{ .Values.external.zookeeper.appName }}
- name: {{ .Values.external.zookeeper.persistence.claimName }}
-spec:
- storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName }}
- accessModes:
- - {{ .Values.persistentVolumeAccessModes }}
- resources:
- requests:
- storage: {{ .Values.external.zookeeper.persistence.storageSize }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml b/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
deleted file mode 100644
index 492d0558ea..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: Service
-metadata:
- name: {{ .Values.external.zookeeper.service.name }}
-spec:
- selector:
- app: {{ .Values.external.zookeeper.appName }}
- ports:
- - name: main
- protocol: TCP
- port: {{ .Values.external.zookeeper.port }}
- targetPort: {{ .Values.external.zookeeper.port }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index 4e6b3932b5..8e58afcfbf 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -150,18 +150,6 @@ external:
storageSize: "1Gi"
claimName: "kafka-pvc"
pvName: "kafka-pv"
- zookeeper:
- appName: "zookeeper"
- version: 3.4.13
- port: 2181
- service:
- name: "zookeeper"
- port: 2181
- persistence:
- storageClassName: "hostpath"
- storageSize: "1Gi"
- claimName: "zookeeper-pvc"
- pvName: "zookeeper-pv"
pulsar:
appName: "pulsar"
version: 3.0.0
diff --git a/pom.xml b/pom.xml
index 7746ddd87a..adede46055 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,7 @@
5.3.0
2.5.0
3.0.2
- 3.4.0
+ 3.5.1
1.9.0
0.2.0
2.20.0
diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/TransformValueAdapterPipelineElement.java
index 37aacbad31..67ed380db3 100644
--- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -28,6 +28,7 @@
import org.apache.streampipes.connect.shared.preprocessing.transform.value.ValueTransformationRule;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
@@ -75,6 +76,8 @@ public TransformValueAdapterPipelineElement(
var tmp = (ChangeDatatypeTransformationRuleDescription) ruleDescription;
rules.add(new DatatypeTransformationRule(tmp.getRuntimeKey(), tmp.getOriginalDatatypeXsd(),
tmp.getTargetDatatypeXsd()));
+ } else if (ruleDescription instanceof AddValueTransformationRuleDescription) {
+ // Do nothing since value rules are processed earlier
} else {
logger.error(
"Could not find the class for the rule description. This should never happen. "
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 7938ecf657..eb07b66c6b 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -96,12 +96,11 @@ public void connect() {
LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
this.brokerUrl = protocol.getBrokerHostname() + ":" + protocol.getKafkaPort();
this.topic = protocol.getTopicDefinition().getActualTopicName();
- String zookeeperHost = protocol.getZookeeperHost() + ":" + protocol.getZookeeperPort();
try {
createKafkaTopic(protocol);
} catch (ExecutionException | InterruptedException e) {
- LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+ LOG.error("Could not create topic: " + topic + " on broker " + brokerUrl);
}
this.producer = new KafkaProducer<>(makeProperties(protocol, Collections.emptyList()));
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 3ab75d03e6..c75b686197 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -20,8 +20,10 @@
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import java.util.List;
import java.util.Properties;
import java.util.UUID;
@@ -54,9 +56,13 @@ public Properties makeDefaultProperties() {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
-
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ props.put(
+ ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
+ getConfigOrDefault(protocol::getGroupInstanceId, UUID.randomUUID().toString())
+ );
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG_DEFAULT);
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, List.of(CooperativeStickyAssignor.class));
return props;
}
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
index 9fe5228abe..a830a45239 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
@@ -22,10 +22,6 @@ public class KafkaTransportProtocol extends TransportProtocol {
private static final long serialVersionUID = -4067982203807146257L;
- private String zookeeperHost;
-
- private int zookeeperPort;
-
private int kafkaPort;
private Integer lingerMs;
@@ -42,40 +38,33 @@ public class KafkaTransportProtocol extends TransportProtocol {
private String groupId;
- public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic) {
- super(kafkaHost, new SimpleTopicDefinition(topic));
- this.zookeeperHost = kafkaHost;
- this.zookeeperPort = kafkaPort;
- this.kafkaPort = kafkaPort;
- }
+ private String groupInstanceId;
- public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic, String zookeeperHost,
- int zookeeperPort) {
+ public KafkaTransportProtocol(String kafkaHost,
+ int kafkaPort,
+ String topic) {
super(kafkaHost, new SimpleTopicDefinition(topic));
- this.zookeeperHost = zookeeperHost;
- this.zookeeperPort = zookeeperPort;
this.kafkaPort = kafkaPort;
}
public KafkaTransportProtocol(KafkaTransportProtocol other) {
super(other);
this.kafkaPort = other.getKafkaPort();
- this.zookeeperHost = other.getZookeeperHost();
- this.zookeeperPort = other.getZookeeperPort();
this.acks = other.getAcks();
this.batchSize = other.getBatchSize();
this.groupId = other.getGroupId();
+ this.groupInstanceId = other.getGroupInstanceId();
this.lingerMs = other.getLingerMs();
this.maxRequestSize = other.getMaxRequestSize();
this.messageMaxBytes = other.getMessageMaxBytes();
this.offset = other.getOffset();
}
- public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort, WildcardTopicDefinition wildcardTopicDefinition) {
+ public KafkaTransportProtocol(String kafkaHost,
+ Integer kafkaPort,
+ WildcardTopicDefinition wildcardTopicDefinition) {
super(kafkaHost, wildcardTopicDefinition);
this.kafkaPort = kafkaPort;
- this.zookeeperHost = kafkaHost;
- this.zookeeperPort = kafkaPort;
}
public KafkaTransportProtocol() {
@@ -86,22 +75,6 @@ public static long getSerialVersionUID() {
return serialVersionUID;
}
- public String getZookeeperHost() {
- return zookeeperHost;
- }
-
- public void setZookeeperHost(String zookeeperHost) {
- this.zookeeperHost = zookeeperHost;
- }
-
- public int getZookeeperPort() {
- return zookeeperPort;
- }
-
- public void setZookeeperPort(int zookeeperPort) {
- this.zookeeperPort = zookeeperPort;
- }
-
public int getKafkaPort() {
return kafkaPort;
}
@@ -165,4 +138,12 @@ public String getMaxRequestSize() {
public void setMaxRequestSize(String maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}
+
+ public String getGroupInstanceId() {
+ return groupInstanceId;
+ }
+
+ public void setGroupInstanceId(String groupInstanceId) {
+ this.groupInstanceId = groupInstanceId;
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index e1eb685f6d..e1eeb4f58f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -114,9 +114,7 @@ private TransportProtocol kafkaTopic() {
return new KafkaTransportProtocol(
messagingSettings.getKafkaHost(),
messagingSettings.getKafkaPort(),
- outputTopic,
- messagingSettings.getZookeeperHost(),
- messagingSettings.getZookeeperPort()
+ outputTopic
);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 1113c9a4c8..3c1b635b2e 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -28,8 +28,11 @@
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.preview.PipelinePreviewModel;
+import org.apache.streampipes.model.util.Cloner;
import java.util.ArrayList;
import java.util.List;
@@ -42,10 +45,11 @@ public class PipelinePreview {
public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
String previewId = generatePreviewId();
pipeline.setActions(new ArrayList<>());
- List pipelineElements = new PipelineVerificationHandlerV2(pipeline)
- .verifyAndBuildGraphs(true)
- .stream()
- .collect(Collectors.toList());
+ List pipelineElements = new ArrayList<>(
+ new PipelineVerificationHandlerV2(pipeline)
+ .verifyAndBuildGraphs(true));
+
+ pipelineElements.forEach(this::applyGrounding);
invokeGraphs(filter(pipelineElements));
storeGraphs(previewId, pipelineElements);
@@ -53,6 +57,27 @@ public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
return makePreviewModel(previewId, pipelineElements);
}
+ private void applyGrounding(NamedStreamPipesEntity pe) {
+ if (pe instanceof SpDataStream) {
+ var grounding = ((SpDataStream) pe).getEventGrounding();
+ applyGroupId(grounding);
+ } else if (pe instanceof DataProcessorInvocation) {
+ applyGroupId(((DataProcessorInvocation) pe).getOutputStream().getEventGrounding());
+ }
+ }
+
+ private void applyGroupId(EventGrounding grounding) {
+ var protocol = grounding.getTransportProtocol();
+ if (protocol instanceof KafkaTransportProtocol) {
+ var clonedProtocol = new Cloner().protocol(protocol);
+ String groupId = UUID.randomUUID().toString();
+ String groupInstanceId = UUID.randomUUID().toString();
+ ((KafkaTransportProtocol) clonedProtocol).setGroupId(groupId);
+ ((KafkaTransportProtocol) clonedProtocol).setGroupInstanceId(groupInstanceId);
+ grounding.setTransportProtocol(clonedProtocol);
+ }
+ }
+
public void deletePreview(String previewId) {
List graphs = ActivePipelinePreviews.INSTANCE.getInvocationGraphs(previewId);
detachGraphs(filter(graphs));
diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
index d1e31832e4..c9eba57657 100644
--- a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
+++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
@@ -27,7 +27,7 @@
public class TestUtils {
public static TransportProtocol kafkaProtocol() {
- return new KafkaTransportProtocol("localhost", 9092, "abc", "localhost", 2181);
+ return new KafkaTransportProtocol("localhost", 9092, "abc");
}
public static TransportProtocol jmsProtocol() {
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
index f9c25f11c8..f20e15f40f 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
@@ -36,7 +36,7 @@ public class Protocols {
* containing URL and topic where data arrives.
*/
public static KafkaTransportProtocol kafka(String kafkaHost, Integer kafkaPort, String topic) {
- return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic, kafkaHost, kafkaPort);
+ return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic);
}
/**
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 36a810b513..9ad6fc81db 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -2007,6 +2007,7 @@ export class KafkaTransportProtocol extends TransportProtocol {
'acks': string;
'batchSize': string;
'groupId': string;
+ 'groupInstanceId': string;
'kafkaPort': number;
'lingerMs': number;
'maxRequestSize': string;
diff --git a/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.ts b/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.ts
index 01e07422b0..af1c318b40 100644
--- a/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.ts
+++ b/ui/src/app/connect/components/runtime-info/pipeline-element-runtime-info.component.ts
@@ -16,19 +16,21 @@
*
*/
-import { Component, Input, OnDestroy, OnInit } from '@angular/core';
+import { Component, Input, OnDestroy } from '@angular/core';
import {
EventPropertyUnion,
+ KafkaTransportProtocol,
SpDataStream,
} from '@streampipes/platform-services';
import { RestService } from '../../services/rest.service';
+import { UUID } from 'angular2-uuid';
@Component({
selector: 'sp-pipeline-element-runtime-info',
templateUrl: './pipeline-element-runtime-info.component.html',
styleUrls: ['./pipeline-element-runtime-info.component.scss'],
})
-export class PipelineElementRuntimeInfoComponent implements OnInit, OnDestroy {
+export class PipelineElementRuntimeInfoComponent implements OnDestroy {
@Input()
streamDescription: SpDataStream;
@@ -38,14 +40,27 @@ export class PipelineElementRuntimeInfoComponent implements OnInit, OnDestroy {
timer: any;
runtimeDataError = false;
+ groupId: string;
+ groupInstanceId = 'runtime-info';
+
constructor(private restService: RestService) {}
- ngOnInit(): void {
- this.checkPollingStart();
+ checkAndAssignGroupId(): void {
+ if (!this.groupId && this.streamDescription) {
+ if (
+ this.streamDescription.eventGrounding
+ .transportProtocols[0] instanceof KafkaTransportProtocol
+ ) {
+ this.streamDescription.eventGrounding.transportProtocols[0].groupId =
+ UUID.UUID();
+ this.streamDescription.eventGrounding.transportProtocols[0].groupInstanceId =
+ this.groupInstanceId;
+ }
+ }
}
checkPollingStart() {
- if (this._pollingActive) {
+ if (this._pollingActive && this.streamDescription) {
this.getLatestRuntimeInfo();
}
}
@@ -103,6 +118,7 @@ export class PipelineElementRuntimeInfoComponent implements OnInit, OnDestroy {
@Input()
set pollingActive(pollingActive: boolean) {
this._pollingActive = pollingActive;
+ this.checkAndAssignGroupId();
this.checkPollingStart();
}
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index 73056e6462..875683cd94 100644
--- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -21,6 +21,7 @@ import { ShepherdService } from '../../../services/tour/shepherd.service';
import { RestService } from '../../services/rest.service';
import {
AdapterDescription,
+ AdapterService,
ErrorMessage,
Message,
PipelineOperationStatus,
@@ -29,7 +30,6 @@ import {
} from '@streampipes/platform-services';
import { DialogRef } from '@streampipes/shared-ui';
import { PipelineInvocationBuilder } from '../../../core-services/template/PipelineInvocationBuilder';
-import { AdapterService } from '../../../../../projects/streampipes/platform-services/src/lib/apis/adapter.service';
@Component({
selector: 'sp-dialog-adapter-started-dialog',
@@ -187,7 +187,12 @@ export class AdapterStartedDialog implements OnInit {
.subscribe(pipelineOperationStatus => {
this.pipelineOperationStatus =
pipelineOperationStatus;
- this.startAdapter(message, adapterElementId);
+ setTimeout(() => {
+ this.startAdapter(
+ message,
+ adapterElementId,
+ );
+ }, 3000);
});
},
res => {
diff --git a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
index 8e57fc405e..30fd188c83 100644
--- a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
+++ b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
@@ -56,13 +56,12 @@ export class PipelineElementPreviewComponent implements OnInit {
) {
this.runtimeData = data;
}
-
- this.timer = setTimeout(() => {
- this.getLatestRuntimeInfo();
- }, 1000);
} else {
this.runtimeDataError = true;
}
+ this.timer = setTimeout(() => {
+ this.getLatestRuntimeInfo();
+ }, 1000);
});
}
}