diff --git a/message-processor/.gitignore b/message-processor/.gitignore new file mode 100644 index 0000000..a2a3040 --- /dev/null +++ b/message-processor/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/message-processor/pom.xml b/message-processor/pom.xml new file mode 100644 index 0000000..0a6e649 --- /dev/null +++ b/message-processor/pom.xml @@ -0,0 +1,140 @@ + + + + iot-things-examples + com.bosch.iot.things.examples + 0-SNAPSHOT + + 4.0.0 + + message-processor + 0.0.1-SNAPSHOT + Bosch IoT Things :: Custom Message Processor + Message processor application for Bosch IoT Hub/Things services + + + 8 + + 3.8.2 + 0.33.2 + 2.1.0 + 1.0.3 + 4.12 + + false + 2.2.0.RELEASE + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.version} + pom + import + + + io.vertx + vertx-dependencies + ${stack.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + io.vertx + vertx-core + + + org.apache.qpid + proton-j + ${proton.version} + + + io.vertx + vertx-proton + ${stack.version} + + + junit + junit + ${junit.version} + test + + + io.vertx + vertx-unit + test + + + org.reactivestreams + reactive-streams-tck + ${rs.version} + test + + + io.reactivex.rxjava2 + rxjava + ${rx.version} + test + + + + io.vertx + vertx-codegen + processor + provided + + + + io.vertx + vertx-docgen + provided + + + org.slf4j + slf4j-api + + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.version} + + + + repackage + + + com.bosch.iot.things.example.message.processor.MessageProcessorApplication + + + + + + + \ No newline at end of file diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java new file mode 100644 index 0000000..8f89e5d --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java @@ -0,0 +1,23 @@ +package com.bosch.iot.things.example.message.processor; + +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import com.bosch.iot.things.example.message.processor.transport.AmqpServer; +import io.vertx.core.Vertx; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +@Configuration +public class ApplicationConfiguration { + + @Bean + public Vertx vertx() { return Vertx.vertx(); } + + @Bean + @DependsOn("client") + public AmqpServer server(){ return new AmqpServer(); } + + @Bean + public AmqpClient client(){ return new AmqpClient(); } + +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java new file mode 100644 index 0000000..1e7d74a --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java @@ -0,0 +1,11 @@ +package com.bosch.iot.things.example.message.processor; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MessageProcessorApplication { + public static void main(String[] args) { + SpringApplication.run(MessageProcessorApplication.class, args); + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java new file mode 100644 index 0000000..0b89af1 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java @@ -0,0 +1,52 @@ +package com.bosch.iot.things.example.message.processor.downstream; + +import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import com.bosch.iot.things.example.message.processor.utils.Utils; +import io.vertx.proton.ProtonSender; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +@Component +public class ThingsToHubFlow { + + private static final Logger log = LoggerFactory.getLogger(ThingsToHubFlow.class); + + @Autowired + private volatile AmqpClient amqpClient; + + @Autowired + private Utils utils; + + @Autowired + private MessageProcessor messageProcessingService; + + private Map hubSenders = new HashMap<>(); + + public void init(Set thingsReceiverAddresses) { + thingsReceiverAddresses.forEach(address -> { + ProtonSender hubSender = amqpClient.getHubConnection().createSender(address); + hubSender.open(); + hubSenders.put(address, hubSender); + log.info("Hub sender created for: " + address); + }); + } + + public void forwardToHub(Message msg, String address) { + log.debug("Sending message to HUB service"); + Message processedMessage = this.messageProcessingService.decrypt(msg); + utils.logMessageInfo(msg, address); + ProtonSender hubSender = hubSenders.get(address); + hubSender.send(processedMessage, delivery -> { + log.info(String.format("The message was received by the HUB service: remote state=%s, remotely settled=%s", + delivery.getRemoteState(), delivery.remotelySettled())); + }); + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java new file mode 100644 index 0000000..a2f3966 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java @@ -0,0 +1,34 @@ +package com.bosch.iot.things.example.message.processor.processing; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; +import org.springframework.stereotype.Service; + +@Service +public class MessageProcessingService implements MessageProcessor{ + + @Override + public Message encrypt(Message message) { return getProcessedMessage(message); } + + @Override + public Message decrypt(Message message) { return getProcessedMessage(message); } + + public Message getProcessedMessage(Message message) { + Section body = message.getBody(); + if (body instanceof Data) { + String msgString = ((Data) body).getValue().toString(); + //Simple modification of payload + String modifiedBody = msgString.replaceAll("60", "70"); + message.setBody((new Data(new Binary(modifiedBody.getBytes())))); + } else if (body instanceof AmqpValue) { + String msgString = ((AmqpValue) body).getValue().toString(); + //Simple modification of payload + String modifiedBody = msgString.replaceAll("60", "70"); + message.setBody(new AmqpValue(modifiedBody)); + } + return message; + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java new file mode 100644 index 0000000..62a5f3d --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java @@ -0,0 +1,9 @@ +package com.bosch.iot.things.example.message.processor.processing; + +import org.apache.qpid.proton.message.Message; + +public interface MessageProcessor { + Message encrypt(Message message); + + Message decrypt(Message message); +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java new file mode 100644 index 0000000..740db68 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java @@ -0,0 +1,91 @@ +package com.bosch.iot.things.example.message.processor.transport; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonClient; +import io.vertx.proton.ProtonClientOptions; +import io.vertx.proton.ProtonConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.util.Objects; + +public class AmqpClient { + + private static final Logger log = LoggerFactory.getLogger(AmqpClient.class); + + @Autowired + private Vertx vertx; + + @Value(value = "${hub.host}") + private String hubHost; + + @Value(value = "${hub.port}") + private Integer hubPort; + + @Value(value = "${hub.username}") + private String hubUsername; + + @Value(value = "${hub.password}") + private String hubPassword; + + private ProtonClient hubClient; + private ProtonConnection hubConnection; + + @PostConstruct + private void start() { + setHubClient(ProtonClient.create(vertx)); + } + + private Future connectHubClient() { + Promise promise = Promise.promise(); + hubClient.connect(new ProtonClientOptions().setSsl(Boolean.TRUE), hubHost, hubPort, hubUsername, hubPassword, res -> { + if (res.succeeded()) { + log.info("Connected to Hub server!"); + setHubConnection(res.result()); + hubConnection.open(); + promise.complete(); + } else { + log.error(String.format("Error connecting to Hub server! Cause - %s", res.cause().getMessage())); + promise.fail(res.cause()); + } + }); + return promise.future(); + } + + public Future requestHubConnection() { + if (Objects.isNull(this.getHubConnection())) { + return this.connectHubClient(); + } + return Future.succeededFuture(); + } + + public void disconnectFromHub() { + if (Objects.nonNull(this.getHubConnection()) && !this.getHubConnection().isDisconnected()) { + getHubConnection().close().disconnect(); + setHubConnection(null); + log.info("Disconnected from Hub server!"); + } + } + + public ProtonClient getHubClient() { + return hubClient; + } + + private void setHubClient(ProtonClient hubClient) { + this.hubClient = hubClient; + } + + public ProtonConnection getHubConnection() { + return hubConnection; + } + + private void setHubConnection(ProtonConnection hubConnection) { + this.hubConnection = hubConnection; + } + +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java new file mode 100644 index 0000000..3c394c9 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java @@ -0,0 +1,142 @@ +package com.bosch.iot.things.example.message.processor.transport; + +import com.bosch.iot.things.example.message.processor.downstream.ThingsToHubFlow; +import com.bosch.iot.things.example.message.processor.upstream.HubToThingsFlow; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonSender; +import io.vertx.proton.ProtonServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class AmqpServer { + + private static final Logger log = LoggerFactory.getLogger(AmqpServer.class); + + @Autowired + private Vertx vertx; + + @Autowired + private HubToThingsFlow hubToThingsFlow; + + @Autowired + private ThingsToHubFlow thingsToHubFlow; + + @Autowired + private AmqpClient amqpClient; + + @Value(value = "${local.server.port}") + private Integer localPort; + + private ProtonServer localServer; + private ProtonConnection connection; + private Map thingsSenders = new HashMap<>(); + private Set thingsReceiverAddresses = new HashSet<>(); + + @PostConstruct + private void start() { + // Create the Vert.x AMQP server instance + setLocalServer(ProtonServer.create(vertx)); + listen(); + } + + private void listen(){ + localServer.connectHandler((connection) -> { + setConnection(connection); + handleConnection(connection); + }).listen(localPort,(res) -> { + if (res.succeeded()) { + log.info("Listening on: " + res.result().actualPort()); + } else { + log.error("Error while starting local AMQP server!"); + log.error(res.cause().getMessage()); + } + }); + } + + private void handleConnection(ProtonConnection connection) { + Future connectionSetUp = connectionHandler(connection) + .compose(c -> connectionSenderHandler(connection)) + .compose(c -> connectionReceiverHandler(connection)) + .compose(c -> amqpClient.requestHubConnection()); + connectionSetUp.setHandler(res ->{ + if (res.succeeded()) { + hubToThingsFlow.init(thingsSenders); + thingsToHubFlow.init(thingsReceiverAddresses); + } + }); + } + + private Future connectionHandler(ProtonConnection connection) { + Promise connectionOpenPromise = Promise.promise(); + connection.openHandler(res -> { + log.info("Client connected: " + connection.getRemoteContainer()); + connection.open(); + connectionOpenPromise.complete(); + }).closeHandler(c -> { + log.info("Client closing amqp connection: " + connection.getRemoteContainer()); + connection.close(); + connection.disconnect(); + amqpClient.disconnectFromHub(); + }).disconnectHandler(c -> { + log.info("Client socket disconnected: " + connection.getRemoteContainer()); + connection.disconnect(); + amqpClient.disconnectFromHub(); + }).sessionOpenHandler(session -> session.open()); + return connectionOpenPromise.future(); + } + + private Future connectionReceiverHandler(ProtonConnection connection) { + Promise receiverPromise = Promise.promise(); + connection.receiverOpenHandler(receiver -> { + String address = receiver.getRemoteTarget().getAddress(); + log.info("Receiving from: " + address); + receiver.setTarget(receiver.getRemoteTarget()) + .handler((delivery, msg) -> { + thingsToHubFlow.forwardToHub(msg, address); + }).open(); + thingsReceiverAddresses.add(address); + receiverPromise.complete(); + }); + return receiverPromise.future(); + } + + private Future connectionSenderHandler(ProtonConnection connection) { + Promise senderPromise = Promise.promise(); + connection.senderOpenHandler(sender -> { + String senderAddress = sender.getRemoteSource().getAddress(); + log.info("Sending to client from: " + senderAddress); + sender.setSource(sender.getRemoteSource()); + sender.open(); + thingsSenders.put(senderAddress, sender); + senderPromise.complete(); + }); + return senderPromise.future(); + } + + public ProtonServer getLocalServer() { + return localServer; + } + + private void setLocalServer(ProtonServer server) { + localServer = server; + } + + public ProtonConnection getConnection() { + return connection; + } + + private void setConnection(ProtonConnection connection) { + this.connection = connection; + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java new file mode 100644 index 0000000..90c1a20 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java @@ -0,0 +1,61 @@ +package com.bosch.iot.things.example.message.processor.upstream; + +import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import com.bosch.iot.things.example.message.processor.utils.Utils; +import io.vertx.proton.ProtonReceiver; +import io.vertx.proton.ProtonSender; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +public class HubToThingsFlow { + + private static final Logger log = LoggerFactory.getLogger(HubToThingsFlow.class); + + @Autowired + private AmqpClient amqpClient; + + @Autowired + private Utils utils; + + @Autowired + private MessageProcessor messageProcessingService; + + public void init(Map thingsSenders) { + thingsSenders.entrySet().stream().forEach(entry -> forwardToThings(entry.getKey(), entry.getValue())); + } + + public void forwardToThings(String address, ProtonSender thingsSender) { + ProtonReceiver hubReceiver = createHubReceiver(address); + log.info("Hub receiver created for:" + address); + sendToThingsService(hubReceiver, thingsSender); + } + + private ProtonReceiver createHubReceiver(String address) { + return amqpClient.getHubConnection().createReceiver(address); + } + + private void sendToThingsService(ProtonReceiver hubReceiver, ProtonSender thingsSender) { + hubReceiver.handler((delivery, msg) -> { + log.debug("Received message from HUB service with content: " + msg.getBody().toString()); + Message processedMessage = this.messageProcessingService.encrypt(msg); + send(processedMessage, thingsSender); + }).open(); + } + + private void send(Message message, ProtonSender sender) { + log.debug("Sending message to THINGS service"); + String address = sender.getRemoteSource().getAddress(); + utils.logMessageInfo(message, address); + sender.send(message, delivery -> { + log.info(String.format("The message was received by the THINGS service: remote state=%s, remotely settled=%s", + delivery.getRemoteState(), delivery.remotelySettled())); + }); + } +} \ No newline at end of file diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java new file mode 100644 index 0000000..0d0d08c --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java @@ -0,0 +1,21 @@ +package com.bosch.iot.things.example.message.processor.utils; + +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class Utils { + + private static final Logger log = LoggerFactory.getLogger(Utils.class); + + public void logMessageInfo(Message msg, String address) { + log.debug("MSG.BODY_TYPE: " + msg.getBody().getType()); + log.debug("MSG.ADDRESS: " + msg.getAddress()); + log.debug("MSG.REPLY_TO: " + msg.getReplyTo()); + log.debug("MSG.SUBJECT: " + msg.getSubject()); + log.info("message to:" + address); + log.debug("body: " + msg.getBody().toString()); + } +} diff --git a/message-processor/src/main/resources/application.yml b/message-processor/src/main/resources/application.yml new file mode 100644 index 0000000..548353d --- /dev/null +++ b/message-processor/src/main/resources/application.yml @@ -0,0 +1,12 @@ +hub: + host: + port: + username: + password: +local: + server: + port: +logging: + level: + com.bosch.iot.things.example.message.processor: INFO +spring.main.web-application-type: none \ No newline at end of file diff --git a/pom.xml b/pom.xml index b022d4e..5f0662a 100755 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ things-batch-importer octopus-telemetry octopus-bidirectional + message-processor @@ -57,7 +58,7 @@ 4.0.0 1.7.12 - 1.1.3 + 1.2.3 UTF-8