Skip to content

Commit

Permalink
feat: 修复升级框架后使用consumer方式消费消息报错#2446
Browse files Browse the repository at this point in the history
* feat: 修复升级框架后使用consumer方式消费消息报错#2446

* feat: 代码调整#2446

* feat: 代码调整#2446
  • Loading branch information
zacYL authored Aug 2, 2024
1 parent 36c6ff9 commit bf27e16
Show file tree
Hide file tree
Showing 19 changed files with 390 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ package com.tencent.bkrepo.analyst.configuration
import com.tencent.bkrepo.analysis.executor.api.ExecutorClient
import com.tencent.bkrepo.analyst.dispatcher.SubtaskDispatcherFactory
import com.tencent.bkrepo.analyst.dispatcher.SubtaskPoller
import com.tencent.bkrepo.analyst.event.ScanEventConsumer
import com.tencent.bkrepo.analyst.service.ExecutionClusterService
import com.tencent.bkrepo.analyst.service.ScannerService
import com.tencent.bkrepo.analyst.service.impl.OperateLogServiceImpl
import com.tencent.bkrepo.analyst.service.impl.ProjectUsageStatisticsServiceImpl
import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.common.operate.api.OperateLogService
import com.tencent.bkrepo.common.operate.api.ProjectUsageStatisticsService
import com.tencent.bkrepo.common.service.condition.ConditionalOnNotAssembly
Expand All @@ -45,6 +47,7 @@ import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.function.Consumer

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(
Expand Down Expand Up @@ -84,4 +87,14 @@ class ScannerConfiguration {
): ProjectUsageStatisticsService {
return ProjectUsageStatisticsServiceImpl(client)
}


@Bean("scanEventConsumer")
fun scanEventConsumer(
scanEventConsumer: ScanEventConsumer
): Consumer<ArtifactEvent> {
return Consumer {
scanEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import com.tencent.bkrepo.repository.pojo.packages.PackageSummary
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Component
import java.util.function.Consumer

/**
* 构件事件消费者,用于触发制品更新扫描
Expand All @@ -71,7 +70,7 @@ class ScanEventConsumer(
private val scanPlanDao: ScanPlanDao,
private val projectScanConfigurationService: ProjectScanConfigurationService,
private val executor: ThreadPoolTaskExecutor
) : Consumer<ArtifactEvent> {
) {

/**
* 允许接收的事件类型
Expand All @@ -82,7 +81,7 @@ class ScanEventConsumer(
EventType.VERSION_UPDATED
)

override fun accept(event: ArtifactEvent) {
fun accept(event: ArtifactEvent) {
if (!acceptTypes.contains(event.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.fs.server.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.fs.server.listener.NodeModifyListener
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class FsConsumerConfig {

@Bean("artifactEventFs")
fun nodeModifyListener(
nodeModifyListener: NodeModifyListener
): Consumer<Message<ArtifactEvent>> {
return Consumer {
nodeModifyListener.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.concurrent.Executors
import java.util.function.Consumer

@Component("artifactEventFs")
@Component
class NodeModifyListener(
private val rRepositoryClient: RRepositoryClient,
private val fileNodeService: FileNodeService
) : Consumer<Message<ArtifactEvent>> {
) {

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
val event = message.payload
val type = event.type
// 覆盖创建也会先删除,再创建。所以这里只需关注删除事件即可。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.helm.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.helm.listener.consumer.PackageReplicationEventConsumer
import com.tencent.bkrepo.helm.listener.consumer.RemoteRepoEventConsumer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class HelmConsumerConfig {

@Bean("packageReplication")
fun packageReplicationEventConsumer(
packageReplicationEventConsumer: PackageReplicationEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
packageReplicationEventConsumer.accept(it)
}
}

@Bean("remoteRepo")
fun remoteRepoEventConsumer(
remoteRepoEventConsumer: RemoteRepoEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
remoteRepoEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@ import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor
import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

/**
* 消费基于MQ传递的事件
* 消费分发同步的Package, 用于更新index文件
*/
@Component("packageReplication")
@Component
class PackageReplicationEventConsumer(
private val remoteEventJobExecutor: RemoteEventJobExecutor
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -52,7 +51,7 @@ class PackageReplicationEventConsumer(
EventType.VERSION_UPDATED,
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!acceptTypes.contains(message.payload.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ package com.tencent.bkrepo.helm.listener.consumer
import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.common.artifact.event.base.EventType
import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor
import java.util.function.Consumer
import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
Expand All @@ -39,10 +38,10 @@ import org.springframework.stereotype.Component
* 构件事件消费者,用于实时同步
* 对应destination为对应ArtifactEvent.topic
*/
@Component("remoteRepo")
@Component
class RemoteRepoEventConsumer(
private val remoteEventJobExecutor: RemoteEventJobExecutor
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -53,7 +52,7 @@ class RemoteRepoEventConsumer(
EventType.REPO_REFRESHED
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!acceptTypes.contains(message.payload.type)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@

package com.tencent.bkrepo.job.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator
import com.tencent.bkrepo.job.migrate.config.MigrateRepoStorageProperties
import com.tencent.bkrepo.job.separation.config.DataSeparationConfig
import com.tencent.bkrepo.job.separation.listener.SeparationRecoveryEventConsumer
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.function.Consumer

/**
* Job配置
Expand All @@ -57,4 +61,13 @@ class JobConfig {
Runtime.getRuntime().availableProcessors()
)
}

@Bean("separationRecovery")
fun separationRecoveryEventConsumer(
separationRecoveryEventConsumer: SeparationRecoveryEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
separationRecoveryEventConsumer.accept(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,18 @@ import org.slf4j.LoggerFactory
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.time.format.DateTimeFormatter
import java.util.function.Consumer

/**
* 消费降冷自动恢复事件
*/
@Component("separationRecovery")
@Component
class SeparationRecoveryEventConsumer(
private val separationTaskService: SeparationTaskService,
private val dataSeparationConfig: DataSeparationConfig,
private val separationPackageDao: SeparationPackageDao,
private val separationPackageVersionDao: SeparationPackageVersionDao,
private val separationNodeDao: SeparationNodeDao
) : Consumer<Message<ArtifactEvent>> {
) {

/**
* 允许接收的事件类型
Expand All @@ -70,7 +69,7 @@ class SeparationRecoveryEventConsumer(
EventType.NODE_SEPARATION_RECOVERY,
)

override fun accept(message: Message<ArtifactEvent>) {
fun accept(message: Message<ArtifactEvent>) {
if (!dataSeparationConfig.enableAutoRecovery) return
if (!acceptTypes.contains(message.payload.type)) {
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.tencent.bkrepo.oci.config

import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent
import com.tencent.bkrepo.oci.listener.consumer.RemoteImageRepoEventConsumer
import com.tencent.bkrepo.oci.listener.consumer.ThirdPartyReplicationEventConsumer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import java.util.function.Consumer

@Configuration
class OciConsumerConfig {

// 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704
@Bean("remoteOciRepo")
fun remoteImageRepoEventConsumer(
remoteImageRepoEventConsumer: RemoteImageRepoEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
remoteImageRepoEventConsumer.accept(it)
}
}

@Bean("thirdPartyReplication")
fun thirdPartyReplicationEventConsumer(
thirdPartyReplicationEventConsumer: ThirdPartyReplicationEventConsumer
): Consumer<Message<ArtifactEvent>> {
return Consumer {
thirdPartyReplicationEventConsumer.accept(it)
}
}
}
Loading

0 comments on commit bf27e16

Please sign in to comment.