Skip to content

Commit

Permalink
feat: 增加ddc服务批量操作接口 #2776
Browse files Browse the repository at this point in the history
* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776

* feat: 增加ddc服务批量操作接口 #2776
  • Loading branch information
cnlkl authored Dec 5, 2024
1 parent 9bb349d commit 794126e
Show file tree
Hide file tree
Showing 17 changed files with 851 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.artifact.resolve.file.memory

import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.common.artifact.hash.md5
import com.tencent.bkrepo.common.artifact.hash.sha1
import com.tencent.bkrepo.common.artifact.hash.sha256
import java.io.File
import java.io.InputStream

class ByteArrayArtifactFile(
private val data: ByteArray,
) : ArtifactFile {

private val sha1: String by lazy { getInputStream().sha1() }
private val sha256: String by lazy { getInputStream().sha256() }
private val md5: String by lazy { getInputStream().md5() }

override fun getInputStream(): InputStream {
return data.inputStream()
}

override fun getSize(): Long {
return data.size.toLong()
}

override fun isInMemory(): Boolean {
return true
}

override fun getFile(): File? {
throw UnsupportedOperationException("not implemented")
}

override fun flushToFile(): File {
throw UnsupportedOperationException("not implemented")
}

override fun delete() {
throw UnsupportedOperationException("not implemented")
}

override fun hasInitialized(): Boolean {
return true
}

override fun isFallback(): Boolean {
return false
}

override fun isInLocalDisk(): Boolean {
return false
}

override fun getFileMd5(): String {
return md5
}

override fun getFileSha1(): String {
return sha1
}

override fun getFileSha256(): String {
return sha256
}

fun byteArray() = data
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import com.tencent.bkrepo.common.artifact.message.ArtifactMessageCode
import com.tencent.bkrepo.common.artifact.repository.context.ArtifactDownloadContext
import com.tencent.bkrepo.common.artifact.repository.context.ArtifactUploadContext
import com.tencent.bkrepo.common.artifact.repository.local.LocalRepository
import com.tencent.bkrepo.common.artifact.resolve.file.memory.ByteArrayArtifactFile
import com.tencent.bkrepo.common.artifact.resolve.response.ArtifactResource
import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream
import com.tencent.bkrepo.common.artifact.stream.Range
Expand All @@ -55,6 +56,7 @@ import com.tencent.bkrepo.ddc.exception.ReferenceIsMissingBlobsException
import com.tencent.bkrepo.ddc.metrics.DdcMeterBinder
import com.tencent.bkrepo.ddc.pojo.Blob
import com.tencent.bkrepo.ddc.pojo.ContentHash
import com.tencent.bkrepo.ddc.pojo.CreateRefResponse
import com.tencent.bkrepo.ddc.pojo.Reference
import com.tencent.bkrepo.ddc.pojo.UploadCompressedBlobResponse
import com.tencent.bkrepo.ddc.serialization.CbObject
Expand All @@ -73,6 +75,7 @@ import com.tencent.bkrepo.ddc.utils.isAttachment
import com.tencent.bkrepo.ddc.utils.isBinaryAttachment
import com.tencent.bkrepo.repository.pojo.metadata.MetadataModel
import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
import com.tencent.bkrepo.repository.pojo.repo.RepositoryDetail
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.io.ByteArrayInputStream
Expand Down Expand Up @@ -207,21 +210,13 @@ class DdcLocalRepository(
}

private fun onUploadReference(context: ArtifactUploadContext) {
val contentType = context.request.contentType
val artifactInfo = context.artifactInfo as ReferenceArtifactInfo
when (contentType) {
when (val contentType = context.request.contentType) {
MEDIA_TYPE_UNREAL_COMPACT_BINARY -> {
val payload = context.getArtifactFile().getInputStream().use { it.readBytes() }
val ref = referenceService.create(Reference.from(artifactInfo, payload))
if (ref.inlineBlob == null) {
// inlineBlob为null时表示inlineBlob过大,需要存到文件中
val nodeCreateRequest = buildRefNodeCreateRequest(context)
storageManager.storeArtifactFile(
nodeCreateRequest, context.getArtifactFile(), context.storageCredentials
)
}
val res = referenceService.finalize(ref, payload)
HttpContextHolder.getResponse().writer.println(res.toJsonString())
val artifactInfo = context.artifactInfo as ReferenceArtifactInfo
val artifactFile = context.getArtifactFile()
val repoDetail = context.repositoryDetail
val res = uploadReference(repoDetail, artifactInfo, artifactFile, context.userId).toJsonString()
HttpContextHolder.getResponse().writer.println(res)
}

else -> throw BadRequestException(
Expand All @@ -230,8 +225,32 @@ class DdcLocalRepository(
}
}

private fun buildRefNodeCreateRequest(context: ArtifactUploadContext): NodeCreateRequest {
val artifactInfo = context.artifactInfo as ReferenceArtifactInfo
fun uploadReference(
repositoryDetail: RepositoryDetail,
artifactInfo: ReferenceArtifactInfo,
artifactFile: ArtifactFile,
operator: String,
): CreateRefResponse {
val payload = if (artifactFile is ByteArrayArtifactFile) {
artifactFile.byteArray()
} else {
artifactFile.getInputStream().use { it.readBytes() }
}
val ref = referenceService.create(Reference.from(artifactInfo, payload), operator)
if (ref.inlineBlob == null) {
// inlineBlob为null时表示inlineBlob过大,需要存到文件中
val nodeCreateRequest = buildRefNodeCreateRequest(repositoryDetail, artifactInfo, artifactFile, operator)
storageManager.storeArtifactFile(nodeCreateRequest, artifactFile, repositoryDetail.storageCredentials)
}
return referenceService.finalize(ref, payload)
}

private fun buildRefNodeCreateRequest(
repositoryDetail: RepositoryDetail,
artifactInfo: ReferenceArtifactInfo,
artifactFile: ArtifactFile,
operator: String,
): NodeCreateRequest {
val metadata = ArrayList<MetadataModel>()
metadata.add(
MetadataModel(
Expand All @@ -241,7 +260,15 @@ class DdcLocalRepository(
)
)

return buildNodeCreateRequest(context).copy(
return NodeCreateRequest(
projectId = repositoryDetail.projectId,
repoName = repositoryDetail.name,
folder = false,
fullPath = artifactInfo.getArtifactFullPath(),
size = artifactFile.getSize(),
sha256 = artifactFile.getFileSha256(),
md5 = artifactFile.getFileMd5(),
operator = operator,
overwrite = true,
nodeMetadata = metadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,46 @@

package com.tencent.bkrepo.ddc.config

import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.task.TaskExecutorBuilder
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Lazy
import org.springframework.context.annotation.Primary
import org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(DdcProperties::class)
class DdcConfiguration
class DdcConfiguration {
@Bean(BEAN_NAME_REF_BATCH_EXECUTOR)
fun refBatchExecutor(ddcProperties: DdcProperties): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = ddcProperties.refBatchWorker
maxPoolSize = ddcProperties.refBatchWorker
setAllowCoreThreadTimeOut(true)
keepAliveSeconds = 60
queueCapacity = ddcProperties.refBatchQueueSize
threadNamePrefix = "ddc-ref-batch-%d"
setRejectedExecutionHandler(CallerRunsPolicy())
}
}

@Lazy
@Bean(
name = [
TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME
]
)
@Primary
fun applicationTaskExecutor(builder: TaskExecutorBuilder): ThreadPoolTaskExecutor {
return builder.build()
}

companion object {
const val BEAN_NAME_REF_BATCH_EXECUTOR = "refBatchExecutor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ import org.springframework.util.unit.DataUnit
data class DdcProperties(
var inlineBlobMaxSize: DataSize = DataSize.of(64L, DataUnit.KILOBYTES),
var updateRefLastAccessTime: Boolean = true,
/**
* 批量操作接口工作线程数
*/
var refBatchWorker: Int = Runtime.getRuntime().availableProcessors() * 2,
/**
* 批量操作接口线程池队列大小
*/
var refBatchQueueSize: Int = 16,
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import com.tencent.bkrepo.common.api.message.CommonMessageCode.PARAMETER_INVALID
import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.common.artifact.api.ArtifactPathVariable
import com.tencent.bkrepo.common.artifact.audit.ActionAuditContent
import com.tencent.bkrepo.common.artifact.audit.NODE_CREATE_ACTION
import com.tencent.bkrepo.common.artifact.audit.NODE_DOWNLOAD_ACTION
import com.tencent.bkrepo.common.artifact.audit.NODE_RESOURCE
import com.tencent.bkrepo.common.artifact.audit.NODE_CREATE_ACTION
import com.tencent.bkrepo.common.service.util.HttpContextHolder
import com.tencent.bkrepo.ddc.artifact.ReferenceArtifactInfo
import com.tencent.bkrepo.ddc.artifact.repository.DdcLocalRepository.Companion.HEADER_NAME_HASH
import com.tencent.bkrepo.ddc.component.PermissionHelper
import com.tencent.bkrepo.ddc.pojo.BatchOps
import com.tencent.bkrepo.ddc.pojo.Operation
import com.tencent.bkrepo.ddc.service.ReferenceArtifactService
import com.tencent.bkrepo.ddc.utils.MEDIA_TYPE_JUPITER_INLINED_PAYLOAD
import com.tencent.bkrepo.ddc.utils.MEDIA_TYPE_UNREAL_COMPACT_BINARY
Expand All @@ -58,6 +60,7 @@ import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import java.nio.ByteBuffer

@RequestMapping("/{projectId}/api/v1/refs")
@RestController
Expand Down Expand Up @@ -153,6 +156,36 @@ class ReferencesController(
referenceArtifactService.finalize(artifactInfo)
}

@ApiOperation("批量读写")
@PostMapping(
"/{repoName}",
consumes = [MEDIA_TYPE_UNREAL_COMPACT_BINARY],
produces = [MEDIA_TYPE_UNREAL_COMPACT_BINARY],
)
fun batchOp(
@PathVariable projectId: String,
@PathVariable repoName: String,
): ByteArray {
// 检查权限
val ops = BatchOps.deserialize(HttpContextHolder.getRequest().inputStream.use { it.readBytes() })
var requiredPermissionAction = PermissionAction.READ
for (op in ops.ops) {
if (op.op == Operation.PUT.name) {
requiredPermissionAction = PermissionAction.WRITE
break
}
}
permissionHelper.checkPathPermission(requiredPermissionAction)

// 执行操作
val opsResponse = referenceArtifactService.batch(projectId, repoName, ops)
// 序列化,由于getView返回的是readOnly ByteBuffer,为了避免数组复制,通过反射获取内部数组返回
val data = opsResponse.serialize().getView()
val field = ByteBuffer::class.java.getDeclaredField("hb")
field.isAccessible = true
return field.get(data) as ByteArray
}

private fun getResponseType(format: String?, default: String): String {
if (!format.isNullOrEmpty()) {
return when (format.toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@

package com.tencent.bkrepo.ddc.exception

import com.tencent.bkrepo.common.api.constant.HttpStatus
import com.tencent.bkrepo.common.api.exception.ErrorCodeException
import com.tencent.bkrepo.common.api.message.CommonMessageCode
import com.tencent.bkrepo.ddc.pojo.ContentHash

class ReferenceIsMissingBlobsException(
val missingBlobs: List<ContentHash>
) : RuntimeException("References is missing these blobs: ${missingBlobs.joinToString(",")}")
) : ErrorCodeException(
CommonMessageCode.RESOURCE_NOT_FOUND,
missingBlobs.joinToString(","),
status = HttpStatus.NOT_FOUND
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@

package com.tencent.bkrepo.ddc.metrics

import com.tencent.bkrepo.ddc.config.DdcConfiguration.Companion.BEAN_NAME_REF_BATCH_EXECUTOR
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.binder.MeterBinder
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Component

@Component
class DdcMeterBinder : MeterBinder {
class DdcMeterBinder(
@Qualifier(BEAN_NAME_REF_BATCH_EXECUTOR)
val refBatchExecutor: ThreadPoolTaskExecutor
) : MeterBinder {
/**
* ref inline加载耗时
*/
Expand Down Expand Up @@ -103,6 +110,14 @@ class DdcMeterBinder : MeterBinder {
.tag("type", "compressed")
.tag("method", "load")
.register(registry)

Gauge.builder(DDC_REF_BATCH_EXECUTOR_ACTIVE, refBatchExecutor) { it.activeCount.toDouble() }
.description("ddc ref batch executor active thread")
.register(registry)

Gauge.builder(DDC_REF_BATCH_EXECUTOR_QUEUE_SIZE, refBatchExecutor) { it.queueSize.toDouble() }
.description("ddc ref batch executor queue size")
.register(registry)
}

/**
Expand Down Expand Up @@ -137,5 +152,7 @@ class DdcMeterBinder : MeterBinder {
private const val DDC_REF_LOAD = "ddc.ref.load"
private const val DDC_REF_STORE = "ddc.ref.store"
private const val DDC_BLOB = "ddc.blob"
private const val DDC_REF_BATCH_EXECUTOR_ACTIVE = "ddc.ref.batch.executor.active.count"
private const val DDC_REF_BATCH_EXECUTOR_QUEUE_SIZE = "ddc.ref.batch.executor.queue.size"
}
}
Loading

0 comments on commit 794126e

Please sign in to comment.