Skip to content

Commit

Permalink
feat: 增加row级别并发执行的线程数限制 #2815
Browse files Browse the repository at this point in the history
* feat: 增加row级别并发执行的线程数限制#2815

* feat: 增加row级别并发执行的线程数限制#2815

* feat: 调整默认大小#2815
  • Loading branch information
zacYL authored Dec 9, 2024
1 parent 53198cd commit b9cdfe1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const val SHARDING_COUNT = 256
*/
const val BATCH_SIZE = 1000

/**
* 最大并发线程限制数
*/
const val CONCURRENT_THREAD_LIMIT = 1024

/**
* 数据库字段
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
private val permitsPerSecond: Double
get() = properties.permitsPerSecond

private val concurrentThreadLimit: Int get() = properties.concurrentThreadLimit

@Autowired
private lateinit var lockingTaskExecutor: LockingTaskExecutor

Expand Down Expand Up @@ -195,7 +197,7 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
hasAsyncTask = true
tasks.forEach {
val task = IdentityTask(taskId) { block(it) }
executor.executeWithId(task, produce, permitsPerSecond)
executor.executeWithId(task, concurrentThreadLimit, produce, permitsPerSecond)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.bkrepo.job.config.properties

import com.tencent.bkrepo.job.BATCH_SIZE
import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT
import com.tencent.bkrepo.job.batch.base.JobConcurrentLevel

open class MongodbJobProperties(
Expand All @@ -44,5 +45,9 @@ open class MongodbJobProperties(
/**
* 每次批处理作业大小
* */
var batchSize: Int = BATCH_SIZE
var batchSize: Int = BATCH_SIZE,
/**
* 并发执行下最大并发线程数
*/
var concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT,
) : BatchJobProperties()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.bkrepo.job.executor

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.time.Duration
Expand Down Expand Up @@ -89,9 +90,24 @@ class BlockThreadPoolTaskExecutorDecorator(
/**
* 执行带Id的任务
* */
fun executeWithId(identityTask: IdentityTask, produce: Boolean = false, permitsPerSecond: Double = 0.0) {
fun executeWithId(
identityTask: IdentityTask,
concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT,
produce: Boolean = false,
permitsPerSecond: Double = 0.0
) {
val id = identityTask.id
val taskInfo = taskInfos.getOrPut(id) { IdentityTaskInfo(id, permitsPerSecond = permitsPerSecond) }
while (true) {
if (taskInfo.count.get() < concurrentThreadLimit) {
break
} else {
try {
Thread.sleep(100)
} catch (ignore: InterruptedException) {
}
}
}
taskInfo.count.incrementAndGet()
val enterTime = System.currentTimeMillis()
val task = {
Expand Down

0 comments on commit b9cdfe1

Please sign in to comment.