Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	build.gradle
  • Loading branch information
Andrey Kurilov committed Dec 2, 2017
2 parents b336ba1 + 6cbc652 commit c0c5ca3
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 59 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ operation (leasing/releasing many connections at once).
## Gradle

```groovy
compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '0.1.1'
compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '0.1.2'
```

## Code Example
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apply plugin: "maven"
apply plugin: "signing"

group = "com.github.akurilov"
version = "0.1.2"
version = "0.1.4"

tasks.withType(JavaCompile) {
sourceCompatibility = JavaVersion.VERSION_1_8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private CloseChannelListener(final String nodeAddr, final Channel conn) {
@Override
public final void operationComplete(final ChannelFuture future)
throws Exception {
LOG.fine("Connection to " + nodeAddr + " closed");
closeLock.lock();
try {
synchronized(connCounts) {
Expand Down Expand Up @@ -192,16 +193,6 @@ private Channel connectToAnyNode()
LOG.fine("New connection to \"" + nodeAddr + "\"");
try {
conn = connect(nodeAddr);
conn.closeFuture().addListener(new CloseChannelListener(nodeAddr, conn));
conn.attr(ATTR_KEY_NODE).set(nodeAddr);
allConns.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(conn);
synchronized(connCounts) {
connCounts.put(nodeAddr, connCounts.getInt(nodeAddr) + 1);
}
if(connAttemptsLimit > 0) {
// reset the connection failures counter if connected successfully
failedConnAttemptCounts.put(nodeAddr, 0);
}
} catch(final Exception e) {
LOG.warning(
"Failed to create a new connection to " + nodeAddr + ": " + e.toString()
Expand Down Expand Up @@ -242,12 +233,30 @@ private Channel connectToAnyNode()
}
}

if(conn != null) {
conn.closeFuture().addListener(new CloseChannelListener(nodeAddr, conn));
conn.attr(ATTR_KEY_NODE).set(nodeAddr);
allConns.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(conn);
synchronized(connCounts) {
connCounts.put(nodeAddr, connCounts.getInt(nodeAddr) + 1);
}
if(connAttemptsLimit > 0) {
// reset the connection failures counter if connected successfully
failedConnAttemptCounts.put(nodeAddr, 0);
}
LOG.fine("New connection to " + nodeAddr + " created");
}

return conn;
}

protected Channel connect(final String addr)
throws Exception {
return bootstraps.get(addr).connect().sync().channel();
final Bootstrap bootstrap = bootstraps.get(addr);
if(bootstrap != null) {
return bootstrap.connect().sync().channel();
}
return null;
}

protected Channel poll() {
Expand All @@ -256,9 +265,11 @@ protected Channel poll() {
Channel conn;
for(int j = i; j < i + n; j ++) {
connQueue = availableConns.get(nodes[j % n]);
conn = connQueue.poll();
if(conn != null && conn.isActive()) {
return conn;
if(connQueue != null) {
conn = connQueue.poll();
if(conn != null && conn.isActive()) {
return conn;
}
}
}
return null;
Expand Down Expand Up @@ -341,17 +352,27 @@ public final void release(final List<Channel> conns) {
public void close()
throws IOException {
closeLock.lock();
availableConns.clear();
bootstraps.clear();
int closedConnCount = 0;
for(final String nodeAddr : allConns.keySet()) {
for(final Channel conn : allConns.get(nodeAddr)) {
conn.close();
closedConnCount ++;
for(final String nodeAddr: availableConns.keySet()) {
for(final Channel conn: availableConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
}
}
}
availableConns.clear();
for(final String nodeAddr: allConns.keySet()) {
for(final Channel conn: allConns.get(nodeAddr)) {
if(conn.isOpen()) {
conn.close();
closedConnCount ++;
}
}
}
connCounts.clear();
allConns.clear();
LOG.fine("Closed all " + closedConnCount + " connections");
bootstraps.clear();
connCounts.clear();
LOG.fine("Closed " + closedConnCount + " connections");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.github.akurilov.commons.concurrent.ThreadUtil;
import static com.github.akurilov.netty.connection.pool.NonBlockingConnPool.ATTR_KEY_NODE;

import com.github.akurilov.netty.connection.pool.mock.BasicMultiNodeConnPoolMock;
import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.util.BasicMultiNodeConnPoolMock;
import com.github.akurilov.netty.connection.pool.util.DummyChannelPoolHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;

Expand Down Expand Up @@ -106,8 +106,8 @@ public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCoun
e.printStackTrace();
}
poolLoader.shutdownNow();
} catch(final IOException e) {
e.printStackTrace(System.err);
} catch(final Throwable t) {
t.printStackTrace(System.err);
} finally {
final long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
System.out.println(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.github.akurilov.netty.connection.pool;

import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.mock.EpollConnDroppingServer;
import com.github.akurilov.netty.connection.pool.util.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.util.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.util.EpollConnDroppingServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -97,10 +97,8 @@ public void test()
Channel conn;
for(int j = 0; j < CONN_ATTEMPTS; j ++) {
try {
conn = connPool.lease();
if(conn == null) {
LockSupport.parkNanos(1);
continue;
while(null == (conn = connPool.lease())) {
Thread.sleep(1);
}
conn.writeAndFlush(PAYLOAD.retain()).sync();
connPool.release(conn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.github.akurilov.netty.connection.pool;

import com.github.akurilov.netty.connection.pool.util.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.util.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.util.EpollConnDroppingServer;

import com.github.akurilov.netty.connection.pool.util.PortTools;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.assertEquals;

public class EpollConnLeakTest {

private static final int CONCURRENCY = 100;
private static final String[] NODES = new String[] { "127.0.0.1" };
private static final ChannelPoolHandler CPH = new DummyChannelPoolHandler();
private static final int DEFAULT_PORT = 9876;
private static final long TEST_TIME_SECONDS = 30;
private static final int FAIL_EVERY_CONN_ATTEMPT = 0;
private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000);

Closeable serverMock;
NonBlockingConnPool connPool;
EventLoopGroup group;

@Before
public void setUp()
throws Exception {

serverMock = new EpollConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT);

// create
final Semaphore concurrencyThrottle = new Semaphore(CONCURRENCY);
group = new EpollEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(EpollSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected final void initChannel(final SocketChannel conn)
throws Exception {
conn.pipeline().addLast(new DummyClientChannelHandler());
}
}
)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
connPool = new BasicMultiNodeConnPool(
concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0
);
connPool.preCreateConnections(CONCURRENCY);

// use
final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);
for(int i = 0; i < CONCURRENCY; i ++) {
executor.submit(
(Runnable) () -> {
Channel conn;
while(true) {
try {
conn = connPool.lease();
if(conn == null) {
LockSupport.parkNanos(1);
continue;
}
try {
conn.writeAndFlush(PAYLOAD.retain()).sync();
} finally {
connPool.release(conn);
}
} catch(final Throwable cause) {
cause.printStackTrace(System.err);
break;
}
}
}
);
}
TimeUnit.SECONDS.sleep(TEST_TIME_SECONDS);

// close
executor.shutdownNow();
connPool.close();
group.shutdownGracefully();
serverMock.close();
TimeUnit.SECONDS.sleep(1);
}

@After
public void tearDown()
throws Exception {
}

@Test
public void testNoConnectionsAreAfterPoolClosed()
throws Exception {
final int actualConnCount = PortTools.getConnectionCount("127.0.0.1:" + DEFAULT_PORT);
assertEquals(
"Connection count should be equal to 0 after pool has been closed, but got "
+ actualConnCount, 0, actualConnCount
);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.github.akurilov.netty.connection.pool;

import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.mock.NioConnDroppingServer;
import com.github.akurilov.netty.connection.pool.util.DummyChannelPoolHandler;
import com.github.akurilov.netty.connection.pool.util.DummyClientChannelHandler;
import com.github.akurilov.netty.connection.pool.util.NioConnDroppingServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
Expand All @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;

/**
* Created by andrey on 16.11.17.
Expand Down Expand Up @@ -96,10 +97,8 @@ public void test()
Channel conn;
for(int j = 0; j < CONN_ATTEMPTS; j ++) {
try {
conn = connPool.lease();
if(conn == null) {
LockSupport.parkNanos(1);
continue;
while(null == (conn = connPool.lease())) {
Thread.sleep(1);
}
conn.writeAndFlush(PAYLOAD.retain()).sync();
connPool.release(conn);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.akurilov.netty.connection.pool.mock;
package com.github.akurilov.netty.connection.pool.util;

import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool;
import com.github.akurilov.netty.connection.pool.NonBlockingConnPool;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.akurilov.netty.connection.pool.mock;
package com.github.akurilov.netty.connection.pool.util;

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.akurilov.netty.connection.pool.mock;
package com.github.akurilov.netty.connection.pool.util;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.akurilov.netty.connection.pool.mock;
package com.github.akurilov.netty.connection.pool.util;

import com.emc.mongoose.api.model.data.DataInput;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.akurilov.netty.connection.pool.mock;
package com.github.akurilov.netty.connection.pool.util;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -37,19 +37,22 @@ public EpollConnDroppingServer(final int port, final int dropEveryRequest)
new ChannelInitializer<SocketChannel>() {
@Override
public final void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(
new SimpleChannelInboundHandler<Object>() {
@Override
protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
if(0 == reqCounter.incrementAndGet() % dropEveryRequest) {
final Channel conn = ctx.channel();
System.out.println("Dropping the connection " + conn);
conn.close();
if(dropEveryRequest > 0) {
ch.pipeline().addLast(
new SimpleChannelInboundHandler<Object>() {
@Override
protected final void channelRead0(
final ChannelHandlerContext ctx, final Object msg
) throws Exception {
if(0 == reqCounter.incrementAndGet() % dropEveryRequest) {
final Channel conn = ctx.channel();
System.out.println("Dropping the connection " + conn);
conn.close();
}
}
}
}
);
);
}
}
}
);
Expand Down
Loading

0 comments on commit c0c5ca3

Please sign in to comment.