Skip to content

Commit

Permalink
v1.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Apr 21, 2018
1 parent e10b98d commit d1e2824
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 115 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ operation (leasing/releasing many connections at once).
## Gradle

```groovy
compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '0.1.6'
compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '1.0.1'
```

## Code Example
Expand Down
41 changes: 8 additions & 33 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ apply plugin: "maven"
apply plugin: "signing"

group = "com.github.akurilov"
version = "1.0.0"
version = "1.0.1"

ext {
moduleName = "${group}.netty.connection.pool"
depVersion = [
javaCommons: "[2.0.0,)",
javaConcurrent: "[2.0.0,)",
javaCommons: "[2.0.2,)",
netty: "4.1.23.Final",
]


}

repositories {
Expand All @@ -30,7 +31,6 @@ dependencies {

compile(
"com.github.akurilov:java-commons:${depVersion.javaCommons}",
"com.github.akurilov:java-concurrent:${depVersion.javaConcurrent}",
"io.netty:netty-common:${depVersion.netty}",
"io.netty:netty-transport:${depVersion.netty}",
)
Expand All @@ -42,57 +42,32 @@ dependencies {
}

compileJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
]
classpath = files()
}
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_10
targetCompatibility = JavaVersion.VERSION_1_10
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
inputs.property("moduleName", moduleName)
doFirst {
options.compilerArgs = [
"--module-path", classpath.asPath,
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
}

jar {
inputs.property("moduleName", moduleName)
archiveName "$project.name.$extension"
manifest {
attributes(
"Automatic-Module-Name": moduleName,
"Implementation-Version": version,
"Implementation-Title": "$name"
)
}
}

test {
inputs.property("moduleName", moduleName)
doFirst {
jvmArgs += [
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005",
"-XX:+HeapDumpOnOutOfMemoryError",
"--module-path", classpath.asPath,
'--add-exports', "$moduleName/${moduleName}.test=junit",
"--add-modules", "ALL-MODULE-PATH",
"--add-reads", "${moduleName}.test=junit",
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
]
classpath = files()
}
maxHeapSize "1g"
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public BasicMultiNodeConnPool(
connCounts = new Object2IntOpenHashMap<>(n);
failedConnAttemptCounts = new Object2IntOpenHashMap<>(n);

for(final var node : nodes) {
for(final String node : nodes) {
final InetSocketAddress nodeAddr;
if(node.contains(":")) {
final String addrParts[] = node.split(":");
Expand All @@ -86,11 +86,11 @@ public BasicMultiNodeConnPool(
.clone()
.remoteAddress(nodeAddr)
.handler(
new ChannelInitializer<>() {
new ChannelInitializer<Channel>() {
@Override
protected final void initChannel(final Channel conn)
throws Exception {
if(! conn.eventLoop().inEventLoop()) {
if(!conn.eventLoop().inEventLoop()) {
throw new AssertionError();
}
connPoolHandler.channelCreated(conn);
Expand All @@ -108,16 +108,16 @@ protected final void initChannel(final Channel conn)
public void preCreateConnections(final int count)
throws ConnectException, IllegalArgumentException {
if(count > 0) {
for(var i = 0; i < count; i ++) {
final var conn = connectToAnyNode();
for(int i = 0; i < count; i ++) {
final Channel conn = connectToAnyNode();
if(conn == null) {
throw new ConnectException(
"Failed to pre-create the connections to the target nodes"
);
}
final var nodeAddr = conn.attr(ATTR_KEY_NODE).get();
final String nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
final var connQueue = availableConns.get(nodeAddr);
final Queue<Channel> connQueue = availableConns.get(nodeAddr);
if(connQueue != null) {
connQueue.add(conn);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public final void operationComplete(final ChannelFuture future)
}
}
synchronized(allConns) {
final var nodeConns = allConns.get(nodeAddr);
final List<Channel> nodeConns = allConns.get(nodeAddr);
if(nodeConns != null) {
nodeConns.remove(conn);
}
Expand All @@ -174,10 +174,10 @@ private Channel connectToAnyNode()
// select the endpoint node having the minimum count of established connections
String nodeAddr = null;
String nextNodeAddr;
var minConnsCount = Integer.MAX_VALUE;
int minConnsCount = Integer.MAX_VALUE;
int nextConnsCount = 0;
final var i = ThreadLocalRandom.current().nextInt(n);
for(var j = i; j < n; j ++) {
final int i = ThreadLocalRandom.current().nextInt(n);
for(int j = i; j < n; j ++) {
nextNodeAddr = nodes[j % n];
nextConnsCount = connCounts.getInt(nextNodeAddr);
if(nextConnsCount == 0) {
Expand All @@ -199,7 +199,7 @@ private Channel connectToAnyNode()
"Failed to create a new connection to " + nodeAddr + ": " + e.toString()
);
if(connAttemptsLimit > 0) {
final var selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts
final int selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts
.getInt(nodeAddr) + 1;
failedConnAttemptCounts.put(
nodeAddr, selectedNodeFailedConnAttemptsCount
Expand All @@ -213,8 +213,8 @@ private Channel connectToAnyNode()
// the node having virtually Integer.MAX_VALUE established connections
// will never be selected by the algorithm
connCounts.put(nodeAddr, Integer.MAX_VALUE);
var allNodesExcluded = true;
for(final var node : nodes) {
boolean allNodesExcluded = true;
for(final String node : nodes) {
if(connCounts.getInt(node) < Integer.MAX_VALUE) {
allNodesExcluded = false;
break;
Expand Down Expand Up @@ -253,18 +253,18 @@ private Channel connectToAnyNode()

protected Channel connect(final String addr)
throws Exception {
final var bootstrap = bootstraps.get(addr);
final Bootstrap bootstrap = bootstraps.get(addr);
if(bootstrap != null) {
return bootstrap.connect().sync().channel();
}
return null;
}

protected Channel poll() {
final var i = ThreadLocalRandom.current().nextInt(n);
final int i = ThreadLocalRandom.current().nextInt(n);
Queue<Channel> connQueue;
Channel conn;
for(var j = i; j < i + n; j ++) {
for(int j = i; j < i + n; j ++) {
connQueue = availableConns.get(nodes[j % n]);
if(connQueue != null) {
conn = connQueue.poll();
Expand Down Expand Up @@ -295,7 +295,7 @@ public final Channel lease()
@Override
public final int lease(final List<Channel> conns, final int maxCount)
throws ConnectException {
var availableCount = concurrencyThrottle.drainPermits();
int availableCount = concurrencyThrottle.drainPermits();
if(availableCount == 0) {
return availableCount;
}
Expand All @@ -305,7 +305,7 @@ public final int lease(final List<Channel> conns, final int maxCount)
}

Channel conn;
for(var i = 0; i < availableCount; i ++) {
for(int i = 0; i < availableCount; i ++) {
if(null == (conn = poll())) {
conn = connectToAnyNode();
}
Expand All @@ -321,9 +321,9 @@ public final int lease(final List<Channel> conns, final int maxCount)

@Override
public final void release(final Channel conn) {
final var nodeAddr = conn.attr(ATTR_KEY_NODE).get();
final String nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
final var connQueue = availableConns.get(nodeAddr);
final Queue<Channel> connQueue = availableConns.get(nodeAddr);
if(connQueue != null) {
connQueue.add(conn);
}
Expand All @@ -337,7 +337,7 @@ public final void release(final Channel conn) {
public final void release(final List<Channel> conns) {
String nodeAddr;
Queue<Channel> connQueue;
for(final var conn : conns) {
for(final Channel conn : conns) {
nodeAddr = conn.attr(ATTR_KEY_NODE).get();
if(conn.isActive()) {
connQueue = availableConns.get(nodeAddr);
Expand All @@ -354,17 +354,17 @@ public void close()
throws IOException {
closeLock.lock();
int closedConnCount = 0;
for(final var nodeAddr: availableConns.keySet()) {
for(final var conn: availableConns.get(nodeAddr)) {
for(final String nodeAddr: availableConns.keySet()) {
for(final Channel conn: availableConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
}
}
}
availableConns.clear();
for(final var nodeAddr: allConns.keySet()) {
for(final var conn: allConns.get(nodeAddr)) {
for(final String nodeAddr: allConns.keySet()) {
for(final Channel conn: allConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/module-info.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.akurilov.netty.connection.pool.test;

import com.github.akurilov.concurrent.ThreadUtil;

import static com.github.akurilov.netty.connection.pool.NonBlockingConnPool.ATTR_KEY_NODE;

import com.github.akurilov.netty.connection.pool.NonBlockingConnPool;
Expand Down Expand Up @@ -56,24 +54,23 @@ public static Collection<Object[]> generateData() {
public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCount) {
this.concurrencyLevel = concurrencyLevel;
this.nodeCount = nodeCount;
final var nodes = new String[nodeCount];
for(var i = 0; i < nodeCount; i ++) {
final String[] nodes = new String[nodeCount];
for(int i = 0; i < nodeCount; i ++) {
nodes[i] = Integer.toString(i);
}
try(
final var connPool = new BasicMultiNodeConnPoolMock(
final NonBlockingConnPool connPool = new BasicMultiNodeConnPoolMock(
new Semaphore(concurrencyLevel), nodes, new Bootstrap(),
new DummyChannelPoolHandler(), 12345, 0
)
) {
final var poolLoader = Executors.newFixedThreadPool(
ThreadUtil.getHardwareThreadCount()
);
for(var i = 0; i < ThreadUtil.getHardwareThreadCount(); i ++) {
final int coreCount = Runtime.getRuntime().availableProcessors();
final ExecutorService poolLoader = Executors.newFixedThreadPool(coreCount);
for(int i = 0; i < coreCount; i ++) {
poolLoader.submit(
() -> {
final var currThread = Thread.currentThread();
final var connBuff = new ArrayList<Channel>(BATCH_SIZE);
final Thread currThread = Thread.currentThread();
final List<Channel> connBuff = new ArrayList<>(BATCH_SIZE);
int j, k;
Channel c;
try {
Expand Down Expand Up @@ -110,7 +107,7 @@ public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCoun
} catch(final Throwable t) {
t.printStackTrace(System.err);
} finally {
final var connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
final long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
System.out.println(
"concurrency = " + concurrencyLevel + ", nodes = " + nodeCount + " -> rate: " +
connCountSum / TEST_STEP_TIME_SECONDS
Expand All @@ -121,9 +118,9 @@ public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCoun
@Test
public void test() {
if(nodeCount > 1) {
final var connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
final var avgConnCountPerNode = connCountSum / nodeCount;
for(final var nodeAddr: nodeFreq.keySet()) {
final long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
final long avgConnCountPerNode = connCountSum / nodeCount;
for(final String nodeAddr: nodeFreq.keySet()) {
assertTrue(nodeFreq.get(nodeAddr).sum() > 0);
assertEquals(
"Node count: " + nodeCount + ", node: \"" + nodeAddr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.test.util.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.test.util.EpollConnDroppingServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -55,9 +56,9 @@ public void setUp()

serverMock = new EpollConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT);

final var concurrencyThrottle = new Semaphore(CONCURRENCY);
final Semaphore concurrencyThrottle = new Semaphore(CONCURRENCY);
group = new EpollEventLoopGroup();
final var bootstrap = new Bootstrap()
final Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(EpollSocketChannel.class)
.handler(
Expand Down Expand Up @@ -89,13 +90,13 @@ public void tearDown()
@Test
public void test()
throws Exception {
final var connCounter = new LongAdder();
final var executor = Executors.newFixedThreadPool(CONCURRENCY);
for(var i = 0; i < CONCURRENCY; i ++) {
final LongAdder connCounter = new LongAdder();
final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);
for(int i = 0; i < CONCURRENCY; i ++) {
executor.submit(
() -> {
Channel conn;
for(var j = 0; j < CONN_ATTEMPTS; j ++) {
for(int j = 0; j < CONN_ATTEMPTS; j ++) {
try {
while(null == (conn = connPool.lease())) {
Thread.sleep(1);
Expand Down
Loading

0 comments on commit d1e2824

Please sign in to comment.