From 868d603e68f58afe13edcad6aea24512e94c3789 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 6 Apr 2023 20:40:48 +0800 Subject: [PATCH 1/3] =?UTF-8?q?bug:=20=E4=BF=AE=E5=A4=8D=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=9C=A8=E8=BF=9C=E8=B7=9D=E7=A6=BB=E5=88=86=E5=8F=91=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E4=BC=9A=E5=87=BA=E7=8E=B0=E5=8D=A1=E4=BD=8F=E7=9A=84?= =?UTF-8?q?=E6=83=85=E5=86=B5=E3=80=82#387?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../util/okhttp/HttpClientBuilderFactory.kt | 14 +- .../util/okhttp/UnsafeSSLSocketImpl.kt | 254 ++++++++++++++++++ .../util/okhttp/UnsafeSslSocketFactoryImpl.kt | 73 +++++ .../replica/base/OkHttpClientPool.kt | 11 +- .../replica/base/context/ReplicaContext.kt | 12 +- 5 files changed, 352 insertions(+), 12 deletions(-) create mode 100644 src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt create mode 100644 src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSslSocketFactoryImpl.kt diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/HttpClientBuilderFactory.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/HttpClientBuilderFactory.kt index e1baf4190d..83e0c952b1 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/HttpClientBuilderFactory.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/HttpClientBuilderFactory.kt @@ -63,14 +63,20 @@ object HttpClientBuilderFactory { fun create( certificate: String? = null, neverReadTimeout: Boolean = false, - beanFactory: BeanFactory? = null + beanFactory: BeanFactory? = null, + closeTimeout: Long = 0, ): OkHttpClient.Builder { return defaultClient.newBuilder() .apply { certificate?.let { val trustManager = CertTrustManager.createTrustManager(it) val sslSocketFactory = CertTrustManager.createSSLSocketFactory(trustManager) - sslSocketFactory(sslSocketFactory, trustManager) + val ssf = if (closeTimeout > 0) { + UnsafeSslSocketFactoryImpl(sslSocketFactory, closeTimeout) + } else { + sslSocketFactory + } + sslSocketFactory(ssf, trustManager) } if (neverReadTimeout) { @@ -88,8 +94,8 @@ object HttpClientBuilderFactory { 60L, TimeUnit.SECONDS, SynchronousQueue(), - threadFactory("OkHttp Dispatcher", false) - ) + threadFactory("OkHttp Dispatcher", false), + ), ) dispatcher(Dispatcher(traceableExecutorService)) } diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt new file mode 100644 index 0000000000..3b0e6012b3 --- /dev/null +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt @@ -0,0 +1,254 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.artifact.util.okhttp + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import java.io.InputStream +import java.io.OutputStream +import java.net.SocketAddress +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import java.util.function.BiFunction +import javax.net.ssl.HandshakeCompletedListener +import javax.net.ssl.SSLParameters +import javax.net.ssl.SSLSession +import javax.net.ssl.SSLSocket +import okio.AsyncTimeout +import org.slf4j.LoggerFactory +import sun.security.ssl.SSLSocketImpl + +/** + * 不安全的ssl socket。重写了SSLSocketImpl的close方法,允许强势关闭连接。 + * 但是因此,可能会破坏tls的正确关闭。 + * */ +class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTimeout: Long) : SSLSocket() { + + init { + require(closeTimeout >= 0) + } + + override fun getSupportedCipherSuites(): Array { + return delegate.supportedCipherSuites + } + + override fun getEnabledCipherSuites(): Array { + return delegate.enabledCipherSuites + } + + override fun setEnabledCipherSuites(suites: Array?) { + delegate.enabledCipherSuites = suites + } + + override fun getSupportedProtocols(): Array { + return delegate.supportedProtocols + } + + override fun getEnabledProtocols(): Array { + return delegate.enabledProtocols + } + + override fun setEnabledProtocols(protocols: Array?) { + delegate.enabledProtocols = protocols + } + + override fun getSession(): SSLSession { + return delegate.session + } + + override fun addHandshakeCompletedListener(listener: HandshakeCompletedListener?) { + delegate.addHandshakeCompletedListener(listener) + } + + override fun removeHandshakeCompletedListener(listener: HandshakeCompletedListener?) { + delegate.removeHandshakeCompletedListener(listener) + } + + override fun startHandshake() { + delegate.startHandshake() + } + + override fun setUseClientMode(mode: Boolean) { + delegate.useClientMode = mode + } + + override fun getUseClientMode(): Boolean { + return delegate.useClientMode + } + + override fun setNeedClientAuth(need: Boolean) { + delegate.needClientAuth = need + } + + override fun getNeedClientAuth(): Boolean { + return delegate.needClientAuth + } + + override fun setWantClientAuth(want: Boolean) { + delegate.wantClientAuth = want + } + + override fun getWantClientAuth(): Boolean { + return delegate.wantClientAuth + } + + override fun setEnableSessionCreation(flag: Boolean) { + delegate.enableSessionCreation = flag + } + + override fun getEnableSessionCreation(): Boolean { + return delegate.enableSessionCreation + } + + override fun connect(endpoint: SocketAddress?, timeout: Int) { + delegate.connect(endpoint, timeout) + } + + override fun isClosed(): Boolean { + return delegate.isClosed + } + + override fun shutdownInput() { + delegate.shutdownInput() + } + + override fun isInputShutdown(): Boolean { + return delegate.isInputShutdown + } + + override fun shutdownOutput() { + delegate.shutdownOutput() + } + + override fun isOutputShutdown(): Boolean { + return delegate.isOutputShutdown + } + + override fun getInputStream(): InputStream { + return delegate.inputStream + } + + override fun getOutputStream(): OutputStream { + return delegate.outputStream + } + + override fun getSSLParameters(): SSLParameters { + return delegate.sslParameters + } + + override fun setSSLParameters(params: SSLParameters?) { + delegate.sslParameters = params + } + + override fun getApplicationProtocol(): String { + return delegate.applicationProtocol + } + + override fun setHandshakeApplicationProtocolSelector(selector: BiFunction, String>?) { + delegate.handshakeApplicationProtocolSelector = selector + } + + override fun getHandshakeApplicationProtocolSelector(): BiFunction, String> { + return delegate.getHandshakeApplicationProtocolSelector() + } + + override fun getHandshakeApplicationProtocol(): String { + return delegate.handshakeApplicationProtocol + } + + /** + * ssl socket在write阻塞时,close也会阻塞。如果close长时间阻塞,则影响上层业务。 + * */ + override fun close() { + if (logger.isDebugEnabled) { + logger.debug("Close socket $delegate") + } + if (closeTimeout == 0L) { + delegate.close() + return + } + try { + threadPool.execute { asyncClose() } + } catch (e: RejectedExecutionException) { + closeImmediately() + } + } + + /** + * 异步关闭,超时后会强制关闭socket + * */ + private fun asyncClose() { + try { + object : AsyncTimeout() { + override fun timedOut() { + closeImmediately() + } + }.apply { + timeout(closeTimeout, TimeUnit.SECONDS) + withTimeout { delegate.close() } + } + } catch (ignore: Exception) { + // close quietly + } + } + + /** + * 立即关闭ssl socket,可能会破坏tls的正确关闭 + * */ + @Synchronized + fun closeImmediately() { + try { + val closeSocketMethod = SSLSocketImpl::class.java.getDeclaredMethod("closeSocket", Boolean::class.java) + closeSocketMethod.isAccessible = true + if (!delegate.isClosed) { + closeSocketMethod.invoke(delegate, false) + val tlsIsClosedField = SSLSocketImpl::class.java.getDeclaredField("tlsIsClosed") + tlsIsClosedField.isAccessible = true + tlsIsClosedField.set(delegate, true) + logger.info("Success close socket $delegate") + } else { + logger.info("Already closed $delegate") + } + } catch (e: Exception) { + logger.warn("Unable close socket $delegate", e) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(UnsafeSSLSocketImpl::class.java) + private val threadPool = ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + ArrayBlockingQueue(8192), + ThreadFactoryBuilder().setNameFormat("unsafe-sslSocket-watchDog-%d").setDaemon(true).build(), + ) + } +} diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSslSocketFactoryImpl.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSslSocketFactoryImpl.kt new file mode 100644 index 0000000000..feb3b302b0 --- /dev/null +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSslSocketFactoryImpl.kt @@ -0,0 +1,73 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.common.artifact.util.okhttp + +import java.net.InetAddress +import java.net.Socket +import javax.net.ssl.SSLSocket +import javax.net.ssl.SSLSocketFactory + +/** + * 创建不安全的ssl socket。主要是处理希望可以快速关闭连接的情况。 + * */ +class UnsafeSslSocketFactoryImpl(private val delegate: SSLSocketFactory, private val closeTimeout: Long) : + SSLSocketFactory() { + init { + require(closeTimeout >= 0) + } + + override fun createSocket(s: Socket, host: String, port: Int, autoClose: Boolean): Socket { + return (delegate.createSocket(s, host, port, autoClose) as SSLSocket).unsafe() + } + + override fun createSocket(host: String, port: Int): Socket { + return (delegate.createSocket(host, port) as SSLSocket).unsafe() + } + + override fun createSocket(host: String, port: Int, localHost: InetAddress?, localPort: Int): Socket { + return (delegate.createSocket(host, port, localHost, localPort) as SSLSocket).unsafe() + } + + override fun createSocket(host: InetAddress, port: Int): Socket { + return (delegate.createSocket(host, port) as SSLSocket).unsafe() + } + + override fun createSocket(address: InetAddress, port: Int, localAddress: InetAddress, localPort: Int): Socket { + return (delegate.createSocket(address, port, localAddress, localPort) as SSLSocket).unsafe() + } + + override fun getDefaultCipherSuites(): Array { + return delegate.defaultCipherSuites + } + + override fun getSupportedCipherSuites(): Array { + return delegate.supportedCipherSuites + } + + private fun SSLSocket.unsafe() = UnsafeSSLSocketImpl(this, closeTimeout) +} diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/OkHttpClientPool.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/OkHttpClientPool.kt index bc1269bb54..d6070989eb 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/OkHttpClientPool.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/OkHttpClientPool.kt @@ -18,16 +18,19 @@ object OkHttpClientPool { clusterInfo: ClusterInfo, readTimeout: Duration, writeTimeout: Duration, - vararg interceptors: Interceptor + closeTimeout: Duration = Duration.ZERO, + vararg interceptors: Interceptor, ): OkHttpClient { return clientCache.getOrPut(clusterInfo) { - val builder = HttpClientBuilderFactory.create(clusterInfo.certificate) - .protocols(listOf(Protocol.HTTP_1_1)) + val builder = HttpClientBuilderFactory.create( + clusterInfo.certificate, + closeTimeout = closeTimeout.seconds, + ).protocols(listOf(Protocol.HTTP_1_1)) .readTimeout(readTimeout) .writeTimeout(writeTimeout) interceptors.forEach { builder.addInterceptor( - it + it, ) } builder.addNetworkInterceptor(ProgressInterceptor()) diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/context/ReplicaContext.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/context/ReplicaContext.kt index bef2457235..2c5d3f3cef 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/context/ReplicaContext.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/base/context/ReplicaContext.kt @@ -65,7 +65,7 @@ class ReplicaContext( val taskObject: ReplicaObjectInfo, val taskRecord: ReplicaRecordInfo, val localRepo: RepositoryDetail, - val remoteCluster: ClusterNodeInfo + val remoteCluster: ClusterNodeInfo, ) { // 任务信息 val task = taskDetail.task @@ -108,7 +108,7 @@ class ReplicaContext( certificate = remoteCluster.certificate, appId = remoteCluster.appId, accessKey = remoteCluster.accessKey, - secretKey = remoteCluster.secretKey + secretKey = remoteCluster.secretKey, ) // 远端集群仓库特殊处理, 远端集群走对应制品类型协议传输 @@ -126,19 +126,22 @@ class ReplicaContext( targetVersions = initImageTargetTag() val readTimeout = Duration.ofMillis(READ_TIMEOUT) val writeTimeout = Duration.ofMillis(WRITE_TIMEOUT) + val closeTimeout = Duration.ofMillis(CLOSE_TIMEOUT) httpClient = if (cluster.username != null) { OkHttpClientPool.getHttpClient( cluster, readTimeout, writeTimeout, - BasicAuthInterceptor(cluster.username!!, cluster.password!!) + closeTimeout, + BasicAuthInterceptor(cluster.username!!, cluster.password!!), ) } else { OkHttpClientPool.getHttpClient( cluster, readTimeout, writeTimeout, - SignInterceptor(cluster) + closeTimeout, + SignInterceptor(cluster), ) } } @@ -181,5 +184,6 @@ class ReplicaContext( private val logger = LoggerFactory.getLogger(ReplicaContext::class.java) private const val READ_TIMEOUT = 60 * 60 * 1000L private const val WRITE_TIMEOUT = 30 * 1000L + private const val CLOSE_TIMEOUT = 10 * 1000L } } From 2d3cd61bfc06025529da7422845d132ddb14701a Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 6 Apr 2023 21:18:39 +0800 Subject: [PATCH 2/3] bug: code style format #387 --- .../common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt index 3b0e6012b3..6a6e707357 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt @@ -170,7 +170,12 @@ class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTime return delegate.applicationProtocol } - override fun setHandshakeApplicationProtocolSelector(selector: BiFunction, String>?) { + override fun setHandshakeApplicationProtocolSelector( + selector: BiFunction< + SSLSocket, + MutableList, String, + >?, + ) { delegate.handshakeApplicationProtocolSelector = selector } From 61c4e4a41d837d6a961aacf8994659591b3589f4 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Fri, 7 Apr 2023 12:01:58 +0800 Subject: [PATCH 3/3] =?UTF-8?q?bug:=20=E5=AE=9E=E7=8E=B0close=20timeout?= =?UTF-8?q?=E7=9A=84=E6=AD=A3=E7=A1=AE=E8=AF=AD=E4=B9=89=20#387?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../util/okhttp/UnsafeSSLSocketImpl.kt | 59 +++++++------------ 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt index 6a6e707357..f704ce09ea 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/util/okhttp/UnsafeSSLSocketImpl.kt @@ -28,19 +28,19 @@ package com.tencent.bkrepo.common.artifact.util.okhttp import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.tencent.bkrepo.common.artifact.stream.closeQuietly import java.io.InputStream import java.io.OutputStream import java.net.SocketAddress -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.ThreadPoolExecutor +import java.time.Duration +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.function.BiFunction import javax.net.ssl.HandshakeCompletedListener import javax.net.ssl.SSLParameters import javax.net.ssl.SSLSession import javax.net.ssl.SSLSocket -import okio.AsyncTimeout +import kotlin.system.measureNanoTime import org.slf4j.LoggerFactory import sun.security.ssl.SSLSocketImpl @@ -54,6 +54,8 @@ class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTime require(closeTimeout >= 0) } + private val closeLock = Any() + override fun getSupportedCipherSuites(): Array { return delegate.supportedCipherSuites } @@ -194,32 +196,22 @@ class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTime if (logger.isDebugEnabled) { logger.debug("Close socket $delegate") } - if (closeTimeout == 0L) { - delegate.close() + if (delegate.isClosed) { return } - try { - threadPool.execute { asyncClose() } - } catch (e: RejectedExecutionException) { - closeImmediately() - } - } - - /** - * 异步关闭,超时后会强制关闭socket - * */ - private fun asyncClose() { - try { - object : AsyncTimeout() { - override fun timedOut() { - closeImmediately() - } - }.apply { - timeout(closeTimeout, TimeUnit.SECONDS) - withTimeout { delegate.close() } + synchronized(closeLock) { + if (delegate.isClosed) { + return + } + if (closeTimeout == 0L) { + delegate.close() + return + } + val timeoutFuture = threadPool.schedule({ closeImmediately() }, closeTimeout, TimeUnit.SECONDS) + val closeTime = measureNanoTime { delegate.closeQuietly() } + if (closeTime < Duration.ofSeconds(closeTimeout).toNanos()) { + timeoutFuture.cancel(false) } - } catch (ignore: Exception) { - // close quietly } } @@ -237,8 +229,6 @@ class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTime tlsIsClosedField.isAccessible = true tlsIsClosedField.set(delegate, true) logger.info("Success close socket $delegate") - } else { - logger.info("Already closed $delegate") } } catch (e: Exception) { logger.warn("Unable close socket $delegate", e) @@ -247,13 +237,8 @@ class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTime companion object { private val logger = LoggerFactory.getLogger(UnsafeSSLSocketImpl::class.java) - private val threadPool = ThreadPoolExecutor( - 1, - 1, - 0L, - TimeUnit.MILLISECONDS, - ArrayBlockingQueue(8192), - ThreadFactoryBuilder().setNameFormat("unsafe-sslSocket-watchDog-%d").setDaemon(true).build(), - ) + private val defaultFactory = + ThreadFactoryBuilder().setNameFormat("unsafe-sslSocket-watchDog-%d").setDaemon(true).build() + private val threadPool = Executors.newSingleThreadScheduledExecutor(defaultFactory) } }