From 7673041565668117cb07599e07d5d0b72574a8a7 Mon Sep 17 00:00:00 2001 From: zhupengfei Date: Tue, 10 Dec 2024 00:51:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=97=B6=E6=8C=87=E5=AE=9A=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=EF=BC=8C=E9=87=8D=E8=BF=9E=E6=97=B6=E4=BB=8D=E7=84=B6=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E7=AB=AF=E5=8F=A3=E3=80=82=E5=A2=9E=E5=8A=A0=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E9=9A=8F=E6=9C=BA=E7=AB=AF=E5=8F=A3=EF=BC=8C=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E6=97=B6=E4=BB=8D=E7=84=B6=E9=9A=8F=E6=9C=BA=E7=AB=AF?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/ConnectionCompletionHandler.java | 2 + .../main/java/org/tio/client/TioClient.java | 825 +++++++++--------- 2 files changed, 412 insertions(+), 415 deletions(-) diff --git a/src/core/src/main/java/org/tio/client/ConnectionCompletionHandler.java b/src/core/src/main/java/org/tio/client/ConnectionCompletionHandler.java index bcd5d20..6f1c7c6 100644 --- a/src/core/src/main/java/org/tio/client/ConnectionCompletionHandler.java +++ b/src/core/src/main/java/org/tio/client/ConnectionCompletionHandler.java @@ -304,6 +304,8 @@ private void handler(Void result, ConnectionCompletionVo attachment, Throwable t if (reconnConf != null) { channelContext = new ClientChannelContext(tioClientConfig, asynchronousSocketChannel); channelContext.setServerNode(serverNode); + channelContext.setBindIp(bindIp); + channelContext.setBindPort(bindPort); } } diff --git a/src/core/src/main/java/org/tio/client/TioClient.java b/src/core/src/main/java/org/tio/client/TioClient.java index 2d699d2..700cb9c 100644 --- a/src/core/src/main/java/org/tio/client/TioClient.java +++ b/src/core/src/main/java/org/tio/client/TioClient.java @@ -194,6 +194,7 @@ recommend that a file or class name and description of purpose be included on package org.tio.client; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; @@ -218,423 +219,417 @@ recommend that a file or class name and description of purpose be included on import org.tio.utils.lock.SetWithLock; /** - * * @author tanyaowu * 2017年4月1日 上午9:29:58 */ public class TioClient { - private static Logger log = LoggerFactory.getLogger(TioClient.class); - - private AsynchronousChannelGroup channelGroup; - - private TioClientConfig tioClientConfig; - - /** - * @param serverIp 可以为空 - * @param serverPort - * @param aioDecoder - * @param aioEncoder - * @param tioHandler - * - * @author tanyaowu - * @throws IOException - * - */ - public TioClient(final TioClientConfig tioClientConfig) throws IOException { - super(); - this.tioClientConfig = tioClientConfig; - this.channelGroup = AsynchronousChannelGroup.withThreadPool(tioClientConfig.groupExecutor); - - startHeartbeatTask(); - startReconnTask(); - } - - /** - * - * @param serverNode - * @throws Exception - * - * @author tanyaowu - * - */ - public void asynConnect(Node serverNode) throws Exception { - asynConnect(serverNode, null); - } - - /** - * - * @param serverNode - * @param timeout - * @throws Exception - * - * @author tanyaowu - * - */ - public void asynConnect(Node serverNode, Integer timeout) throws Exception { - asynConnect(serverNode, null, null, timeout); - } - - /** - * - * @param serverNode - * @param bindIp - * @param bindPort - * @param timeout - * @throws Exception - * - * @author tanyaowu - * - */ - public void asynConnect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { - connect(serverNode, bindIp, bindPort, null, timeout, false); - } - - /** - * - * @param serverNode - * @return - * @throws Exception - * - * @author tanyaowu - * - */ - public ClientChannelContext connect(Node serverNode) throws Exception { - return connect(serverNode, null); - } - - /** - * - * @param serverNode - * @param timeout - * @return - * @throws Exception - * @author tanyaowu - */ - public ClientChannelContext connect(Node serverNode, Integer timeout) throws Exception { - return connect(serverNode, null, 0, timeout); - } - - /** - * - * @param serverNode - * @param bindIp - * @param bindPort - * @param initClientChannelContext - * @param timeout 超时时间,单位秒 - * @return - * @throws Exception - * @author tanyaowu - */ - public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout) throws Exception { - return connect(serverNode, bindIp, bindPort, initClientChannelContext, timeout, true); - } - - /** - * - * @param serverNode - * @param bindIp - * @param bindPort - * @param initClientChannelContext - * @param timeout 超时时间,单位秒 - * @param isSyn true: 同步, false: 异步 - * @return - * @throws Exception - * @author tanyaowu - */ - private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn) - throws Exception { - - AsynchronousSocketChannel asynchronousSocketChannel = null; - ClientChannelContext channelContext = null; - boolean isReconnect = initClientChannelContext != null; - // TioClientListener tioClientListener = tioClientConfig.getTioClientListener(); - - long start = SystemTimer.currTime; - asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup); - long end = SystemTimer.currTime; - long iv = end - start; - if (iv >= 100) { - log.error("{}, open 耗时:{} ms", channelContext, iv); - } - - asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - - InetSocketAddress bind = null; - if (bindPort != null && bindPort > 0) { - if (false == StrUtil.isBlank(bindIp)) { - bind = new InetSocketAddress(bindIp, bindPort); - } else { - bind = new InetSocketAddress(bindPort); - } - } - - if (bind != null) { - asynchronousSocketChannel.bind(bind); - } - - channelContext = initClientChannelContext; - - start = SystemTimer.currTime; - - InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort()); - - ConnectionCompletionVo attachment = new ConnectionCompletionVo(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp, bindPort); - - if (isSyn) { - Integer realTimeout = timeout; - if (realTimeout == null) { - realTimeout = 5; - } - - CountDownLatch countDownLatch = new CountDownLatch(1); - attachment.setCountDownLatch(countDownLatch); - - try { - asynchronousSocketChannel.connect(inetSocketAddress, attachment, tioClientConfig.getConnectionCompletionHandler()); - } catch (Throwable e) { - tioClientConfig.getConnectionCompletionHandler().failed(e, attachment); - return attachment.getChannelContext(); - } - - @SuppressWarnings("unused") - boolean f = countDownLatch.await(realTimeout, TimeUnit.SECONDS); - return attachment.getChannelContext(); - } else { - try { - asynchronousSocketChannel.connect(inetSocketAddress, attachment, tioClientConfig.getConnectionCompletionHandler()); - } catch (Throwable e) { - tioClientConfig.getConnectionCompletionHandler().failed(e, attachment); - } - return null; - } - } - - /** - * - * @param serverNode - * @param bindIp - * @param bindPort - * @param timeout 超时时间,单位秒 - * @return - * @throws Exception - * - * @author tanyaowu - * - */ - public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { - return connect(serverNode, bindIp, bindPort, null, timeout); - } - - /** - * @return the channelGroup - */ - public AsynchronousChannelGroup getChannelGroup() { - return channelGroup; - } - - /** - * @return the tioClientConfig - */ - public TioClientConfig getTioClientConfig() { - return tioClientConfig; - } - - /** - * - * @param channelContext - * @param timeout 单位秒 - * @return - * @throws Exception - * - * @author tanyaowu - * - */ - public void reconnect(ClientChannelContext channelContext, Integer timeout) throws Exception { - connect(channelContext.getServerNode(), channelContext.getBindIp(), channelContext.getBindPort(), channelContext, timeout); - } - - /** - * @param tioClientConfig the tioClientConfig to set - */ - public void setTioClientConfig(TioClientConfig tioClientConfig) { - this.tioClientConfig = tioClientConfig; - } - - /** - * 定时任务:发心跳 - * @author tanyaowu - * - */ - private void startHeartbeatTask() { - final ClientGroupStat clientGroupStat = (ClientGroupStat) tioClientConfig.groupStat; - final TioClientHandler tioHandler = tioClientConfig.getTioClientHandler(); - - final String id = tioClientConfig.getId(); - new Thread(new Runnable() { - @Override - public void run() { - while (!tioClientConfig.isStopped()) { - // final long heartbeatTimeout = tioClientConfig.heartbeatTimeout; - if (tioClientConfig.heartbeatTimeout <= 0) { - log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制"); - break; - } - SetWithLock setWithLock = tioClientConfig.connecteds; - ReadLock readLock = setWithLock.readLock(); - readLock.lock(); - try { - Set set = setWithLock.getObj(); - long currtime = SystemTimer.currTime; - for (ChannelContext entry : set) { - ClientChannelContext channelContext = (ClientChannelContext) entry; - if (channelContext.isClosed || channelContext.isRemoved) { - continue; - } - - ChannelStat stat = channelContext.stat; - long compareTime = Math.max(stat.latestTimeOfReceivedByte, stat.latestTimeOfSentPacket); - long interval = currtime - compareTime; - if (interval >= tioClientConfig.heartbeatTimeout / 2) { - Packet packet = tioHandler.heartbeatPacket(channelContext); - if (packet != null) { - if (log.isInfoEnabled()) { - log.info("{}发送心跳包", channelContext.toString()); - } - Tio.send(channelContext, packet); - } - } - } - if (log.isInfoEnabled()) { - log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(), - clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(), - clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get()); - } - - } catch (Throwable e) { - log.error("", e); - } finally { - try { - readLock.unlock(); - Thread.sleep(tioClientConfig.heartbeatTimeout / 4); - } catch (Throwable e) { - log.error(e.toString(), e); - } finally { - - } - } - } - } - }, "tio-timer-heartbeat" + id).start(); - } - - /** - * 启动重连任务 - * - * @author tanyaowu - * - */ - private void startReconnTask() { - final ReconnConf reconnConf = tioClientConfig.getReconnConf(); - if (reconnConf == null || reconnConf.getInterval() <= 0) { - return; - } - - final String id = tioClientConfig.getId(); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - while (!tioClientConfig.isStopped()) { - if (tioClientConfig.closeds.size() > 0) { - // 有连接断开时才打印日志 - log.error("closeds:{}, connections:{}", tioClientConfig.closeds.size(), tioClientConfig.connections.size()); - } - //log.info("准备重连"); - LinkedBlockingQueue queue = reconnConf.getQueue(); - ClientChannelContext channelContext = null; - try { - channelContext = (ClientChannelContext) queue.take(); - } catch (InterruptedException e1) { - log.error(e1.toString(), e1); - } - if (channelContext == null) { - continue; - // return; - } - - if (channelContext.isRemoved) //已经删除的,不需要重新再连 - { - continue; - } - - SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext; - if (sslFacadeContext != null) { - sslFacadeContext.setHandshakeCompleted(false); - } - - long sleeptime = reconnConf.getInterval() - (SystemTimer.currTime - channelContext.stat.timeInReconnQueue); - //log.info("sleeptime:{}, closetime:{}", sleeptime, timeInReconnQueue); - if (sleeptime > 0) { - try { - Thread.sleep(sleeptime); - } catch (InterruptedException e) { - log.error(e.toString(), e); - } - } - - if (channelContext.isRemoved || !channelContext.isClosed) //已经删除的和已经连上的,不需要重新再连 - { - continue; - } else { - ReconnRunnable runnable = channelContext.getReconnRunnable(); - if (runnable == null) { - synchronized (channelContext) { - runnable = channelContext.getReconnRunnable(); - if (runnable == null) { - runnable = new ReconnRunnable(channelContext, TioClient.this, reconnConf.getThreadPoolExecutor()); - channelContext.setReconnRunnable(runnable); - } - } - } - runnable.execute(); - // reconnConf.getThreadPoolExecutor().execute(runnable); - } - } - } - }); - thread.setName("tio-timer-reconnect-" + id); - thread.setDaemon(true); - thread.start(); - - } - - /** - * - * @return - * @author tanyaowu - */ - public boolean stop() { - boolean ret = true; - try { - tioClientConfig.groupExecutor.shutdown(); - } catch (Exception e1) { - log.error(e1.toString(), e1); - } - try { - tioClientConfig.tioExecutor.shutdown(); - } catch (Exception e1) { - log.error(e1.toString(), e1); - } - - tioClientConfig.setStopped(true); - try { - ret = ret && tioClientConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS); - ret = ret && tioClientConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.error(e.getLocalizedMessage(), e); - } - log.info("client resource has released"); - return ret; - } + private static Logger log = LoggerFactory.getLogger(TioClient.class); + + private AsynchronousChannelGroup channelGroup; + + private TioClientConfig tioClientConfig; + + /** + * @param serverIp 可以为空 + * @param serverPort + * @param aioDecoder + * @param aioEncoder + * @param tioHandler + * @throws IOException + * @author tanyaowu + */ + public TioClient(final TioClientConfig tioClientConfig) throws IOException { + super(); + this.tioClientConfig = tioClientConfig; + this.channelGroup = AsynchronousChannelGroup.withThreadPool(tioClientConfig.groupExecutor); + + startHeartbeatTask(); + startReconnTask(); + } + + /** + * @param serverNode + * @throws Exception + * @author tanyaowu + */ + public void asynConnect(Node serverNode) throws Exception { + asynConnect(serverNode, null); + } + + /** + * @param serverNode + * @param timeout + * @throws Exception + * @author tanyaowu + */ + public void asynConnect(Node serverNode, Integer timeout) throws Exception { + asynConnect(serverNode, null, null, timeout); + } + + /** + * @param serverNode + * @param bindIp + * @param bindPort + * @param timeout + * @throws Exception + * @author tanyaowu + */ + public void asynConnect(Node serverNode, String bindIp, Integer timeout) throws Exception { + connect(serverNode, bindIp, 0, null, timeout, false); + } + + /** + * @param serverNode + * @param bindIp + * @param bindPort + * @param timeout + * @throws Exception + * @author tanyaowu + */ + public void asynConnect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { + connect(serverNode, bindIp, bindPort, null, timeout, false); + } + + /** + * @param serverNode + * @return + * @throws Exception + * @author tanyaowu + */ + public ClientChannelContext connect(Node serverNode) throws Exception { + return connect(serverNode, null); + } + + /** + * @param serverNode + * @param timeout + * @return + * @throws Exception + * @author tanyaowu + */ + public ClientChannelContext connect(Node serverNode, Integer timeout) throws Exception { + return connect(serverNode, null, 0, timeout); + } + + /** + * @param serverNode + * @param bindIp + * @param bindPort + * @param initClientChannelContext + * @param timeout 超时时间,单位秒 + * @return + * @throws Exception + * @author tanyaowu + */ + public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout) throws Exception { + return connect(serverNode, bindIp, bindPort, initClientChannelContext, timeout, true); + } + + /** + * @param serverNode + * @param bindIp + * @param bindPort + * @param initClientChannelContext + * @param timeout 超时时间,单位秒 + * @param isSyn true: 同步, false: 异步 + * @return + * @throws Exception + * @author tanyaowu + */ + private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn) + throws Exception { + + AsynchronousSocketChannel asynchronousSocketChannel = null; + ClientChannelContext channelContext = null; + boolean isReconnect = initClientChannelContext != null; + + long start = SystemTimer.currTime; + asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup); + long end = SystemTimer.currTime; + long iv = end - start; + if (iv >= 100) { + log.error("{}, open 耗时:{} ms", channelContext, iv); + } + + asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + + // Client bind + InetSocketAddress bindClient = null; + if (bindIp != null && false == StrUtil.isBlank(bindIp)) { + if (bindPort != null && bindPort > 0) { + // 指定ip和端口 + bindClient = new InetSocketAddress(bindIp, bindPort); + } else { + // 指定ip,端口随机 + bindClient = new InetSocketAddress(bindIp, 0); + } + } else { + if (bindPort != null && bindPort > 0) { + // 未指定ip,指定端口 + bindClient = new InetSocketAddress((InetAddress) null, bindPort); + } else { + // 未指定ip和端口 + bindClient = new InetSocketAddress((InetAddress) null, 0); + } + } + + if (bindClient != null) { + asynchronousSocketChannel.bind(bindClient); + } + + channelContext = initClientChannelContext; + + start = SystemTimer.currTime; + + // Server + InetSocketAddress bindServer = new InetSocketAddress(serverNode.getIp(), serverNode.getPort()); + + ConnectionCompletionVo attachment = new ConnectionCompletionVo(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp, bindPort); + + if (isSyn) { + Integer realTimeout = timeout; + if (realTimeout == null) { + realTimeout = 5; + } + + CountDownLatch countDownLatch = new CountDownLatch(1); + attachment.setCountDownLatch(countDownLatch); + + try { + asynchronousSocketChannel.connect(bindServer, attachment, tioClientConfig.getConnectionCompletionHandler()); + } catch (Throwable e) { + tioClientConfig.getConnectionCompletionHandler().failed(e, attachment); + return attachment.getChannelContext(); + } + + @SuppressWarnings("unused") + boolean f = countDownLatch.await(realTimeout, TimeUnit.SECONDS); + return attachment.getChannelContext(); + } else { + try { + asynchronousSocketChannel.connect(bindServer, attachment, tioClientConfig.getConnectionCompletionHandler()); + } catch (Throwable e) { + tioClientConfig.getConnectionCompletionHandler().failed(e, attachment); + } + return null; + } + } + + /** + * @param serverNode + * @param bindIp + * @param bindPort + * @param timeout 超时时间,单位秒 + * @return + * @throws Exception + * @author tanyaowu + */ + public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { + return connect(serverNode, bindIp, bindPort, null, timeout); + } + + /** + * @return the channelGroup + */ + public AsynchronousChannelGroup getChannelGroup() { + return channelGroup; + } + + /** + * @return the tioClientConfig + */ + public TioClientConfig getTioClientConfig() { + return tioClientConfig; + } + + /** + * @param channelContext + * @param timeout 单位秒 + * @return + * @throws Exception + * @author tanyaowu + */ + public void reconnect(ClientChannelContext channelContext, Integer timeout) throws Exception { + connect(channelContext.getServerNode(), channelContext.getBindIp(), channelContext.getBindPort(), channelContext, timeout); + } + + /** + * @param tioClientConfig the tioClientConfig to set + */ + public void setTioClientConfig(TioClientConfig tioClientConfig) { + this.tioClientConfig = tioClientConfig; + } + + /** + * 定时任务:发心跳 + * + * @author tanyaowu + */ + private void startHeartbeatTask() { + final ClientGroupStat clientGroupStat = (ClientGroupStat) tioClientConfig.groupStat; + final TioClientHandler tioHandler = tioClientConfig.getTioClientHandler(); + + final String id = tioClientConfig.getId(); + new Thread(new Runnable() { + @Override + public void run() { + while (!tioClientConfig.isStopped()) { + if (tioClientConfig.heartbeatTimeout <= 0) { + // log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制"); + break; + } + SetWithLock setWithLock = tioClientConfig.connecteds; + ReadLock readLock = setWithLock.readLock(); + readLock.lock(); + try { + Set set = setWithLock.getObj(); + long currtime = SystemTimer.currTime; + for (ChannelContext entry : set) { + ClientChannelContext channelContext = (ClientChannelContext) entry; + if (channelContext.isClosed || channelContext.isRemoved) { + continue; + } + + ChannelStat stat = channelContext.stat; + long compareTime = Math.max(stat.latestTimeOfReceivedByte, stat.latestTimeOfSentPacket); + long interval = currtime - compareTime; + if (interval >= tioClientConfig.heartbeatTimeout / 2) { + Packet packet = tioHandler.heartbeatPacket(channelContext); + if (packet != null) { + if (log.isInfoEnabled()) { + log.info("{}发送心跳包", channelContext.toString()); + } + Tio.send(channelContext, packet); + } + } + } + if (log.isInfoEnabled()) { + log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(), + clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(), + clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get()); + } + + } catch (Throwable e) { + log.error("", e); + } finally { + try { + readLock.unlock(); + Thread.sleep(tioClientConfig.heartbeatTimeout / 4); + } catch (Throwable e) { + log.error(e.toString(), e); + } finally { + + } + } + } + } + }, "tio-timer-heartbeat" + id).start(); + } + + /** + * 启动重连任务 + * + * @author tanyaowu + */ + private void startReconnTask() { + final ReconnConf reconnConf = tioClientConfig.getReconnConf(); + if (reconnConf == null || reconnConf.getInterval() <= 0) { + return; + } + + final String id = tioClientConfig.getId(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!tioClientConfig.isStopped()) { + if (tioClientConfig.closeds.size() > 0) { + // 有连接断开时才打印日志 + log.error("closeds:{}, connections:{}", tioClientConfig.closeds.size(), tioClientConfig.connections.size()); + } + // log.info("准备重连"); + LinkedBlockingQueue queue = reconnConf.getQueue(); + ClientChannelContext channelContext = null; + try { + channelContext = (ClientChannelContext) queue.take(); + } catch (InterruptedException e1) { + log.error(e1.toString(), e1); + } + if (channelContext == null) { + continue; + } + + if (channelContext.isRemoved) { + // 已经删除的,不需要重新再连 + continue; + } + + SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext; + if (sslFacadeContext != null) { + sslFacadeContext.setHandshakeCompleted(false); + } + + long sleeptime = reconnConf.getInterval() - (SystemTimer.currTime - channelContext.stat.timeInReconnQueue); + // log.info("sleeptime:{}, closetime:{}", sleeptime, timeInReconnQueue); + if (sleeptime > 0) { + try { + Thread.sleep(sleeptime); + } catch (InterruptedException e) { + log.error(e.toString(), e); + } + } + + if (channelContext.isRemoved || !channelContext.isClosed) { + // 已经删除的和已经连上的,不需要重新再连 + continue; + } else { + ReconnRunnable runnable = channelContext.getReconnRunnable(); + if (runnable == null) { + synchronized (channelContext) { + runnable = channelContext.getReconnRunnable(); + if (runnable == null) { + runnable = new ReconnRunnable(channelContext, TioClient.this, reconnConf.getThreadPoolExecutor()); + channelContext.setReconnRunnable(runnable); + } + } + } + runnable.execute(); + } + } + } + }); + thread.setName("tio-timer-reconnect-" + id); + thread.setDaemon(true); + thread.start(); + + } + + /** + * @return + * @author tanyaowu + */ + public boolean stop() { + boolean ret = true; + try { + tioClientConfig.groupExecutor.shutdown(); + } catch (Exception e1) { + log.error(e1.toString(), e1); + } + try { + tioClientConfig.tioExecutor.shutdown(); + } catch (Exception e1) { + log.error(e1.toString(), e1); + } + + tioClientConfig.setStopped(true); + try { + ret = ret && tioClientConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS); + ret = ret && tioClientConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error(e.getLocalizedMessage(), e); + } + log.info("client resource has released"); + return ret; + } }