diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index bdbcb41059..c3e6f969f5 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -95,7 +95,20 @@ public enum Envs { SP_HEALTH_SERVICE_MAX_UNHEALTHY_TIME_MS("SP_HEALTH_SERVICE_MAX_UNHEALTHY_TIME_MS", "60000"), - SP_INITIAL_WAIT_BEFORE_INSTALLATION_MS("SP_INITIAL_WAIT_BEFORE_INSTALLATION_MS", "5000"); + SP_INITIAL_WAIT_BEFORE_INSTALLATION_MS("SP_INITIAL_WAIT_BEFORE_INSTALLATION_MS", "5000"), + + // Broker defaults + + SP_KAFKA_HOST("SP_KAFKA_HOST", "kafka"), + SP_KAFKA_PORT("SP_KAFKA_PORT", "9092"), + + SP_MQTT_HOST("SP_MQTT_HOST", "mosquitto"), + SP_MQTT_PORT("SP_MQTT_PORT", "1883"), + + SP_NATS_HOST("SP_NATS_HOST", "nats"), + SP_NATS_PORT("SP_NATS_PORT", "4222"), + + SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 3a5831ae84..c1db1c1851 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -249,6 +249,41 @@ public IntEnvironmentVariable getInitialWaitTimeBeforeInstallationInMillis() { return new IntEnvironmentVariable(Envs.SP_INITIAL_WAIT_BEFORE_INSTALLATION_MS); } + @Override + public StringEnvironmentVariable getKafkaHost() { + return new StringEnvironmentVariable(Envs.SP_KAFKA_HOST); + } + + @Override + public IntEnvironmentVariable getKafkaPort() { + return new IntEnvironmentVariable(Envs.SP_KAFKA_PORT); + } + + @Override + public StringEnvironmentVariable getMqttHost() { + return new StringEnvironmentVariable(Envs.SP_MQTT_HOST); + } + + @Override + public IntEnvironmentVariable getMqttPort() { + return new IntEnvironmentVariable(Envs.SP_MQTT_PORT); + } + + @Override + public StringEnvironmentVariable getNatsHost() { + return new StringEnvironmentVariable(Envs.SP_NATS_HOST); + } + + @Override + public IntEnvironmentVariable getNatsPort() { + return new IntEnvironmentVariable(Envs.SP_NATS_PORT); + } + + @Override + public StringEnvironmentVariable getPulsarUrl() { + return new StringEnvironmentVariable(Envs.SP_PULSAR_URL); + } + @Override public StringEnvironmentVariable getConsulLocation() { return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION); diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 30c6749c2a..913855924e 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -131,4 +131,16 @@ public interface Environment { IntEnvironmentVariable getInitialWaitTimeBeforeInstallationInMillis(); + // Broker defaults + StringEnvironmentVariable getKafkaHost(); + IntEnvironmentVariable getKafkaPort(); + + StringEnvironmentVariable getMqttHost(); + IntEnvironmentVariable getMqttPort(); + + StringEnvironmentVariable getNatsHost(); + IntEnvironmentVariable getNatsPort(); + + StringEnvironmentVariable getPulsarUrl(); + } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java index d2f21ea0cb..457ec36b14 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/DefaultMessagingSettings.java @@ -32,8 +32,6 @@ public MessagingSettings make() { protocolList = switch (env.getPrioritizedProtocol().getValueOrDefault().toLowerCase()) { case "mqtt" -> Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS, SpProtocol.NATS, SpProtocol.PULSAR); - case "kafka" -> - Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS, SpProtocol.NATS, SpProtocol.PULSAR); case "jms" -> Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.NATS, SpProtocol.PULSAR); case "nats" -> @@ -58,16 +56,16 @@ public MessagingSettings make() { defaultSettings.setJmsHost("activemq"); defaultSettings.setJmsPort(61616); - defaultSettings.setMqttHost("mosquitto"); - defaultSettings.setMqttPort(1883); + defaultSettings.setMqttHost(env.getMqttHost().getValueOrDefault()); + defaultSettings.setMqttPort(env.getMqttPort().getValueOrDefault()); - defaultSettings.setNatsHost("nats"); - defaultSettings.setNatsPort(4222); + defaultSettings.setNatsHost(env.getNatsHost().getValueOrDefault()); + defaultSettings.setNatsPort(env.getNatsPort().getValueOrDefault()); - defaultSettings.setKafkaHost("kafka"); - defaultSettings.setKafkaPort(9092); + defaultSettings.setKafkaHost(env.getKafkaHost().getValueOrDefault()); + defaultSettings.setKafkaPort(env.getKafkaPort().getValueOrDefault()); - defaultSettings.setPulsarUrl("pulsar://localhost:6650"); + defaultSettings.setPulsarUrl(env.getPulsarUrl().getValueOrDefault()); defaultSettings.setZookeeperHost("zookeeper"); defaultSettings.setZookeeperPort(2181);