Skip to content

Commit

Permalink
perf: 使用缓存机制更新目录修改信息 #756
Browse files Browse the repository at this point in the history
  • Loading branch information
scplsy committed Aug 17, 2023
1 parent 9f4700c commit 33fb580
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

package com.tencent.bkrepo.repository.service.node.impl

import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListeners
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.exception.ErrorCodeException
import com.tencent.bkrepo.common.api.pojo.Page
import com.tencent.bkrepo.common.api.util.Preconditions
Expand Down Expand Up @@ -67,9 +71,14 @@ import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.Update
import org.springframework.data.mongodb.core.query.and
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

/**
* 节点基础服务,实现了CRUD基本操作
Expand All @@ -85,6 +94,37 @@ abstract class NodeBaseService(
open val messageSupplier: MessageSupplier,
) : NodeService {

init {
// 定时清理过期缓存, 否则写入不频繁时可能很长时间也不触发数据库更新
scheduler.scheduleWithFixedDelay(
{ lastModifiedInfoUpdateCache.cleanUp() },
UPDATE_LAST_MODIFIED_INFO_INTERVAL,
UPDATE_LAST_MODIFIED_INFO_INTERVAL,
TimeUnit.MINUTES
)
}

// 目录最后修改信息更新缓存, 缓存过期后才可能写入数据库
private val lastModifiedInfoUpdateCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(UPDATE_LAST_MODIFIED_INFO_INTERVAL, TimeUnit.MINUTES)
.removalListener(
RemovalListeners.asynchronous<Triple<String, String, String>, Pair<String, LocalDateTime>> (
{
if (it.cause == RemovalCause.EXPIRED || it.cause == RemovalCause.SIZE) {
val (projectId, repoName, fullPath) = it.key!!
val (lastModifiedBy, lastModifiedDate) = it.value!!
val query = NodeQueryHelper.nodeQuery(projectId, repoName, fullPath)
query.addCriteria(where(TNode::createdDate).lt(lastModifiedDate))
val update = NodeQueryHelper.update(lastModifiedBy, lastModifiedDate)
nodeDao.updateFirst(query, update)
}
},
updateLastModifiedInfoExecutor
)
)
.build<Triple<String, String, String>, Pair<String, LocalDateTime>>()

override fun getNodeDetail(artifact: ArtifactInfo, repoType: String?): NodeDetail? {
with(artifact) {
val node = nodeDao.findNode(projectId, repoName, getArtifactFullPath())
Expand Down Expand Up @@ -388,12 +428,19 @@ abstract class NodeBaseService(
modifiedBy: String,
modifiedDate: LocalDateTime = LocalDateTime.now()
) {
if (PathUtils.isRoot(fullPath)) {
return
if (!PathUtils.isRoot(fullPath)) {
lastModifiedInfoUpdateCache.put(Triple(projectId, repoName, fullPath), Pair(modifiedBy, modifiedDate))
}
}

fun cancelUpdateModifiedInfo(projectId: String, repoName: String, fullPathList: List<String>) {
val keys = lastModifiedInfoUpdateCache.asMap().keys.filter { key ->
key.first == projectId && key.second == repoName &&
fullPathList.any { key.third == it || key.third.startsWith(PathUtils.toPath(it)) }
}
if (keys.isNotEmpty()) {
lastModifiedInfoUpdateCache.invalidateAll(keys)
}
val query = NodeQueryHelper.nodeQuery(projectId, repoName, fullPath)
val update = NodeQueryHelper.update(modifiedBy, modifiedDate)
nodeDao.updateFirst(query, update)
}

open fun checkConflictAndQuota(createRequest: NodeCreateRequest, fullPath: String): LocalDateTime? {
Expand Down Expand Up @@ -430,6 +477,19 @@ abstract class NodeBaseService(
companion object {
private val logger = LoggerFactory.getLogger(NodeBaseService::class.java)
private const val TOPIC = "bkbase_bkrepo_artifact_node_created"
private const val UPDATE_LAST_MODIFIED_INFO_INTERVAL = 5L
private val scheduler = Executors.newSingleThreadScheduledExecutor()

// 更新目录最后修改信息任务线程池
private val updateLastModifiedInfoExecutor = ThreadPoolExecutor(
4,
8,
60,
TimeUnit.SECONDS,
ArrayBlockingQueue(1024),
ThreadFactoryBuilder().setNameFormat("update-lastModified-info-%d").build(),
ThreadPoolExecutor.CallerRunsPolicy()
)

private fun convert(tNode: TNode?): NodeInfo? {
return tNode?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,15 @@ open class NodeDeleteSupport(
}
// 获取被删除节点的父目录并更新修改信息
val parentFullPaths = if (fullPaths?.size == 1) {
nodeBaseService.cancelUpdateModifiedInfo(projectId, repoName, fullPaths)
listOf(PathUtils.toFullPath(PathUtils.resolveParent(fullPaths[0])))
} else {
existFullPaths?.map { PathUtils.toFullPath(PathUtils.resolveParent(it)) }
?.distinct()?.filterNot { PathUtils.isRoot(it) }
?.also { nodeBaseService.cancelUpdateModifiedInfo(projectId, repoName, it) }
}
if (parentFullPaths?.size == 1) {
nodeBaseService.updateModifiedInfo(projectId, repoName, parentFullPaths[0], operator, deleteTime)
} else if (parentFullPaths != null && parentFullPaths.size > 1) {
val parentNodeQuery = NodeQueryHelper.nodeQuery(projectId, repoName, parentFullPaths)
val parentNodeUpdate = NodeQueryHelper.update(operator)
nodeDao.updateMulti(parentNodeQuery, parentNodeUpdate)
parentFullPaths?.forEach {
nodeBaseService.updateModifiedInfo(projectId, repoName, it, operator, deleteTime)
}
deletedSize = nodeBaseService.aggregateComputeSize(criteria.and(TNode::deleted).isEqualTo(deleteTime))
quotaService.decreaseUsedVolume(projectId, repoName, deletedSize)
Expand Down

0 comments on commit 33fb580

Please sign in to comment.