diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/resolve/file/ArtifactDataReceiver.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/resolve/file/ArtifactDataReceiver.kt index 10955f6718..f2e0f7c83e 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/resolve/file/ArtifactDataReceiver.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/resolve/file/ArtifactDataReceiver.kt @@ -77,7 +77,7 @@ class ArtifactDataReceiver( private val filename: String = generateRandomName(), private val randomPath: Boolean = false, private val originPath: Path = path, - private val requestLimitCheckService: RequestLimitCheckService + private val requestLimitCheckService: RequestLimitCheckService? = null ) : StorageHealthMonitor.Observer, AutoCloseable { /** @@ -190,7 +190,7 @@ class ArtifactDataReceiver( startTime = System.nanoTime() } try { - requestLimitCheckService.uploadBandwidthCheck( + requestLimitCheckService?.uploadBandwidthCheck( HttpContextHolder.getRequest(), length.toLong() ) writeData(chunk, offset, length) @@ -209,7 +209,7 @@ class ArtifactDataReceiver( startTime = System.nanoTime() } try { - requestLimitCheckService.uploadBandwidthCheck( + requestLimitCheckService?.uploadBandwidthCheck( HttpContextHolder.getRequest(), 1 ) checkFallback() @@ -233,7 +233,7 @@ class ArtifactDataReceiver( } try { - val input = requestLimitCheckService.bandwidthCheck( + val input = requestLimitCheckService?.bandwidthCheck( HttpContextHolder.getRequest(), source ) ?: source.rateLimit(receiveProperties.rateLimit.toBytes()) val buffer = ByteArray(bufferSize) diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/RateLimiterAutoConfiguration.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/RateLimiterAutoConfiguration.kt index e45f56252d..3f51f04f9e 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/RateLimiterAutoConfiguration.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/RateLimiterAutoConfiguration.kt @@ -150,6 +150,13 @@ class RateLimiterAutoConfiguration { ) } + @Bean + fun requestLimitCheckService( + rateLimiterProperties: RateLimiterProperties, + ): RequestLimitCheckService { + return RequestLimitCheckService(rateLimiterProperties) + } + @Bean @ConditionalOnWebApplication fun rateLimitHandlerInterceptorRegister( diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt index 22b35ed6bb..dab184081b 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/config/RateLimiterProperties.kt @@ -41,5 +41,7 @@ data class RateLimiterProperties( // 限流配置 var rules: List = mutableListOf(), // 等待时间,单位毫秒 - var sleepTime: Long = 10 + var sleepTime: Long = 10, + // 重试次数 + var retryNum: Int = 10 ) diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/MonitorRateLimiterInterceptorAdaptor.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/MonitorRateLimiterInterceptorAdaptor.kt index 3d98e04a7a..82ed67e83a 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/MonitorRateLimiterInterceptorAdaptor.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/MonitorRateLimiterInterceptorAdaptor.kt @@ -48,6 +48,7 @@ class MonitorRateLimiterInterceptorAdaptor( resource: String, resourceLimit: ResourceLimit?, result: Boolean, e: Exception?, applyPermits: Long ) { + if (resourceLimit == null) return val startNano = startTime.get() startTime.remove() val duration = Duration.ofNanos(System.nanoTime() - startNano) diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/TargetRateLimiterInterceptorAdaptor.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/TargetRateLimiterInterceptorAdaptor.kt index 76307994c9..4a293c0ad0 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/TargetRateLimiterInterceptorAdaptor.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/TargetRateLimiterInterceptorAdaptor.kt @@ -37,10 +37,10 @@ import org.springframework.beans.factory.annotation.Value class TargetRateLimiterInterceptorAdaptor : RateLimiterInterceptorAdapter() { @Value("\${spring.cloud.client.ip-address}") - private lateinit var host: String + private var host: String = "127.0.0.1" override fun beforeLimitCheck(resource: String, resourceLimit: ResourceLimit) { - if (!resourceLimit.targets.contains(host)) { + if (resourceLimit.targets.isNotEmpty() && !resourceLimit.targets.contains(host)) { throw InvalidResourceException("targets not contain $host") } } diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt index 2104e6a181..b08c4d9c14 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractBandwidthRateLimiterService.kt @@ -64,18 +64,23 @@ abstract class AbstractBandwidthRateLimiterService( val resLimitInfo = getBandwidthRateLimit(request) ?: return null val rateLimiter = getAlgorithmOfRateLimiter(resLimitInfo.resource, resLimitInfo.resourceLimit) return CommonRateLimitInputStream( - inputStream, rateLimiter, rateLimiterProperties.sleepTime, rateLimiterProperties.dryRun + inputStream, rateLimiter, rateLimiterProperties.sleepTime, + rateLimiterProperties.retryNum, rateLimiterProperties.dryRun ) } fun bandwidthRateLimit(request: HttpServletRequest, permits: Long) { val resLimitInfo = getBandwidthRateLimit(request) ?: return + interceptorChain.doBeforeLimitCheck(resLimitInfo.resource, resLimitInfo.resourceLimit) val rateLimiter = getAlgorithmOfRateLimiter(resLimitInfo.resource, resLimitInfo.resourceLimit) var flag = false + var exception: Exception? = null + var retryNum = 0 try { while (!flag) { flag = rateLimiter.tryAcquire(permits) - if (!flag) { + if (!flag && retryNum < rateLimiterProperties.retryNum) { + retryNum++ Thread.sleep(rateLimiterProperties.sleepTime) } } @@ -84,8 +89,11 @@ abstract class AbstractBandwidthRateLimiterService( logger.warn("${request.requestURI} has exceeded max rate limit: ${resLimitInfo.resourceLimit}") return } else { + exception = e throw e } + } finally { + interceptorChain.doAfterLimitCheck(resLimitInfo.resource, resLimitInfo.resourceLimit, flag, exception, permits) } } diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt index 8cb44a35f1..783229caea 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/AbstractRateLimiterService.kt @@ -82,7 +82,7 @@ abstract class AbstractRateLimiterService( // 资源对应限限流算法缓存 private var rateLimiterCache: ConcurrentHashMap = ConcurrentHashMap(256) - private val interceptorChain: RateLimiterInterceptorChain = + val interceptorChain: RateLimiterInterceptorChain = RateLimiterInterceptorChain(mutableListOf( MonitorRateLimiterInterceptorAdaptor(rateLimiterMetrics), TargetRateLimiterInterceptorAdaptor() @@ -140,6 +140,7 @@ abstract class AbstractRateLimiterService( exception = e throw e } catch (e: InvalidResourceException) { + logger.warn("$resourceLimit is invalid") exception = e throw e } catch (e: Exception) { diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/RequestLimitCheckService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/RequestLimitCheckService.kt index 4c60b99f91..d17ca1cfcc 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/RequestLimitCheckService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/RequestLimitCheckService.kt @@ -38,37 +38,53 @@ import com.tencent.bkrepo.common.ratelimiter.service.usage.UploadUsageRateLimite import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserDownloadUsageRateLimiterService import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserUploadUsageRateLimiterService import com.tencent.bkrepo.common.ratelimiter.stream.CommonRateLimitInputStream +import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier -import org.springframework.stereotype.Component import java.io.InputStream import javax.servlet.http.HttpServletRequest -@Component class RequestLimitCheckService( private val rateLimiterProperties: RateLimiterProperties, +) { + + @Autowired @Qualifier(RateLimiterAutoConfiguration.URL_RATELIMITER_SERVICE) - private val urlRateLimiterService: UrlRateLimiterService, + private lateinit var urlRateLimiterService: UrlRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.UPLOAD_USAGE_RATELIMITER_SERVICE) - private val uploadUsageRateLimiterService: UploadUsageRateLimiterService, + private lateinit var uploadUsageRateLimiterService: UploadUsageRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.USER_URL_RATELIMITER_SERVICE) - private val userUrlRateLimiterService: UserUrlRateLimiterService, + private lateinit var userUrlRateLimiterService: UserUrlRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.USER_UPLOAD_USAGE_RATELIMITER_SERVICE) - private val userUploadUsageRateLimiterService: UserUploadUsageRateLimiterService, + private lateinit var userUploadUsageRateLimiterService: UserUploadUsageRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.DOWNLOAD_USAGE_RATELIMITER_SERVICE) - private val downloadUsageRateLimiterService: DownloadUsageRateLimiterService, + private lateinit var downloadUsageRateLimiterService: DownloadUsageRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.USER_DOWNLOAD_USAGE_RATELIMITER_SERVICE) - private val userDownloadUsageRateLimiterService: UserDownloadUsageRateLimiterService, + private lateinit var userDownloadUsageRateLimiterService: UserDownloadUsageRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.DOWNLOAD_BANDWIDTH_RATELIMITER_SERVICE) - private val downloadBandwidthRateLimiterService: DownloadBandwidthRateLimiterService, + private lateinit var downloadBandwidthRateLimiterService: DownloadBandwidthRateLimiterService + + @Autowired @Qualifier(RateLimiterAutoConfiguration.UPLOAD_BANDWIDTH_RATELIMITER_ERVICE) - private val uploadBandwidthRateLimiterService: UploadBandwidthRateLimiterService, -) { + private lateinit var uploadBandwidthRateLimiterService: UploadBandwidthRateLimiterService fun preLimitCheck(request: HttpServletRequest) { if (!rateLimiterProperties.enabled) { return } // TODO 可以优化 + // TODO 不能全部遍历, 只有有的才查询 userUrlRateLimiterService.limit(request) userUploadUsageRateLimiterService.limit(request) urlRateLimiterService.limit(request) diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/usage/user/UserUploadUsageRateLimiterService.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/usage/user/UserUploadUsageRateLimiterService.kt index 56a7d78296..c1ee58283e 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/usage/user/UserUploadUsageRateLimiterService.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/service/usage/user/UserUploadUsageRateLimiterService.kt @@ -63,7 +63,7 @@ open class UserUploadUsageRateLimiterService( val userId = HttpContextHolder.getRequestOrNull()?.getAttribute(USER_KEY) as? String ?: ANONYMOUS_USER val result = mutableListOf() result.add("$userId:/$projectId/") - result.add(userId) + result.add("$userId:") return result } diff --git a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/stream/CommonRateLimitInputStream.kt b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/stream/CommonRateLimitInputStream.kt index cb9e73dfbc..8c7cc885be 100644 --- a/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/stream/CommonRateLimitInputStream.kt +++ b/src/backend/common/common-ratelimiter/src/main/kotlin/com/tencent/bkrepo/common/ratelimiter/stream/CommonRateLimitInputStream.kt @@ -36,6 +36,7 @@ class CommonRateLimitInputStream( delegate: InputStream, private val rateLimiter: RateLimiter, private val sleepTime: Long, + private val retryNum: Int, private val dryRun: Boolean = false ) : DelegateInputStream(delegate) { @@ -56,11 +57,13 @@ class CommonRateLimitInputStream( private fun acquire(permits: Int) { var flag = false + var failedNum = 0 try { while (!flag) { // TODO 当限制小于读取大小时,会进入死循环 flag = rateLimiter.tryAcquire(permits.toLong()) - if (!flag) { + if (!flag && failedNum < retryNum) { + failedNum++ Thread.sleep(sleepTime) } } diff --git a/src/backend/common/common-ratelimiter/src/test/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/RateLimiterInterceptorChainTest.kt b/src/backend/common/common-ratelimiter/src/test/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/RateLimiterInterceptorChainTest.kt index 5b54d8c677..8ea4ee3cda 100644 --- a/src/backend/common/common-ratelimiter/src/test/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/RateLimiterInterceptorChainTest.kt +++ b/src/backend/common/common-ratelimiter/src/test/kotlin/com/tencent/bkrepo/common/ratelimiter/interceptor/RateLimiterInterceptorChainTest.kt @@ -27,18 +27,24 @@ package com.tencent.bkrepo.common.ratelimiter.interceptor +import com.tencent.bkrepo.common.ratelimiter.enums.Algorithms +import com.tencent.bkrepo.common.ratelimiter.enums.LimitDimension +import com.tencent.bkrepo.common.ratelimiter.enums.WorkScope +import com.tencent.bkrepo.common.ratelimiter.exception.InvalidResourceException import com.tencent.bkrepo.common.ratelimiter.rule.common.ResourceLimit import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.util.concurrent.TimeUnit class RateLimiterInterceptorChainTest { open class InterceptorA : RateLimiterInterceptor { - override fun beforeLimitCheck(resource: String) { + override fun beforeLimitCheck(resource: String, resourceLimit: ResourceLimit) { list.add(identity() + ":before") } @@ -78,13 +84,33 @@ class RateLimiterInterceptorChainTest { chain.addInterceptor(InterceptorA()) chain.addInterceptor(InterceptorB()) chain.addInterceptor(InterceptorC()) - chain.doBeforeLimitCheck("test1") + val resourceLimit = ResourceLimit( + algo = Algorithms.FIXED_WINDOW.name, resource = "/project1/", + limitDimension = LimitDimension.URL.name, limit = 52428800, + unit = TimeUnit.SECONDS.name, scope = WorkScope.LOCAL.name + ) + chain.doBeforeLimitCheck("test1", resourceLimit) assertEquals(Companion.list.size, 3) assertEquals(Companion.list[0], "InterceptorA:before") assertEquals(Companion.list[1], "InterceptorB:before") assertEquals(Companion.list[2], "InterceptorC:before") } + @Test + fun testTargetDoBeforeLimit() { + val chain = RateLimiterInterceptorChain() + chain.addInterceptor(TargetRateLimiterInterceptorAdaptor()) + val resourceLimit = ResourceLimit( + algo = Algorithms.FIXED_WINDOW.name, resource = "/project1/", + limitDimension = LimitDimension.URL.name, limit = 52428800, + unit = TimeUnit.SECONDS.name, scope = WorkScope.LOCAL.name, + targets = listOf("127.0.0.2") + ) + assertThrows { + chain.doBeforeLimitCheck("test1", resourceLimit) + } + } + @Test fun testDoAfterLimit() { val chain = RateLimiterInterceptorChain() diff --git a/src/backend/s3/biz-s3/src/main/kotlin/com/tencent/bkrepo/s3/artifact/response/S3ArtifactResourceWriter.kt b/src/backend/s3/biz-s3/src/main/kotlin/com/tencent/bkrepo/s3/artifact/response/S3ArtifactResourceWriter.kt index 9fe4d1be1c..96a68d2bef 100644 --- a/src/backend/s3/biz-s3/src/main/kotlin/com/tencent/bkrepo/s3/artifact/response/S3ArtifactResourceWriter.kt +++ b/src/backend/s3/biz-s3/src/main/kotlin/com/tencent/bkrepo/s3/artifact/response/S3ArtifactResourceWriter.kt @@ -39,9 +39,6 @@ import com.tencent.bkrepo.common.artifact.resolve.response.AbstractArtifactResou import com.tencent.bkrepo.common.artifact.resolve.response.ArtifactResource import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.ratelimiter.service.RequestLimitCheckService -import com.tencent.bkrepo.common.ratelimiter.service.bandwidth.DownloadBandwidthRateLimiterService -import com.tencent.bkrepo.common.ratelimiter.service.usage.DownloadUsageRateLimiterService -import com.tencent.bkrepo.common.ratelimiter.service.usage.user.UserDownloadUsageRateLimiterService import com.tencent.bkrepo.common.service.util.HttpContextHolder import com.tencent.bkrepo.common.storage.core.StorageProperties import com.tencent.bkrepo.common.storage.monitor.Throughput @@ -55,10 +52,10 @@ import javax.servlet.http.HttpServletResponse /** * S3协议的响应输出 */ -class S3ArtifactResourceWriter ( +class S3ArtifactResourceWriter( storageProperties: StorageProperties, requestLimitCheckService: RequestLimitCheckService - ) : AbstractArtifactResourceHandler( +) : AbstractArtifactResourceHandler( storageProperties, requestLimitCheckService ) {