From 9da0ea063c28e36b6e3eb00440b0a3e42ce4ba86 Mon Sep 17 00:00:00 2001 From: Yasen Aleksandrov Date: Wed, 27 Nov 2019 14:18:12 +0200 Subject: [PATCH 1/3] Added message-processor module. --- message-processor/.gitignore | 31 ++++ message-processor/pom.xml | 137 ++++++++++++++++ .../processor/ApplicationConfiguration.java | 23 +++ .../example/message/processor/Constants.java | 9 ++ .../MessageProcessorApplication.java | 11 ++ .../processor/downstream/ThingsToHubFlow.java | 59 +++++++ .../processing/MessageProcessingService.java | 28 ++++ .../processor/transport/AmqpClient.java | 126 +++++++++++++++ .../processor/transport/AmqpServer.java | 153 ++++++++++++++++++ .../processor/upstream/HubToThingsFlow.java | 84 ++++++++++ .../src/main/resources/application.yml | 15 ++ pom.xml | 3 +- 12 files changed, 678 insertions(+), 1 deletion(-) create mode 100644 message-processor/.gitignore create mode 100644 message-processor/pom.xml create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java create mode 100644 message-processor/src/main/resources/application.yml diff --git a/message-processor/.gitignore b/message-processor/.gitignore new file mode 100644 index 0000000..a2a3040 --- /dev/null +++ b/message-processor/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/message-processor/pom.xml b/message-processor/pom.xml new file mode 100644 index 0000000..e6910c3 --- /dev/null +++ b/message-processor/pom.xml @@ -0,0 +1,137 @@ + + + + iot-things-examples + com.bosch.iot.things.examples + 0-SNAPSHOT + + 4.0.0 + + message-processor + 0.0.1-SNAPSHOT + Bosch IoT Things :: Custom Message Processor + Message processor application for Bosch IoT Hub/Things services + + + 8 + + 3.8.2 + 0.33.2 + + false + 2.2.0.RELEASE + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.version} + pom + import + + + io.vertx + vertx-dependencies + ${stack.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + io.vertx + vertx-core + + + org.apache.qpid + proton-j + ${proton.version} + + + io.vertx + vertx-proton + 3.8.2 + + + junit + junit + 4.12 + test + + + io.vertx + vertx-unit + test + + + org.reactivestreams + reactive-streams-tck + 1.0.3 + test + + + io.reactivex.rxjava2 + rxjava + 2.1.0 + test + + + + io.vertx + vertx-codegen + processor + provided + + + + io.vertx + vertx-docgen + provided + + + org.slf4j + slf4j-api + + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.version} + + + + repackage + + + com.bosch.iot.things.example.message.processor.MessageProcessorApplication + + + + + + + \ No newline at end of file diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java new file mode 100644 index 0000000..7f5974c --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java @@ -0,0 +1,23 @@ +package com.bosch.iot.things.example.message.processor; + +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import com.bosch.iot.things.example.message.processor.transport.AmqpServer; +import io.vertx.core.Vertx; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +@Configuration +public class ApplicationConfiguration { + + @Bean + public Vertx vertx() { return Vertx.vertx(); } + + @Bean + public AmqpServer server(){ return new AmqpServer(); } + + @Bean + @DependsOn("server") + public AmqpClient client(){ return new AmqpClient(); } + +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java new file mode 100644 index 0000000..4163a9e --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java @@ -0,0 +1,9 @@ +package com.bosch.iot.things.example.message.processor; + +public class Constants { + public static final String TELEMETRY_ENDPOINT = "telemetry/"; + public static final String EVENT_ENDPOINT = "event/"; + public static final String CONTROL_ENDPOINT = "control/"; + public static final String CONTROL_REPLY_ENDPOINT = "/replies"; + public static final int TIME_OUT = 2000; +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java new file mode 100644 index 0000000..1e7d74a --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/MessageProcessorApplication.java @@ -0,0 +1,11 @@ +package com.bosch.iot.things.example.message.processor; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MessageProcessorApplication { + public static void main(String[] args) { + SpringApplication.run(MessageProcessorApplication.class, args); + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java new file mode 100644 index 0000000..19d4cf0 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java @@ -0,0 +1,59 @@ +package com.bosch.iot.things.example.message.processor.downstream; + +import com.bosch.iot.things.example.message.processor.Constants; +import com.bosch.iot.things.example.message.processor.processing.MessageProcessingService; +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import io.vertx.proton.*; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.bosch.iot.things.example.message.processor.Constants.TIME_OUT; + +@Component +public class ThingsToHubFlow { + + private static final Logger log = LoggerFactory.getLogger(ThingsToHubFlow.class); + + @Autowired + private volatile AmqpClient amqpClient; + + @Autowired + private MessageProcessingService messageProcessingService; + + @Value(value = "${tenant.id}") + private String tenantId; + + @PostConstruct + private void start() { + if (amqpClient.isConnected(TIME_OUT)) { + receiveFromThingsSentToHub(); + } else { + log.error("Either connection to local server or connection to hub server not initialized"); + } + } + + private void receiveFromThingsSentToHub() { + ProtonConnection localConnection = amqpClient.getLocalConnection(); + ProtonReceiver receiver = localConnection.createReceiver(Constants.CONTROL_ENDPOINT + tenantId); + receiver.handler((delivery, msg) -> { + log.debug("Received command message with content: " + msg.getBody().toString()); + Message processedMessage = this.messageProcessingService.getProcessedMessage(msg); + forwardToHub(amqpClient.getHubConnection(), processedMessage); + }).open(); + } + + private void forwardToHub(ProtonConnection hubConnection, Message msg) { + ProtonSender sender = hubConnection.createSender(msg.getAddress()); + sender.open(); + sender.send(msg, delivery -> { + log.info(String.format("The message was received by the HUB service: remote state=%s, remotely settled=%s", + delivery.getRemoteState(), delivery.remotelySettled())); + }); + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java new file mode 100644 index 0000000..abd6bc4 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java @@ -0,0 +1,28 @@ +package com.bosch.iot.things.example.message.processor.processing; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; +import org.springframework.stereotype.Service; + +@Service +public class MessageProcessingService { + + public Message getProcessedMessage(Message message) { + Section body = message.getBody(); + if (body instanceof Data) { + String msgString = ((Data) body).getValue().toString(); + //Simple modification of payload + String modifiedBody = msgString.replaceAll("60", "70"); + message.setBody((new Data(new Binary(modifiedBody.getBytes())))); + } else if (body instanceof AmqpValue) { + String msgString = ((AmqpValue) body).getValue().toString(); + //Simple modification of payload + String modifiedBody = msgString.replaceAll("60", "70"); + message.setBody(new AmqpValue(modifiedBody)); + } + return message; + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java new file mode 100644 index 0000000..b3b6c08 --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java @@ -0,0 +1,126 @@ +package com.bosch.iot.things.example.message.processor.transport; + +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonClient; +import io.vertx.proton.ProtonClientOptions; +import io.vertx.proton.ProtonConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.util.Objects; + +import static java.lang.Thread.sleep; + +public class AmqpClient { + + private static final Logger log = LoggerFactory.getLogger(AmqpClient.class); + + @Autowired + private Vertx vertx; + + @Value(value = "${hub.client.host}") + private String hubHost; + + @Value(value = "${hub.client.port}") + private Integer hubPort; + + @Value(value = "${hub.client.username}") + private String hubUsername; + + @Value(value = "${hub.client.password}") + private String hubPassword; + + @Value(value = "${local.client.host}") + private String localHost; + + @Value(value = "${local.client.port}") + private Integer localPort; + + private ProtonClient localClient; + private ProtonConnection localConnection; + private ProtonClient hubClient; + private ProtonConnection hubConnection; + + + @PostConstruct + private void start() { + setLocalClient(ProtonClient.create(vertx)); + setHubClient(ProtonClient.create(vertx)); + connectLocalClient(localClient); + connectHubClient(hubClient); + } + + private void connectLocalClient(ProtonClient localClient) { + localClient.connect(localHost, localPort, res -> { + if (res.succeeded()) { + log.info("We're connected to local AMQP server! " + res.result().getHostname()); + setLocalConnection(res.result()); + localConnection.open(); + } else { + log.error("Error connecting to local AMQP server!"); + res.cause().printStackTrace(); + } + }); + } + + private void connectHubClient(ProtonClient hubClient) { + hubClient.connect(new ProtonClientOptions().setSsl(Boolean.TRUE), hubHost, hubPort, hubUsername, hubPassword, res -> { + if (res.succeeded()) { + log.info("We're connected to Hub server!"); + setHubConnection(res.result()); + hubConnection.open(); + } else { + log.error("Error connecting to Hub server!"); + res.cause().printStackTrace(); + } + }); + } + + public boolean isConnected(int timeout) { + try { + while (timeout > 0 && (Objects.isNull(this.getLocalConnection()) || Objects.isNull(this.getHubConnection()))) { + sleep(100); + timeout -= 100; + } + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + return Objects.nonNull(this.getLocalConnection()) && Objects.nonNull(this.getHubConnection()); + } + + public ProtonClient getLocalClient() { + return localClient; + } + + private void setLocalClient(ProtonClient localClient) { + this.localClient = localClient; + } + + public ProtonConnection getLocalConnection() { + return localConnection; + } + + private void setLocalConnection(ProtonConnection localConnection) { + this.localConnection = localConnection; + } + + public ProtonClient getHubClient() { + return hubClient; + } + + private void setHubClient(ProtonClient hubClient) { + this.hubClient = hubClient; + } + + public ProtonConnection getHubConnection() { + return hubConnection; + } + + private void setHubConnection(ProtonConnection hubConnection) { + this.hubConnection = hubConnection; + } + +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java new file mode 100644 index 0000000..fdf74ea --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java @@ -0,0 +1,153 @@ +package com.bosch.iot.things.example.message.processor.transport; + +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonSender; +import io.vertx.proton.ProtonServer; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; + +import static com.bosch.iot.things.example.message.processor.Constants.CONTROL_ENDPOINT; +import static com.bosch.iot.things.example.message.processor.Constants.CONTROL_REPLY_ENDPOINT; + +public class AmqpServer { + + private static final Logger log = LoggerFactory.getLogger(AmqpServer.class); + + @Autowired + private Vertx vertx; + + @Value(value = "${tenant.id}") + private String tenantId; + + @Value(value = "${local.client.port}") + private Integer localPort; + + private ProtonServer localServer; + + private ProtonConnection connection; + + private Map senders = new HashMap<>(); + + @PostConstruct + private void start() { + // Create the Vert.x AMQP server instance + setLocalServer(ProtonServer.create(vertx)); + listen(); + } + + private void listen(){ + localServer.connectHandler((connection) -> { + setConnection(connection); + handleConnection(connection); + }).listen(localPort,(res) -> { + if (res.succeeded()) { + log.info("Listening on: " + res.result().actualPort()); + } else { + log.error("Error while starting local AMQP server!"); + res.cause().printStackTrace(); + } + }); + } + + private void handleConnection(ProtonConnection connection) { + connectionHandler(connection); + connectionReceiverHandler(connection); + connectionSenderHandler(connection); + } + + public void connectionHandler(ProtonConnection connection) { + connection.openHandler(res -> { + log.info("Client connected: " + connection.getRemoteContainer()); + connection.open(); + }).closeHandler(c -> { + log.info("Client closing amqp connection: " + connection.getRemoteContainer()); + connection.close(); + connection.disconnect(); + }).disconnectHandler(c -> { + log.info("Client socket disconnected: " + connection.getRemoteContainer()); + connection.disconnect(); + }).sessionOpenHandler(session -> session.open()); + } + + private void connectionReceiverHandler(ProtonConnection connection) { + connection.receiverOpenHandler(receiver -> { + log.info("Receiving from: " + receiver.getRemoteTarget().getAddress()); + receiver.setTarget(receiver.getRemoteTarget()) + .handler((delivery, msg) -> { + String address = receiver.getRemoteTarget().getAddress(); + logMessageInfo(msg, address); + sendMessage(msg, address); + }).open(); + }); + } + + private void logMessageInfo(Message msg, String address) { + log.debug("MSG.BODY_TYPE: " + msg.getBody().getType()); + log.debug("MSG.ADDRESS: " + msg.getAddress()); + log.debug("MSG.REPLY_TO: " + msg.getReplyTo()); + log.debug("MSG.SUBJECT: " + msg.getSubject()); + log.info("message to:" + address); + log.debug("body: " + msg.getBody().toString()); + } + + private void sendMessage(Message msg, String address) { + if (address.startsWith(CONTROL_ENDPOINT) && !address.endsWith(CONTROL_REPLY_ENDPOINT)) { + forwardToHubService(msg, CONTROL_ENDPOINT + tenantId); + } else { + forwardToThingsService(msg, address); + } + } + + private void forwardToHubService(Message msg, String address) { + ProtonSender sender = senders.get(address); + log.debug("Sending message to HUB service"); + log.debug("payload: " + msg.getBody().toString()); + sender.send(msg, delivery -> { + log.info(String.format("The message was received by the local client : remote state=%s, remotely settled=%s message format=%d", + delivery.getRemoteState(), delivery.remotelySettled(), delivery.getMessageFormat())); + }); + } + private void forwardToThingsService(Message msg, String address) { + ProtonSender sender = senders.get(address); + log.debug("Sending message to THINGS service"); + log.debug("payload: " + msg.getBody().toString()); + sender.send(msg, delivery -> { + log.info(String.format("The message was received by THINGS service : remote state=%s, remotely settled=%s message format=%d", + delivery.getRemoteState(), delivery.remotelySettled(), delivery.getMessageFormat())); + }); + } + + private void connectionSenderHandler(ProtonConnection connection) { + connection.senderOpenHandler(sender -> { + String senderAddress = sender.getRemoteSource().getAddress(); + log.info("Sending to client from: " + senderAddress); + sender.setSource(sender.getRemoteSource()); + sender.open(); + senders.put(senderAddress, sender); + }); + } + + public ProtonServer getLocalServer() { + return localServer; + } + + private void setLocalServer(ProtonServer server) { + localServer = server; + } + + public ProtonConnection getConnection() { + return connection; + } + + private void setConnection(ProtonConnection connection) { + this.connection = connection; + } +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java new file mode 100644 index 0000000..c61d40f --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java @@ -0,0 +1,84 @@ +package com.bosch.iot.things.example.message.processor.upstream; + +import com.bosch.iot.things.example.message.processor.Constants; +import com.bosch.iot.things.example.message.processor.processing.MessageProcessingService; +import com.bosch.iot.things.example.message.processor.transport.AmqpClient; +import io.vertx.proton.*; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static com.bosch.iot.things.example.message.processor.Constants.TIME_OUT; + +@Component +public class HubToThingsFlow { + + private static final Logger log = LoggerFactory.getLogger(HubToThingsFlow.class); + + @Autowired + private AmqpClient amqpClient; + + @Autowired + private MessageProcessingService messageProcessingService; + + @Value(value = "${tenant.id}") + private String tenantId; + + private Map hubToLocalServerSenders = new HashMap<>(); + + @PostConstruct + public void start() { + if (amqpClient.isConnected(TIME_OUT)) { + receiveFromHubSendToThings(amqpClient.getHubConnection()); + } else { + log.error("Either connection to local server or connection to hub server not initialized"); + } + } + + private void receiveFromHubSendToThings(ProtonConnection hubConnection) { + createHubReceiver(Constants.TELEMETRY_ENDPOINT + tenantId, hubConnection); + createHubReceiver(Constants.EVENT_ENDPOINT + tenantId, hubConnection); + createHubReceiver(Constants.CONTROL_ENDPOINT + tenantId + Constants.CONTROL_REPLY_ENDPOINT, hubConnection); + } + + private void createHubReceiver(String address, ProtonConnection hubConnection) { + ProtonReceiver hubReceiver = hubConnection.createReceiver(address); + createHubToThingsSender(address); + sendToThingsService(hubReceiver); + } + + private void createHubToThingsSender(String address) { + ProtonSender hubToThingsSender = amqpClient.getLocalConnection().createSender(address); + hubToLocalServerSenders.put(address, hubToThingsSender); + } + + private void sendToThingsService(ProtonReceiver hubReceiver) { + hubReceiver.handler((delivery, msg) -> { + String address = hubReceiver.getRemoteSource().getAddress(); + log.debug("Received message from HUB service with content: " + msg.getBody().toString()); + Message processedMessage = this.messageProcessingService.getProcessedMessage(msg); + forwardToThings(processedMessage, address); + }).open(); + } + + private void forwardToThings(Message message, String address) { + ProtonSender sender = hubToLocalServerSenders.get(address); + if (Objects.nonNull(sender)) { + sender.open(); + log.debug("Sending message to local server"); + sender.send(message, delivery -> { + log.info(String.format("The message was received by the local server: remote state=%s, remotely settled=%s", delivery.getRemoteState(), delivery.remotelySettled())); + }); + } else { + log.error(String.format("Can't forward message to local server over address: %s", address)); + } + } +} \ No newline at end of file diff --git a/message-processor/src/main/resources/application.yml b/message-processor/src/main/resources/application.yml new file mode 100644 index 0000000..ee8fee5 --- /dev/null +++ b/message-processor/src/main/resources/application.yml @@ -0,0 +1,15 @@ +hub: + client: + host: + port: + username: + password: +tenant: + id: +local: + client: + host: + port: +logging: + level: + com.bosch.iot.things.example.message.processor: INFO \ No newline at end of file diff --git a/pom.xml b/pom.xml index b022d4e..5f0662a 100755 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ things-batch-importer octopus-telemetry octopus-bidirectional + message-processor @@ -57,7 +58,7 @@ 4.0.0 1.7.12 - 1.1.3 + 1.2.3 UTF-8 From eddfa48d7c3c258dccd98b8b93248a2069e38aa3 Mon Sep 17 00:00:00 2001 From: Yasen Aleksandrov Date: Fri, 24 Jan 2020 14:46:24 +0200 Subject: [PATCH 2/3] Changed deprecated control endpoint with command. Address some more issues mentioned in PR comments. --- message-processor/pom.xml | 11 +++++---- .../example/message/processor/Constants.java | 5 ++-- .../processor/downstream/ThingsToHubFlow.java | 11 +++++---- .../processing/MessageProcessingService.java | 8 ++++++- .../processing/MessageProcessor.java | 9 ++++++++ .../processor/transport/AmqpServer.java | 23 ++----------------- .../processor/upstream/HubToThingsFlow.java | 8 +++---- 7 files changed, 38 insertions(+), 37 deletions(-) create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java diff --git a/message-processor/pom.xml b/message-processor/pom.xml index e6910c3..0a6e649 100644 --- a/message-processor/pom.xml +++ b/message-processor/pom.xml @@ -19,6 +19,9 @@ 3.8.2 0.33.2 + 2.1.0 + 1.0.3 + 4.12 false 2.2.0.RELEASE @@ -71,12 +74,12 @@ io.vertx vertx-proton - 3.8.2 + ${stack.version} junit junit - 4.12 + ${junit.version} test @@ -87,13 +90,13 @@ org.reactivestreams reactive-streams-tck - 1.0.3 + ${rs.version} test io.reactivex.rxjava2 rxjava - 2.1.0 + ${rx.version} test diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java index 4163a9e..ab6e882 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java @@ -3,7 +3,8 @@ public class Constants { public static final String TELEMETRY_ENDPOINT = "telemetry/"; public static final String EVENT_ENDPOINT = "event/"; - public static final String CONTROL_ENDPOINT = "control/"; - public static final String CONTROL_REPLY_ENDPOINT = "/replies"; + public static final String COMMAND_RESPONSE = "command_response/"; + public static final String COMMAND_ENDPOINT = "command/"; + public static final String REPLIES = "/replies"; public static final int TIME_OUT = 2000; } diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java index 19d4cf0..7bf86c3 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java @@ -1,7 +1,7 @@ package com.bosch.iot.things.example.message.processor.downstream; import com.bosch.iot.things.example.message.processor.Constants; -import com.bosch.iot.things.example.message.processor.processing.MessageProcessingService; +import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; import com.bosch.iot.things.example.message.processor.transport.AmqpClient; import io.vertx.proton.*; import org.apache.qpid.proton.message.Message; @@ -13,6 +13,7 @@ import javax.annotation.PostConstruct; +import static com.bosch.iot.things.example.message.processor.Constants.COMMAND_ENDPOINT; import static com.bosch.iot.things.example.message.processor.Constants.TIME_OUT; @Component @@ -24,7 +25,7 @@ public class ThingsToHubFlow { private volatile AmqpClient amqpClient; @Autowired - private MessageProcessingService messageProcessingService; + private MessageProcessor messageProcessingService; @Value(value = "${tenant.id}") private String tenantId; @@ -40,16 +41,16 @@ private void start() { private void receiveFromThingsSentToHub() { ProtonConnection localConnection = amqpClient.getLocalConnection(); - ProtonReceiver receiver = localConnection.createReceiver(Constants.CONTROL_ENDPOINT + tenantId); + ProtonReceiver receiver = localConnection.createReceiver(Constants.COMMAND_ENDPOINT + tenantId); receiver.handler((delivery, msg) -> { log.debug("Received command message with content: " + msg.getBody().toString()); - Message processedMessage = this.messageProcessingService.getProcessedMessage(msg); + Message processedMessage = this.messageProcessingService.decrypt(msg); forwardToHub(amqpClient.getHubConnection(), processedMessage); }).open(); } private void forwardToHub(ProtonConnection hubConnection, Message msg) { - ProtonSender sender = hubConnection.createSender(msg.getAddress()); + ProtonSender sender = hubConnection.createSender(COMMAND_ENDPOINT + tenantId); sender.open(); sender.send(msg, delivery -> { log.info(String.format("The message was received by the HUB service: remote state=%s, remotely settled=%s", diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java index abd6bc4..a2f3966 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessingService.java @@ -8,7 +8,13 @@ import org.springframework.stereotype.Service; @Service -public class MessageProcessingService { +public class MessageProcessingService implements MessageProcessor{ + + @Override + public Message encrypt(Message message) { return getProcessedMessage(message); } + + @Override + public Message decrypt(Message message) { return getProcessedMessage(message); } public Message getProcessedMessage(Message message) { Section body = message.getBody(); diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java new file mode 100644 index 0000000..62a5f3d --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/processing/MessageProcessor.java @@ -0,0 +1,9 @@ +package com.bosch.iot.things.example.message.processor.processing; + +import org.apache.qpid.proton.message.Message; + +public interface MessageProcessor { + Message encrypt(Message message); + + Message decrypt(Message message); +} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java index fdf74ea..b3b1f3a 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java @@ -14,8 +14,7 @@ import java.util.HashMap; import java.util.Map; -import static com.bosch.iot.things.example.message.processor.Constants.CONTROL_ENDPOINT; -import static com.bosch.iot.things.example.message.processor.Constants.CONTROL_REPLY_ENDPOINT; +import static com.bosch.iot.things.example.message.processor.Constants.*; public class AmqpServer { @@ -99,28 +98,10 @@ private void logMessageInfo(Message msg, String address) { } private void sendMessage(Message msg, String address) { - if (address.startsWith(CONTROL_ENDPOINT) && !address.endsWith(CONTROL_REPLY_ENDPOINT)) { - forwardToHubService(msg, CONTROL_ENDPOINT + tenantId); - } else { - forwardToThingsService(msg, address); - } - } - - private void forwardToHubService(Message msg, String address) { - ProtonSender sender = senders.get(address); - log.debug("Sending message to HUB service"); - log.debug("payload: " + msg.getBody().toString()); - sender.send(msg, delivery -> { - log.info(String.format("The message was received by the local client : remote state=%s, remotely settled=%s message format=%d", - delivery.getRemoteState(), delivery.remotelySettled(), delivery.getMessageFormat())); - }); - } - private void forwardToThingsService(Message msg, String address) { ProtonSender sender = senders.get(address); - log.debug("Sending message to THINGS service"); log.debug("payload: " + msg.getBody().toString()); sender.send(msg, delivery -> { - log.info(String.format("The message was received by THINGS service : remote state=%s, remotely settled=%s message format=%d", + log.info(String.format("The message was received : remote state=%s, remotely settled=%s message format=%d", delivery.getRemoteState(), delivery.remotelySettled(), delivery.getMessageFormat())); }); } diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java index c61d40f..582833c 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java @@ -1,7 +1,7 @@ package com.bosch.iot.things.example.message.processor.upstream; import com.bosch.iot.things.example.message.processor.Constants; -import com.bosch.iot.things.example.message.processor.processing.MessageProcessingService; +import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; import com.bosch.iot.things.example.message.processor.transport.AmqpClient; import io.vertx.proton.*; import org.apache.qpid.proton.message.Message; @@ -27,7 +27,7 @@ public class HubToThingsFlow { private AmqpClient amqpClient; @Autowired - private MessageProcessingService messageProcessingService; + private MessageProcessor messageProcessingService; @Value(value = "${tenant.id}") private String tenantId; @@ -46,7 +46,7 @@ public void start() { private void receiveFromHubSendToThings(ProtonConnection hubConnection) { createHubReceiver(Constants.TELEMETRY_ENDPOINT + tenantId, hubConnection); createHubReceiver(Constants.EVENT_ENDPOINT + tenantId, hubConnection); - createHubReceiver(Constants.CONTROL_ENDPOINT + tenantId + Constants.CONTROL_REPLY_ENDPOINT, hubConnection); + createHubReceiver(Constants.COMMAND_RESPONSE + tenantId + Constants.REPLIES, hubConnection); } private void createHubReceiver(String address, ProtonConnection hubConnection) { @@ -64,7 +64,7 @@ private void sendToThingsService(ProtonReceiver hubReceiver) { hubReceiver.handler((delivery, msg) -> { String address = hubReceiver.getRemoteSource().getAddress(); log.debug("Received message from HUB service with content: " + msg.getBody().toString()); - Message processedMessage = this.messageProcessingService.getProcessedMessage(msg); + Message processedMessage = this.messageProcessingService.encrypt(msg); forwardToThings(processedMessage, address); }).open(); } From eef08da682a50f55686cf964db482e9bc0796d82 Mon Sep 17 00:00:00 2001 From: Yasen Aleksandrov Date: Thu, 30 Jan 2020 15:57:35 +0200 Subject: [PATCH 3/3] Simplified architecture by removing internal proton client. Initialize Hub connection upon Things connecting to internal amqp server. --- .../processor/ApplicationConfiguration.java | 2 +- .../example/message/processor/Constants.java | 10 --- .../processor/downstream/ThingsToHubFlow.java | 52 +++++------ .../processor/transport/AmqpClient.java | 81 +++++------------ .../processor/transport/AmqpServer.java | 86 ++++++++++--------- .../processor/upstream/HubToThingsFlow.java | 71 ++++++--------- .../message/processor/utils/Utils.java | 21 +++++ .../src/main/resources/application.yml | 17 ++-- 8 files changed, 145 insertions(+), 195 deletions(-) delete mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java create mode 100644 message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java index 7f5974c..8f89e5d 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/ApplicationConfiguration.java @@ -14,10 +14,10 @@ public class ApplicationConfiguration { public Vertx vertx() { return Vertx.vertx(); } @Bean + @DependsOn("client") public AmqpServer server(){ return new AmqpServer(); } @Bean - @DependsOn("server") public AmqpClient client(){ return new AmqpClient(); } } diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java deleted file mode 100644 index ab6e882..0000000 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/Constants.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.bosch.iot.things.example.message.processor; - -public class Constants { - public static final String TELEMETRY_ENDPOINT = "telemetry/"; - public static final String EVENT_ENDPOINT = "event/"; - public static final String COMMAND_RESPONSE = "command_response/"; - public static final String COMMAND_ENDPOINT = "command/"; - public static final String REPLIES = "/replies"; - public static final int TIME_OUT = 2000; -} diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java index 7bf86c3..0b89af1 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/downstream/ThingsToHubFlow.java @@ -1,20 +1,18 @@ package com.bosch.iot.things.example.message.processor.downstream; -import com.bosch.iot.things.example.message.processor.Constants; import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; import com.bosch.iot.things.example.message.processor.transport.AmqpClient; -import io.vertx.proton.*; +import com.bosch.iot.things.example.message.processor.utils.Utils; +import io.vertx.proton.ProtonSender; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; - -import static com.bosch.iot.things.example.message.processor.Constants.COMMAND_ENDPOINT; -import static com.bosch.iot.things.example.message.processor.Constants.TIME_OUT; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; @Component public class ThingsToHubFlow { @@ -25,34 +23,28 @@ public class ThingsToHubFlow { private volatile AmqpClient amqpClient; @Autowired - private MessageProcessor messageProcessingService; + private Utils utils; - @Value(value = "${tenant.id}") - private String tenantId; + @Autowired + private MessageProcessor messageProcessingService; - @PostConstruct - private void start() { - if (amqpClient.isConnected(TIME_OUT)) { - receiveFromThingsSentToHub(); - } else { - log.error("Either connection to local server or connection to hub server not initialized"); - } - } + private Map hubSenders = new HashMap<>(); - private void receiveFromThingsSentToHub() { - ProtonConnection localConnection = amqpClient.getLocalConnection(); - ProtonReceiver receiver = localConnection.createReceiver(Constants.COMMAND_ENDPOINT + tenantId); - receiver.handler((delivery, msg) -> { - log.debug("Received command message with content: " + msg.getBody().toString()); - Message processedMessage = this.messageProcessingService.decrypt(msg); - forwardToHub(amqpClient.getHubConnection(), processedMessage); - }).open(); + public void init(Set thingsReceiverAddresses) { + thingsReceiverAddresses.forEach(address -> { + ProtonSender hubSender = amqpClient.getHubConnection().createSender(address); + hubSender.open(); + hubSenders.put(address, hubSender); + log.info("Hub sender created for: " + address); + }); } - private void forwardToHub(ProtonConnection hubConnection, Message msg) { - ProtonSender sender = hubConnection.createSender(COMMAND_ENDPOINT + tenantId); - sender.open(); - sender.send(msg, delivery -> { + public void forwardToHub(Message msg, String address) { + log.debug("Sending message to HUB service"); + Message processedMessage = this.messageProcessingService.decrypt(msg); + utils.logMessageInfo(msg, address); + ProtonSender hubSender = hubSenders.get(address); + hubSender.send(processedMessage, delivery -> { log.info(String.format("The message was received by the HUB service: remote state=%s, remotely settled=%s", delivery.getRemoteState(), delivery.remotelySettled())); }); diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java index b3b6c08..740db68 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpClient.java @@ -1,5 +1,7 @@ package com.bosch.iot.things.example.message.processor.transport; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.proton.ProtonClient; import io.vertx.proton.ProtonClientOptions; @@ -12,8 +14,6 @@ import javax.annotation.PostConstruct; import java.util.Objects; -import static java.lang.Thread.sleep; - public class AmqpClient { private static final Logger log = LoggerFactory.getLogger(AmqpClient.class); @@ -21,90 +21,55 @@ public class AmqpClient { @Autowired private Vertx vertx; - @Value(value = "${hub.client.host}") + @Value(value = "${hub.host}") private String hubHost; - @Value(value = "${hub.client.port}") + @Value(value = "${hub.port}") private Integer hubPort; - @Value(value = "${hub.client.username}") + @Value(value = "${hub.username}") private String hubUsername; - @Value(value = "${hub.client.password}") + @Value(value = "${hub.password}") private String hubPassword; - @Value(value = "${local.client.host}") - private String localHost; - - @Value(value = "${local.client.port}") - private Integer localPort; - - private ProtonClient localClient; - private ProtonConnection localConnection; private ProtonClient hubClient; private ProtonConnection hubConnection; - @PostConstruct private void start() { - setLocalClient(ProtonClient.create(vertx)); setHubClient(ProtonClient.create(vertx)); - connectLocalClient(localClient); - connectHubClient(hubClient); - } - - private void connectLocalClient(ProtonClient localClient) { - localClient.connect(localHost, localPort, res -> { - if (res.succeeded()) { - log.info("We're connected to local AMQP server! " + res.result().getHostname()); - setLocalConnection(res.result()); - localConnection.open(); - } else { - log.error("Error connecting to local AMQP server!"); - res.cause().printStackTrace(); - } - }); } - private void connectHubClient(ProtonClient hubClient) { + private Future connectHubClient() { + Promise promise = Promise.promise(); hubClient.connect(new ProtonClientOptions().setSsl(Boolean.TRUE), hubHost, hubPort, hubUsername, hubPassword, res -> { if (res.succeeded()) { - log.info("We're connected to Hub server!"); + log.info("Connected to Hub server!"); setHubConnection(res.result()); hubConnection.open(); + promise.complete(); } else { - log.error("Error connecting to Hub server!"); - res.cause().printStackTrace(); + log.error(String.format("Error connecting to Hub server! Cause - %s", res.cause().getMessage())); + promise.fail(res.cause()); } }); + return promise.future(); } - public boolean isConnected(int timeout) { - try { - while (timeout > 0 && (Objects.isNull(this.getLocalConnection()) || Objects.isNull(this.getHubConnection()))) { - sleep(100); - timeout -= 100; - } - } catch (Throwable throwable) { - throwable.printStackTrace(); + public Future requestHubConnection() { + if (Objects.isNull(this.getHubConnection())) { + return this.connectHubClient(); } - return Objects.nonNull(this.getLocalConnection()) && Objects.nonNull(this.getHubConnection()); + return Future.succeededFuture(); } - public ProtonClient getLocalClient() { - return localClient; - } - - private void setLocalClient(ProtonClient localClient) { - this.localClient = localClient; - } - - public ProtonConnection getLocalConnection() { - return localConnection; - } - - private void setLocalConnection(ProtonConnection localConnection) { - this.localConnection = localConnection; + public void disconnectFromHub() { + if (Objects.nonNull(this.getHubConnection()) && !this.getHubConnection().isDisconnected()) { + getHubConnection().close().disconnect(); + setHubConnection(null); + log.info("Disconnected from Hub server!"); + } } public ProtonClient getHubClient() { diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java index b3b1f3a..3c394c9 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/transport/AmqpServer.java @@ -1,10 +1,13 @@ package com.bosch.iot.things.example.message.processor.transport; +import com.bosch.iot.things.example.message.processor.downstream.ThingsToHubFlow; +import com.bosch.iot.things.example.message.processor.upstream.HubToThingsFlow; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.proton.ProtonConnection; import io.vertx.proton.ProtonSender; import io.vertx.proton.ProtonServer; -import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -12,9 +15,9 @@ import javax.annotation.PostConstruct; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; - -import static com.bosch.iot.things.example.message.processor.Constants.*; +import java.util.Set; public class AmqpServer { @@ -23,17 +26,22 @@ public class AmqpServer { @Autowired private Vertx vertx; - @Value(value = "${tenant.id}") - private String tenantId; + @Autowired + private HubToThingsFlow hubToThingsFlow; + + @Autowired + private ThingsToHubFlow thingsToHubFlow; + + @Autowired + private AmqpClient amqpClient; - @Value(value = "${local.client.port}") + @Value(value = "${local.server.port}") private Integer localPort; private ProtonServer localServer; - private ProtonConnection connection; - - private Map senders = new HashMap<>(); + private Map thingsSenders = new HashMap<>(); + private Set thingsReceiverAddresses = new HashSet<>(); @PostConstruct private void start() { @@ -51,69 +59,69 @@ private void listen(){ log.info("Listening on: " + res.result().actualPort()); } else { log.error("Error while starting local AMQP server!"); - res.cause().printStackTrace(); + log.error(res.cause().getMessage()); } }); } private void handleConnection(ProtonConnection connection) { - connectionHandler(connection); - connectionReceiverHandler(connection); - connectionSenderHandler(connection); + Future connectionSetUp = connectionHandler(connection) + .compose(c -> connectionSenderHandler(connection)) + .compose(c -> connectionReceiverHandler(connection)) + .compose(c -> amqpClient.requestHubConnection()); + connectionSetUp.setHandler(res ->{ + if (res.succeeded()) { + hubToThingsFlow.init(thingsSenders); + thingsToHubFlow.init(thingsReceiverAddresses); + } + }); } - public void connectionHandler(ProtonConnection connection) { + private Future connectionHandler(ProtonConnection connection) { + Promise connectionOpenPromise = Promise.promise(); connection.openHandler(res -> { log.info("Client connected: " + connection.getRemoteContainer()); connection.open(); + connectionOpenPromise.complete(); }).closeHandler(c -> { log.info("Client closing amqp connection: " + connection.getRemoteContainer()); connection.close(); connection.disconnect(); + amqpClient.disconnectFromHub(); }).disconnectHandler(c -> { log.info("Client socket disconnected: " + connection.getRemoteContainer()); connection.disconnect(); + amqpClient.disconnectFromHub(); }).sessionOpenHandler(session -> session.open()); + return connectionOpenPromise.future(); } - private void connectionReceiverHandler(ProtonConnection connection) { + private Future connectionReceiverHandler(ProtonConnection connection) { + Promise receiverPromise = Promise.promise(); connection.receiverOpenHandler(receiver -> { - log.info("Receiving from: " + receiver.getRemoteTarget().getAddress()); + String address = receiver.getRemoteTarget().getAddress(); + log.info("Receiving from: " + address); receiver.setTarget(receiver.getRemoteTarget()) .handler((delivery, msg) -> { - String address = receiver.getRemoteTarget().getAddress(); - logMessageInfo(msg, address); - sendMessage(msg, address); + thingsToHubFlow.forwardToHub(msg, address); }).open(); + thingsReceiverAddresses.add(address); + receiverPromise.complete(); }); + return receiverPromise.future(); } - private void logMessageInfo(Message msg, String address) { - log.debug("MSG.BODY_TYPE: " + msg.getBody().getType()); - log.debug("MSG.ADDRESS: " + msg.getAddress()); - log.debug("MSG.REPLY_TO: " + msg.getReplyTo()); - log.debug("MSG.SUBJECT: " + msg.getSubject()); - log.info("message to:" + address); - log.debug("body: " + msg.getBody().toString()); - } - - private void sendMessage(Message msg, String address) { - ProtonSender sender = senders.get(address); - log.debug("payload: " + msg.getBody().toString()); - sender.send(msg, delivery -> { - log.info(String.format("The message was received : remote state=%s, remotely settled=%s message format=%d", - delivery.getRemoteState(), delivery.remotelySettled(), delivery.getMessageFormat())); - }); - } - - private void connectionSenderHandler(ProtonConnection connection) { + private Future connectionSenderHandler(ProtonConnection connection) { + Promise senderPromise = Promise.promise(); connection.senderOpenHandler(sender -> { String senderAddress = sender.getRemoteSource().getAddress(); log.info("Sending to client from: " + senderAddress); sender.setSource(sender.getRemoteSource()); sender.open(); - senders.put(senderAddress, sender); + thingsSenders.put(senderAddress, sender); + senderPromise.complete(); }); + return senderPromise.future(); } public ProtonServer getLocalServer() { diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java index 582833c..90c1a20 100644 --- a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/upstream/HubToThingsFlow.java @@ -1,22 +1,17 @@ package com.bosch.iot.things.example.message.processor.upstream; -import com.bosch.iot.things.example.message.processor.Constants; import com.bosch.iot.things.example.message.processor.processing.MessageProcessor; import com.bosch.iot.things.example.message.processor.transport.AmqpClient; -import io.vertx.proton.*; +import com.bosch.iot.things.example.message.processor.utils.Utils; +import io.vertx.proton.ProtonReceiver; +import io.vertx.proton.ProtonSender; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import java.util.HashMap; import java.util.Map; -import java.util.Objects; - -import static com.bosch.iot.things.example.message.processor.Constants.TIME_OUT; @Component public class HubToThingsFlow { @@ -27,58 +22,40 @@ public class HubToThingsFlow { private AmqpClient amqpClient; @Autowired - private MessageProcessor messageProcessingService; - - @Value(value = "${tenant.id}") - private String tenantId; - - private Map hubToLocalServerSenders = new HashMap<>(); + private Utils utils; - @PostConstruct - public void start() { - if (amqpClient.isConnected(TIME_OUT)) { - receiveFromHubSendToThings(amqpClient.getHubConnection()); - } else { - log.error("Either connection to local server or connection to hub server not initialized"); - } - } + @Autowired + private MessageProcessor messageProcessingService; - private void receiveFromHubSendToThings(ProtonConnection hubConnection) { - createHubReceiver(Constants.TELEMETRY_ENDPOINT + tenantId, hubConnection); - createHubReceiver(Constants.EVENT_ENDPOINT + tenantId, hubConnection); - createHubReceiver(Constants.COMMAND_RESPONSE + tenantId + Constants.REPLIES, hubConnection); + public void init(Map thingsSenders) { + thingsSenders.entrySet().stream().forEach(entry -> forwardToThings(entry.getKey(), entry.getValue())); } - private void createHubReceiver(String address, ProtonConnection hubConnection) { - ProtonReceiver hubReceiver = hubConnection.createReceiver(address); - createHubToThingsSender(address); - sendToThingsService(hubReceiver); + public void forwardToThings(String address, ProtonSender thingsSender) { + ProtonReceiver hubReceiver = createHubReceiver(address); + log.info("Hub receiver created for:" + address); + sendToThingsService(hubReceiver, thingsSender); } - private void createHubToThingsSender(String address) { - ProtonSender hubToThingsSender = amqpClient.getLocalConnection().createSender(address); - hubToLocalServerSenders.put(address, hubToThingsSender); + private ProtonReceiver createHubReceiver(String address) { + return amqpClient.getHubConnection().createReceiver(address); } - private void sendToThingsService(ProtonReceiver hubReceiver) { + private void sendToThingsService(ProtonReceiver hubReceiver, ProtonSender thingsSender) { hubReceiver.handler((delivery, msg) -> { - String address = hubReceiver.getRemoteSource().getAddress(); log.debug("Received message from HUB service with content: " + msg.getBody().toString()); Message processedMessage = this.messageProcessingService.encrypt(msg); - forwardToThings(processedMessage, address); + send(processedMessage, thingsSender); }).open(); } - private void forwardToThings(Message message, String address) { - ProtonSender sender = hubToLocalServerSenders.get(address); - if (Objects.nonNull(sender)) { - sender.open(); - log.debug("Sending message to local server"); - sender.send(message, delivery -> { - log.info(String.format("The message was received by the local server: remote state=%s, remotely settled=%s", delivery.getRemoteState(), delivery.remotelySettled())); - }); - } else { - log.error(String.format("Can't forward message to local server over address: %s", address)); - } + private void send(Message message, ProtonSender sender) { + log.debug("Sending message to THINGS service"); + String address = sender.getRemoteSource().getAddress(); + utils.logMessageInfo(message, address); + sender.send(message, delivery -> { + log.info(String.format("The message was received by the THINGS service: remote state=%s, remotely settled=%s", + delivery.getRemoteState(), delivery.remotelySettled())); + }); } } \ No newline at end of file diff --git a/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java new file mode 100644 index 0000000..0d0d08c --- /dev/null +++ b/message-processor/src/main/java/com/bosch/iot/things/example/message/processor/utils/Utils.java @@ -0,0 +1,21 @@ +package com.bosch.iot.things.example.message.processor.utils; + +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class Utils { + + private static final Logger log = LoggerFactory.getLogger(Utils.class); + + public void logMessageInfo(Message msg, String address) { + log.debug("MSG.BODY_TYPE: " + msg.getBody().getType()); + log.debug("MSG.ADDRESS: " + msg.getAddress()); + log.debug("MSG.REPLY_TO: " + msg.getReplyTo()); + log.debug("MSG.SUBJECT: " + msg.getSubject()); + log.info("message to:" + address); + log.debug("body: " + msg.getBody().toString()); + } +} diff --git a/message-processor/src/main/resources/application.yml b/message-processor/src/main/resources/application.yml index ee8fee5..548353d 100644 --- a/message-processor/src/main/resources/application.yml +++ b/message-processor/src/main/resources/application.yml @@ -1,15 +1,12 @@ hub: - client: - host: - port: - username: - password: -tenant: - id: + host: + port: + username: + password: local: - client: - host: + server: port: logging: level: - com.bosch.iot.things.example.message.processor: INFO \ No newline at end of file + com.bosch.iot.things.example.message.processor: INFO +spring.main.web-application-type: none \ No newline at end of file