Skip to content

Commit

Permalink
v1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Apr 11, 2018
1 parent 5b64226 commit e10b98d
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 199 deletions.
126 changes: 71 additions & 55 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,103 +11,115 @@ apply plugin: "maven"
apply plugin: "signing"

group = "com.github.akurilov"
version = "0.1.6"

tasks.withType(JavaCompile) {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

task("create-dirs").doLast(
{
sourceSets*.java.srcDirs*.each { it.mkdirs() }
sourceSets*.resources.srcDirs*.each { it.mkdirs() }
}
)

repositories {
mavenCentral()
}
version = "1.0.0"

ext {
moduleName = "${group}.netty.connection.pool"
depVersion = [
commonsJava: "[1.4.1,)",
mongoose: "[3.6,)",
netty: "4.1.17.Final",
javaCommons: "[2.0.0,)",
javaConcurrent: "[2.0.0,)",
netty: "4.1.23.Final",
]
}

configurations {
core
compileOnly {
extendsFrom core
}
testCompile {
extendsFrom core
}
repositories {
mavenCentral()
}

dependencies {

compile(
"com.github.akurilov:java-commons:${depVersion.commonsJava}",
"com.github.akurilov:java-commons:${depVersion.javaCommons}",
"com.github.akurilov:java-concurrent:${depVersion.javaConcurrent}",
"io.netty:netty-common:${depVersion.netty}",
"io.netty:netty-transport:${depVersion.netty}",
)

testCompile(
"junit:junit:[4,)",
"com.github.emc-mongoose:mongoose-api-model:${depVersion.mongoose}",
"io.netty:netty-transport-native-epoll:${depVersion.netty}:linux-x86_64",
)
}

test {
/*
systemProperty "com.sun.management.jmxremote", "true"
systemProperty "com.sun.management.jmxremote.port", "9010"
systemProperty "com.sun.management.jmxremote.local.only", "false"
systemProperty "com.sun.management.jmxremote.authenticate", "false"
systemProperty "com.sun.management.jmxremote.ssl", "false"
*/
testLogging {
events "passed", "skipped", "failed", "standardOut"
showExceptions = true
showStandardStreams = true
compileJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
]
classpath = files()
}
}

def getCoreClassPath() {
return configurations.core.dependencies
.collect { dep -> ["..", (dep.name + ".jar")].join(File.separator) }
.join(" ")
}

static def getClassPath(final configuration) {
return configuration
.collect { file -> ["..", "lib", file.name].join(File.separator) }
.join(" ")
compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
}

def fullClassPath = getCoreClassPath() + " " + getClassPath(configurations.runtime)

jar {
inputs.property("moduleName", moduleName)
archiveName "$project.name.$extension"
manifest {
attributes(
"Class-Path": fullClassPath,
"Automatic-Module-Name": moduleName,
"Implementation-Version": version,
"Implementation-Title": "$name"
)
}
}

test {
inputs.property("moduleName", moduleName)
doFirst {
jvmArgs += [
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005",
"-XX:+HeapDumpOnOutOfMemoryError",
"--module-path", classpath.asPath,
'--add-exports', "$moduleName/${moduleName}.test=junit",
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
maxHeapSize "1g"
/*
systemProperty "com.sun.management.jmxremote", "true"
systemProperty "com.sun.management.jmxremote.port", "9010"
systemProperty "com.sun.management.jmxremote.rmi.port", "9010"
systemProperty "com.sun.management.jmxremote.local.only", "false"
systemProperty "com.sun.management.jmxremote.authenticate", "false"
systemProperty "com.sun.management.jmxremote.ssl", "false"
*/
testLogging {
events "passed", "skipped", "failed", "standardOut"
showExceptions = true
showStandardStreams = true
}
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = "sources"
archiveName "$project.name-$classifier.$extension"
from sourceSets.main.allSource
}

javadoc {
options.addStringOption("-module-path", classpath.asPath)
}

task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = "javadoc"
archiveName "$project.name-$classifier.$extension"
Expand Down Expand Up @@ -190,3 +202,7 @@ uploadArchives {
}
}
}

task wrapper(type: Wrapper) {
gradleVersion = "4.6"
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 1 addition & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Tue Nov 14 20:40:44 MSK 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-bin.zip
6 changes: 3 additions & 3 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -155,7 +155,7 @@ if $cygwin ; then
fi

# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public BasicMultiNodeConnPool(
connCounts = new Object2IntOpenHashMap<>(n);
failedConnAttemptCounts = new Object2IntOpenHashMap<>(n);

for(final String node : nodes) {
for(final var node : nodes) {
final InetSocketAddress nodeAddr;
if(node.contains(":")) {
final String addrParts[] = node.split(":");
Expand All @@ -86,11 +86,11 @@ public BasicMultiNodeConnPool(
.clone()
.remoteAddress(nodeAddr)
.handler(
new ChannelInitializer<Channel>() {
new ChannelInitializer<>() {
@Override
protected final void initChannel(final Channel conn)
throws Exception {
if(!conn.eventLoop().inEventLoop()) {
if(! conn.eventLoop().inEventLoop()) {
throw new AssertionError();
}
connPoolHandler.channelCreated(conn);
Expand All @@ -108,16 +108,16 @@ protected final void initChannel(final Channel conn)
public void preCreateConnections(final int count)
throws ConnectException, IllegalArgumentException {
if(count > 0) {
for(int i = 0; i < count; i ++) {
final Channel conn = connectToAnyNode();
for(var i = 0; i < count; i ++) {
final var conn = connectToAnyNode();
if(conn == null) {
throw new ConnectException(
"Failed to pre-create the connections to the target nodes"
);
}
final String nodeAddr = conn.attr(ATTR_KEY_NODE).get();
final var nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
final Queue<Channel> connQueue = availableConns.get(nodeAddr);
final var connQueue = availableConns.get(nodeAddr);
if(connQueue != null) {
connQueue.add(conn);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public final void operationComplete(final ChannelFuture future)
}
}
synchronized(allConns) {
final List<Channel> nodeConns = allConns.get(nodeAddr);
final var nodeConns = allConns.get(nodeAddr);
if(nodeConns != null) {
nodeConns.remove(conn);
}
Expand All @@ -174,9 +174,10 @@ private Channel connectToAnyNode()
// select the endpoint node having the minimum count of established connections
String nodeAddr = null;
String nextNodeAddr;
int minConnsCount = Integer.MAX_VALUE, nextConnsCount = 0;
final int i = ThreadLocalRandom.current().nextInt(n);
for(int j = i; j < n; j ++) {
var minConnsCount = Integer.MAX_VALUE;
int nextConnsCount = 0;
final var i = ThreadLocalRandom.current().nextInt(n);
for(var j = i; j < n; j ++) {
nextNodeAddr = nodes[j % n];
nextConnsCount = connCounts.getInt(nextNodeAddr);
if(nextConnsCount == 0) {
Expand All @@ -198,7 +199,7 @@ private Channel connectToAnyNode()
"Failed to create a new connection to " + nodeAddr + ": " + e.toString()
);
if(connAttemptsLimit > 0) {
final int selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts
final var selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts
.getInt(nodeAddr) + 1;
failedConnAttemptCounts.put(
nodeAddr, selectedNodeFailedConnAttemptsCount
Expand All @@ -212,8 +213,8 @@ private Channel connectToAnyNode()
// the node having virtually Integer.MAX_VALUE established connections
// will never be selected by the algorithm
connCounts.put(nodeAddr, Integer.MAX_VALUE);
boolean allNodesExcluded = true;
for(final String node : nodes) {
var allNodesExcluded = true;
for(final var node : nodes) {
if(connCounts.getInt(node) < Integer.MAX_VALUE) {
allNodesExcluded = false;
break;
Expand Down Expand Up @@ -252,18 +253,18 @@ private Channel connectToAnyNode()

protected Channel connect(final String addr)
throws Exception {
final Bootstrap bootstrap = bootstraps.get(addr);
final var bootstrap = bootstraps.get(addr);
if(bootstrap != null) {
return bootstrap.connect().sync().channel();
}
return null;
}

protected Channel poll() {
final int i = ThreadLocalRandom.current().nextInt(n);
final var i = ThreadLocalRandom.current().nextInt(n);
Queue<Channel> connQueue;
Channel conn;
for(int j = i; j < i + n; j ++) {
for(var j = i; j < i + n; j ++) {
connQueue = availableConns.get(nodes[j % n]);
if(connQueue != null) {
conn = connQueue.poll();
Expand Down Expand Up @@ -294,7 +295,7 @@ public final Channel lease()
@Override
public final int lease(final List<Channel> conns, final int maxCount)
throws ConnectException {
int availableCount = concurrencyThrottle.drainPermits();
var availableCount = concurrencyThrottle.drainPermits();
if(availableCount == 0) {
return availableCount;
}
Expand All @@ -304,7 +305,7 @@ public final int lease(final List<Channel> conns, final int maxCount)
}

Channel conn;
for(int i = 0; i < availableCount; i ++) {
for(var i = 0; i < availableCount; i ++) {
if(null == (conn = poll())) {
conn = connectToAnyNode();
}
Expand All @@ -320,9 +321,9 @@ public final int lease(final List<Channel> conns, final int maxCount)

@Override
public final void release(final Channel conn) {
final String nodeAddr = conn.attr(ATTR_KEY_NODE).get();
final var nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
final Queue<Channel> connQueue = availableConns.get(nodeAddr);
final var connQueue = availableConns.get(nodeAddr);
if(connQueue != null) {
connQueue.add(conn);
}
Expand All @@ -336,7 +337,7 @@ public final void release(final Channel conn) {
public final void release(final List<Channel> conns) {
String nodeAddr;
Queue<Channel> connQueue;
for(final Channel conn : conns) {
for(final var conn : conns) {
nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
connQueue = availableConns.get(nodeAddr);
Expand All @@ -353,17 +354,17 @@ public void close()
throws IOException {
closeLock.lock();
int closedConnCount = 0;
for(final String nodeAddr: availableConns.keySet()) {
for(final Channel conn: availableConns.get(nodeAddr)) {
for(final var nodeAddr: availableConns.keySet()) {
for(final var conn: availableConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
}
}
}
availableConns.clear();
for(final String nodeAddr: allConns.keySet()) {
for(final Channel conn: allConns.get(nodeAddr)) {
for(final var nodeAddr: allConns.keySet()) {
for(final var conn: allConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module com.github.akurilov.netty.connection.pool {

requires com.github.akurilov.commons;
requires com.github.akurilov.concurrent;
requires io.netty.common;
requires io.netty.transport;
requires java.base;
requires java.logging;

exports com.github.akurilov.netty.connection.pool;
}
Loading

0 comments on commit e10b98d

Please sign in to comment.