diff --git a/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/repository/BlobRefRepository.kt b/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/repository/BlobRefRepository.kt index 2b4e6808b0..9a21311458 100644 --- a/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/repository/BlobRefRepository.kt +++ b/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/repository/BlobRefRepository.kt @@ -11,7 +11,18 @@ import org.springframework.stereotype.Repository @Repository class BlobRefRepository : SimpleMongoDao() { - fun addRefToBlob(projectId: String, repoName: String, bucket: String, refKey: String, blobIds: Set) { + + /** + * 添加blob与ref关系,返回插入成功的blobId列表 + */ + fun addRefToBlob( + projectId: String, + repoName: String, + bucket: String, + refKey: String, + blobIds: Set + ): Set { + val addedBlobIds = HashSet() if (blobIds.size > DEFAULT_BLOB_SIZE_LIMIT) { val ref = "$projectId/$repoName/${buildRef(bucket, refKey)}" logger.error("blobs of ref[$ref] exceed size limit, size[${blobIds.size}]]") @@ -27,9 +38,11 @@ class BlobRefRepository : SimpleMongoDao() { ref = buildRef(bucket, refKey) ) ) + addedBlobIds.add(it) } catch (ignore: DuplicateKeyException) { } } + return addedBlobIds } fun removeRefFromBlob(projectId: String, repoName: String, bucket: String, refKey: String): List { diff --git a/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/service/BlobService.kt b/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/service/BlobService.kt index 111465478b..d69d313128 100644 --- a/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/service/BlobService.kt +++ b/src/backend/ddc/biz-ddc/src/main/kotlin/com/tencent/bkrepo/ddc/service/BlobService.kt @@ -109,14 +109,20 @@ class BlobService( } fun addRefToBlobs(ref: Reference, blobIds: Set) { - blobRefRepository.addRefToBlob(ref.projectId, ref.repoName, ref.bucket, ref.key.toString(), blobIds) - blobRepository.incRefCount(ref.projectId, ref.repoName, blobIds) + with(ref) { + val addedBlobIds = blobRefRepository.addRefToBlob(projectId, repoName, bucket, key.toString(), blobIds) + if (addedBlobIds.isNotEmpty()) { + blobRepository.incRefCount(projectId, repoName, addedBlobIds) + } + } } fun removeRefFromBlobs(projectId: String, repoName: String, bucket: String, key: String) { val blobIds = HashSet() blobRefRepository.removeRefFromBlob(projectId, repoName, bucket, key).mapTo(blobIds) { it.blobId } - blobRepository.incRefCount(projectId, repoName, blobIds, -1L) + if (blobIds.isNotEmpty()) { + blobRepository.incRefCount(projectId, repoName, blobIds, -1L) + } // 兼容旧逻辑,所有blob的references字段为空后可以移除该逻辑 blobRepository.removeRefFromBlob(projectId, repoName, bucket, key) } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/DdcBlobRefCountCorrectJobContext.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/DdcBlobRefCountCorrectJobContext.kt new file mode 100644 index 0000000000..967def4fbf --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/DdcBlobRefCountCorrectJobContext.kt @@ -0,0 +1,36 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.context + +import com.google.common.hash.BloomFilter +import com.tencent.bkrepo.job.batch.base.JobContext + +@Suppress("UnstableApiUsage") +class DdcBlobRefCountCorrectJobContext( + val bf: BloomFilter, +) : JobContext() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJob.kt new file mode 100644 index 0000000000..f0dcc69fb5 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJob.kt @@ -0,0 +1,113 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.task.ddc + +import com.google.common.hash.BloomFilter +import com.google.common.hash.Funnels +import com.tencent.bkrepo.common.mongo.constant.ID +import com.tencent.bkrepo.job.BATCH_SIZE +import com.tencent.bkrepo.job.batch.base.MongoDbBatchJob +import com.tencent.bkrepo.job.batch.context.DdcBlobRefCountCorrectJobContext +import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Blob +import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Companion.COLLECTION_NAME +import com.tencent.bkrepo.job.batch.task.ddc.ExpiredDdcRefCleanupJob.Companion.COLLECTION_NAME_BLOB_REF +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.config.properties.DdcBlobRefCountCorrectJobProperties +import org.slf4j.LoggerFactory +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.Update +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.stereotype.Component +import java.nio.charset.StandardCharsets + +/** + * DDC blob 引用数校正任务 + * 仅处理blob-ref关系已删除,但是blob.refCount不为0的情况 + */ +@Component +@EnableConfigurationProperties(DdcBlobRefCountCorrectJobProperties::class) +@Suppress("UnstableApiUsage") +class DdcBlobRefCountCorrectJob( + private val properties: DdcBlobRefCountCorrectJobProperties, +) : MongoDbBatchJob(properties) { + + override fun createJobContext(): DdcBlobRefCountCorrectJobContext { + val bf = BloomFilter.create( + Funnels.stringFunnel(StandardCharsets.UTF_8), + properties.expectedRefs, + properties.fpp, + ) + logger.info("Start to build ref bloom filter") + val query = Query() + query.fields().include(ExpiredDdcRefCleanupJob.BlobRef::blobId.name) + NodeCommonUtils.findByCollection(Query(), BATCH_SIZE, COLLECTION_NAME_BLOB_REF) { + bf.put(it[ExpiredDdcRefCleanupJob.BlobRef::blobId.name]!!.toString()) + } + val count = "approximate(${bf.approximateElementCount()})/expected(${properties.expectedRefs})" + logger.info("Build ref bloom filter successful, count: $count, fpp: ${bf.expectedFpp()}") + return DdcBlobRefCountCorrectJobContext(bf) + } + + override fun run(row: Blob, collectionName: String, context: DdcBlobRefCountCorrectJobContext) { + if (row.refCount == 0L || context.bf.mightContain(row.blobId)) { + // blob引用已经为0,或者blob引用大概率存在时不处理直接返回 + return + } + + // blob未被引用,更新refCount为0 + val criteria = Criteria + .where(Blob::projectId.name).isEqualTo(row.projectId) + .and(Blob::repoName.name).isEqualTo(row.repoName) + .and(Blob::blobId.name).isEqualTo(row.blobId) + val update = Update().set(Blob::refCount.name, 0L) + mongoTemplate.updateFirst(Query(criteria), update, COLLECTION_NAME) + } + + override fun collectionNames() = listOf(COLLECTION_NAME) + + override fun buildQuery() = Query() + + override fun mapToEntity(row: Map): Blob { + return Blob( + id = row[ID]!!.toString(), + projectId = row[Blob::projectId.name]!!.toString(), + repoName = row[Blob::repoName.name]!!.toString(), + blobId = row[Blob::blobId.name]!!.toString(), + references = (row[Blob::references.name] as? List)?.toSet() ?: emptySet(), + refCount = row[Blob::refCount.name]?.toString()?.toLong() ?: 0L, + ) + } + + override fun entityClass() = Blob::class + + companion object { + private val logger = LoggerFactory.getLogger(DdcBlobRefCountCorrectJobContext::class.java) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/DdcBlobRefCountCorrectJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/DdcBlobRefCountCorrectJobProperties.kt new file mode 100644 index 0000000000..ccd5f0b0a2 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/DdcBlobRefCountCorrectJobProperties.kt @@ -0,0 +1,44 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.config.properties + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(value = "job.ddc-blob-ref-count-correct") +class DdcBlobRefCountCorrectJobProperties( + override var cron: String = "0 0 1 * * ?", + /** + * 预期blob与ref关系总数 + */ + var expectedRefs: Long = 10_000_000, + /** + * 布隆过滤器的误报率。 + * 误报率较高,会导致更多的数据库查询,但不影响节点清理的正确性,误报率越低,消耗的内存越大。 + * */ + var fpp: Double = 0.0001, +) : MongodbJobProperties() diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJobTest.kt new file mode 100644 index 0000000000..d405f07078 --- /dev/null +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/ddc/DdcBlobRefCountCorrectJobTest.kt @@ -0,0 +1,68 @@ +package com.tencent.bkrepo.job.batch.task.ddc + +import com.tencent.bkrepo.common.metadata.service.log.OperateLogService +import com.tencent.bkrepo.common.metadata.service.node.NodeService +import com.tencent.bkrepo.job.batch.JobBaseTest +import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Companion.COLLECTION_NAME +import com.tencent.bkrepo.job.batch.task.ddc.DdcTestUtils.insertBlob +import com.tencent.bkrepo.job.batch.task.ddc.DdcTestUtils.insertBlobRef +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.migrate.MigrateRepoStorageService +import com.tencent.bkrepo.job.separation.service.SeparationTaskService +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.findOne +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.isEqualTo + +@DisplayName("DDC Blob引用数量校正测试") +@DataMongoTest +class DdcBlobRefCountCorrectJobTest @Autowired constructor( + private val blobRefCountCorrectJob: DdcBlobRefCountCorrectJob, + private val mongoTemplate: MongoTemplate, +) : JobBaseTest() { + @MockBean + lateinit var operateLogService: OperateLogService + + @MockBean + private lateinit var nodeService: NodeService + + @MockBean + private lateinit var migrateService: MigrateRepoStorageService + + @MockBean + private lateinit var separateTaskService: SeparationTaskService + + @Autowired + lateinit var nodeCommonUtils: NodeCommonUtils + + @Test + fun test() { + generateData() + blobRefCountCorrectJob.start() + assertRefCount("0", 0) + assertRefCount("1", 0) + assertRefCount("2", 1) + } + + private fun assertRefCount(blobId: String, expectedRefCount: Long) { + val criteria = DdcBlobCleanupJob.Blob::blobId.isEqualTo(blobId) + val blob = mongoTemplate.findOne(Query(criteria), COLLECTION_NAME)!! + assertEquals(expectedRefCount, blob.refCount) + } + + private fun generateData() { + val ref = "ref/legacytexture/366f955a863296d36ce99868d015752ad0e29f81" + mongoTemplate.insertBlob(generateObjectId(0), "0", setOf(ref), 0L) + mongoTemplate.insertBlob(generateObjectId(1), "1", emptySet(), 1L) + mongoTemplate.insertBlob(generateObjectId(2), "2", emptySet(), 1L) + mongoTemplate.insertBlobRef(generateObjectId(2), "2", "2") + } + + private fun generateObjectId(index: Int) = "11bb03c690c9fab0c5e3ed9$index" +}