Skip to content

Commit

Permalink
Updated versions, replaced Spring Cloud Streams by Spring Kafka, remo…
Browse files Browse the repository at this point in the history
…ved C7 from Kafka example (related to #73)
  • Loading branch information
berndruecker committed May 23, 2022
1 parent 7e62b96 commit 41c5022
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 54 deletions.
42 changes: 14 additions & 28 deletions kafka/java/monitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,30 @@
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.release>8</maven.compiler.release>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
<spring-cloud-stream.version>Horsham.RELEASE</spring-cloud-stream.version>
<spring.boot.version>2.6.7</spring.boot.version>
<spring.kafka.version>2.8.5</spring.kafka.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<artifactId>spring-boot-starter-json</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>

<!-- Webjars for JS libraries and CSS -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
package io.flowing.retail.monitor.messages;

import java.nio.charset.Charset;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.flowing.retail.monitor.domain.PastEvent;
import io.flowing.retail.monitor.persistence.LogRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@EnableBinding(Sink.class)
public class MessageListener {

private static final String TOPIC_NAME = "flowing-retail";

@Autowired
private SimpMessagingTemplate simpMessageTemplate;

@Autowired
private ObjectMapper objectMapper;

@StreamListener(target = Sink.INPUT)
@Transactional
public void messageReceived(byte[] messageJsonBytes) throws Exception {
String messageJson = new String(messageJsonBytes, "UTF-8");
@KafkaListener(id = "monitor", topics = TOPIC_NAME)
public void messageReceived(String messageJson) throws Exception {
Message<JsonNode> message = objectMapper.readValue( //
messageJson, //
new TypeReference<Message<JsonNode>>() {});
Expand Down
14 changes: 4 additions & 10 deletions kafka/java/monitor/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
spring.application.name=monitor
server.port = 8095

spring.cloud.stream.bindings.output.destination=flowing-retail
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input.destination=flowing-retail
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=monitor

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:29092

server.port = 8095
# Kafka
spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.auto-offset-reset=earliest
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ShipGoodsAdapter {
@Autowired
private OrderRepository orderRepository;

@ZeebeWorker(type = "ship-goods")
@ZeebeWorker(type = "ship-goods", autoComplete = true)
public Map<String, String> handle(ActivatedJob job) {
OrderFlowContext context = OrderFlowContext.fromMap(job.getVariablesAsMap());
Order order = orderRepository.findById(context.getOrderId()).get();
Expand Down

0 comments on commit 41c5022

Please sign in to comment.