diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt index e552aed048..d5d4e75a2c 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt @@ -40,6 +40,11 @@ const val SHARDING_COUNT = 256 */ const val BATCH_SIZE = 1000 +/** + * 最大并发线程限制数 + */ +const val CONCURRENT_THREAD_LIMIT = 1024 + /** * 数据库字段 */ diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt index 30d93b724e..24c0076986 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt @@ -94,6 +94,8 @@ abstract class MongoDbBatchJob( private val permitsPerSecond: Double get() = properties.permitsPerSecond + private val concurrentThreadLimit: Int get() = properties.concurrentThreadLimit + @Autowired private lateinit var lockingTaskExecutor: LockingTaskExecutor @@ -195,7 +197,7 @@ abstract class MongoDbBatchJob( hasAsyncTask = true tasks.forEach { val task = IdentityTask(taskId) { block(it) } - executor.executeWithId(task, produce, permitsPerSecond) + executor.executeWithId(task, concurrentThreadLimit, produce, permitsPerSecond) } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/MongodbJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/MongodbJobProperties.kt index dea3ea1ab7..f1f2f50c43 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/MongodbJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/MongodbJobProperties.kt @@ -28,6 +28,7 @@ package com.tencent.bkrepo.job.config.properties import com.tencent.bkrepo.job.BATCH_SIZE +import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT import com.tencent.bkrepo.job.batch.base.JobConcurrentLevel open class MongodbJobProperties( @@ -44,5 +45,9 @@ open class MongodbJobProperties( /** * 每次批处理作业大小 * */ - var batchSize: Int = BATCH_SIZE + var batchSize: Int = BATCH_SIZE, + /** + * 并发执行下最大并发线程数 + */ + var concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT, ) : BatchJobProperties() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/executor/BlockThreadPoolTaskExecutorDecorator.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/executor/BlockThreadPoolTaskExecutorDecorator.kt index 34265f988b..38172faa93 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/executor/BlockThreadPoolTaskExecutorDecorator.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/executor/BlockThreadPoolTaskExecutorDecorator.kt @@ -28,6 +28,7 @@ package com.tencent.bkrepo.job.executor import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT import org.slf4j.LoggerFactory import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import java.time.Duration @@ -89,9 +90,24 @@ class BlockThreadPoolTaskExecutorDecorator( /** * 执行带Id的任务 * */ - fun executeWithId(identityTask: IdentityTask, produce: Boolean = false, permitsPerSecond: Double = 0.0) { + fun executeWithId( + identityTask: IdentityTask, + concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT, + produce: Boolean = false, + permitsPerSecond: Double = 0.0 + ) { val id = identityTask.id val taskInfo = taskInfos.getOrPut(id) { IdentityTaskInfo(id, permitsPerSecond = permitsPerSecond) } + while (true) { + if (taskInfo.count.get() < concurrentThreadLimit) { + break + } else { + try { + Thread.sleep(100) + } catch (ignore: InterruptedException) { + } + } + } taskInfo.count.incrementAndGet() val enterTime = System.currentTimeMillis() val task = {