Skip to content

Commit

Permalink
feat: 支持按项目维度统计保留的缓存大小 #2612
Browse files Browse the repository at this point in the history
* feat: 输出count操作日志表耗时日志 #2612

* feat: 修复Milvus Collection不存在时查询相似路径失败 #2612

* feat: 修复搜素相似路径时路径拼接错误 #2612

* feat: 支持按项目维度统计保留的缓存大小 #2612

* feat: 支持按项目维度统计保留的缓存大小 #2612
  • Loading branch information
cnlkl authored Oct 8, 2024
1 parent 8b438d9 commit b62ea50
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ data class ArtifactPreloadPlan(
status = status,
)

fun artifactInfo() = "plan[$id] credentials[$credentialsKey] sha256[$sha256] size[$size]"
fun artifactInfo() =
"plan[$id] credentials[$credentialsKey] sha256[$sha256] size[$size] fullPath[$projectId/$repoName/$fullPath]"

companion object {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CleanupFileVisitor(
try {
val file = filePath.toFile()
val expired = fileExpireResolver.isExpired(file)
val retain = fileRetainResolver?.retain(file) ?: false
val retain = fileRetainResolver?.retain(file.name) ?: false

var shouldDelete = expired && !isNFSTempFile(filePath)
if (shouldDelete && !isTempFile) {
Expand All @@ -101,6 +101,7 @@ class CleanupFileVisitor(
if (shouldDelete && retain) {
result.retainFile += 1
result.retainSize += size
result.retainSha256.add(file.name)
onFileRetained(filePath, size)
}
} catch (ignored: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,15 @@ data class CleanupResult(
* 根据保留策略保留的文件大小
*/
var retainSize: Long = 0,
/**
* 保留的文件sha256
*/
var retainSha256: MutableSet<String> = HashSet(),
) {

fun merge(vararg others: CleanupResult): CleanupResult {
others.forEach {
totalFile += it.totalFile
totalFolder += it.totalFolder
totalSize += it.totalSize
cleanupFile += it.cleanupFile
cleanupFolder += it.cleanupFolder
cleanupSize += it.cleanupSize
errorCount += it.errorCount
}
return this
}

override fun toString(): String {
return "$cleanupFile/$totalFile[${HumanReadable.size(cleanupSize)}/${HumanReadable.size(totalSize)}] " +
"files deleted,errorCount[$errorCount], $cleanupFolder/$totalFolder dirs deleted."
"files deleted, errorCount[$errorCount], $cleanupFolder/$totalFolder dirs deleted, " +
"retainCount[$retainFile], retainSize[${HumanReadable.size(retainSize)}]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,10 @@

package com.tencent.bkrepo.common.storage.filesystem.cleanup

import java.io.File

/**
* 文件是否保留解析器,用于在清理文件时判断是否保留
*/
interface FileRetainResolver {
/**
* 文件是否保留
*
* @param file 待解析文件
*
* @return true表示保留,否则不保留
*/
fun retain(file: File): Boolean


/**
* 文件是否保留
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.tencent.bkrepo.job.batch.file
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
import com.tencent.bkrepo.common.query.util.MongoEscapeUtils
import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver
import com.tencent.bkrepo.job.DELETED_DATE
import com.tencent.bkrepo.job.FOLDER
import com.tencent.bkrepo.job.FULL_PATH
Expand All @@ -26,79 +25,56 @@ import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
import org.springframework.util.unit.DataSize
import java.io.File
import java.time.LocalDateTime

/**
* 基于仓库配置判断文件是否过期
* */
class BasedRepositoryFileExpireResolver(
class BasedRepositoryNodeRetainResolver(
private val expireConfig: RepositoryExpireConfig,
taskScheduler: ThreadPoolTaskScheduler,
private val fileCacheService: FileCacheService,
private val mongoTemplate: MongoTemplate,
) : FileRetainResolver {
) : NodeRetainResolver {

private var retainNodes = mutableSetOf<String>()
private var retainNodes = HashMap<String, RetainNode>()

init {
taskScheduler.scheduleWithFixedDelay(this::refreshRetainNode, expireConfig.cacheTime)
}

override fun retain(file: File): Boolean {
return retainNodes.contains(file.name)
}

override fun retain(sha256: String): Boolean {
return retainNodes.contains(sha256)
}

override fun getRetainNode(sha256: String): RetainNode? {
return retainNodes[sha256]
}

private fun refreshRetainNode() {
logger.info("Refresh retain nodes start. size of nodes ${retainNodes.size}")
try {
val temp = mutableSetOf<String>()
temp.addAll(getNodeFromConfig())
temp.addAll(getNodeFromDataBase())
val temp = HashMap<String, RetainNode>()
val configs = expireConfig.repos.map { convertRepoConfigToFileCache(it) } + fileCacheService.list()
configs.forEach { config ->
getNodes(config).forEach { node ->
val retainNode = RetainNode(
projectId = config.projectId,
repoName = config.repoName,
fullPath = node[FULL_PATH].toString(),
sha256 = node[SHA256].toString(),
size = node[SIZE].toString().toLong()
)
temp[retainNode.sha256] = retainNode
logger.info("Retain node[$retainNode]")
}
}
retainNodes = temp
} catch (e: Exception) {
logger.warn("An error occurred while refreshing retain node $e")
}
logger.info("Refresh retain nodes finished. size of nodes ${retainNodes.size}")
}

private fun getNodeFromConfig(): Set<String> {
val temp = mutableSetOf<String>()
expireConfig.repos.map{ convertRepoConfigToFileCache(it) }.forEach {
val projectId = it.projectId
val repoName = it.repoName
val records = getNodes(it)
records.forEach { ret ->
// 获取每个的sha256
val sha256 = ret[SHA256].toString()
val fullPath = ret[FULL_PATH].toString()
temp.add(sha256)
logger.info("Retain node $projectId/$repoName$fullPath, $sha256.")
}
}
return temp
}

private fun getNodeFromDataBase(): Set<String> {
val temp = mutableSetOf<String>()
fileCacheService.list().forEach {
val projectId = it.projectId
val repoName = it.repoName
val records = getNodes(it)
records.forEach { ret ->
// 获取每个的sha256
val sha256 = ret[SHA256].toString()
val fullPath = ret[FULL_PATH].toString()
temp.add(sha256)
logger.info("Retain node $projectId/$repoName$fullPath, $sha256.")
}
}
return temp
}

private fun convertRepoConfigToFileCache(repoConfig: RepoConfig):TFileCache {
return TFileCache(
Expand Down Expand Up @@ -147,7 +123,7 @@ class BasedRepositoryFileExpireResolver(
)

val fields = query.fields()
fields.include(SHA256, FULL_PATH)
fields.include(SHA256, FULL_PATH, SIZE)
var querySize: Int
var lastId = ObjectId(MIN_OBJECT_ID)
do {
Expand All @@ -171,7 +147,7 @@ class BasedRepositoryFileExpireResolver(


companion object {
private val logger = LoggerFactory.getLogger(BasedRepositoryFileExpireResolver::class.java)
private val logger = LoggerFactory.getLogger(BasedRepositoryNodeRetainResolver::class.java)
private const val COLLECTION_NODE_PREFIX = "node_"

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ExpireFileResolverConfig {
fileCacheService: FileCacheService,
mongoTemplate: MongoTemplate
): FileRetainResolver {
return BasedRepositoryFileExpireResolver(
return BasedRepositoryNodeRetainResolver(
expiredCacheFileCleanupJobProperties.repoConfig,
scheduler,
fileCacheService,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.file

import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver

/**
* 可根据sha256查询保留的node的解析器
*/
interface NodeRetainResolver : FileRetainResolver {
/**
* 根据[sha256]查询保留的node
*
* @param sha256 文件sha256
* @return 保留的node
*/
fun getRetainNode(sha256: String): RetainNode?
}

data class RetainNode(
val projectId: String,
val repoName: String,
val fullPath: String,
val sha256: String,
val size: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ExpiredCacheFileCleanupJob(
}.apply {
first[storage.cache.path.toPath()]?.let {
storageCacheMetrics.setCacheMetrics(key, it.rootDirNotDeletedSize, it.rootDirNotDeletedFile)
storageCacheMetrics.setRetainCacheMetrics(key, it.retainSize, it.retainFile)
storageCacheMetrics.setProjectRetainCacheMetrics(key, it.retainSha256)
}
logger.info("Clean up on storage[$key] completed, summary: $first, elapse [${second.seconds}] s.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

package com.tencent.bkrepo.job.batch.task.cache.preload

import com.tencent.bkrepo.auth.constant.PIPELINE
import com.tencent.bkrepo.common.api.util.executeAndMeasureTime
import com.tencent.bkrepo.common.artifact.event.base.EventType
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
Expand Down Expand Up @@ -180,15 +180,7 @@ class ArtifactAccessLogEmbeddingJob(

private fun HashMap<String, MutableMap<String, AccessLog>>.addToBuffer(operateLog: OperateLog): Boolean {
with(operateLog) {
val projectRepoFullPath = if (repoName == PIPELINE) {
// 流水线仓库路径/p-xxx/b-xxx/xxx中的构建id不参与相似度计算
val secondSlashIndex = resourceKey.indexOf("/", 1)
val pipelinePath = resourceKey.substring(0, secondSlashIndex)
val artifactPath = resourceKey.substring(resourceKey.indexOf("/", secondSlashIndex + 1))
"/$projectId/$repoName$pipelinePath$artifactPath"
} else {
"/$projectId/$repoName$resourceKey"
}
val projectRepoFullPath = projectRepoFullPath(projectId, repoName, resourceKey)
val buffer = getOrPut(projectId) { HashMap() }
val accessLog = buffer.getOrPut(projectRepoFullPath) {
AccessLog(
Expand All @@ -215,7 +207,11 @@ class ArtifactAccessLogEmbeddingJob(
logger.warn("mongo collection[$collectionName] not exists")
return
}
val count = mongoTemplate.count(Query(), collectionName)
val result = executeAndMeasureTime {
mongoTemplate.count(Query(Criteria.where(ID).gt(startId)), collectionName)
}
logger.info("count $collectionName elapsed[${result.second}]")
val count = result.first
var progress = 0
var records: List<OperateLog>
var lastId = startId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import com.tencent.bkrepo.common.artifact.cache.service.ArtifactPreloadPlanGener
import com.tencent.bkrepo.common.mongo.dao.util.sharding.MonthRangeShardingUtils
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.AiProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.SearchRequest
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusClient
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusRestApiException
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties
import org.slf4j.LoggerFactory
import java.time.LocalDateTime
import java.time.ZoneId
Expand Down Expand Up @@ -78,14 +79,20 @@ class ArtifactSimilarityPreloadPlanGenerator(
val preloadHourOfDay = preloadProperties.preloadHourOfDay.sorted().ifEmpty { return null }

// 查询相似路径,没有相似路径时不执行预加载
val projectPath = "/$projectId/$repoName$fullPath"
val projectPath = projectRepoFullPath(projectId, repoName, fullPath)
val searchReq = SearchRequest(
query = projectPath,
topK = 10,
similarityThreshold = aiProperties.defaultSimilarityThreshold
)
val docs = createVectorStore(0L).similaritySearch(searchReq).ifEmpty {
createVectorStore(1L).similaritySearch(searchReq)

val docs = try {
createVectorStore(0L).similaritySearch(searchReq).ifEmpty {
createVectorStore(1L).similaritySearch(searchReq)
}
} catch (e: MilvusRestApiException) {
logger.error("search similar path failed: ${e.message}")
emptyList()
}
if (docs.isEmpty()) {
logger.info("no similarity path found for [$projectPath]")
Expand All @@ -95,17 +102,16 @@ class ArtifactSimilarityPreloadPlanGenerator(
val now = LocalDateTime.now()
val preloadHour = preloadHourOfDay.firstOrNull { it > now.hour }
?: (preloadHourOfDay.first { (it + 24) > now.hour } + 24)
val preloadTimestamp = now
val preloadDateTime = now
// 设置预加载时间
.plusHours((preloadHour - now.hour).toLong())
.withMinute(0)
// 减去随机时间,避免同时多文件触发加载
.minusSeconds(Random.nextLong(0, preloadProperties.maxRandomSeconds))
// 转化为毫秒时间戳
.atZone(ZoneId.systemDefault())
.toEpochSecond() * 1000
// 转化为毫秒时间戳
val preloadTimestamp = preloadDateTime.atZone(ZoneId.systemDefault()).toEpochSecond() * 1000
logger.info(
"similarity path[${docs.first().content}] found for [$projectPath], will preload on $preloadTimestamp"
"similarity path[${docs.first().content}] found for [$projectPath], will preload on $preloadDateTime"
)
return preloadTimestamp
}
Expand Down
Loading

0 comments on commit b62ea50

Please sign in to comment.