Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 增加row级别并发执行的线程数限制#2815 #2821

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const val SHARDING_COUNT = 256
*/
const val BATCH_SIZE = 1000

/**
* 最大并发线程限制数
*/
const val CONCURRENT_THREAD_LIMIT = 1024

/**
* 数据库字段
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
private val permitsPerSecond: Double
get() = properties.permitsPerSecond

private val concurrentThreadLimit: Int get() = properties.concurrentThreadLimit

@Autowired
private lateinit var lockingTaskExecutor: LockingTaskExecutor

Expand Down Expand Up @@ -195,7 +197,7 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
hasAsyncTask = true
tasks.forEach {
val task = IdentityTask(taskId) { block(it) }
executor.executeWithId(task, produce, permitsPerSecond)
executor.executeWithId(task, concurrentThreadLimit, produce, permitsPerSecond)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.bkrepo.job.config.properties

import com.tencent.bkrepo.job.BATCH_SIZE
import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT
import com.tencent.bkrepo.job.batch.base.JobConcurrentLevel

open class MongodbJobProperties(
Expand All @@ -44,5 +45,9 @@ open class MongodbJobProperties(
/**
* 每次批处理作业大小
* */
var batchSize: Int = BATCH_SIZE
var batchSize: Int = BATCH_SIZE,
/**
* 并发执行下最大并发线程数
*/
var concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT,
) : BatchJobProperties()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.bkrepo.job.executor

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.job.CONCURRENT_THREAD_LIMIT
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.time.Duration
Expand Down Expand Up @@ -89,9 +90,24 @@ class BlockThreadPoolTaskExecutorDecorator(
/**
* 执行带Id的任务
* */
fun executeWithId(identityTask: IdentityTask, produce: Boolean = false, permitsPerSecond: Double = 0.0) {
fun executeWithId(
identityTask: IdentityTask,
concurrentThreadLimit: Int = CONCURRENT_THREAD_LIMIT,
produce: Boolean = false,
permitsPerSecond: Double = 0.0
) {
val id = identityTask.id
val taskInfo = taskInfos.getOrPut(id) { IdentityTaskInfo(id, permitsPerSecond = permitsPerSecond) }
while (true) {
if (taskInfo.count.get() < concurrentThreadLimit) {
break
} else {
try {
Thread.sleep(100)
} catch (ignore: InterruptedException) {
}
}
}
taskInfo.count.incrementAndGet()
val enterTime = System.currentTimeMillis()
val task = {
Expand Down
Loading