Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 添加限流模块 #1539 #1829

Merged
merged 68 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
7b4adf2
feat: 增加限流功能#1539
zacYL Aug 2, 2024
c804903
feat: 添加执行机器判断#1539
zacYL Aug 5, 2024
f9076b8
Merge remote-tracking branch 'github/master' into issue_1539
zacYL Aug 5, 2024
9ff12f9
feat: 代码调整#1539
zacYL Aug 5, 2024
9515001
feat: 代码调整#1539
zacYL Aug 5, 2024
c0449e7
feat: 针对带宽限流调整#1539
zacYL Aug 7, 2024
5f3b4bf
feat: 只针对上传流量进行判断#1539
zacYL Aug 8, 2024
6303a22
feat: 添加测试代码#1539
zacYL Aug 13, 2024
38a8abd
feat: 添加限流相关测试用例#1539
zacYL Aug 16, 2024
d5368ad
feat: 测试代码调整#1539
zacYL Aug 16, 2024
f0a8ca0
feat: 代码调整#1539
zacYL Aug 19, 2024
bb53c99
Merge remote-tracking branch 'github/master' into issue_1539
zacYL Aug 19, 2024
c6d1d41
feat: 上传限流调整#1539
zacYL Aug 20, 2024
f554afa
feat: 上传下载限流异常捕获#1539
zacYL Aug 22, 2024
9cb55d4
feat: 测试代码调整#1539
zacYL Aug 22, 2024
a52c199
feat: 测试代码调整#1539
zacYL Aug 22, 2024
2a311aa
feat: 配置参数调整#1539
zacYL Aug 22, 2024
aaf4d56
feat: 由于redis expire只支持秒为单位,所以周期最小单位为秒#1539
zacYL Aug 23, 2024
88b1de8
feat: contentLengthLong为-1的情况特殊处理#1539
zacYL Aug 27, 2024
a845b30
feat: 去掉多余引用#1539
zacYL Aug 27, 2024
d09a21f
feat: 测试用例调整#1539
zacYL Aug 29, 2024
1669f31
feat: 时间参数从redisserver读取#1539
zacYL Sep 6, 2024
e6d6cdd
feat: embeddedredis版本调整#1539
zacYL Sep 6, 2024
e717605
feat: 测试用例调整#1539
zacYL Sep 6, 2024
51a8f45
feat: 支持对url进行仓库维度限频#1539
zacYL Oct 10, 2024
47f2d22
Merge remote-tracking branch 'origin/master' into issue_1539
zacYL Oct 10, 2024
6040eaa
feat: 代码调整#1539
zacYL Oct 10, 2024
206442f
Merge remote-tracking branch 'origin/master' into issue_1539
zacYL Oct 22, 2024
0085706
Merge pull request #18 from zacYL/issue_1539
lannoy0523 Oct 22, 2024
db29091
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 21, 2024
53e0be4
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 23, 2024
e346f57
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 23, 2024
760830e
Merge pull request #8 from lannoy0523/issue_2655
zacYL Oct 23, 2024
38f64d0
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 23, 2024
c10696c
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 24, 2024
c9ef7a3
feat: 代码回滚,因redis版本问题,获取统一时间逻辑调整#1539
zacYL Oct 24, 2024
6fcb0ea
feat: 代码调整#1539
zacYL Oct 24, 2024
3bff4fd
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 24, 2024
46ee8da
feat: 异常显示调整#1539
zacYL Oct 24, 2024
bea9449
Merge pull request #9 from lannoy0523/issue_2655
zacYL Oct 24, 2024
200e0da
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 24, 2024
93db9ee
Merge pull request #19 from zacYL/issue_1539
lannoy0523 Oct 24, 2024
baf4e91
Merge pull request #10 from lannoy0523/issue_2655
zacYL Oct 24, 2024
0cda07f
feat: 正则匹配规则修改#1539
zacYL Oct 24, 2024
29b6c82
feat: 正则匹配规则修改#1539
zacYL Oct 24, 2024
4772e4e
feat: 下载容量限制添加#1539
zacYL Oct 25, 2024
5b5a6f5
Merge remote-tracking branch 'origin/master' into issue_1539
zacYL Oct 25, 2024
bbeea1a
Merge pull request #21 from zacYL/issue_1539
lannoy0523 Oct 25, 2024
b8a2725
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 25, 2024
3eeb1be
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 25, 2024
ca2ab03
feat: 代码调整#1539
zacYL Oct 25, 2024
5cbd93c
feat: 滑动窗口脚本调整#1539
zacYL Oct 28, 2024
afa0a00
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 28, 2024
77eb32e
Merge pull request #14 from lannoy0523/issue_2655
zacYL Oct 28, 2024
deab781
feat: targets过滤调整#1539
zacYL Oct 28, 2024
f588d4c
Merge branch 'issue_1539' of https://github.com/zacYL/bk-repo into is…
zacYL Oct 28, 2024
f8c5989
feat: 代码调整#1539
zacYL Oct 28, 2024
f8575f4
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 28, 2024
46f5522
feat: 读取项目仓库信息增加异常捕获#1539
zacYL Oct 28, 2024
a43c822
feat: 读取项目仓库信息增加异常捕获#1539
zacYL Oct 28, 2024
8e07e93
feat: 使用globalexceptionhandler处理#1539
zacYL Oct 28, 2024
a5e8724
feat: targets校验调整#1539
zacYL Oct 28, 2024
556139d
feat: 测试代码调整#1539
zacYL Oct 28, 2024
e0619f3
feat: 后台增加限流配置页面 #2655
lannoy0523 Oct 29, 2024
3334186
Merge pull request #15 from lannoy0523/issue_2655
zacYL Oct 30, 2024
4edfd18
feat: 代码调整#1539
zacYL Nov 1, 2024
1e8e153
feat: 代码调整#1539
zacYL Nov 4, 2024
1d0fbe2
feat: 代码调整#1539
zacYL Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.api.exception

import com.tencent.bkrepo.common.api.constant.HttpStatus
import com.tencent.bkrepo.common.api.message.CommonMessageCode

/**
* 超过限流配置异常
*/
class OverloadException(
val resource: String
) : ErrorCodeException(CommonMessageCode.RATE_LIMITER_OVERLOAD, resource, status = HttpStatus.TOO_MANY_REQUESTS)
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ enum class CommonMessageCode(private val key: String) : MessageCode {
MEDIA_TYPE_UNACCEPTABLE("system.media-type.unacceptable"),
TOO_MANY_REQUESTS("too.many.requests"),
PIPELINE_NOT_RUNNING("pipeline.not-running"),
INVALID_CONFIG("system.config.invalid"),
ACQUIRE_LOCK_FAILED("acquire.lock.failed"),
RATE_LIMITER_OVERLOAD("rate.limiter.overload")
;

override fun getBusinessCode() = ordinal + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=Cross location operation is not allowed
system.media-type.unacceptable=Unacceptable Media Type
too.many.requests=Too Many Requests: {0}
pipeline.not-running=Pipeline[{0}] is not running status
system.config.invalid=Config [{0}] is invalid
acquire.lock.failed=acquire lock failed:[{0}]
rate.limiter.overload=resource requests reached rate limit:[{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=不允许跨地点操作
system.media-type.unacceptable=不接受的Media Type
too.many.requests=请求过多: {0}
pipeline.not-running=流水线[{0}]不是运行状态
system.config.invalid=配置[{0}]无效
acquire.lock.failed=获取锁失败: [{0}]
rate.limiter.overload=资源请求量超过限流值: [{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ operation.cross-cluster.not-allowed=不允許跨地點操作
system.media-type.unacceptable=不接受的Media Type
too.many.requests=請求過多: {0}
pipeline.not-running=流水線[{0}]不是運行狀態
system.config.invalid=配置[{0}]無效
acquire.lock.failed=獲取鎖失敗: [{0}]
rate.limiter.overload=資源請求量超過限流值: [{0}]
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
api(project(":common:common-security"))
api(project(":common:common-artifact:artifact-api"))
api(project(":common:common-storage:storage-service"))
api(project(":common:common-ratelimiter"))
api(project(":common:common-stream"))
api(project(":common:common-metrics-push"))
api(project(":common:common-metadata:metadata-service"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import com.tencent.bkrepo.common.artifact.resolve.path.ResolverMap
import com.tencent.bkrepo.common.artifact.resolve.response.ArtifactResourceWriter
import com.tencent.bkrepo.common.artifact.resolve.response.DefaultArtifactResourceWriter
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand Down Expand Up @@ -84,8 +85,14 @@ class ArtifactResolverConfiguration {

@Bean
@ConditionalOnMissingBean(ArtifactResourceWriter::class)
fun artifactResourceWriter(storageProperties: StorageProperties): ArtifactResourceWriter {
return DefaultArtifactResourceWriter(storageProperties)
fun artifactResourceWriter(
storageProperties: StorageProperties,
requestLimitCheckService: RequestLimitCheckService
): ArtifactResourceWriter {
return DefaultArtifactResourceWriter(
storageProperties,
requestLimitCheckService
)
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@
package com.tencent.bkrepo.common.artifact.resolve.file

import com.tencent.bkrepo.common.api.constant.retry
import com.tencent.bkrepo.common.api.exception.OverloadException
import com.tencent.bkrepo.common.artifact.exception.ArtifactReceiveException
import com.tencent.bkrepo.common.artifact.hash.sha256
import com.tencent.bkrepo.common.artifact.metrics.ArtifactMetrics
import com.tencent.bkrepo.common.artifact.metrics.TrafficHandler
import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener
import com.tencent.bkrepo.common.artifact.stream.rateLimit
import com.tencent.bkrepo.common.artifact.util.http.IOExceptionUtils
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.ratelimiter.stream.CommonRateLimitInputStream
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.core.locator.HashFileLocator
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.createFile
import com.tencent.bkrepo.common.storage.util.delete
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
Expand All @@ -57,6 +59,7 @@ import java.security.SecureRandom
import java.time.Duration
import kotlin.math.abs
import kotlin.system.measureTimeMillis
import org.slf4j.LoggerFactory

/**
* artifact数据接收类,作用:
Expand All @@ -75,6 +78,8 @@ class ArtifactDataReceiver(
private val filename: String = generateRandomName(),
private val randomPath: Boolean = false,
private val originPath: Path = path,
private val requestLimitCheckService: RequestLimitCheckService? = null,
private val contentLength: Long? = null,
) : StorageHealthMonitor.Observer, AutoCloseable {

/**
Expand Down Expand Up @@ -187,9 +192,15 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService?.uploadBandwidthCheck(
zacYL marked this conversation as resolved.
Show resolved Hide resolved
length.toLong(),
receiveProperties.circuitBreakerThreshold
)
writeData(chunk, offset, length)
} catch (exception: IOException) {
handleIOException(exception)
} catch (overloadEx: OverloadException) {
handleOverloadException(overloadEx)
}
}

Expand All @@ -203,13 +214,18 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService?.uploadBandwidthCheck(
1, receiveProperties.circuitBreakerThreshold
)
checkFallback()
outputStream.write(b)
listener.data(b)
received += 1
checkThreshold()
} catch (exception: IOException) {
handleIOException(exception)
} catch (overloadEx: OverloadException) {
handleOverloadException(overloadEx)
}
}

Expand All @@ -222,8 +238,13 @@ class ArtifactDataReceiver(
if (startTime == 0L) {
startTime = System.nanoTime()
}
var rateLimitFlag = false
var exp: Exception? = null
try {
val input = source.rateLimit(receiveProperties.rateLimit.toBytes())
val input = requestLimitCheckService?.bandwidthCheck(
source, receiveProperties.circuitBreakerThreshold, contentLength
) ?: source.rateLimit(receiveProperties.rateLimit.toBytes())
rateLimitFlag = input is CommonRateLimitInputStream
val buffer = ByteArray(bufferSize)
input.use {
var bytes = input.read(buffer)
Expand All @@ -233,7 +254,15 @@ class ArtifactDataReceiver(
}
}
} catch (exception: IOException) {
exp = exception
handleIOException(exception)
} catch (overloadEx: OverloadException) {
exp = overloadEx
handleOverloadException(overloadEx)
} finally {
if (rateLimitFlag) {
requestLimitCheckService?.bandwidthFinish(exp)
}
}
}

Expand Down Expand Up @@ -322,16 +351,28 @@ class ArtifactDataReceiver(
* 处理IO异常
*/
private fun handleIOException(exception: IOException) {
finished = true
endTime = System.nanoTime()
close()
finishWithException()
if (IOExceptionUtils.isClientBroken(exception)) {
throw ArtifactReceiveException(exception.message.orEmpty())
} else {
throw exception
}
}

/**
* 处理限流请求
*/
private fun handleOverloadException(exception: OverloadException) {
finishWithException()
throw exception
}

private fun finishWithException() {
finished = true
endTime = System.nanoTime()
close()
}

/**
* 检查是否需要fall back操作
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.tencent.bkrepo.common.artifact.resolve.file.multipart.MultipartArtifa
import com.tencent.bkrepo.common.artifact.resolve.file.stream.StreamArtifactFile
import com.tencent.bkrepo.common.bksync.BlockChannel
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitorHelper
Expand All @@ -54,17 +55,20 @@ import java.io.InputStream
class ArtifactFileFactory(
storageProperties: StorageProperties,
storageHealthMonitorHelper: StorageHealthMonitorHelper,
private val limitCheckService: RequestLimitCheckService
) {

init {
monitorHelper = storageHealthMonitorHelper
properties = storageProperties
requestLimitCheckService = limitCheckService
}

companion object {

private lateinit var monitorHelper: StorageHealthMonitorHelper
private lateinit var properties: StorageProperties
private lateinit var requestLimitCheckService: RequestLimitCheckService

const val ARTIFACT_FILES = "artifact.files"

Expand All @@ -89,34 +93,49 @@ class ArtifactFileFactory(
* 构造分块接收数据的artifact file
*/
fun buildChunked(): ChunkedArtifactFile {
return ChunkedArtifactFile(getMonitor(), properties, getStorageCredentials()).apply {
return ChunkedArtifactFile(
getMonitor(), properties, getStorageCredentials(),
).apply {
track(this)
}
}

fun buildChunked(storageCredentials: StorageCredentials): ChunkedArtifactFile {
return ChunkedArtifactFile(getMonitor(storageCredentials), properties, storageCredentials).apply {
return ChunkedArtifactFile(
getMonitor(storageCredentials), properties, storageCredentials,
).apply {
track(this)
}
}

fun buildDfsArtifactFile(): RandomAccessArtifactFile {
return RandomAccessArtifactFile(getMonitor(), getStorageCredentials(), properties).apply {
return RandomAccessArtifactFile(
getMonitor(), getStorageCredentials(), properties,
).apply {
track(this)
}
}

/**
* 通过输入流构造artifact file, 主要针对上传请求对其做限流操作
* @param inputStream 输入流
*/
fun buildWithRateLimiter(inputStream: InputStream, contentLength: Long? = null): ArtifactFile {
return StreamArtifactFile(
inputStream, getMonitor(), properties, getStorageCredentials(), contentLength,
requestLimitCheckService = requestLimitCheckService
).apply {
track(this)
}
}

/**
* 通过输入流构造artifact file
* 通过输入流构造artifact file,服务内部输入流转换成文件使用
* @param inputStream 输入流
*/
fun build(inputStream: InputStream, contentLength: Long? = null): ArtifactFile {
return StreamArtifactFile(
inputStream,
getMonitor(),
properties,
getStorageCredentials(),
contentLength,
inputStream, getMonitor(), properties, getStorageCredentials(), contentLength
).apply {
track(this)
}
Expand All @@ -137,10 +156,8 @@ class ArtifactFileFactory(
*/
fun build(multipartFile: MultipartFile, storageCredentials: StorageCredentials): ArtifactFile {
return MultipartArtifactFile(
multipartFile,
getMonitor(storageCredentials),
properties,
storageCredentials,
multipartFile, getMonitor(storageCredentials), properties, storageCredentials,
requestLimitCheckService = requestLimitCheckService
).apply {
track(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.NoSuchFileException
class RandomAccessArtifactFile(
private val monitor: StorageHealthMonitor,
private val storageCredentials: StorageCredentials,
storageProperties: StorageProperties
storageProperties: StorageProperties,
) : ArtifactFile {

/**
Expand All @@ -43,7 +43,9 @@ class RandomAccessArtifactFile(

init {
val path = storageCredentials.upload.location.toPath()
receiver = ArtifactDataReceiver(storageProperties.receive, storageProperties.monitor, path)
receiver = ArtifactDataReceiver(
storageProperties.receive, storageProperties.monitor, path,
)
monitor.add(receiver)
if (!monitor.healthy.get()) {
receiver.unhealthy(monitor.getFallbackPath(), monitor.fallBackReason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.bkrepo.common.artifact.resolve.file.multipart

import com.tencent.bkrepo.common.artifact.resolve.file.stream.StreamArtifactFile
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import org.springframework.web.multipart.MultipartFile
Expand All @@ -37,9 +38,11 @@ class MultipartArtifactFile(
private val multipartFile: MultipartFile,
monitor: StorageHealthMonitor,
storageProperties: StorageProperties,
storageCredentials: StorageCredentials
storageCredentials: StorageCredentials,
requestLimitCheckService: RequestLimitCheckService
) : StreamArtifactFile(
multipartFile.inputStream, monitor, storageProperties, storageCredentials, multipartFile.size
multipartFile.inputStream, monitor, storageProperties, storageCredentials, multipartFile.size,
requestLimitCheckService
) {
fun getOriginalFilename() = multipartFile.originalFilename.orEmpty()
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ class ArtifactFileMethodArgumentResolver : HandlerMethodArgumentResolver {
}

private fun resolveOctetStream(request: HttpServletRequest): ArtifactFile {
return ArtifactFileFactory.build(request.inputStream, request.contentLengthLong)
return ArtifactFileFactory.buildWithRateLimiter(request.inputStream, request.contentLengthLong)
}
}
Loading
Loading