Skip to content

Commit

Permalink
adds ReadMe and docker compose
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Dec 26, 2024
1 parent 86ef429 commit 20f3051
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 4 deletions.
38 changes: 38 additions & 0 deletions kafka-spring-boot/boot-concurrent-kafka-consumer/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Concurrent Kafka Consumer

This project demonstrates a multi-threaded Kafka consumer approach using Spring Boot, with a dedicated implementation that processes messages concurrently across different threads.

### Sequence Diagram



### Format code

```shell
./mvnw spotless:apply
```

### Run tests

```shell
./mvnw clean verify
```

### Run locally

```shell
$ docker-compose -f docker/docker-compose.yml up -d
$ ./mvnw spring-boot:run -Dspring-boot.run.profiles=local
```

### Using Testcontainers at Development Time
You can run `TestConcurrentKafkaConsumer.java` from your IDE directly.
You can also run the application using Maven as follows:

```shell
./mvnw spring-boot:test-run
```


### Useful Links
* Actuator Endpoint: http://localhost:8080/actuator
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
services:

broker:
image: apache/kafka-native:3.8.1
hostname: broker
ports:
- "9092:9092"
healthcheck:
test: [ "CMD-SHELL", "nc -z localhost 9092" ]
interval: 30s
timeout: 5s
retries: 3
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
#Use two listeners with different names, it will force Kafka to communicate with itself via internal
#listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
KAFKA_LISTENERS: INTERNAL://broker:29092,EXTERNAL_HOST://0.0.0.0:9092,CONTROLLER://broker:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://broker:29092,EXTERNAL_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 32
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9094
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 9223372036854775807
CLUSTER_ID: 4L6g3nShT-eMCtK--X86sw

kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "broker:29092"
depends_on:
broker:
condition: service_healthy
3 changes: 2 additions & 1 deletion kafka-spring-boot/boot-concurrent-kafka-consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
Expand Down Expand Up @@ -74,6 +74,7 @@
<importOrder/>
<removeUnusedImports/>
<formatAnnotations/>
<endWithNewline/>
</java>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.example.kafka.listener.AppListener;
import com.example.kafka.processor.ToUpperStringProcessor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -20,7 +21,7 @@ public class ProducerConfig {

@Bean("toUpperStringProcessors")
List<ToUpperStringProcessor> toUpperStringProcessors() {
return new ArrayList<>();
return Collections.synchronizedList(new ArrayList<>());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ void onMessage() {

/**
* Asserts specific behaviors of the StringProcessor instances.
* Verifies that there are exactly two StringProcessor instances, each with a non-empty queue,
* and that the data in the queues of these two processors are distinct.
* Verifies that there are exactly three toUpperStringProcessor instances, each with a non-empty queue,
* and that the data in the queues of these three processors are distinct.
*/
private void assertStringProcessorsBehaviors() {
await().untilAsserted(() -> assertThat(toUpperStringProcessors).hasSize(3));
Expand Down

0 comments on commit 20f3051

Please sign in to comment.