Skip to content

Commit

Permalink
feat: 代码调整#1539
Browse files Browse the repository at this point in the history
  • Loading branch information
zacYL committed Aug 5, 2024
1 parent f9076b8 commit 9ff12f9
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ArtifactDataReceiver(
private val filename: String = generateRandomName(),
private val randomPath: Boolean = false,
private val originPath: Path = path,
private val requestLimitCheckService: RequestLimitCheckService
private val requestLimitCheckService: RequestLimitCheckService? = null
) : StorageHealthMonitor.Observer, AutoCloseable {

/**
Expand Down Expand Up @@ -190,7 +190,7 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService.uploadBandwidthCheck(
requestLimitCheckService?.uploadBandwidthCheck(
HttpContextHolder.getRequest(), length.toLong()
)
writeData(chunk, offset, length)
Expand All @@ -209,7 +209,7 @@ class ArtifactDataReceiver(
startTime = System.nanoTime()
}
try {
requestLimitCheckService.uploadBandwidthCheck(
requestLimitCheckService?.uploadBandwidthCheck(
HttpContextHolder.getRequest(), 1
)
checkFallback()
Expand All @@ -233,7 +233,7 @@ class ArtifactDataReceiver(
}
try {

val input = requestLimitCheckService.bandwidthCheck(
val input = requestLimitCheckService?.bandwidthCheck(
HttpContextHolder.getRequest(), source
) ?: source.rateLimit(receiveProperties.rateLimit.toBytes())
val buffer = ByteArray(bufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ class RateLimiterAutoConfiguration {
)
}

@Bean
fun requestLimitCheckService(
rateLimiterProperties: RateLimiterProperties,
): RequestLimitCheckService {
return RequestLimitCheckService(rateLimiterProperties)
}

@Bean
@ConditionalOnWebApplication
fun rateLimitHandlerInterceptorRegister(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ data class RateLimiterProperties(
// 限流配置
var rules: List<ResourceLimit> = mutableListOf(),
// 等待时间,单位毫秒
var sleepTime: Long = 10
var sleepTime: Long = 10,
// 重试次数
var retryNum: Int = 10
)
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class MonitorRateLimiterInterceptorAdaptor(
resource: String, resourceLimit: ResourceLimit?,
result: Boolean, e: Exception?, applyPermits: Long
) {
if (resourceLimit == null) return
val startNano = startTime.get()
startTime.remove()
val duration = Duration.ofNanos(System.nanoTime() - startNano)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import org.springframework.beans.factory.annotation.Value
class TargetRateLimiterInterceptorAdaptor : RateLimiterInterceptorAdapter() {

@Value("\${spring.cloud.client.ip-address}")
private lateinit var host: String
private var host: String = "127.0.0.1"

override fun beforeLimitCheck(resource: String, resourceLimit: ResourceLimit) {
if (!resourceLimit.targets.contains(host)) {
if (resourceLimit.targets.isNotEmpty() && !resourceLimit.targets.contains(host)) {
throw InvalidResourceException("targets not contain $host")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,23 @@ abstract class AbstractBandwidthRateLimiterService(
val resLimitInfo = getBandwidthRateLimit(request) ?: return null
val rateLimiter = getAlgorithmOfRateLimiter(resLimitInfo.resource, resLimitInfo.resourceLimit)
return CommonRateLimitInputStream(
inputStream, rateLimiter, rateLimiterProperties.sleepTime, rateLimiterProperties.dryRun
inputStream, rateLimiter, rateLimiterProperties.sleepTime,
rateLimiterProperties.retryNum, rateLimiterProperties.dryRun
)
}

fun bandwidthRateLimit(request: HttpServletRequest, permits: Long) {
val resLimitInfo = getBandwidthRateLimit(request) ?: return
interceptorChain.doBeforeLimitCheck(resLimitInfo.resource, resLimitInfo.resourceLimit)
val rateLimiter = getAlgorithmOfRateLimiter(resLimitInfo.resource, resLimitInfo.resourceLimit)
var flag = false
var exception: Exception? = null
var retryNum = 0
try {
while (!flag) {
flag = rateLimiter.tryAcquire(permits)
if (!flag) {
if (!flag && retryNum < rateLimiterProperties.retryNum) {
retryNum++
Thread.sleep(rateLimiterProperties.sleepTime)
}
}
Expand All @@ -84,8 +89,11 @@ abstract class AbstractBandwidthRateLimiterService(
logger.warn("${request.requestURI} has exceeded max rate limit: ${resLimitInfo.resourceLimit}")
return
} else {
exception = e
throw e
}
} finally {
interceptorChain.doAfterLimitCheck(resLimitInfo.resource, resLimitInfo.resourceLimit, flag, exception, permits)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class AbstractRateLimiterService(
// 资源对应限限流算法缓存
private var rateLimiterCache: ConcurrentHashMap<String, RateLimiter> = ConcurrentHashMap(256)

private val interceptorChain: RateLimiterInterceptorChain =
val interceptorChain: RateLimiterInterceptorChain =
RateLimiterInterceptorChain(mutableListOf(
MonitorRateLimiterInterceptorAdaptor(rateLimiterMetrics),
TargetRateLimiterInterceptorAdaptor()
Expand Down Expand Up @@ -140,6 +140,7 @@ abstract class AbstractRateLimiterService(
exception = e
throw e
} catch (e: InvalidResourceException) {
logger.warn("$resourceLimit is invalid")
exception = e
throw e
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,53 @@ import com.tencent.bkrepo.common.ratelimiter.service.usage.UploadUsageRateLimite
import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserDownloadUsageRateLimiterService
import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserUploadUsageRateLimiterService
import com.tencent.bkrepo.common.ratelimiter.stream.CommonRateLimitInputStream
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import java.io.InputStream
import javax.servlet.http.HttpServletRequest

@Component
class RequestLimitCheckService(
private val rateLimiterProperties: RateLimiterProperties,
) {

@Autowired
@Qualifier(RateLimiterAutoConfiguration.URL_RATELIMITER_SERVICE)
private val urlRateLimiterService: UrlRateLimiterService,
private lateinit var urlRateLimiterService: UrlRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.UPLOAD_USAGE_RATELIMITER_SERVICE)
private val uploadUsageRateLimiterService: UploadUsageRateLimiterService,
private lateinit var uploadUsageRateLimiterService: UploadUsageRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.USER_URL_RATELIMITER_SERVICE)
private val userUrlRateLimiterService: UserUrlRateLimiterService,
private lateinit var userUrlRateLimiterService: UserUrlRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.USER_UPLOAD_USAGE_RATELIMITER_SERVICE)
private val userUploadUsageRateLimiterService: UserUploadUsageRateLimiterService,
private lateinit var userUploadUsageRateLimiterService: UserUploadUsageRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.DOWNLOAD_USAGE_RATELIMITER_SERVICE)
private val downloadUsageRateLimiterService: DownloadUsageRateLimiterService,
private lateinit var downloadUsageRateLimiterService: DownloadUsageRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.USER_DOWNLOAD_USAGE_RATELIMITER_SERVICE)
private val userDownloadUsageRateLimiterService: UserDownloadUsageRateLimiterService,
private lateinit var userDownloadUsageRateLimiterService: UserDownloadUsageRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.DOWNLOAD_BANDWIDTH_RATELIMITER_SERVICE)
private val downloadBandwidthRateLimiterService: DownloadBandwidthRateLimiterService,
private lateinit var downloadBandwidthRateLimiterService: DownloadBandwidthRateLimiterService

@Autowired
@Qualifier(RateLimiterAutoConfiguration.UPLOAD_BANDWIDTH_RATELIMITER_ERVICE)
private val uploadBandwidthRateLimiterService: UploadBandwidthRateLimiterService,
) {
private lateinit var uploadBandwidthRateLimiterService: UploadBandwidthRateLimiterService

fun preLimitCheck(request: HttpServletRequest) {
if (!rateLimiterProperties.enabled) {
return
}
// TODO 可以优化
// TODO 不能全部遍历, 只有有的才查询
userUrlRateLimiterService.limit(request)
userUploadUsageRateLimiterService.limit(request)
urlRateLimiterService.limit(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ open class UserUploadUsageRateLimiterService(
val userId = HttpContextHolder.getRequestOrNull()?.getAttribute(USER_KEY) as? String ?: ANONYMOUS_USER
val result = mutableListOf<String>()
result.add("$userId:/$projectId/")
result.add(userId)
result.add("$userId:")
return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CommonRateLimitInputStream(
delegate: InputStream,
private val rateLimiter: RateLimiter,
private val sleepTime: Long,
private val retryNum: Int,
private val dryRun: Boolean = false
) : DelegateInputStream(delegate) {

Expand All @@ -56,11 +57,13 @@ class CommonRateLimitInputStream(

private fun acquire(permits: Int) {
var flag = false
var failedNum = 0
try {
while (!flag) {
// TODO 当限制小于读取大小时,会进入死循环
flag = rateLimiter.tryAcquire(permits.toLong())
if (!flag) {
if (!flag && failedNum < retryNum) {
failedNum++
Thread.sleep(sleepTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@

package com.tencent.bkrepo.common.ratelimiter.interceptor

import com.tencent.bkrepo.common.ratelimiter.enums.Algorithms
import com.tencent.bkrepo.common.ratelimiter.enums.LimitDimension
import com.tencent.bkrepo.common.ratelimiter.enums.WorkScope
import com.tencent.bkrepo.common.ratelimiter.exception.InvalidResourceException
import com.tencent.bkrepo.common.ratelimiter.rule.common.ResourceLimit
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.util.concurrent.TimeUnit


class RateLimiterInterceptorChainTest {

open class InterceptorA : RateLimiterInterceptor {

override fun beforeLimitCheck(resource: String) {
override fun beforeLimitCheck(resource: String, resourceLimit: ResourceLimit) {
list.add(identity() + ":before")
}

Expand Down Expand Up @@ -78,13 +84,33 @@ class RateLimiterInterceptorChainTest {
chain.addInterceptor(InterceptorA())
chain.addInterceptor(InterceptorB())
chain.addInterceptor(InterceptorC())
chain.doBeforeLimitCheck("test1")
val resourceLimit = ResourceLimit(
algo = Algorithms.FIXED_WINDOW.name, resource = "/project1/",
limitDimension = LimitDimension.URL.name, limit = 52428800,
unit = TimeUnit.SECONDS.name, scope = WorkScope.LOCAL.name
)
chain.doBeforeLimitCheck("test1", resourceLimit)
assertEquals(Companion.list.size, 3)
assertEquals(Companion.list[0], "InterceptorA:before")
assertEquals(Companion.list[1], "InterceptorB:before")
assertEquals(Companion.list[2], "InterceptorC:before")
}

@Test
fun testTargetDoBeforeLimit() {
val chain = RateLimiterInterceptorChain()
chain.addInterceptor(TargetRateLimiterInterceptorAdaptor())
val resourceLimit = ResourceLimit(
algo = Algorithms.FIXED_WINDOW.name, resource = "/project1/",
limitDimension = LimitDimension.URL.name, limit = 52428800,
unit = TimeUnit.SECONDS.name, scope = WorkScope.LOCAL.name,
targets = listOf("127.0.0.2")
)
assertThrows<InvalidResourceException> {
chain.doBeforeLimitCheck("test1", resourceLimit)
}
}

@Test
fun testDoAfterLimit() {
val chain = RateLimiterInterceptorChain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ import com.tencent.bkrepo.common.artifact.resolve.response.AbstractArtifactResou
import com.tencent.bkrepo.common.artifact.resolve.response.ArtifactResource
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService
import com.tencent.bkrepo.common.ratelimiter.service.bandwidth.DownloadBandwidthRateLimiterService
import com.tencent.bkrepo.common.ratelimiter.service.usage.DownloadUsageRateLimiterService
import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserDownloadUsageRateLimiterService
import com.tencent.bkrepo.common.service.util.HttpContextHolder
import com.tencent.bkrepo.common.storage.core.StorageProperties
import com.tencent.bkrepo.common.storage.monitor.Throughput
Expand All @@ -55,10 +52,10 @@ import javax.servlet.http.HttpServletResponse
/**
* S3协议的响应输出
*/
class S3ArtifactResourceWriter (
class S3ArtifactResourceWriter(
storageProperties: StorageProperties,
requestLimitCheckService: RequestLimitCheckService
) : AbstractArtifactResourceHandler(
) : AbstractArtifactResourceHandler(
storageProperties, requestLimitCheckService
) {

Expand Down

0 comments on commit 9ff12f9

Please sign in to comment.