Skip to content

Commit

Permalink
fix: 修复重复增加ddc blob引用数 #2825
Browse files Browse the repository at this point in the history
* fix: 修复重复增加blob引用数 #2825

* fix: 修复重复增加ddc blob引用数 #2825

* fix: 修复重复增加ddc blob引用数 #2825
  • Loading branch information
cnlkl authored Dec 9, 2024
1 parent 785742b commit 31e9afc
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ import org.springframework.stereotype.Repository

@Repository
class BlobRefRepository : SimpleMongoDao<TDdcBlobRef>() {
fun addRefToBlob(projectId: String, repoName: String, bucket: String, refKey: String, blobIds: Set<String>) {

/**
* 添加blob与ref关系,返回插入成功的blobId列表
*/
fun addRefToBlob(
projectId: String,
repoName: String,
bucket: String,
refKey: String,
blobIds: Set<String>
): Set<String> {
val addedBlobIds = HashSet<String>()
if (blobIds.size > DEFAULT_BLOB_SIZE_LIMIT) {
val ref = "$projectId/$repoName/${buildRef(bucket, refKey)}"
logger.error("blobs of ref[$ref] exceed size limit, size[${blobIds.size}]]")
Expand All @@ -27,9 +38,11 @@ class BlobRefRepository : SimpleMongoDao<TDdcBlobRef>() {
ref = buildRef(bucket, refKey)
)
)
addedBlobIds.add(it)
} catch (ignore: DuplicateKeyException) {
}
}
return addedBlobIds
}

fun removeRefFromBlob(projectId: String, repoName: String, bucket: String, refKey: String): List<TDdcBlobRef> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,20 @@ class BlobService(
}

fun addRefToBlobs(ref: Reference, blobIds: Set<String>) {
blobRefRepository.addRefToBlob(ref.projectId, ref.repoName, ref.bucket, ref.key.toString(), blobIds)
blobRepository.incRefCount(ref.projectId, ref.repoName, blobIds)
with(ref) {
val addedBlobIds = blobRefRepository.addRefToBlob(projectId, repoName, bucket, key.toString(), blobIds)
if (addedBlobIds.isNotEmpty()) {
blobRepository.incRefCount(projectId, repoName, addedBlobIds)
}
}
}

fun removeRefFromBlobs(projectId: String, repoName: String, bucket: String, key: String) {
val blobIds = HashSet<String>()
blobRefRepository.removeRefFromBlob(projectId, repoName, bucket, key).mapTo(blobIds) { it.blobId }
blobRepository.incRefCount(projectId, repoName, blobIds, -1L)
if (blobIds.isNotEmpty()) {
blobRepository.incRefCount(projectId, repoName, blobIds, -1L)
}
// 兼容旧逻辑,所有blob的references字段为空后可以移除该逻辑
blobRepository.removeRefFromBlob(projectId, repoName, bucket, key)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.context

import com.google.common.hash.BloomFilter
import com.tencent.bkrepo.job.batch.base.JobContext

@Suppress("UnstableApiUsage")
class DdcBlobRefCountCorrectJobContext(
val bf: BloomFilter<CharSequence>,
) : JobContext()
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.ddc

import com.google.common.hash.BloomFilter
import com.google.common.hash.Funnels
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.job.BATCH_SIZE
import com.tencent.bkrepo.job.batch.base.MongoDbBatchJob
import com.tencent.bkrepo.job.batch.context.DdcBlobRefCountCorrectJobContext
import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Blob
import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Companion.COLLECTION_NAME
import com.tencent.bkrepo.job.batch.task.ddc.ExpiredDdcRefCleanupJob.Companion.COLLECTION_NAME_BLOB_REF
import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils
import com.tencent.bkrepo.job.config.properties.DdcBlobRefCountCorrectJobProperties
import org.slf4j.LoggerFactory
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.Update
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.stereotype.Component
import java.nio.charset.StandardCharsets

/**
* DDC blob 引用数校正任务
* 仅处理blob-ref关系已删除,但是blob.refCount不为0的情况
*/
@Component
@EnableConfigurationProperties(DdcBlobRefCountCorrectJobProperties::class)
@Suppress("UnstableApiUsage")
class DdcBlobRefCountCorrectJob(
private val properties: DdcBlobRefCountCorrectJobProperties,
) : MongoDbBatchJob<Blob, DdcBlobRefCountCorrectJobContext>(properties) {

override fun createJobContext(): DdcBlobRefCountCorrectJobContext {
val bf = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
properties.expectedRefs,
properties.fpp,
)
logger.info("Start to build ref bloom filter")
val query = Query()
query.fields().include(ExpiredDdcRefCleanupJob.BlobRef::blobId.name)
NodeCommonUtils.findByCollection(Query(), BATCH_SIZE, COLLECTION_NAME_BLOB_REF) {
bf.put(it[ExpiredDdcRefCleanupJob.BlobRef::blobId.name]!!.toString())
}
val count = "approximate(${bf.approximateElementCount()})/expected(${properties.expectedRefs})"
logger.info("Build ref bloom filter successful, count: $count, fpp: ${bf.expectedFpp()}")
return DdcBlobRefCountCorrectJobContext(bf)
}

override fun run(row: Blob, collectionName: String, context: DdcBlobRefCountCorrectJobContext) {
if (row.refCount == 0L || context.bf.mightContain(row.blobId)) {
// blob引用已经为0,或者blob引用大概率存在时不处理直接返回
return
}

// blob未被引用,更新refCount为0
val criteria = Criteria
.where(Blob::projectId.name).isEqualTo(row.projectId)
.and(Blob::repoName.name).isEqualTo(row.repoName)
.and(Blob::blobId.name).isEqualTo(row.blobId)
val update = Update().set(Blob::refCount.name, 0L)
mongoTemplate.updateFirst(Query(criteria), update, COLLECTION_NAME)
}

override fun collectionNames() = listOf(COLLECTION_NAME)

override fun buildQuery() = Query()

override fun mapToEntity(row: Map<String, Any?>): Blob {
return Blob(
id = row[ID]!!.toString(),
projectId = row[Blob::projectId.name]!!.toString(),
repoName = row[Blob::repoName.name]!!.toString(),
blobId = row[Blob::blobId.name]!!.toString(),
references = (row[Blob::references.name] as? List<String>)?.toSet() ?: emptySet(),
refCount = row[Blob::refCount.name]?.toString()?.toLong() ?: 0L,
)
}

override fun entityClass() = Blob::class

companion object {
private val logger = LoggerFactory.getLogger(DdcBlobRefCountCorrectJobContext::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.config.properties

import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(value = "job.ddc-blob-ref-count-correct")
class DdcBlobRefCountCorrectJobProperties(
override var cron: String = "0 0 1 * * ?",
/**
* 预期blob与ref关系总数
*/
var expectedRefs: Long = 10_000_000,
/**
* 布隆过滤器的误报率。
* 误报率较高,会导致更多的数据库查询,但不影响节点清理的正确性,误报率越低,消耗的内存越大。
* */
var fpp: Double = 0.0001,
) : MongodbJobProperties()
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.tencent.bkrepo.job.batch.task.ddc

import com.tencent.bkrepo.common.metadata.service.log.OperateLogService
import com.tencent.bkrepo.common.metadata.service.node.NodeService
import com.tencent.bkrepo.job.batch.JobBaseTest
import com.tencent.bkrepo.job.batch.task.ddc.DdcBlobCleanupJob.Companion.COLLECTION_NAME
import com.tencent.bkrepo.job.batch.task.ddc.DdcTestUtils.insertBlob
import com.tencent.bkrepo.job.batch.task.ddc.DdcTestUtils.insertBlobRef
import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils
import com.tencent.bkrepo.job.migrate.MigrateRepoStorageService
import com.tencent.bkrepo.job.separation.service.SeparationTaskService
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.findOne
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo

@DisplayName("DDC Blob引用数量校正测试")
@DataMongoTest
class DdcBlobRefCountCorrectJobTest @Autowired constructor(
private val blobRefCountCorrectJob: DdcBlobRefCountCorrectJob,
private val mongoTemplate: MongoTemplate,
) : JobBaseTest() {
@MockBean
lateinit var operateLogService: OperateLogService

@MockBean
private lateinit var nodeService: NodeService

@MockBean
private lateinit var migrateService: MigrateRepoStorageService

@MockBean
private lateinit var separateTaskService: SeparationTaskService

@Autowired
lateinit var nodeCommonUtils: NodeCommonUtils

@Test
fun test() {
generateData()
blobRefCountCorrectJob.start()
assertRefCount("0", 0)
assertRefCount("1", 0)
assertRefCount("2", 1)
}

private fun assertRefCount(blobId: String, expectedRefCount: Long) {
val criteria = DdcBlobCleanupJob.Blob::blobId.isEqualTo(blobId)
val blob = mongoTemplate.findOne<DdcBlobCleanupJob.Blob>(Query(criteria), COLLECTION_NAME)!!
assertEquals(expectedRefCount, blob.refCount)
}

private fun generateData() {
val ref = "ref/legacytexture/366f955a863296d36ce99868d015752ad0e29f81"
mongoTemplate.insertBlob(generateObjectId(0), "0", setOf(ref), 0L)
mongoTemplate.insertBlob(generateObjectId(1), "1", emptySet(), 1L)
mongoTemplate.insertBlob(generateObjectId(2), "2", emptySet(), 1L)
mongoTemplate.insertBlobRef(generateObjectId(2), "2", "2")
}

private fun generateObjectId(index: Int) = "11bb03c690c9fab0c5e3ed9$index"
}

0 comments on commit 31e9afc

Please sign in to comment.