From b62ea5064ed244aac17508aa7b6f437ffc546a41 Mon Sep 17 00:00:00 2001 From: kunlongli <16629885+cnlkl@users.noreply.github.com> Date: Tue, 8 Oct 2024 17:54:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E6=8C=89=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E7=BB=B4=E5=BA=A6=E7=BB=9F=E8=AE=A1=E4=BF=9D=E7=95=99?= =?UTF-8?q?=E7=9A=84=E7=BC=93=E5=AD=98=E5=A4=A7=E5=B0=8F=20#2612?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 输出count操作日志表耗时日志 #2612 * feat: 修复Milvus Collection不存在时查询相似路径失败 #2612 * feat: 修复搜素相似路径时路径拼接错误 #2612 * feat: 支持按项目维度统计保留的缓存大小 #2612 * feat: 支持按项目维度统计保留的缓存大小 #2612 --- .../cache/pojo/ArtifactPreloadPlan.kt | 3 +- .../filesystem/cleanup/CleanupFileVisitor.kt | 3 +- .../filesystem/cleanup/CleanupResult.kt | 20 ++---- .../filesystem/cleanup/FileRetainResolver.kt | 12 ---- ...t => BasedRepositoryNodeRetainResolver.kt} | 72 +++++++------------ .../batch/file/ExpireFileResolverConfig.kt | 2 +- .../job/batch/file/NodeRetainResolver.kt | 51 +++++++++++++ .../task/cache/ExpiredCacheFileCleanupJob.kt | 2 +- .../preload/ArtifactAccessLogEmbeddingJob.kt | 18 ++--- .../ArtifactSimilarityPreloadPlanGenerator.kt | 26 ++++--- .../task/cache/preload/SimilarityPathUtils.kt | 46 ++++++++++++ .../cache/preload/ai/milvus/MilvusClient.kt | 6 +- .../ai/milvus/MilvusRestApiException.kt | 30 ++++++++ .../bkrepo/job/metrics/StorageCacheMetrics.kt | 67 +++++++++++++++-- .../job/metrics/StorageCacheMetricsTest.kt | 52 +++++++++++--- 15 files changed, 292 insertions(+), 118 deletions(-) rename src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/{BasedRepositoryFileExpireResolver.kt => BasedRepositoryNodeRetainResolver.kt} (73%) create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/NodeRetainResolver.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/SimilarityPathUtils.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusRestApiException.kt diff --git a/src/backend/common/common-artifact/artifact-cache/src/main/kotlin/com/tencent/bkrepo/common/artifact/cache/pojo/ArtifactPreloadPlan.kt b/src/backend/common/common-artifact/artifact-cache/src/main/kotlin/com/tencent/bkrepo/common/artifact/cache/pojo/ArtifactPreloadPlan.kt index c8f8032517..65948f931d 100644 --- a/src/backend/common/common-artifact/artifact-cache/src/main/kotlin/com/tencent/bkrepo/common/artifact/cache/pojo/ArtifactPreloadPlan.kt +++ b/src/backend/common/common-artifact/artifact-cache/src/main/kotlin/com/tencent/bkrepo/common/artifact/cache/pojo/ArtifactPreloadPlan.kt @@ -71,7 +71,8 @@ data class ArtifactPreloadPlan( status = status, ) - fun artifactInfo() = "plan[$id] credentials[$credentialsKey] sha256[$sha256] size[$size]" + fun artifactInfo() = + "plan[$id] credentials[$credentialsKey] sha256[$sha256] size[$size] fullPath[$projectId/$repoName/$fullPath]" companion object { /** diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupFileVisitor.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupFileVisitor.kt index bd1ac6cec8..06e4ff970d 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupFileVisitor.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupFileVisitor.kt @@ -78,7 +78,7 @@ class CleanupFileVisitor( try { val file = filePath.toFile() val expired = fileExpireResolver.isExpired(file) - val retain = fileRetainResolver?.retain(file) ?: false + val retain = fileRetainResolver?.retain(file.name) ?: false var shouldDelete = expired && !isNFSTempFile(filePath) if (shouldDelete && !isTempFile) { @@ -101,6 +101,7 @@ class CleanupFileVisitor( if (shouldDelete && retain) { result.retainFile += 1 result.retainSize += size + result.retainSha256.add(file.name) onFileRetained(filePath, size) } } catch (ignored: Exception) { diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupResult.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupResult.kt index d882890970..217dd25817 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupResult.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/CleanupResult.kt @@ -53,23 +53,15 @@ data class CleanupResult( * 根据保留策略保留的文件大小 */ var retainSize: Long = 0, + /** + * 保留的文件sha256 + */ + var retainSha256: MutableSet = HashSet(), ) { - fun merge(vararg others: CleanupResult): CleanupResult { - others.forEach { - totalFile += it.totalFile - totalFolder += it.totalFolder - totalSize += it.totalSize - cleanupFile += it.cleanupFile - cleanupFolder += it.cleanupFolder - cleanupSize += it.cleanupSize - errorCount += it.errorCount - } - return this - } - override fun toString(): String { return "$cleanupFile/$totalFile[${HumanReadable.size(cleanupSize)}/${HumanReadable.size(totalSize)}] " + - "files deleted,errorCount[$errorCount], $cleanupFolder/$totalFolder dirs deleted." + "files deleted, errorCount[$errorCount], $cleanupFolder/$totalFolder dirs deleted, " + + "retainCount[$retainFile], retainSize[${HumanReadable.size(retainSize)}]" } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/FileRetainResolver.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/FileRetainResolver.kt index d69b96b1c6..f4e2b7a7f6 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/FileRetainResolver.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/cleanup/FileRetainResolver.kt @@ -27,22 +27,10 @@ package com.tencent.bkrepo.common.storage.filesystem.cleanup -import java.io.File - /** * 文件是否保留解析器,用于在清理文件时判断是否保留 */ interface FileRetainResolver { - /** - * 文件是否保留 - * - * @param file 待解析文件 - * - * @return true表示保留,否则不保留 - */ - fun retain(file: File): Boolean - - /** * 文件是否保留 * diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryFileExpireResolver.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryNodeRetainResolver.kt similarity index 73% rename from src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryFileExpireResolver.kt rename to src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryNodeRetainResolver.kt index ea5cac826c..78ccdfd868 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryFileExpireResolver.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/BasedRepositoryNodeRetainResolver.kt @@ -3,7 +3,6 @@ package com.tencent.bkrepo.job.batch.file import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.query.util.MongoEscapeUtils -import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver import com.tencent.bkrepo.job.DELETED_DATE import com.tencent.bkrepo.job.FOLDER import com.tencent.bkrepo.job.FULL_PATH @@ -26,79 +25,56 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler import org.springframework.util.unit.DataSize -import java.io.File import java.time.LocalDateTime /** * 基于仓库配置判断文件是否过期 * */ -class BasedRepositoryFileExpireResolver( +class BasedRepositoryNodeRetainResolver( private val expireConfig: RepositoryExpireConfig, taskScheduler: ThreadPoolTaskScheduler, private val fileCacheService: FileCacheService, private val mongoTemplate: MongoTemplate, -) : FileRetainResolver { +) : NodeRetainResolver { - private var retainNodes = mutableSetOf() + private var retainNodes = HashMap() init { taskScheduler.scheduleWithFixedDelay(this::refreshRetainNode, expireConfig.cacheTime) } - override fun retain(file: File): Boolean { - return retainNodes.contains(file.name) - } - override fun retain(sha256: String): Boolean { return retainNodes.contains(sha256) } + override fun getRetainNode(sha256: String): RetainNode? { + return retainNodes[sha256] + } + private fun refreshRetainNode() { logger.info("Refresh retain nodes start. size of nodes ${retainNodes.size}") try { - val temp = mutableSetOf() - temp.addAll(getNodeFromConfig()) - temp.addAll(getNodeFromDataBase()) + val temp = HashMap() + val configs = expireConfig.repos.map { convertRepoConfigToFileCache(it) } + fileCacheService.list() + configs.forEach { config -> + getNodes(config).forEach { node -> + val retainNode = RetainNode( + projectId = config.projectId, + repoName = config.repoName, + fullPath = node[FULL_PATH].toString(), + sha256 = node[SHA256].toString(), + size = node[SIZE].toString().toLong() + ) + temp[retainNode.sha256] = retainNode + logger.info("Retain node[$retainNode]") + } + } retainNodes = temp } catch (e: Exception) { logger.warn("An error occurred while refreshing retain node $e") } logger.info("Refresh retain nodes finished. size of nodes ${retainNodes.size}") } - - private fun getNodeFromConfig(): Set { - val temp = mutableSetOf() - expireConfig.repos.map{ convertRepoConfigToFileCache(it) }.forEach { - val projectId = it.projectId - val repoName = it.repoName - val records = getNodes(it) - records.forEach { ret -> - // 获取每个的sha256 - val sha256 = ret[SHA256].toString() - val fullPath = ret[FULL_PATH].toString() - temp.add(sha256) - logger.info("Retain node $projectId/$repoName$fullPath, $sha256.") - } - } - return temp - } - - private fun getNodeFromDataBase(): Set { - val temp = mutableSetOf() - fileCacheService.list().forEach { - val projectId = it.projectId - val repoName = it.repoName - val records = getNodes(it) - records.forEach { ret -> - // 获取每个的sha256 - val sha256 = ret[SHA256].toString() - val fullPath = ret[FULL_PATH].toString() - temp.add(sha256) - logger.info("Retain node $projectId/$repoName$fullPath, $sha256.") - } - } - return temp - } private fun convertRepoConfigToFileCache(repoConfig: RepoConfig):TFileCache { return TFileCache( @@ -147,7 +123,7 @@ class BasedRepositoryFileExpireResolver( ) val fields = query.fields() - fields.include(SHA256, FULL_PATH) + fields.include(SHA256, FULL_PATH, SIZE) var querySize: Int var lastId = ObjectId(MIN_OBJECT_ID) do { @@ -171,7 +147,7 @@ class BasedRepositoryFileExpireResolver( companion object { - private val logger = LoggerFactory.getLogger(BasedRepositoryFileExpireResolver::class.java) + private val logger = LoggerFactory.getLogger(BasedRepositoryNodeRetainResolver::class.java) private const val COLLECTION_NODE_PREFIX = "node_" } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/ExpireFileResolverConfig.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/ExpireFileResolverConfig.kt index da80c2ec1d..a9e838ea17 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/ExpireFileResolverConfig.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/ExpireFileResolverConfig.kt @@ -17,7 +17,7 @@ class ExpireFileResolverConfig { fileCacheService: FileCacheService, mongoTemplate: MongoTemplate ): FileRetainResolver { - return BasedRepositoryFileExpireResolver( + return BasedRepositoryNodeRetainResolver( expiredCacheFileCleanupJobProperties.repoConfig, scheduler, fileCacheService, diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/NodeRetainResolver.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/NodeRetainResolver.kt new file mode 100644 index 0000000000..318a892737 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/file/NodeRetainResolver.kt @@ -0,0 +1,51 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.file + +import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver + +/** + * 可根据sha256查询保留的node的解析器 + */ +interface NodeRetainResolver : FileRetainResolver { + /** + * 根据[sha256]查询保留的node + * + * @param sha256 文件sha256 + * @return 保留的node + */ + fun getRetainNode(sha256: String): RetainNode? +} + +data class RetainNode( + val projectId: String, + val repoName: String, + val fullPath: String, + val sha256: String, + val size: Long, +) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/ExpiredCacheFileCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/ExpiredCacheFileCleanupJob.kt index ffeb121e8a..6b4b77d7d3 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/ExpiredCacheFileCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/ExpiredCacheFileCleanupJob.kt @@ -94,7 +94,7 @@ class ExpiredCacheFileCleanupJob( }.apply { first[storage.cache.path.toPath()]?.let { storageCacheMetrics.setCacheMetrics(key, it.rootDirNotDeletedSize, it.rootDirNotDeletedFile) - storageCacheMetrics.setRetainCacheMetrics(key, it.retainSize, it.retainFile) + storageCacheMetrics.setProjectRetainCacheMetrics(key, it.retainSha256) } logger.info("Clean up on storage[$key] completed, summary: $first, elapse [${second.seconds}] s.") } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactAccessLogEmbeddingJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactAccessLogEmbeddingJob.kt index d82fa8a7ad..1d1838805c 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactAccessLogEmbeddingJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactAccessLogEmbeddingJob.kt @@ -27,7 +27,7 @@ package com.tencent.bkrepo.job.batch.task.cache.preload -import com.tencent.bkrepo.auth.constant.PIPELINE +import com.tencent.bkrepo.common.api.util.executeAndMeasureTime import com.tencent.bkrepo.common.artifact.event.base.EventType import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID @@ -180,15 +180,7 @@ class ArtifactAccessLogEmbeddingJob( private fun HashMap>.addToBuffer(operateLog: OperateLog): Boolean { with(operateLog) { - val projectRepoFullPath = if (repoName == PIPELINE) { - // 流水线仓库路径/p-xxx/b-xxx/xxx中的构建id不参与相似度计算 - val secondSlashIndex = resourceKey.indexOf("/", 1) - val pipelinePath = resourceKey.substring(0, secondSlashIndex) - val artifactPath = resourceKey.substring(resourceKey.indexOf("/", secondSlashIndex + 1)) - "/$projectId/$repoName$pipelinePath$artifactPath" - } else { - "/$projectId/$repoName$resourceKey" - } + val projectRepoFullPath = projectRepoFullPath(projectId, repoName, resourceKey) val buffer = getOrPut(projectId) { HashMap() } val accessLog = buffer.getOrPut(projectRepoFullPath) { AccessLog( @@ -215,7 +207,11 @@ class ArtifactAccessLogEmbeddingJob( logger.warn("mongo collection[$collectionName] not exists") return } - val count = mongoTemplate.count(Query(), collectionName) + val result = executeAndMeasureTime { + mongoTemplate.count(Query(Criteria.where(ID).gt(startId)), collectionName) + } + logger.info("count $collectionName elapsed[${result.second}]") + val count = result.first var progress = 0 var records: List var lastId = startId diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactSimilarityPreloadPlanGenerator.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactSimilarityPreloadPlanGenerator.kt index ccfc2bfa79..bc14d96451 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactSimilarityPreloadPlanGenerator.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ArtifactSimilarityPreloadPlanGenerator.kt @@ -34,11 +34,12 @@ import com.tencent.bkrepo.common.artifact.cache.service.ArtifactPreloadPlanGener import com.tencent.bkrepo.common.mongo.dao.util.sharding.MonthRangeShardingUtils import com.tencent.bkrepo.job.batch.task.cache.preload.ai.AiProperties import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel -import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore -import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties import com.tencent.bkrepo.job.batch.task.cache.preload.ai.SearchRequest import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusClient +import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusRestApiException +import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore +import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties import org.slf4j.LoggerFactory import java.time.LocalDateTime import java.time.ZoneId @@ -78,14 +79,20 @@ class ArtifactSimilarityPreloadPlanGenerator( val preloadHourOfDay = preloadProperties.preloadHourOfDay.sorted().ifEmpty { return null } // 查询相似路径,没有相似路径时不执行预加载 - val projectPath = "/$projectId/$repoName$fullPath" + val projectPath = projectRepoFullPath(projectId, repoName, fullPath) val searchReq = SearchRequest( query = projectPath, topK = 10, similarityThreshold = aiProperties.defaultSimilarityThreshold ) - val docs = createVectorStore(0L).similaritySearch(searchReq).ifEmpty { - createVectorStore(1L).similaritySearch(searchReq) + + val docs = try { + createVectorStore(0L).similaritySearch(searchReq).ifEmpty { + createVectorStore(1L).similaritySearch(searchReq) + } + } catch (e: MilvusRestApiException) { + logger.error("search similar path failed: ${e.message}") + emptyList() } if (docs.isEmpty()) { logger.info("no similarity path found for [$projectPath]") @@ -95,17 +102,16 @@ class ArtifactSimilarityPreloadPlanGenerator( val now = LocalDateTime.now() val preloadHour = preloadHourOfDay.firstOrNull { it > now.hour } ?: (preloadHourOfDay.first { (it + 24) > now.hour } + 24) - val preloadTimestamp = now + val preloadDateTime = now // 设置预加载时间 .plusHours((preloadHour - now.hour).toLong()) .withMinute(0) // 减去随机时间,避免同时多文件触发加载 .minusSeconds(Random.nextLong(0, preloadProperties.maxRandomSeconds)) - // 转化为毫秒时间戳 - .atZone(ZoneId.systemDefault()) - .toEpochSecond() * 1000 + // 转化为毫秒时间戳 + val preloadTimestamp = preloadDateTime.atZone(ZoneId.systemDefault()).toEpochSecond() * 1000 logger.info( - "similarity path[${docs.first().content}] found for [$projectPath], will preload on $preloadTimestamp" + "similarity path[${docs.first().content}] found for [$projectPath], will preload on $preloadDateTime" ) return preloadTimestamp } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/SimilarityPathUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/SimilarityPathUtils.kt new file mode 100644 index 0000000000..ed969c1656 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/SimilarityPathUtils.kt @@ -0,0 +1,46 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.task.cache.preload + +import com.tencent.bkrepo.auth.constant.PIPELINE + +/** + * 拼接projectId、repoName、fullPath用于向量化 + * 流水线仓库路径/p-xxx/b-xxx/xxx中的构建id为随机生成,不参与相似度计算 + */ +fun projectRepoFullPath(projectId: String, repoName: String, fullPath: String): String { + return if (repoName == PIPELINE) { + // 流水线仓库路径/p-xxx/b-xxx/xxx中的构建id不参与相似度计算 + val secondSlashIndex = fullPath.indexOf("/", 1) + val pipelinePath = fullPath.substring(0, secondSlashIndex) + val artifactPath = fullPath.substring(fullPath.indexOf("/", secondSlashIndex + 1)) + "/$projectId/$repoName$pipelinePath$artifactPath" + } else { + "/$projectId/$repoName$fullPath" + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusClient.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusClient.kt index a3bf519364..9648872a15 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusClient.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusClient.kt @@ -117,7 +117,9 @@ class MilvusClient( object : TypeReference>() {} ) if (res.code != 0) { - throw RuntimeException("request milvus failed, code: $code, message: ${res.message}") + throw MilvusRestApiException( + res.code, "request milvus failed, code: $code, message: ${res.message}" + ) } return handler(res.data) } else { @@ -126,7 +128,7 @@ class MilvusClient( } else { "" } - throw RuntimeException("request milvus failed, code: $code, message: $message") + throw MilvusRestApiException(code, "request milvus failed, code: $code, message: $message") } } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusRestApiException.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusRestApiException.kt new file mode 100644 index 0000000000..9d6ccb748d --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/cache/preload/ai/milvus/MilvusRestApiException.kt @@ -0,0 +1,30 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus + +class MilvusRestApiException(val code: Int, msg: String) : RuntimeException(msg) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetrics.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetrics.kt index 8521a309ad..9e7217753d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetrics.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetrics.kt @@ -27,6 +27,8 @@ package com.tencent.bkrepo.job.metrics +import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver +import com.tencent.bkrepo.job.batch.file.NodeRetainResolver import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.binder.BaseUnits @@ -36,12 +38,13 @@ import java.util.concurrent.ConcurrentHashMap @Component class StorageCacheMetrics( private val registry: MeterRegistry, + private val retainResolver: FileRetainResolver? = null, ) { private val cacheSizeMap = ConcurrentHashMap() private val cacheCountMap = ConcurrentHashMap() - private val retainSizeMap = ConcurrentHashMap() - private val retainCountMap = ConcurrentHashMap() + private val storageProjectRetainSizeMap = ConcurrentHashMap>() + private val storageProjectRetainCountMap = ConcurrentHashMap>() /** * 设置当前缓存总大小及数量,由于目前只有Job服务在清理缓存时会统计,因此只有Job服务会调用该方法 @@ -56,11 +59,44 @@ class StorageCacheMetrics( /** * 设置根据策略保留的文件总大小及数量 */ - fun setRetainCacheMetrics(storageKey: String, size: Long, count: Long) { - retainSizeMap[storageKey] = size - gauge(CACHE_RETAIN_SIZE, storageKey, retainSizeMap, "storage cache retain size", BaseUnits.BYTES) - retainCountMap[storageKey] = count - gauge(CACHE_RETAIN_COUNT, storageKey, retainCountMap, "storage cache retain count") + fun setProjectRetainCacheMetrics(storageKey: String, sha256Set: Set) { + if (retainResolver !is NodeRetainResolver) { + return + } + + val projectRetainSize = ConcurrentHashMap() + val projectRetainCount = ConcurrentHashMap() + sha256Set.forEach { sha256 -> + retainResolver.getRetainNode(sha256)?.let { retainNode -> + val projectId = retainNode.projectId + val size = projectRetainSize[projectId] ?: 0L + val count = projectRetainCount[projectId] ?: 0L + projectRetainSize[projectId] = size + retainNode.size + projectRetainCount[projectId] = count + 1 + } + } + + storageProjectRetainSizeMap[storageKey] = projectRetainSize + projectRetainSize.forEach { (projectId, _) -> + projectGauge( + CACHE_RETAIN_SIZE, + storageKey, + projectId, + storageProjectRetainSizeMap, + "storage cache retain size", + BaseUnits.BYTES + ) + } + storageProjectRetainCountMap[storageKey] = projectRetainCount + projectRetainCount.forEach { (projectId, _) -> + projectGauge( + CACHE_RETAIN_COUNT, + storageKey, + projectId, + storageProjectRetainCountMap, + "storage cache retain count", + ) + } } private fun gauge(name: String, storageKey: String, data: Map, des: String, unit: String? = null) { @@ -71,11 +107,28 @@ class StorageCacheMetrics( .register(registry) } + private fun projectGauge( + name: String, + storageKey: String, + projectId: String, + data: Map>, + des: String, + unit: String? = null + ) { + Gauge.builder(name, data) { it[storageKey]?.get(projectId)?.toDouble() ?: 0.0 } + .baseUnit(unit) + .tag(TAG_STORAGE_KEY, storageKey) + .tag(TAG_PROJECT_ID, projectId) + .description(des) + .register(registry) + } + companion object { const val CACHE_SIZE = "storage.cache.size" const val CACHE_COUNT = "storage.cache.count" const val CACHE_RETAIN_SIZE = "storage.cache.retain.size" const val CACHE_RETAIN_COUNT = "storage.cache.retain.count" const val TAG_STORAGE_KEY = "storageKey" + const val TAG_PROJECT_ID = "projectId" } } diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetricsTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetricsTest.kt index 78563ed887..310d286ecd 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetricsTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/metrics/StorageCacheMetricsTest.kt @@ -1,37 +1,69 @@ package com.tencent.bkrepo.job.metrics +import com.tencent.bkrepo.job.UT_PROJECT_ID +import com.tencent.bkrepo.job.UT_REPO_NAME +import com.tencent.bkrepo.job.UT_SHA256 +import com.tencent.bkrepo.job.batch.file.NodeRetainResolver +import com.tencent.bkrepo.job.batch.file.RetainNode +import com.tencent.bkrepo.job.metrics.StorageCacheMetrics.Companion.CACHE_RETAIN_COUNT +import com.tencent.bkrepo.job.metrics.StorageCacheMetrics.Companion.CACHE_RETAIN_SIZE +import com.tencent.bkrepo.job.metrics.StorageCacheMetrics.Companion.TAG_PROJECT_ID +import com.tencent.bkrepo.job.metrics.StorageCacheMetrics.Companion.TAG_STORAGE_KEY import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.mockito.ArgumentMatchers.anyString +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class StorageCacheMetricsTest { + + private lateinit var retainResolver: NodeRetainResolver + + @BeforeAll + fun beforeAll() { + retainResolver = mock() + whenever(retainResolver.getRetainNode(anyString())).thenReturn( + RetainNode( + UT_PROJECT_ID, UT_REPO_NAME, "/a/b/c.txt", UT_SHA256, 100 + ) + ) + } + @Test fun test() { val storageKey = "testKey" val registry = SimpleMeterRegistry() - val metrics = StorageCacheMetrics(registry) + val metrics = StorageCacheMetrics(registry, retainResolver) metrics.setCacheMetrics(storageKey, 1000L, 20L) - metrics.setRetainCacheMetrics(storageKey, 500L, 10L) + metrics.setProjectRetainCacheMetrics(storageKey, setOf(UT_SHA256)) val sizeMeter = registry.getMeter(StorageCacheMetrics.CACHE_SIZE, storageKey) - val retainSizeMeter = registry.getMeter(StorageCacheMetrics.CACHE_RETAIN_SIZE, storageKey) + val retainSizeMeter = registry.getProjectMeter(CACHE_RETAIN_SIZE, storageKey, UT_PROJECT_ID) val countMeter = registry.getMeter(StorageCacheMetrics.CACHE_COUNT, storageKey) - val retainCountMeter = registry.getMeter(StorageCacheMetrics.CACHE_RETAIN_COUNT, storageKey) + val retainCountMeter = registry.getProjectMeter(CACHE_RETAIN_COUNT, storageKey, UT_PROJECT_ID) Assertions.assertEquals(1000.0, sizeMeter.value()) - Assertions.assertEquals(500.0, retainSizeMeter.value()) + Assertions.assertEquals(100.0, retainSizeMeter.value()) Assertions.assertEquals(20.0, countMeter.value()) - Assertions.assertEquals(10.0, retainCountMeter.value()) + Assertions.assertEquals(1.0, retainCountMeter.value()) // 测试更新后的统计值 metrics.setCacheMetrics(storageKey, 2000L, 40L) - metrics.setRetainCacheMetrics(storageKey, 1000L, 30L) + metrics.setProjectRetainCacheMetrics(storageKey, setOf("sha256-1", "sha256-2")) Assertions.assertEquals(2000.0, sizeMeter.value()) - Assertions.assertEquals(1000.0, retainSizeMeter.value()) + Assertions.assertEquals(200.0, retainSizeMeter.value()) Assertions.assertEquals(40.0, countMeter.value()) - Assertions.assertEquals(30.0, retainCountMeter.value()) + Assertions.assertEquals(2.0, retainCountMeter.value()) } private fun MeterRegistry.getMeter(name: String, storageKey: String) = get(name) - .tags(StorageCacheMetrics.TAG_STORAGE_KEY, storageKey) + .tags(TAG_STORAGE_KEY, storageKey) + .gauge() + + private fun MeterRegistry.getProjectMeter(name: String, storageKey: String, projectId: String) = get(name) + .tags(TAG_STORAGE_KEY, storageKey, TAG_PROJECT_ID, projectId) .gauge() }