Skip to content

Commit

Permalink
Merge pull request #388 from felixncheng/bug_387
Browse files Browse the repository at this point in the history
bug: 修复文件在远距离分发时,会出现卡住的情况。#387
  • Loading branch information
owenlxu authored Apr 10, 2023
2 parents f3414c5 + 61c4e4a commit 58fa953
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ object HttpClientBuilderFactory {
fun create(
certificate: String? = null,
neverReadTimeout: Boolean = false,
beanFactory: BeanFactory? = null
beanFactory: BeanFactory? = null,
closeTimeout: Long = 0,
): OkHttpClient.Builder {
return defaultClient.newBuilder()
.apply {
certificate?.let {
val trustManager = CertTrustManager.createTrustManager(it)
val sslSocketFactory = CertTrustManager.createSSLSocketFactory(trustManager)
sslSocketFactory(sslSocketFactory, trustManager)
val ssf = if (closeTimeout > 0) {
UnsafeSslSocketFactoryImpl(sslSocketFactory, closeTimeout)
} else {
sslSocketFactory
}
sslSocketFactory(ssf, trustManager)
}

if (neverReadTimeout) {
Expand All @@ -88,8 +94,8 @@ object HttpClientBuilderFactory {
60L,
TimeUnit.SECONDS,
SynchronousQueue(),
threadFactory("OkHttp Dispatcher", false)
)
threadFactory("OkHttp Dispatcher", false),
),
)
dispatcher(Dispatcher(traceableExecutorService))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.artifact.util.okhttp

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.artifact.stream.closeQuietly
import java.io.InputStream
import java.io.OutputStream
import java.net.SocketAddress
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.function.BiFunction
import javax.net.ssl.HandshakeCompletedListener
import javax.net.ssl.SSLParameters
import javax.net.ssl.SSLSession
import javax.net.ssl.SSLSocket
import kotlin.system.measureNanoTime
import org.slf4j.LoggerFactory
import sun.security.ssl.SSLSocketImpl

/**
* 不安全的ssl socket。重写了SSLSocketImpl的close方法,允许强势关闭连接。
* 但是因此,可能会破坏tls的正确关闭。
* */
class UnsafeSSLSocketImpl(private val delegate: SSLSocket, private val closeTimeout: Long) : SSLSocket() {

init {
require(closeTimeout >= 0)
}

private val closeLock = Any()

override fun getSupportedCipherSuites(): Array<String> {
return delegate.supportedCipherSuites
}

override fun getEnabledCipherSuites(): Array<String> {
return delegate.enabledCipherSuites
}

override fun setEnabledCipherSuites(suites: Array<out String>?) {
delegate.enabledCipherSuites = suites
}

override fun getSupportedProtocols(): Array<String> {
return delegate.supportedProtocols
}

override fun getEnabledProtocols(): Array<String> {
return delegate.enabledProtocols
}

override fun setEnabledProtocols(protocols: Array<out String>?) {
delegate.enabledProtocols = protocols
}

override fun getSession(): SSLSession {
return delegate.session
}

override fun addHandshakeCompletedListener(listener: HandshakeCompletedListener?) {
delegate.addHandshakeCompletedListener(listener)
}

override fun removeHandshakeCompletedListener(listener: HandshakeCompletedListener?) {
delegate.removeHandshakeCompletedListener(listener)
}

override fun startHandshake() {
delegate.startHandshake()
}

override fun setUseClientMode(mode: Boolean) {
delegate.useClientMode = mode
}

override fun getUseClientMode(): Boolean {
return delegate.useClientMode
}

override fun setNeedClientAuth(need: Boolean) {
delegate.needClientAuth = need
}

override fun getNeedClientAuth(): Boolean {
return delegate.needClientAuth
}

override fun setWantClientAuth(want: Boolean) {
delegate.wantClientAuth = want
}

override fun getWantClientAuth(): Boolean {
return delegate.wantClientAuth
}

override fun setEnableSessionCreation(flag: Boolean) {
delegate.enableSessionCreation = flag
}

override fun getEnableSessionCreation(): Boolean {
return delegate.enableSessionCreation
}

override fun connect(endpoint: SocketAddress?, timeout: Int) {
delegate.connect(endpoint, timeout)
}

override fun isClosed(): Boolean {
return delegate.isClosed
}

override fun shutdownInput() {
delegate.shutdownInput()
}

override fun isInputShutdown(): Boolean {
return delegate.isInputShutdown
}

override fun shutdownOutput() {
delegate.shutdownOutput()
}

override fun isOutputShutdown(): Boolean {
return delegate.isOutputShutdown
}

override fun getInputStream(): InputStream {
return delegate.inputStream
}

override fun getOutputStream(): OutputStream {
return delegate.outputStream
}

override fun getSSLParameters(): SSLParameters {
return delegate.sslParameters
}

override fun setSSLParameters(params: SSLParameters?) {
delegate.sslParameters = params
}

override fun getApplicationProtocol(): String {
return delegate.applicationProtocol
}

override fun setHandshakeApplicationProtocolSelector(
selector: BiFunction<
SSLSocket,
MutableList<String>, String,
>?,
) {
delegate.handshakeApplicationProtocolSelector = selector
}

override fun getHandshakeApplicationProtocolSelector(): BiFunction<SSLSocket, MutableList<String>, String> {
return delegate.getHandshakeApplicationProtocolSelector()
}

override fun getHandshakeApplicationProtocol(): String {
return delegate.handshakeApplicationProtocol
}

/**
* ssl socket在write阻塞时,close也会阻塞。如果close长时间阻塞,则影响上层业务。
* */
override fun close() {
if (logger.isDebugEnabled) {
logger.debug("Close socket $delegate")
}
if (delegate.isClosed) {
return
}
synchronized(closeLock) {
if (delegate.isClosed) {
return
}
if (closeTimeout == 0L) {
delegate.close()
return
}
val timeoutFuture = threadPool.schedule({ closeImmediately() }, closeTimeout, TimeUnit.SECONDS)
val closeTime = measureNanoTime { delegate.closeQuietly() }
if (closeTime < Duration.ofSeconds(closeTimeout).toNanos()) {
timeoutFuture.cancel(false)
}
}
}

/**
* 立即关闭ssl socket,可能会破坏tls的正确关闭
* */
@Synchronized
fun closeImmediately() {
try {
val closeSocketMethod = SSLSocketImpl::class.java.getDeclaredMethod("closeSocket", Boolean::class.java)
closeSocketMethod.isAccessible = true
if (!delegate.isClosed) {
closeSocketMethod.invoke(delegate, false)
val tlsIsClosedField = SSLSocketImpl::class.java.getDeclaredField("tlsIsClosed")
tlsIsClosedField.isAccessible = true
tlsIsClosedField.set(delegate, true)
logger.info("Success close socket $delegate")
}
} catch (e: Exception) {
logger.warn("Unable close socket $delegate", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(UnsafeSSLSocketImpl::class.java)
private val defaultFactory =
ThreadFactoryBuilder().setNameFormat("unsafe-sslSocket-watchDog-%d").setDaemon(true).build()
private val threadPool = Executors.newSingleThreadScheduledExecutor(defaultFactory)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.artifact.util.okhttp

import java.net.InetAddress
import java.net.Socket
import javax.net.ssl.SSLSocket
import javax.net.ssl.SSLSocketFactory

/**
* 创建不安全的ssl socket。主要是处理希望可以快速关闭连接的情况。
* */
class UnsafeSslSocketFactoryImpl(private val delegate: SSLSocketFactory, private val closeTimeout: Long) :
SSLSocketFactory() {
init {
require(closeTimeout >= 0)
}

override fun createSocket(s: Socket, host: String, port: Int, autoClose: Boolean): Socket {
return (delegate.createSocket(s, host, port, autoClose) as SSLSocket).unsafe()
}

override fun createSocket(host: String, port: Int): Socket {
return (delegate.createSocket(host, port) as SSLSocket).unsafe()
}

override fun createSocket(host: String, port: Int, localHost: InetAddress?, localPort: Int): Socket {
return (delegate.createSocket(host, port, localHost, localPort) as SSLSocket).unsafe()
}

override fun createSocket(host: InetAddress, port: Int): Socket {
return (delegate.createSocket(host, port) as SSLSocket).unsafe()
}

override fun createSocket(address: InetAddress, port: Int, localAddress: InetAddress, localPort: Int): Socket {
return (delegate.createSocket(address, port, localAddress, localPort) as SSLSocket).unsafe()
}

override fun getDefaultCipherSuites(): Array<String> {
return delegate.defaultCipherSuites
}

override fun getSupportedCipherSuites(): Array<String> {
return delegate.supportedCipherSuites
}

private fun SSLSocket.unsafe() = UnsafeSSLSocketImpl(this, closeTimeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ object OkHttpClientPool {
clusterInfo: ClusterInfo,
readTimeout: Duration,
writeTimeout: Duration,
vararg interceptors: Interceptor
closeTimeout: Duration = Duration.ZERO,
vararg interceptors: Interceptor,
): OkHttpClient {
return clientCache.getOrPut(clusterInfo) {
val builder = HttpClientBuilderFactory.create(clusterInfo.certificate)
.protocols(listOf(Protocol.HTTP_1_1))
val builder = HttpClientBuilderFactory.create(
clusterInfo.certificate,
closeTimeout = closeTimeout.seconds,
).protocols(listOf(Protocol.HTTP_1_1))
.readTimeout(readTimeout)
.writeTimeout(writeTimeout)
interceptors.forEach {
builder.addInterceptor(
it
it,
)
}
builder.addNetworkInterceptor(ProgressInterceptor())
Expand Down
Loading

0 comments on commit 58fa953

Please sign in to comment.