Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Custom Message Processor for Hub & Things services. #35

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions message-processor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/

### VS Code ###
.vscode/
140 changes: 140 additions & 0 deletions message-processor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-things-examples</artifactId>
<groupId>com.bosch.iot.things.examples</groupId>
<version>0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>message-processor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Bosch IoT Things :: Custom Message Processor</name>
<description>Message processor application for Bosch IoT Hub/Things services</description>

<properties>
<java.version>8</java.version>
<!-- Dependency versions -->
<stack.version>3.8.2</stack.version>
<proton.version>0.33.2</proton.version>
<rx.version>2.1.0</rx.version>
<rs.version>1.0.3</rs.version>
<junit.version>4.12</junit.version>
<!-- Test properties -->
<proton.trace.frames>false</proton.trace.frames>
<spring.version>2.2.0.RELEASE</spring.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-dependencies</artifactId>
<version>${stack.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
<version>${proton.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${stack.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${rs.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.version}</version>
<scope>test</scope>
</dependency>
<!-- codegen -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<classifier>processor</classifier>
<scope>provided</scope>
</dependency>
<!-- doc -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-docgen</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<mainClass>com.bosch.iot.things.example.message.processor.MessageProcessorApplication</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.bosch.iot.things.example.message.processor;

import com.bosch.iot.things.example.message.processor.transport.AmqpClient;
import com.bosch.iot.things.example.message.processor.transport.AmqpServer;
import io.vertx.core.Vertx;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

@Configuration
public class ApplicationConfiguration {

@Bean
public Vertx vertx() { return Vertx.vertx(); }

@Bean
@DependsOn("client")
public AmqpServer server(){ return new AmqpServer(); }

@Bean
public AmqpClient client(){ return new AmqpClient(); }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.bosch.iot.things.example.message.processor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(MessageProcessorApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.bosch.iot.things.example.message.processor.downstream;

import com.bosch.iot.things.example.message.processor.processing.MessageProcessor;
import com.bosch.iot.things.example.message.processor.transport.AmqpClient;
import com.bosch.iot.things.example.message.processor.utils.Utils;
import io.vertx.proton.ProtonSender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

@Component
public class ThingsToHubFlow {

private static final Logger log = LoggerFactory.getLogger(ThingsToHubFlow.class);

@Autowired
private volatile AmqpClient amqpClient;

@Autowired
private Utils utils;

@Autowired
private MessageProcessor messageProcessingService;

private Map<String, ProtonSender> hubSenders = new HashMap<>();

public void init(Set<String> thingsReceiverAddresses) {
thingsReceiverAddresses.forEach(address -> {
ProtonSender hubSender = amqpClient.getHubConnection().createSender(address);
hubSender.open();
hubSenders.put(address, hubSender);
log.info("Hub sender created for: " + address);
});
}

public void forwardToHub(Message msg, String address) {
log.debug("Sending message to HUB service");
Message processedMessage = this.messageProcessingService.decrypt(msg);
utils.logMessageInfo(msg, address);
ProtonSender hubSender = hubSenders.get(address);
hubSender.send(processedMessage, delivery -> {
log.info(String.format("The message was received by the HUB service: remote state=%s, remotely settled=%s",
delivery.getRemoteState(), delivery.remotelySettled()));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.bosch.iot.things.example.message.processor.processing;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.springframework.stereotype.Service;

@Service
public class MessageProcessingService implements MessageProcessor{

@Override
public Message encrypt(Message message) { return getProcessedMessage(message); }

@Override
public Message decrypt(Message message) { return getProcessedMessage(message); }

public Message getProcessedMessage(Message message) {
Section body = message.getBody();
if (body instanceof Data) {
String msgString = ((Data) body).getValue().toString();
//Simple modification of payload
String modifiedBody = msgString.replaceAll("60", "70");
message.setBody((new Data(new Binary(modifiedBody.getBytes()))));
} else if (body instanceof AmqpValue) {
String msgString = ((AmqpValue) body).getValue().toString();
//Simple modification of payload
String modifiedBody = msgString.replaceAll("60", "70");
message.setBody(new AmqpValue(modifiedBody));
}
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.bosch.iot.things.example.message.processor.processing;

import org.apache.qpid.proton.message.Message;

public interface MessageProcessor {
Message encrypt(Message message);

Message decrypt(Message message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.bosch.iot.things.example.message.processor.transport;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import javax.annotation.PostConstruct;
import java.util.Objects;

public class AmqpClient {

private static final Logger log = LoggerFactory.getLogger(AmqpClient.class);

@Autowired
private Vertx vertx;

@Value(value = "${hub.host}")
private String hubHost;

@Value(value = "${hub.port}")
private Integer hubPort;

@Value(value = "${hub.username}")
private String hubUsername;

@Value(value = "${hub.password}")
private String hubPassword;

private ProtonClient hubClient;
private ProtonConnection hubConnection;

@PostConstruct
private void start() {
setHubClient(ProtonClient.create(vertx));
}

private Future<ProtonConnection> connectHubClient() {
Promise<ProtonConnection> promise = Promise.promise();
hubClient.connect(new ProtonClientOptions().setSsl(Boolean.TRUE), hubHost, hubPort, hubUsername, hubPassword, res -> {
if (res.succeeded()) {
log.info("Connected to Hub server!");
setHubConnection(res.result());
hubConnection.open();
promise.complete();
} else {
log.error(String.format("Error connecting to Hub server! Cause - %s", res.cause().getMessage()));
promise.fail(res.cause());
}
});
return promise.future();
}

public Future<ProtonConnection> requestHubConnection() {
if (Objects.isNull(this.getHubConnection())) {
return this.connectHubClient();
}
return Future.succeededFuture();
}

public void disconnectFromHub() {
if (Objects.nonNull(this.getHubConnection()) && !this.getHubConnection().isDisconnected()) {
getHubConnection().close().disconnect();
setHubConnection(null);
log.info("Disconnected from Hub server!");
}
}

public ProtonClient getHubClient() {
return hubClient;
}

private void setHubClient(ProtonClient hubClient) {
this.hubClient = hubClient;
}

public ProtonConnection getHubConnection() {
return hubConnection;
}

private void setHubConnection(ProtonConnection hubConnection) {
this.hubConnection = hubConnection;
}

}
Loading