This Java-based application demonstrates a sophisticated concurrent message processing system using the producer-consumer design pattern. It showcases advanced concurrent programming techniques, thread management, and configurable message processing.
+-------------------+ +-------------------+
| Producers | | Consumers |
+-------------------+ +-------------------+
| - Generate msgs | | - Process msgs |
| - Put in channel | --> | - Consume from |
+-------------------+ | shared channel |
+-------------------+
| Blocking Queue |
+-------------------+
Producer 1 Producer 2 Producer N
| | |
v v v
+--------------------+ +--------------------+ +--------------------+
| Generate Message | | Generate Message | | Generate Message |
+--------------------+ +--------------------+ +--------------------+
| | |
| | |
v v v
+------------------------------------------------------------+
| Blocking Message Queue |
| [Message 1] [Message 2] [Message 3] ... [Message N] |
+------------------------------------------------------------+
| | |
| Concurrent | |
| Consumption | |
v v v
+--------------------+ +--------------------+ +--------------------+
| Consumer 1 | | Consumer 2 | | Consumer N |
| - Process Message | | - Process Message | | - Process Message |
| - Track Duration | | - Track Duration | | - Track Duration |
+--------------------+ +--------------------+ +--------------------+
| | |
v v v
+------------------------------------------------------------+
| Processed Message Results |
| [Processed Msg] [Processed Msg] [Processed Msg] |
+------------------------------------------------------------+
-
Producers:
- Generate unique messages
- Add messages to a shared blocking queue
- Simulate variable message creation time
-
Consumers:
- Remove messages from the shared queue
- Process messages with variable processing time
- Track message processing duration
-
Shared Channel:
- Thread-safe
ArrayBlockingQueue
- Manages message transfer between producers and consumers
- Thread-safe
- Configurable number of producers and consumers
- Dynamic thread pool management
- Message processing time tracking
- Graceful thread shutdown
- Comprehensive logging
- Command-line configuration
MessageProcessor
: Orchestrates message processingMessage
: Immutable message representationDuration
: Processing time trackingProducerConsumerApp
: CLI entry point
ExecutorService
for thread managementConcurrentHashMap
for thread-safe trackingBlockingQueue
for inter-thread communication- Configurable queue capacity
- Language: Java 17
- Concurrency: Java Concurrent APIs
- CLI: Picocli
- Logging: SLF4J
- Testing: JUnit 5
- Java 17 or higher
- Gradle 8.x
- Clone the repository
git clone https://github.com/yourusername/producer-consumer-app.git
cd producer-consumer-app
- Build the project
./gradlew clean build
- Run the application
# Default configuration
./gradlew run
./gradlew run --args="--numProducers 5 --numConsumers 5 --queueCapacity 100 --messageCreationDelay 1000 --messageProcessingDelay 500"
-p, --producers
: Number of producers (default: 10)-c, --consumers
: Number of consumers (default: 5)-m, --messages
: Messages per producer (default: 5)
Run comprehensive test suite:
./gradlew test
- Dynamic thread pool scaling
- Non-blocking message processing
- Configurable processing delays
- Detailed performance logging
Utilizes SLF4J for comprehensive logging:
- INFO level for system events
- DEBUG level for detailed message tracking
- ERROR level for exception handling
- Advanced metrics collection
- Configurable backoff strategies
- Enhanced error handling
- Prometheus/Micrometer integration for monitoring
MIT License
- Fork the repository
- Create your feature branch
- Commit your changes
- Push to the branch
- Create a new Pull Request