A robust, concurrent producer-consumer implementation in Scala demonstrating asynchronous message processing with configurable parameters.
Producers Queue Consumers
--------- ------------- -----------
P1 ──┐ │ │ ┌── C1
│ │ │ │
P2 ──┤ ──► FIFO ──►│ Messages │ ──► Consume ┤── C2
│ │ │ │
P3 ──┤ │ │ ├── C3
│ │ │ │
P4 ──┘ │ │ └── C4
Legend:
P = Producer
C = Consumer
► = Message Flow
Producer Flow Consumer Flow
-------------- --------------
┌─────────────┐ ┌─────────────┐
│ Generate ID │ │ Process │
│ (UUID) │ │ Message │
└─────┬───────┘ └─────┬───────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Add to Channel │ │ Remove from │
│ (Blocking Queue)│ ────────────────► │ Channel │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Track Timestamp │ │ Calculate │
│ & Metrics │ │ Processing Time │
└─────────────────┘ └─────────────────┘
[Producers] [Shared Channel] [Consumers]
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────────┐ ┌───────────────┐
│ Generate │ ──push── │ LinkedBlocking│ ──poll── │ Process │
│ Message │ │ Queue │ │ Message │
│ (UUID) │ │ (Max Size 100)│ │ (With Delay) │
└───────────┘ └───────────────┘ └───────────────┘
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────────┐ ┌───────────────┐
│ Log │ │ Track Message │ │ Log Processing│
│ Produce │ │ Metrics │ │ Metrics │
└───────────┘ └───────────────┘ └───────────────┘
[Message Creation] ──► [Queued] ──► [Consumed] ──► [Processed]
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Generate ID │ │ Timestamp │ │ Start │ │ Calculate │
│ │ │ Enqueue │ │ Processing │ │ Total Time │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Metric Capture Points:
┌───────────────┐
│ Message ID │
│ (UUID) │
└───┬───────────┘
│
▼
┌─────────────────┐
│ Produce Time │
│ (Timestamp) │
└───┬─────────────┘
│
▼
┌─────────────────┐
│ Queue Wait Time │
│ │
└───┬─────────────┘
│
▼
┌─────────────────┐
│ Consume Time │
│ │
└───┬─────────────┘
│
▼
┌─────────────────┐
│ Processing Time │
│ (Total Latency) │
└─────────────────┘
- Concurrent message production and consumption
- Configurable number of producers and consumers
- Dynamic message generation
- Detailed performance metrics
- Flexible delay ranges
- Comprehensive logging
- Java 11+ (Recommended: Java 17 or 21)
- Scala 2.13.x
- SBT 1.9.x
- Scala
- SBT (Build Tool)
- ScalaTest (Unit Testing)
- Logback (Logging)
- Scopt (CLI Parsing)
async-scala/
├── src/
│ ├── main/
│ │ ├── scala/
│ │ │ └── com/example/
│ │ │ ├── Main.scala
│ │ │ ├── ProducerConsumer.scala
│ │ │ ├── ProducerConsumerConfig.scala
│ │ │ └── ProducerConsumerMetrics.scala
│ │ └── resources/
│ │ └── logback.xml
│ └── test/
│ └── scala/
│ └── com/example/
│ └── ProducerConsumerSpec.scala
├── project/
│ └── build.properties
└── build.sbt
git clone https://github.com/yourusername/async-scala.git
cd async-scala
#Install SBT (if not already installed)
#macOS
brew install sbt
sbt compile
sbt test
sbt run
### CLI Parameters
- `--producers` (default: 10): Number of producer threads
- `--consumers` (default: 5): Number of consumer threads
- `--messages` (default: 5): Messages per producer
sbt assembly
java -jar target/scala-2.13/async-scala-assembly-0.1.0.jar
Edit ProducerConsumerConfig.scala
to modify default settings:
scala
ProducerConsumerConfig(
producers = 10,
consumers = 5,
messagesPerProducer = 5,
maxQueueSize = 100,
producerDelayRange = (10, 100),
consumerDelayRange = (10, 100)
)
Logging is configured in logback.xml
. Logs are printed to console with timestamps.
The application tracks:
- Total messages processed
- Individual message processing times
- Producer and consumer performance
Run comprehensive test suite:
sbt test
Tests cover:
- Message processing
- Concurrent scenarios
- Edge cases
- Ensure Java 11+ is installed
- Check SBT and Scala versions
- Verify dependencies in
build.sbt
- Fork the repository
- Create your feature branch
- Commit changes
- Push to the branch
- Create a Pull Request
[MIT License]