Skip to content
This repository has been archived by the owner on May 14, 2018. It is now read-only.

Commit

Permalink
v2.0.1: not finished, save
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Apr 21, 2018
1 parent cdba0d5 commit 8b375df
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 166 deletions.
35 changes: 6 additions & 29 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ allprojects {
apply plugin: "maven"
apply plugin: "signing"
group = "com.github.akurilov"
version = "2.0.0"
version = "2.0.1"
}

ext.moduleName = "${group}.concurrent"
Expand All @@ -21,52 +21,29 @@ repositories {
}

dependencies {
compile("com.github.akurilov:java-commons:[2.0.0,)")
compile("com.github.akurilov:java-commons:[2.0.1,)")
testCompile("junit:junit:4.12")
}

compileJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
]
classpath = files()
}
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

jar {
inputs.property("moduleName", moduleName)
manifest {
attributes (
"Automatic-Module-Name": moduleName,
"Implementation-Version": version,
"Implementation-Title": rootProject.name,
)
}
}

javadoc {
options.addStringOption("-module-path", classpath.asPath)
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = "sources"
from sourceSets.main.allSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public final AsyncRunnableBase await()
@Override
public boolean await(final long timeout, final TimeUnit timeUnit)
throws IllegalStateException, InterruptedException {
final var timeOutMilliSec = timeUnit.toMillis(timeout);
final var t = System.currentTimeMillis();
final long timeOutMilliSec = timeUnit.toMillis(timeout);
final long t = System.currentTimeMillis();
while(isStarted() || isShutdown()) {
if(System.currentTimeMillis() - t >= timeOutMilliSec) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.akurilov.concurrent;

public interface IndexThrottle {

boolean tryAcquire(final int index);

int tryAcquire(final int index, final int times);
}
12 changes: 6 additions & 6 deletions src/main/java/com/github/akurilov/concurrent/RateThrottle.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
/**
* A semaphore-like non-blocking throttle which permits at the given rate.
*/
public final class RateThrottle<X>
implements Throttle<X> {
public final class RateThrottle
implements Throttle {

private final long periodNanos;
private volatile long startTime = -1;
Expand All @@ -27,10 +27,10 @@ public RateThrottle(final double rateLimit) {
}

@Override
public final boolean tryAcquire(final X item) {
public final boolean tryAcquire() {
synchronized(this) {
if(startTime > 0) {
final var periodCount = (nanoTime() - startTime) / periodNanos;
final long periodCount = (nanoTime() - startTime) / periodNanos;
if(periodCount > acquiredCount) {
acquiredCount ++;
return true;
Expand All @@ -46,10 +46,10 @@ public final boolean tryAcquire(final X item) {
}

@Override
public final int tryAcquire(final X item, final int requiredCount) {
public final int tryAcquire(final int requiredCount) {
synchronized(this) {
if(startTime > 0) {
final var availableCount = (int) (
final int availableCount = (int) (
(nanoTime() - startTime) / periodNanos - acquiredCount
);
if(availableCount > requiredCount) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.akurilov.concurrent;

import java.util.Arrays;

/**
Created by kurila on 29.03.16.
An throttle which uses the map of weights.
The throttle determines the weight for each I/O task and makes the decision.
The weight is used to pass the I/O task with specific ratio for the different keys.
*/
public final class SequentialWeightsThrottle {

// initial weight map (constant)
private final int[] weights;
private final int[] remainingWeights;

public SequentialWeightsThrottle(final int[] weights)
throws IllegalArgumentException {
this.weights = Arrays.copyOf(weights, weights.length);
remainingWeights = new int[weights.length];
resetRemainingWeights();
}

private void resetRemainingWeights()
throws IllegalArgumentException {
for(int i = 0; i < weights.length; i ++) {
remainingWeights[i] = weights[i];
}
}

private void ensureRemainingWeights() {
for(int i = 0; i < weights.length; i ++) {
if(remainingWeights[i] > 0) {
return;
}
}
resetRemainingWeights();
}

public final boolean tryAcquire(final int index) {
synchronized(remainingWeights) {
ensureRemainingWeights();
final int remainingWeight = remainingWeights[index];
if(remainingWeight > 0) {
remainingWeights[index] = remainingWeight - 1;
return true;
} else {
return false;
}
}
}

public final int tryAcquire(final int index, final int times) {
if(times == 0) {
return 0;
}
synchronized(remainingWeights) {
ensureRemainingWeights();
final int remainingWeight = remainingWeights[index];
if(times > remainingWeight) {
remainingWeights[index] = 0;
return remainingWeight;
} else {
remainingWeights[index] = remainingWeight - times;
return times;
}
}
}
}
8 changes: 3 additions & 5 deletions src/main/java/com/github/akurilov/concurrent/Throttle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
Throttle can make a decision about the specified thing to pass or to wait.
The throttle calls are not blocking so the caller should block if the throttle tells so.
*/
public interface Throttle<X> {
public interface Throttle {

/**
Request a permit about a thing
@param thing the subject of the permit
@return true if the thing should be passed, false otherwise
*/
boolean tryAcquire(final X thing);
boolean tryAcquire();

/**
Request permits about a set of things
@param thing the subject of the permits
@param times how many permits is requested
@return how many permits are got
*/
int tryAcquire(final X thing, int times);
int tryAcquire(int times);
}
74 changes: 0 additions & 74 deletions src/main/java/com/github/akurilov/concurrent/WeightThrottle.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ protected void doStart() {
*/
@Override
public final void invoke() {
var t = System.nanoTime();
invokeTimed(t);
invokeTimed(System.nanoTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public CoroutinesExecutor() {
}

public CoroutinesExecutor(final boolean backgroundFlag) {
final var svcThreadCount = Runtime.getRuntime().availableProcessors();
final int svcThreadCount = Runtime.getRuntime().availableProcessors();
executor = new ThreadPoolExecutor(
svcThreadCount, svcThreadCount, 0, TimeUnit.DAYS, new ArrayBlockingQueue<>(1),
new ContextAwareThreadFactory("coroutine-processor-", true, null)
);
this.backgroundFlag = backgroundFlag;
for(var i = 0; i < svcThreadCount; i ++) {
final var svcWorkerTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
for(int i = 0; i < svcThreadCount; i ++) {
final CoroutinesExecutorTask svcWorkerTask = new CoroutinesExecutorTask(
coroutines, backgroundFlag
);
executor.submit(svcWorkerTask);
workers.add(svcWorkerTask);
svcWorkerTask.start();
Expand All @@ -54,22 +56,24 @@ public void stop(final Coroutine coroutine) {
}

public void setThreadCount(final int threadCount) {
final var newThreadCount = threadCount > 0 ?
final int newThreadCount = threadCount > 0 ?
threadCount : Runtime.getRuntime().availableProcessors();
final var oldThreadCount = executor.getCorePoolSize();
final int oldThreadCount = executor.getCorePoolSize();
if(newThreadCount != oldThreadCount) {
executor.setCorePoolSize(newThreadCount);
executor.setMaximumPoolSize(newThreadCount);
if(newThreadCount > oldThreadCount) {
for(var i = oldThreadCount; i < newThreadCount; i ++) {
final var execTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
for(int i = oldThreadCount; i < newThreadCount; i ++) {
final CoroutinesExecutorTask execTask = new CoroutinesExecutorTask(
coroutines, backgroundFlag
);
executor.submit(execTask);
workers.add(execTask);
execTask.start();
}
} else { // less, remove some active service worker tasks
try {
for(var i = oldThreadCount - 1; i >= newThreadCount; i --) {
for(int i = oldThreadCount - 1; i >= newThreadCount; i --) {
workers.remove(i).close();
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final void run() {
break;
}
} else {
for(final var nextCoroutine : coroutines) {
for(final Coroutine nextCoroutine : coroutines) {
try {
if(nextCoroutine.isStarted()) {
nextCoroutine.invoke();
Expand Down
Loading

0 comments on commit 8b375df

Please sign in to comment.