Skip to content

Commit

Permalink
feat : implement concurrent consumers using separate thread (#617)
Browse files Browse the repository at this point in the history
* feat : implement concurrent consumers using saperate thread

* adds ReadMe and docker compose

* updates with sequence diagram

* updates seq diagram
  • Loading branch information
rajadilipkolli authored Dec 26, 2024
1 parent fcbff8c commit 651bfef
Show file tree
Hide file tree
Showing 17 changed files with 1,070 additions and 1 deletion.
35 changes: 35 additions & 0 deletions .github/workflows/kafka-boot-concurrent-kafka-consumer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: boot-concurrent-kafka-consumer

on:
push:
paths:
- "kafka-spring-boot/boot-concurrent-kafka-consumer/**"
branches: [ main ]
pull_request:
paths:
- "kafka-spring-boot/boot-concurrent-kafka-consumer/**"
types:
- opened
- synchronize
- reopened

jobs:
build:

runs-on: ubuntu-latest
defaults:
run:
working-directory: kafka-spring-boot/boot-concurrent-kafka-consumer

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up JDK
uses: actions/[email protected]
with:
java-version: '21'
distribution: 'temurin'
cache: 'maven'
- name: Build with Maven
run: ./mvnw -B clean verify --file pom.xml
3 changes: 2 additions & 1 deletion kafka-spring-boot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ This folder contains sample kafka producer and consumer using java, spring & Spr
| Type | Description |
|------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------|
| [Spring Boot Kafka Sample](./boot-kafka-sample) | Producing and consuming Messages using Spring Boot |
| [Multiple Producers & Consumers](./spring-boot-multiple-producers-consumers) | Producing and consuming Messages from Multiple Producer and consumers using Spring Boot |
| [Multiple Producers & Consumers](./boot-multiple-producers-consumers) | Producing and consuming Messages from Multiple Producer and consumers using Spring Boot |
| [Concurrent Consumer](./boot-concurrent-kafka-consumer) | Consuming Messages Concurrently using new Thread using Spring Boot |
2 changes: 2 additions & 0 deletions kafka-spring-boot/boot-concurrent-kafka-consumer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

/target
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip
123 changes: 123 additions & 0 deletions kafka-spring-boot/boot-concurrent-kafka-consumer/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Concurrent Kafka Consumer

This project demonstrates a multi-threaded Kafka consumer approach using Spring Boot, with a dedicated implementation that processes messages concurrently across different threads.

### Sequence Diagram

```mermaid
sequenceDiagram
participant C as ConcurrentKafkaConsumer
participant A as AppListener
participant P as ToUpperStringProcessor
C->>C: Spring Boot initializes application
Note over C: Configure concurrent consumers
C->>A: Kafka messages arrive, triggering onMessage()
par Thread 1
A->>A: Retrieve or initialize ThreadLocal<ToUpperStringProcessor>
A->>P: processString(value)
P->>P: Converts to uppercase & enqueues result
and Thread 2
A->>A: Retrieve or initialize ThreadLocal<ToUpperStringProcessor>
A->>P: processString(value)
P->>P: Converts to uppercase & enqueues result
end
Note over A,P: Additional messages handled similarly
alt Error occurs
A->>A: Handle error & retry/dead letter
end
A->>A: onEvent(ConsumerStoppedEvent) -> Remove ThreadLocal processor
```

## Project Structure

### 1. `ConcurrentKafkaConsumer`
- **Location**: `com.example.kafka`
- **Description**:
- Serves as the Spring Boot application entry point.
- Annotated with `@SpringBootApplication`.
- The `main` method runs the Spring application using `SpringApplication.run`.

### 2. `AppListener`
- **Location**: `com.example.kafka.listener`
- **Description**:
- A `@Component` that subscribes to a Kafka topic (referenced by `SPRING_KAFKA_TEST_TOPIC`).
- Uses a `ThreadLocal<ToUpperStringProcessor>` to manage message processing on a per-thread basis.
- Processes messages in the `onMessage` method and handles resource cleanup in `onEvent` when the consumer stops.

### 3. `ToUpperStringProcessor`
- **Location**: `com.example.kafka.processor`
- **Description**:
- A `@Component` with prototype scope, ensuring each instance is unique (e.g., one per thread).
- Transforms incoming strings to uppercase and stores them in a `BlockingQueue`.
- Offers methods like `distinctQueuedData()` to retrieve unique processed values.

## Message Processing Sequence

1. **Startup and Configuration**
- The `ConcurrentKafkaConsumer` class launches the Spring Boot application.
- Kafka consumer properties are configured in `application.properties`, including concurrency level and bootstrap servers.

2. **Receiving Messages**
- The `AppListener` class is annotated with `@KafkaListener`, making it an endpoint for Kafka messages.
- When a message arrives, `onMessage` is triggered, and a `ToUpperStringProcessor` is retrieved or created for the current thread.

3. **Processing**
- The processor converts the message to uppercase and stores it in a thread-safe queue.
- Each thread maintains its own `ToUpperStringProcessor`, eliminating conflicts between parallel consumers.

4. **Resource Cleanup**
- When a consumer stops, `AppListener`’s `onEvent` method removes the processor from its `ThreadLocal`, preventing resource leaks.

## Running the Application

1. **Build and Start**

```
./mvnw clean package
./mvnw spring-boot:run
```
2. **Send Kafka Messages**
Send messages to the topic defined in `application.properties` (default is `SPRING_KAFKA_TEST_TOPIC`).
3. **Observe Logs**
Each thread logs its own unique processor ID and message processing. Check the console or log files for details.
## Key Advantages
- **Increased Throughput**: Multiple threads can consume and process messages in parallel.
- **Isolation**: Each thread has a dedicated processor, avoiding synchronization overload.
- **Scalability**: The concurrency level can be fine-tuned via application properties.
### Format code
```shell
./mvnw spotless:apply
```

### Run tests

```shell
./mvnw clean verify
```

### Run locally

```shell
$ docker-compose -f docker/docker-compose.yml up -d
$ ./mvnw spring-boot:run -Dspring-boot.run.profiles=local
```

### Using Testcontainers at Development Time
You can run `TestConcurrentKafkaConsumer.java` from your IDE directly.
You can also run the application using Maven as follows:

```shell
./mvnw spring-boot:test-run
```


### Useful Links
* Actuator Endpoint: http://localhost:8080/actuator
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
services:

broker:
image: apache/kafka-native:3.8.1
hostname: broker
ports:
- "9092:9092"
healthcheck:
test: [ "CMD-SHELL", "nc -z localhost 9092" ]
interval: 30s
timeout: 5s
retries: 3
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
#Use two listeners with different names, it will force Kafka to communicate with itself via internal
#listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
KAFKA_LISTENERS: INTERNAL://broker:29092,EXTERNAL_HOST://0.0.0.0:9092,CONTROLLER://broker:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://broker:29092,EXTERNAL_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 32
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9094
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 9223372036854775807
CLUSTER_ID: 4L6g3nShT-eMCtK--X86sw

kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "broker:29092"
depends_on:
broker:
condition: service_healthy
Loading

0 comments on commit 651bfef

Please sign in to comment.