diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e43b0f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/async-java-gradle b/async-java-gradle deleted file mode 160000 index 1f79136..0000000 --- a/async-java-gradle +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1f79136d3c9c45061c44ac063f979f6672c64ab0 diff --git a/async-java-gradle/.gitignore b/async-java-gradle/.gitignore new file mode 100644 index 0000000..416a852 --- /dev/null +++ b/async-java-gradle/.gitignore @@ -0,0 +1,102 @@ +# Create .gitignore file +cat > .gitignore << 'EOL' +# Gradle specific +.gradle/ +build/ +out/ +bin/ + +# Compiled class files +*.class + +# Log files +*.log +logs/ + +# Package Files +*.jar +*.war +*.ear +*.nar + +# Virtual machine crash logs +hs_err_pid* + +# IDE specific files +.idea/ +*.iml +*.ipr +*.iws +.vscode/ +*.code-workspace + +# macOS specific +.DS_Store +.AppleDouble +.LSOverride + +# Eclipse specific +.classpath +.project +.settings/ +.metadata/ + +# NetBeans specific +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +nbactions.xml +nb-configuration.xml + +# Temporary files +*.tmp +*.bak +*.swp +*~.nib + +# Dependency directories +node_modules/ + +# Sensitive or local configuration +*.env +local.properties + +# Compiled Python files +*.pyc + +# Backup files +*.bak +*.gho +*.ori +*.orig +*.tmp + +# Gradle build cache +.gradle-build-cache/ + +# JVM crash logs +hs_err_pid* + +# Compiled bytecode +*.class + +# Log backup files +*.log.* + +# Compiled native libraries +*.dll +*.so +*.dylib + +# Compiled static libraries +*.a +*.lib + +# Profiler and performance analysis files +*.hprof +EOL + +# Set appropriate permissions +chmod 644 .gitignore \ No newline at end of file diff --git a/async-java-gradle/README.md b/async-java-gradle/README.md new file mode 100644 index 0000000..d224abb --- /dev/null +++ b/async-java-gradle/README.md @@ -0,0 +1,177 @@ +# Concurrent Producer-Consumer Message Processing System + +## πŸš€ Project Overview + +This Java-based application demonstrates a sophisticated concurrent message processing system using the producer-consumer design pattern. It showcases advanced concurrent programming techniques, thread management, and configurable message processing. + +## πŸ”§ System Architecture + +### Architectural Diagram +``` ++-------------------+ +-------------------+ +| Producers | | Consumers | ++-------------------+ +-------------------+ +| - Generate msgs | | - Process msgs | +| - Put in channel | --> | - Consume from | ++-------------------+ | shared channel | + +-------------------+ + | Blocking Queue | + +-------------------+ +``` + +### Message Flow Diagram +``` + Producer 1 Producer 2 Producer N + | | | + v v v ++--------------------+ +--------------------+ +--------------------+ +| Generate Message | | Generate Message | | Generate Message | ++--------------------+ +--------------------+ +--------------------+ + | | | + | | | + v v v ++------------------------------------------------------------+ +| Blocking Message Queue | +| [Message 1] [Message 2] [Message 3] ... [Message N] | ++------------------------------------------------------------+ + | | | + | Concurrent | | + | Consumption | | + v v v ++--------------------+ +--------------------+ +--------------------+ +| Consumer 1 | | Consumer 2 | | Consumer N | +| - Process Message | | - Process Message | | - Process Message | +| - Track Duration | | - Track Duration | | - Track Duration | ++--------------------+ +--------------------+ +--------------------+ + | | | + v v v ++------------------------------------------------------------+ +| Processed Message Results | +| [Processed Msg] [Processed Msg] [Processed Msg] | ++------------------------------------------------------------+ +``` + + +### Key Components + +1. **Producers**: + - Generate unique messages + - Add messages to a shared blocking queue + - Simulate variable message creation time + +2. **Consumers**: + - Remove messages from the shared queue + - Process messages with variable processing time + - Track message processing duration + +3. **Shared Channel**: + - Thread-safe `ArrayBlockingQueue` + - Manages message transfer between producers and consumers + +## 🌟 Features + +- Configurable number of producers and consumers +- Dynamic thread pool management +- Message processing time tracking +- Graceful thread shutdown +- Comprehensive logging +- Command-line configuration + +## πŸ›  Technical Details + +### Core Classes + +- `MessageProcessor`: Orchestrates message processing +- `Message`: Immutable message representation +- `Duration`: Processing time tracking +- `ProducerConsumerApp`: CLI entry point + +### Concurrency Mechanisms + +- `ExecutorService` for thread management +- `ConcurrentHashMap` for thread-safe tracking +- `BlockingQueue` for inter-thread communication +- Configurable queue capacity + +## πŸ“¦ Technology Stack + +- **Language**: Java 17 +- **Concurrency**: Java Concurrent APIs +- **CLI**: Picocli +- **Logging**: SLF4J +- **Testing**: JUnit 5 + +## πŸš€ Getting Started + +### Prerequisites + +- Java 17 or higher +- Gradle 8.x + +### Installation + +1. Clone the repository +```bash +git clone https://github.com/yourusername/producer-consumer-app.git +cd producer-consumer-app +``` +2. Build the project +```bash +./gradlew clean build +``` +3. Run the application +```bash +# Default configuration +./gradlew run +``` + +```bash +./gradlew run --args="--numProducers 5 --numConsumers 5 --queueCapacity 100 --messageCreationDelay 1000 --messageProcessingDelay 500" +``` + +### Command-line Options + +- `-p, --producers`: Number of producers (default: 10) +- `-c, --consumers`: Number of consumers (default: 5) +- `-m, --messages`: Messages per producer (default: 5) + +## πŸ§ͺ Testing + +Run comprehensive test suite: +```bash +./gradlew test +``` + + +## πŸ“Š Performance Characteristics + +- Dynamic thread pool scaling +- Non-blocking message processing +- Configurable processing delays +- Detailed performance logging + +## πŸ” Logging + +Utilizes SLF4J for comprehensive logging: +- INFO level for system events +- DEBUG level for detailed message tracking +- ERROR level for exception handling + +## 🚧 Potential Improvements + +- Advanced metrics collection +- Configurable backoff strategies +- Enhanced error handling +- Prometheus/Micrometer integration for monitoring + +## πŸ“œ License + +MIT License + +## 🀝 Contributing + +1. Fork the repository +2. Create your feature branch +3. Commit your changes +4. Push to the branch +5. Create a new Pull Request diff --git a/async-java-gradle/build.gradle b/async-java-gradle/build.gradle new file mode 100644 index 0000000..39e50cc --- /dev/null +++ b/async-java-gradle/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'java' + id 'application' +} + +group 'com.example' +version '1.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + implementation 'com.google.guava:guava:31.1-jre' + implementation 'info.picocli:picocli:4.7.5' + implementation 'org.slf4j:slf4j-api:2.0.9' + implementation 'ch.qos.logback:logback-classic:1.4.11' + + annotationProcessor 'info.picocli:picocli-codegen:4.7.5' + + // Test dependencies + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' +} + +application { + mainClass = 'com.example.ProducerConsumerApp' +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +test { + useJUnitPlatform() +} diff --git a/async-java-gradle/gradle/wrapper/gradle-wrapper.properties b/async-java-gradle/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..1af9e09 --- /dev/null +++ b/async-java-gradle/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/async-java-gradle/gradlew b/async-java-gradle/gradlew new file mode 100755 index 0000000..1aa94a4 --- /dev/null +++ b/async-java-gradle/gradlew @@ -0,0 +1,249 @@ +#!/bin/sh + +# +# Copyright Β© 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions Β«$varΒ», Β«${var}Β», Β«${var:-default}Β», Β«${var+SET}Β», +# Β«${var#prefix}Β», Β«${var%suffix}Β», and Β«$( cmd )Β»; +# * compound commands having a testable exit status, especially Β«caseΒ»; +# * various built-in commands including Β«commandΒ», Β«setΒ», and Β«ulimitΒ». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/async-java-gradle/gradlew.bat b/async-java-gradle/gradlew.bat new file mode 100644 index 0000000..6689b85 --- /dev/null +++ b/async-java-gradle/gradlew.bat @@ -0,0 +1,92 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/async-java-gradle/src/main/java/com/example/Duration.java b/async-java-gradle/src/main/java/com/example/Duration.java new file mode 100644 index 0000000..932a5e3 --- /dev/null +++ b/async-java-gradle/src/main/java/com/example/Duration.java @@ -0,0 +1,18 @@ +package com.example; + +public class Duration { + private long start; + private long end; + + public void setStart(long start) { + this.start = start; + } + + public void setEnd(long end) { + this.end = end; + } + + public long getProcessingTimeMs() { + return (end - start) / 1_000_000; + } +} \ No newline at end of file diff --git a/async-java-gradle/src/main/java/com/example/Message.java b/async-java-gradle/src/main/java/com/example/Message.java new file mode 100644 index 0000000..67e28f1 --- /dev/null +++ b/async-java-gradle/src/main/java/com/example/Message.java @@ -0,0 +1,9 @@ +package com.example; + +import java.util.UUID; + +public record Message(String id) { + public static Message create() { + return new Message(UUID.randomUUID().toString()); + } +} \ No newline at end of file diff --git a/async-java-gradle/src/main/java/com/example/MessageProcessor.java b/async-java-gradle/src/main/java/com/example/MessageProcessor.java new file mode 100644 index 0000000..73470e1 --- /dev/null +++ b/async-java-gradle/src/main/java/com/example/MessageProcessor.java @@ -0,0 +1,149 @@ +package com.example; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.*; + +public class MessageProcessor { + private static final Logger logger = LoggerFactory.getLogger(MessageProcessor.class); + private static final int DEFAULT_QUEUE_CAPACITY = 100; + private static final long SHUTDOWN_TIMEOUT_SECONDS = 60; + + private final int numProducers; + private final int numConsumers; + private final int numMessagesPerProducer; + private final BlockingQueue channel; + private final Map record; + private final List consumed; + + public MessageProcessor(int numProducers, int numConsumers, int numMessagesPerProducer) { + validateInputs(numProducers, numConsumers, numMessagesPerProducer); + + this.numProducers = numProducers; + this.numConsumers = numConsumers; + this.numMessagesPerProducer = numMessagesPerProducer; + this.channel = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + this.record = new ConcurrentHashMap<>(); + this.consumed = new CopyOnWriteArrayList<>(); + } + + private void validateInputs(int producers, int consumers, int messagesPerProducer) { + if (producers <= 0 || consumers <= 0 || messagesPerProducer <= 0) { + throw new IllegalArgumentException("Producers, consumers, and messages must be positive"); + } + } + + public ProcessingResult process() { + logger.info("Starting message processing: producers={}, consumers={}, messages_per_producer={}", + numProducers, numConsumers, numMessagesPerProducer); + + ExecutorService executorService = Executors.newCachedThreadPool(); + List> futures = new ArrayList<>(); + + try { + // Start producers + for (int p = 1; p <= numProducers; p++) { + int producerId = p; + futures.add(executorService.submit(() -> producer(producerId))); + } + + // Start consumers + for (int c = 1; c <= numConsumers; c++) { + int consumerId = c; + futures.add(executorService.submit(() -> consumer(consumerId))); + } + + // Wait for all tasks to complete + futures.forEach(this::awaitFuture); + + return new ProcessingResult(consumed, record); + } finally { + shutdownExecutorService(executorService); + } + } + + private void awaitFuture(Future future) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("Error in task execution", e); + Thread.currentThread().interrupt(); + } + } + + private void shutdownExecutorService(ExecutorService executorService) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void producer(int id) { + Random rng = new Random(); + for (int i = 0; i < numMessagesPerProducer; i++) { + try { + Message msg = Message.create(); + Thread.sleep(100 + rng.nextInt(400)); + channel.put(msg); + Duration duration = new Duration(); + duration.setStart(System.nanoTime()); + record.put(msg, duration); + logger.debug("Message sent by producer_id={}, message={}, message_num={}", + id, msg.id(), i + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + logger.info("Producer completed: producer_id={}", id); + } + + private void consumer(int id) { + Random rng = new Random(); + while (!Thread.currentThread().isInterrupted()) { + try { + Message msg = channel.poll(1, TimeUnit.SECONDS); + if (msg == null) { + if (channel.isEmpty()) break; + continue; + } + Thread.sleep(200 + rng.nextInt(400)); + consumed.add(msg); + record.get(msg).setEnd(System.nanoTime()); + logger.debug("Message received by consumer_id={}, message={}", id, msg.id()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + logger.info("Consumer completed: consumer_id={}", id); + } + + public static class ProcessingResult { + private final List consumedMessages; + private final Map processingTimes; + + public ProcessingResult(List consumedMessages, Map processingTimes) { + this.consumedMessages = consumedMessages; + this.processingTimes = processingTimes; + } + + public List getConsumedMessages() { + return consumedMessages; + } + + public Map getProcessingTimes() { + return processingTimes; + } + } +} diff --git a/async-java-gradle/src/main/java/com/example/ProducerConsumerApp.java b/async-java-gradle/src/main/java/com/example/ProducerConsumerApp.java new file mode 100644 index 0000000..f25620c --- /dev/null +++ b/async-java-gradle/src/main/java/com/example/ProducerConsumerApp.java @@ -0,0 +1,50 @@ +package com.example; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +import java.util.concurrent.Callable; + +@Command(name = "producer-consumer", description = "A producer-consumer application") +public class ProducerConsumerApp implements Callable { + private static final Logger logger = LoggerFactory.getLogger(ProducerConsumerApp.class); + + @Option(names = {"-p", "--producers"}, description = "Number of producers", defaultValue = "10") + private int numProducers; + + @Option(names = {"-c", "--consumers"}, description = "Number of consumers", defaultValue = "5") + private int numConsumers; + + @Option(names = {"-m", "--messages"}, description = "Number of messages per producer", defaultValue = "5") + private int numMessagesPerProducer; + + public static void main(String[] args) { + int exitCode = new CommandLine(new ProducerConsumerApp()).execute(args); + System.exit(exitCode); + } + + @Override + public Integer call() { + logger.info("Starting application with producers={}, consumers={}, messages_per_producer={}", + numProducers, numConsumers, numMessagesPerProducer); + + try { + MessageProcessor processor = new MessageProcessor(numProducers, numConsumers, numMessagesPerProducer); + MessageProcessor.ProcessingResult result = processor.process(); + + logger.info("Total messages consumed: {}", result.getConsumedMessages().size()); + result.getProcessingTimes().forEach((msg, duration) -> + logger.info("Message {} processing time: {} ms", + msg.id(), duration.getProcessingTimeMs()) + ); + + return 0; + } catch (Exception e) { + logger.error("Error in producer-consumer application", e); + return 1; + } + } +} diff --git a/async-java-gradle/src/main/resources/logback.xml b/async-java-gradle/src/main/resources/logback.xml new file mode 100644 index 0000000..3aa5429 --- /dev/null +++ b/async-java-gradle/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/async-java-gradle/src/test/java/com/example/MessageProcessorTest.java b/async-java-gradle/src/test/java/com/example/MessageProcessorTest.java new file mode 100644 index 0000000..8ecac1a --- /dev/null +++ b/async-java-gradle/src/test/java/com/example/MessageProcessorTest.java @@ -0,0 +1,45 @@ +package com.example; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; + +class MessageProcessorTest { + + @Test + void testProcessingWithDefaultParameters() { + MessageProcessor processor = new MessageProcessor(2, 2, 5); + MessageProcessor.ProcessingResult result = processor.process(); + + assertNotNull(result); + assertEquals(10, result.getConsumedMessages().size()); + assertEquals(10, result.getProcessingTimes().size()); + } + + @Test + void testProcessingWithZeroProducers() { + assertThrows(IllegalArgumentException.class, () -> + new MessageProcessor(0, 2, 5) + ); + } + + @Test + void testProcessingWithZeroConsumers() { + assertThrows(IllegalArgumentException.class, () -> + new MessageProcessor(2, 0, 5) + ); + } + + @Test + void testProcessingMessageTimes() { + MessageProcessor processor = new MessageProcessor(1, 1, 5); + MessageProcessor.ProcessingResult result = processor.process(); + + Map processingTimes = result.getProcessingTimes(); + processingTimes.values().forEach(duration -> { + assertTrue(duration.getProcessingTimeMs() >= 0, + "Processing time should be non-negative"); + }); + } +} diff --git a/async-rust b/async-rust deleted file mode 160000 index 674c049..0000000 --- a/async-rust +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 674c0495f92b3d6d621944479afbc91036b391dc diff --git a/async-rust/.gitignore b/async-rust/.gitignore new file mode 100644 index 0000000..3a8cabc --- /dev/null +++ b/async-rust/.gitignore @@ -0,0 +1,2 @@ +/target +.idea diff --git a/async-rust/Cargo.lock b/async-rust/Cargo.lock new file mode 100644 index 0000000..61044a8 --- /dev/null +++ b/async-rust/Cargo.lock @@ -0,0 +1,618 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "async-rust" +version = "0.1.0" +dependencies = [ + "clap", + "env_logger", + "log", + "rand", + "tokio", + "uuid", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "3.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" +dependencies = [ + "atty", + "bitflags 1.3.2", + "clap_lex", + "indexmap", + "strsim", + "termcolor", + "textwrap", +] + +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + +[[package]] +name = "env_logger" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "libc" +version = "0.2.169" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "os_str_bytes" +version = "6.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "syn" +version = "2.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c786062daee0d6db1132800e623df74274a0a87322d8e183338e01b3d98d058" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" + +[[package]] +name = "tokio" +version = "1.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" + +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", + "rand", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/async-rust/Cargo.toml b/async-rust/Cargo.toml new file mode 100644 index 0000000..6d4aff0 --- /dev/null +++ b/async-rust/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "async-rust" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +uuid = { version = "1.1", features = ["v4", "fast-rng"] } +clap = "3.2.25" +log = "0.4" +env_logger = "0.9" +rand = "0.8" diff --git a/async-rust/src/main.rs b/async-rust/src/main.rs new file mode 100644 index 0000000..9ac1435 --- /dev/null +++ b/async-rust/src/main.rs @@ -0,0 +1,157 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; +use tokio::task; +use clap::{App, Arg}; +use log::{info, error}; + +type Message = String; + +struct MessageDuration { + start: Instant, + end: Option, +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let matches = App::new("Producer-Consumer") + .about("A producer-consumer application") + .arg(Arg::with_name("producers") + .short('p') + .long("producers") + .value_name("NUM_PRODUCERS") + .default_value("10") + .help("Number of producers")) + .arg(Arg::with_name("consumers") + .short('c') + .long("consumers") + .value_name("NUM_CONSUMERS") + .default_value("5") + .help("Number of consumers")) + .arg(Arg::with_name("messages") + .short('m') + .long("messages") + .value_name("NUM_MESSAGES_PER_PRODUCER") + .default_value("5") + .help("Number of messages per producer")) + .get_matches(); + + let num_producers = matches.value_of("producers").unwrap().parse().unwrap(); + let num_consumers = matches.value_of("consumers").unwrap().parse().unwrap(); + let num_messages_per_producer = matches.value_of("messages").unwrap().parse().unwrap(); + + info!("Starting with {} producers and {} consumers", num_producers, num_consumers); + + setup(num_producers, num_consumers, num_messages_per_producer).await; +} + +async fn setup(num_producers: usize, num_consumers: usize, num_messages_per_producer: usize) { + let record = Arc::new(Mutex::new(HashMap::new())); + run(num_producers, num_consumers, num_messages_per_producer, record.clone()).await; + info!("################ printing produce and consume delays ################"); + tokio::time::sleep(Duration::from_secs(2)).await; + print_delay(record); + info!("################ printing done ################"); +} + +async fn run(num_producers: usize, num_consumers: usize, num_messages_per_producer: usize, record: Arc>>) { + let (tx, _rx) = broadcast::channel::(100); + + let mut producer_handles = Vec::new(); + let mut consumer_handles = Vec::new(); + + // Spawn producers + for p in 1..=num_producers { + let tx_clone = tx.clone(); + let record_clone = record.clone(); + let handle = task::spawn(async move { + producer(p, tx_clone, num_messages_per_producer, record_clone).await; + }); + producer_handles.push(handle); + } + + // Spawn consumers + let consumed = Arc::new(Mutex::new(Vec::new())); + for c in 1..=num_consumers { + let rx = tx.subscribe(); + let consumed_clone = consumed.clone(); + let record_clone = record.clone(); + let handle = task::spawn(async move { + consumer(c, rx, consumed_clone, record_clone).await; + }); + consumer_handles.push(handle); + } + + // Wait for all producers to complete + for handle in producer_handles { + handle.await.unwrap(); + } + + // Drop the sender to signal no more messages will be sent + drop(tx); + + // Wait for all consumers to complete + for handle in consumer_handles { + handle.await.unwrap(); + } + + info!("All producers and consumers have completed."); + let consumed_messages = consumed.lock().unwrap(); + info!("Number of messages consumed: {}", consumed_messages.len()); +} + +async fn producer(id: usize, sender: broadcast::Sender, num_messages: usize, record: Arc>>) { + for _ in 0..num_messages { + let msg = uuid::Uuid::new_v4().to_string(); + let delay = rand::random::() % 400 + 100; + tokio::time::sleep(Duration::from_millis(delay)).await; + if let Err(e) = sender.send(msg.clone()) { + error!("Producer {} failed to send message: {}", id, e); + return; + } + record.lock().unwrap().insert(msg.clone(), MessageDuration { start: Instant::now(), end: None }); + info!("Producer {} sent {}", id, msg); + } + info!("Producer {} completed", id); +} + +async fn consumer( + id: usize, + mut receiver: broadcast::Receiver, + consumed: Arc>>, + record: Arc>>, +) { + while let Ok(msg) = receiver.recv().await { + // Skip if we've already processed this message (broadcast channels send to all consumers) + { + let mut consumed_msgs = consumed.lock().unwrap(); + if consumed_msgs.contains(&msg) { + continue; + } + // Process the message only if we haven't seen it before + info!("Consumer {} received {}", id, msg); + consumed_msgs.push(msg.clone()); + if let Some(duration) = record.lock().unwrap().get_mut(&msg) { + duration.end = Some(Instant::now()); + } + } // Lock is released here + + let delay = rand::random::() % 400 + 200; + tokio::time::sleep(Duration::from_millis(delay)).await; + } + info!("Consumer {} completed", id); +} + +fn print_delay(record: Arc>>) { + let record = record.lock().unwrap(); + for (msg, duration) in record.iter() { + if let Some(end) = duration.end { + let delay = end.duration_since(duration.start).as_millis(); + info!("Message {} took {} ms", msg, delay); + } + } +} + diff --git a/async-scala b/async-scala deleted file mode 160000 index c6fa96c..0000000 --- a/async-scala +++ /dev/null @@ -1 +0,0 @@ -Subproject commit c6fa96c7260cd6f884c7c6f8650d7f2e33a5e84a diff --git a/async-scala/.gitignore b/async-scala/.gitignore new file mode 100644 index 0000000..c8666bf --- /dev/null +++ b/async-scala/.gitignore @@ -0,0 +1,77 @@ +# SBT specific +target/ +project/project/ +project/target/ +project/boot/ +project/plugins/project/ + +# Scala specific +*.class +*.log + +# IntelliJ IDEA specific +.idea/ +.idea_modules/ +*.iml +*.ipr +*.iws + +# Eclipse specific +.classpath +.project +.settings/ +.metals/ +.bsp/ + +# Mac specific +.DS_Store + +# Logs +*.log +logs/ + +# Virtual machine crash logs +hs_err_pid* + +# Metals (Scala language server) +.metals/ +.bloop/ + +# VS Code +.vscode/ + +# Ensime +.ensime +.ensime_cache/ + +# Dotty/Scala 3 +.dotty-ide.json +.dotty-ide-artifact + +# Dependency management +dependency-reduced-pom.xml + +# Build artifacts +*.jar +!project/build.properties +!project/plugins.sbt + +# Environment files +.env +.env.local +.env.development +.env.test +.env.production + +# Version management files +.java-version +.jvmopts +.tool-versions + +# Optional: .sbtopts (if machine-specific) +# .sbtopts + +# Keep scalafmt configuration +!.scalafmt.conf + +.sbtopts \ No newline at end of file diff --git a/async-scala/.scalafmt.conf b/async-scala/.scalafmt.conf new file mode 100644 index 0000000..71a7c3a --- /dev/null +++ b/async-scala/.scalafmt.conf @@ -0,0 +1,14 @@ +version = "2.7.5" +maxColumn = 100 +align.preset = most +continuationIndent.defnSite = 2 +assumeStandardLibraryStripMargin = true +docstrings.style = Asterisk +lineEndings = preserve +includeCurlyBraceInSelectChains = false +danglingParentheses.preset = true +spaces { + inImportCurlyBraces = true +} +optIn.annotationNewlines = true +rewrite.rules = [SortImports, RedundantBraces] \ No newline at end of file diff --git a/async-scala/README.md b/async-scala/README.md new file mode 100644 index 0000000..0122222 --- /dev/null +++ b/async-scala/README.md @@ -0,0 +1,276 @@ +# Async Scala Producer-Consumer + +## πŸš€ Project Overview + +A robust, concurrent producer-consumer implementation in Scala demonstrating asynchronous message processing with configurable parameters. + +### Message Packet Flow Visualization +```Bash + +Producers Queue Consumers +--------- ------------- ----------- + +P1 ──┐ β”‚ β”‚ β”Œβ”€β”€ C1 + β”‚ β”‚ β”‚ β”‚ +P2 ─── ──► FIFO ──►│ Messages β”‚ ──► Consume ─── C2 + β”‚ β”‚ β”‚ β”‚ +P3 ─── β”‚ β”‚ β”œβ”€β”€ C3 + β”‚ β”‚ β”‚ β”‚ +P4 β”€β”€β”˜ β”‚ β”‚ └── C4 + +Legend: +P = Producer +C = Consumer +β–Ί = Message Flow +``` + +### More Detailed Flow: +```Bash +Producer Flow Consumer Flow +-------------- -------------- + + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Generate ID β”‚ β”‚ Process β”‚ + β”‚ (UUID) β”‚ β”‚ Message β”‚ + β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ + β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Add to Channel β”‚ β”‚ Remove from β”‚ +β”‚ (Blocking Queue)β”‚ ────────────────► β”‚ Channel β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ + β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Track Timestamp β”‚ β”‚ Calculate β”‚ +β”‚ & Metrics β”‚ β”‚ Processing Time β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Comprehensive Flow Diagram: +```Bash +[Producers] [Shared Channel] [Consumers] + β”‚ β”‚ β”‚ + β–Ό β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Generate β”‚ ──push── β”‚ LinkedBlockingβ”‚ ──poll── β”‚ Process β”‚ +β”‚ Message β”‚ β”‚ Queue β”‚ β”‚ Message β”‚ +β”‚ (UUID) β”‚ β”‚ (Max Size 100)β”‚ β”‚ (With Delay) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ β”‚ + β–Ό β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Log β”‚ β”‚ Track Message β”‚ β”‚ Log Processingβ”‚ +β”‚ Produce β”‚ β”‚ Metrics β”‚ β”‚ Metrics β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Metrics Tracking Flow: +```Bash +[Message Creation] ──► [Queued] ──► [Consumed] ──► [Processed] + β”‚ β”‚ β”‚ β”‚ + β–Ό β–Ό β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Generate ID β”‚ β”‚ Timestamp β”‚ β”‚ Start β”‚ β”‚ Calculate β”‚ +β”‚ β”‚ β”‚ Enqueue β”‚ β”‚ Processing β”‚ β”‚ Total Time β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Performance Metrics Capture: +```Bash +Metric Capture Points: + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Message ID β”‚ + β”‚ (UUID) β”‚ + β””β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Produce Time β”‚ +β”‚ (Timestamp) β”‚ +β””β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Queue Wait Time β”‚ +β”‚ β”‚ +β””β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Consume Time β”‚ +β”‚ β”‚ +β””β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Processing Time β”‚ +β”‚ (Total Latency) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + + +### ✨ Features + +- Concurrent message production and consumption +- Configurable number of producers and consumers +- Dynamic message generation +- Detailed performance metrics +- Flexible delay ranges +- Comprehensive logging + +## πŸ“‹ Prerequisites + +- Java 11+ (Recommended: Java 17 or 21) +- Scala 2.13.x +- SBT 1.9.x + +## πŸ›  Technology Stack + +- Scala +- SBT (Build Tool) +- ScalaTest (Unit Testing) +- Logback (Logging) +- Scopt (CLI Parsing) + +## πŸ“‚ Project Structure +```Bash +async-scala/ +β”œβ”€β”€ src/ +β”‚ β”œβ”€β”€ main/ +β”‚ β”‚ β”œβ”€β”€ scala/ +β”‚ β”‚ β”‚ └── com/example/ +β”‚ β”‚ β”‚ β”œβ”€β”€ Main.scala +β”‚ β”‚ β”‚ β”œβ”€β”€ ProducerConsumer.scala +β”‚ β”‚ β”‚ β”œβ”€β”€ ProducerConsumerConfig.scala +β”‚ β”‚ β”‚ └── ProducerConsumerMetrics.scala +β”‚ β”‚ └── resources/ +β”‚ β”‚ └── logback.xml +β”‚ └── test/ +β”‚ └── scala/ +β”‚ └── com/example/ +β”‚ └── ProducerConsumerSpec.scala +β”œβ”€β”€ project/ +β”‚ └── build.properties +└── build.sbt +``` + +## πŸ”§ Setup & Installation + +### 1. Clone the Repository +```Bash +git clone https://github.com/yourusername/async-scala.git +cd async-scala +``` + +### 2. Install Dependencies + +```Bash +#Install SBT (if not already installed) +#macOS +brew install sbt +``` + +### 3. Compile the Project +```Bash +sbt compile +``` + +### Run tests +```Bash +sbt test +``` + + +## πŸš€ Running the Application + +### Command Line Options +```Bash +sbt run +``` + +### Custom configuration + +```Bash + +### CLI Parameters + +- `--producers` (default: 10): Number of producer threads +- `--consumers` (default: 5): Number of consumer threads +- `--messages` (default: 5): Messages per producer +``` + +### Create Executable JAR + +```Bash +sbt assembly +``` + +### Run the JAR +```Bash +java -jar target/scala-2.13/async-scala-assembly-0.1.0.jar +``` + +## πŸ”§ Configuration + +Edit `ProducerConsumerConfig.scala` to modify default settings: +```Bash +scala +ProducerConsumerConfig( +producers = 10, +consumers = 5, +messagesPerProducer = 5, +maxQueueSize = 100, +producerDelayRange = (10, 100), +consumerDelayRange = (10, 100) +) +``` + + +## πŸ“ Logging + +Logging is configured in `logback.xml`. Logs are printed to console with timestamps. + +## πŸ“Š Performance Metrics + +The application tracks: +- Total messages processed +- Individual message processing times +- Producer and consumer performance + +## πŸ§ͺ Testing + +Run comprehensive test suite: +```Bash +sbt test +``` + + +Tests cover: +- Message processing +- Concurrent scenarios +- Edge cases + +## πŸ›  Troubleshooting + +- Ensure Java 11+ is installed +- Check SBT and Scala versions +- Verify dependencies in `build.sbt` + +## 🀝 Contributing + +1. Fork the repository +2. Create your feature branch +3. Commit changes +4. Push to the branch +5. Create a Pull Request + +## πŸ“„ License + +[MIT License] + +## πŸ” Additional Resources + +- [Scala Documentation](https://docs.scala-lang.org/) +- [SBT Documentation](https://www.scala-sbt.org/documentation.html) +- [ScalaTest Documentation](https://www.scalatest.org/) diff --git a/async-scala/build.properties b/async-scala/build.properties new file mode 100644 index 0000000..1a1a23b --- /dev/null +++ b/async-scala/build.properties @@ -0,0 +1,3 @@ +app.name=async-scala +app.version=0.1.0 +scala.version=2.12.12 \ No newline at end of file diff --git a/async-scala/build.sbt b/async-scala/build.sbt new file mode 100644 index 0000000..b611e91 --- /dev/null +++ b/async-scala/build.sbt @@ -0,0 +1,31 @@ +name := "async-scala" +version := "0.1.0" +scalaVersion := "2.13.12" + +libraryDependencies ++= Seq( + "com.github.scopt" %% "scopt" % "4.1.0", + + // Explicit SLF4J and Logback dependencies + "org.slf4j" % "slf4j-api" % "2.0.9", + "ch.qos.logback" % "logback-classic" % "1.4.11", + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", + + "org.scalatest" %% "scalatest" % "3.2.17" % Test +) + +// Assembly configuration +assembly / assemblyMergeStrategy := { + case PathList("module-info.class") => MergeStrategy.discard + case x if x.endsWith("/module-info.class") => MergeStrategy.discard + case PathList("META-INF", xs @ _*) => MergeStrategy.first + case "reference.conf" => MergeStrategy.concat + case x if x.endsWith(".class") => MergeStrategy.last + case x if x.endsWith(".properties") => MergeStrategy.last + case _ => MergeStrategy.first +} + +// Main class configuration +assembly / mainClass := Some("com.example.Main") + +// Ensure all dependencies are packaged +assembly / fullClasspath := (assembly / fullClasspath).value \ No newline at end of file diff --git a/async-scala/project/Dependencies.scala b/async-scala/project/Dependencies.scala new file mode 100644 index 0000000..8353f0f --- /dev/null +++ b/async-scala/project/Dependencies.scala @@ -0,0 +1,33 @@ +import sbt._ + +object Dependencies { + object Versions { + val scala = "2.12.18" + val scalaLogging = "3.9.5" + val logback = "1.4.11" + val scopt = "4.1.0" + val collectionCompat = "2.11.0" + val scalatest = "3.2.17" + val mockito = "5.8.0" + val slf4j = "2.0.9" + } + + val core = Seq( + "org.scala-lang" % "scala-library" % Versions.scala, + "com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging, + "ch.qos.logback" % "logback-classic" % Versions.logback, + "org.slf4j" % "slf4j-api" % Versions.slf4j, + "com.github.scopt" %% "scopt" % Versions.scopt, + "org.scala-lang.modules" %% "scala-collection-compat" % Versions.collectionCompat, + "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" + ) + + val testing = Seq( + "org.scalatest" %% "scalatest" % Versions.scalatest % Test, + "org.mockito" % "mockito-core" % Versions.mockito % Test, + "org.scalatest" %% "scalatest-flatspec" % Versions.scalatest % Test, + "ch.qos.logback" % "logback-classic" % Versions.logback % Test + ) + + val all = core ++ testing +} \ No newline at end of file diff --git a/async-scala/project/build.properties b/async-scala/project/build.properties new file mode 100644 index 0000000..abbbce5 --- /dev/null +++ b/async-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.8 diff --git a/async-scala/project/plugins.sbt b/async-scala/project/plugins.sbt new file mode 100644 index 0000000..b9af522 --- /dev/null +++ b/async-scala/project/plugins.sbt @@ -0,0 +1,10 @@ +// Core plugins +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") +addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") + +// Add scoverage plugin +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3") + +// Remove native packager and scoverage for now to resolve conflicts \ No newline at end of file diff --git a/async-scala/src/main/resources/logback.xml b/async-scala/src/main/resources/logback.xml new file mode 100644 index 0000000..53aeeaf --- /dev/null +++ b/async-scala/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + [%date{yyyy-MM-dd'T'HH:mm:ss'Z'}Z INFO async_rust] %msg%n + + + + + + + + + \ No newline at end of file diff --git a/async-scala/src/main/scala/com/example/Main.scala b/async-scala/src/main/scala/com/example/Main.scala new file mode 100644 index 0000000..48dead6 --- /dev/null +++ b/async-scala/src/main/scala/com/example/Main.scala @@ -0,0 +1,63 @@ +package com.example + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scopt.OParser +import org.slf4j.LoggerFactory + +object Main { + private val logger = LoggerFactory.getLogger(getClass) + + def main(args: Array[String]): Unit = { + // Define the command-line parser + val builder = OParser.builder[ProducerConsumerConfig] + val parser = { + import builder._ + OParser.sequence( + programName("async-scala"), + head("Async Scala Producer-Consumer"), + opt[Int]('p', "producers") + .action((x, c) => c.copy(producers = x)) + .text("number of producers") + .validate(x => if (x > 0) success else failure("Producers must be > 0")), + opt[Int]('c', "consumers") + .action((x, c) => c.copy(consumers = x)) + .text("number of consumers") + .validate(x => if (x > 0) success else failure("Consumers must be > 0")), + opt[Int]('m', "messages") + .action((x, c) => c.copy(messagesPerProducer = x)) + .text("messages per producer") + .validate(x => if (x > 0) success else failure("Messages must be > 0")), + help("help").text("prints this usage text") + ) + } + + // Default configuration + val defaultConfig = ProducerConsumerConfig( + producers = 10, + consumers = 5, + messagesPerProducer = 5 + ) + + // Parse the arguments + OParser.parse(parser, args, defaultConfig) match { + case Some(config) => + logger.info(s"Starting with ${config.producers} producers and ${config.consumers} consumers") + + val pc = new ProducerConsumer(config) + try { + val metricsFuture: Future[ProducerConsumerMetrics] = pc.run() + val metrics = Await.result(metricsFuture, 1.minute) + logger.info(s"Total messages processed: ${metrics.totalMessages}") + } catch { + case e: Exception => + logger.error("Error running producer-consumer", e) + sys.exit(1) + } + + case _ => + OParser.usage(parser) // Print usage information + sys.exit(1) + } + } +} diff --git a/async-scala/src/main/scala/com/example/Message.scala b/async-scala/src/main/scala/com/example/Message.scala new file mode 100644 index 0000000..c7f7ccb --- /dev/null +++ b/async-scala/src/main/scala/com/example/Message.scala @@ -0,0 +1,12 @@ +package com.example + +import java.util.UUID +import java.util.concurrent.TimeUnit + +case class Message( + id: String = UUID.randomUUID().toString, + createdAt: Long = System.nanoTime() +) { + def processingTimeMs(endTime: Long): Long = + TimeUnit.NANOSECONDS.toMillis(endTime - createdAt) +} diff --git a/async-scala/src/main/scala/com/example/ProducerConsumer.scala b/async-scala/src/main/scala/com/example/ProducerConsumer.scala new file mode 100644 index 0000000..818b2e9 --- /dev/null +++ b/async-scala/src/main/scala/com/example/ProducerConsumer.scala @@ -0,0 +1,94 @@ +package com.example + +import scala.concurrent.{Future, Promise, blocking} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.collection.concurrent.TrieMap +import org.slf4j.LoggerFactory +import java.util.UUID +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + +class ProducerConsumer(config: ProducerConsumerConfig) { + private val logger = LoggerFactory.getLogger("async_rust") + + // Shared channel for producers and consumers + private val channel = new LinkedBlockingQueue[String](config.maxQueueSize) + + // Metrics tracking + private val messageMetrics = new TrieMap[String, Long]() + private val messageStartTimes = new TrieMap[String, Long]() + + def run(): Future[ProducerConsumerMetrics] = { + val promise = Promise[ProducerConsumerMetrics]() + + // Producers + val producerFutures = (1 to config.producers).map { producerId => + Future { + blocking { + (1 to config.messagesPerProducer).foreach { _ => + val messageId = UUID.randomUUID().toString + val startTime = System.currentTimeMillis() + messageStartTimes.put(messageId, startTime) + + // Simulate some work before sending + Thread.sleep(scala.util.Random.nextInt(config.producerDelayRange._2 - config.producerDelayRange._1) + config.producerDelayRange._1) + + channel.offer(messageId) + logger.info(s"Producer $producerId sent $messageId") + } + logger.info(s"Producer $producerId completed") + } + } + } + + // Consumers + val consumerFutures = (1 to config.consumers).map { consumerId => + Future { + blocking { + (1 to config.messagesPerProducer).foreach { _ => + val messageId = channel.poll(5, TimeUnit.SECONDS) + if (messageId != null) { + val startTime = messageStartTimes.getOrElse(messageId, System.currentTimeMillis()) + val processingTime = System.currentTimeMillis() - startTime + + logger.info(s"Consumer $consumerId received $messageId") + + // Update metrics + messageMetrics.put(messageId, processingTime) + } + } + logger.info(s"Consumer $consumerId completed") + } + } + } + + // Combine and process results + Future.sequence(producerFutures ++ consumerFutures).map { _ => + // Create metrics + val metrics = ProducerConsumerMetrics( + totalMessages = config.producers * config.messagesPerProducer, + messageTimings = messageMetrics.map { case (id, time) => + id -> MessageTiming(processingTimeMs = time) + }.toMap + ) + + logger.info("All producers and consumers have completed.") + logger.info(s"Number of messages consumed: ${metrics.totalMessages}") + + logger.info("################ printing produce and consume delays ################") + metrics.messageTimings.foreach { case (messageId, timing) => + logger.info(s"Message $messageId took ${timing.processingTimeMs} ms") + } + logger.info("################ printing done ################") + + promise.success(metrics) + metrics + }.recover { + case e: Exception => + logger.error("Error in producer-consumer", e) + promise.failure(e) + ProducerConsumerMetrics() + } + + promise.future + } +} diff --git a/async-scala/src/main/scala/com/example/ProducerConsumerConfig.scala b/async-scala/src/main/scala/com/example/ProducerConsumerConfig.scala new file mode 100644 index 0000000..9d67a5a --- /dev/null +++ b/async-scala/src/main/scala/com/example/ProducerConsumerConfig.scala @@ -0,0 +1,10 @@ +package com.example + +case class ProducerConsumerConfig( + producers: Int = 10, + consumers: Int = 5, + messagesPerProducer: Int = 5, + maxQueueSize: Int = 100, + producerDelayRange: (Int, Int) = (10, 100), + consumerDelayRange: (Int, Int) = (10, 100) +) \ No newline at end of file diff --git a/async-scala/src/main/scala/com/example/ProducerConsumerMetrics.scala b/async-scala/src/main/scala/com/example/ProducerConsumerMetrics.scala new file mode 100644 index 0000000..3ceeea2 --- /dev/null +++ b/async-scala/src/main/scala/com/example/ProducerConsumerMetrics.scala @@ -0,0 +1,10 @@ +package com.example + +case class ProducerConsumerMetrics( + totalMessages: Int = 0, + messageTimings: Map[String, MessageTiming] = Map.empty +) + +case class MessageTiming( + processingTimeMs: Long = 0 +) diff --git a/async-scala/src/test/scala/com/example/ProducerConsumerSpec.scala b/async-scala/src/test/scala/com/example/ProducerConsumerSpec.scala new file mode 100644 index 0000000..efe053a --- /dev/null +++ b/async-scala/src/test/scala/com/example/ProducerConsumerSpec.scala @@ -0,0 +1,78 @@ +package com.example + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ProducerConsumerSpec extends AnyFlatSpec with Matchers { + "ProducerConsumer" should "process all messages" in { + val config = ProducerConsumerConfig(producers = 2, consumers = 1, messagesPerProducer = 3) + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + metrics.totalMessages shouldBe 6 + } + + it should "handle empty queue correctly" in { + val config = ProducerConsumerConfig(producers = 0, consumers = 1, messagesPerProducer = 0) + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + metrics.totalMessages shouldBe 0 + } + + it should "handle multiple producers and consumers" in { + val config = ProducerConsumerConfig(producers = 5, consumers = 3, messagesPerProducer = 10) + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + metrics.totalMessages shouldBe 50 + } + + it should "handle backpressure correctly" in { + val config = ProducerConsumerConfig( + producers = 5, + consumers = 1, + messagesPerProducer = 10, + maxQueueSize = 10 + ) + + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + metrics.messageTimings.size should be <= config.maxQueueSize + } + + it should "process messages within reasonable time" in { + val config = ProducerConsumerConfig( + producers = 3, + consumers = 2, + messagesPerProducer = 5 + ) + + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + + // Check that all messages have processing times + metrics.messageTimings.values.foreach { timing => + timing.processingTimeMs should be >= 0L + } + } + + it should "handle different delay ranges" in { + val config = ProducerConsumerConfig( + producers = 2, + consumers = 2, + messagesPerProducer = 5, + producerDelayRange = (10, 50), + consumerDelayRange = (5, 30) + ) + + val pc = new ProducerConsumer(config) + val metrics = Await.result(pc.run(), 1.minute) + metrics.totalMessages shouldBe 10 + } +} + +object LoggingConfig { + def configureLogging(level: LogLevel = LogLevel.INFO): Unit = { + // Configure logging dynamically + } +}