From bf27e16ef496a70a264f811857bb9ec59835e071 Mon Sep 17 00:00:00 2001 From: zacYL <100330102+zacYL@users.noreply.github.com> Date: Fri, 2 Aug 2024 17:24:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8D=E5=8D=87=E7=BA=A7?= =?UTF-8?q?=E6=A1=86=E6=9E=B6=E5=90=8E=E4=BD=BF=E7=94=A8consumer=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E6=B6=88=E8=B4=B9=E6=B6=88=E6=81=AF=E6=8A=A5=E9=94=99?= =?UTF-8?q?#2446?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 修复升级框架后使用consumer方式消费消息报错#2446 * feat: 代码调整#2446 * feat: 代码调整#2446 --- .../configuration/ScannerConfiguration.kt | 13 ++++ .../bkrepo/analyst/event/ScanEventConsumer.kt | 5 +- .../fs/server/config/FsConsumerConfig.kt | 52 +++++++++++++++ .../fs/server/listener/NodeModifyListener.kt | 7 +-- .../bkrepo/helm/config/HelmConsumerConfig.kt | 62 ++++++++++++++++++ .../PackageReplicationEventConsumer.kt | 7 +-- .../consumer/RemoteRepoEventConsumer.kt | 7 +-- .../tencent/bkrepo/job/config/JobConfig.kt | 13 ++++ .../SeparationRecoveryEventConsumer.kt | 7 +-- .../bkrepo/oci/config/OciConsumerConfig.kt | 63 +++++++++++++++++++ .../consumer/RemoteImageRepoEventConsumer.kt | 7 +-- .../ThirdPartyReplicationEventConsumer.kt | 7 +-- .../config/ReplicationConsumerConfig.kt | 52 +++++++++++++++ .../type/event/ArtifactEventConsumer.kt | 2 +- .../replica/type/event/EventConsumer.kt | 6 +- .../config/RepositoryConsumerConfig.kt | 53 ++++++++++++++++ .../NodeUpdateAccessDateEventListener.kt | 7 +-- .../webhook/config/WebHookConsumerConfig.kt | 52 +++++++++++++++ .../webhook/executor/ArtifactEventConsumer.kt | 7 +-- 19 files changed, 390 insertions(+), 39 deletions(-) create mode 100644 src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt create mode 100644 src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt create mode 100644 src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt create mode 100644 src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt create mode 100644 src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt create mode 100644 src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt diff --git a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt index a11a7ace61..902b98c4a2 100644 --- a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt +++ b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt @@ -30,10 +30,12 @@ package com.tencent.bkrepo.analyst.configuration import com.tencent.bkrepo.analysis.executor.api.ExecutorClient import com.tencent.bkrepo.analyst.dispatcher.SubtaskDispatcherFactory import com.tencent.bkrepo.analyst.dispatcher.SubtaskPoller +import com.tencent.bkrepo.analyst.event.ScanEventConsumer import com.tencent.bkrepo.analyst.service.ExecutionClusterService import com.tencent.bkrepo.analyst.service.ScannerService import com.tencent.bkrepo.analyst.service.impl.OperateLogServiceImpl import com.tencent.bkrepo.analyst.service.impl.ProjectUsageStatisticsServiceImpl +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.operate.api.OperateLogService import com.tencent.bkrepo.common.operate.api.ProjectUsageStatisticsService import com.tencent.bkrepo.common.service.condition.ConditionalOnNotAssembly @@ -45,6 +47,7 @@ import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor +import java.util.function.Consumer @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties( @@ -84,4 +87,14 @@ class ScannerConfiguration { ): ProjectUsageStatisticsService { return ProjectUsageStatisticsServiceImpl(client) } + + + @Bean("scanEventConsumer") + fun scanEventConsumer( + scanEventConsumer: ScanEventConsumer + ): Consumer { + return Consumer { + scanEventConsumer.accept(it) + } + } } diff --git a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt index 75246aac62..aea9aa3757 100644 --- a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt +++ b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt @@ -56,7 +56,6 @@ import com.tencent.bkrepo.repository.pojo.packages.PackageSummary import org.slf4j.LoggerFactory import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 构件事件消费者,用于触发制品更新扫描 @@ -71,7 +70,7 @@ class ScanEventConsumer( private val scanPlanDao: ScanPlanDao, private val projectScanConfigurationService: ProjectScanConfigurationService, private val executor: ThreadPoolTaskExecutor -) : Consumer { +) { /** * 允许接收的事件类型 @@ -82,7 +81,7 @@ class ScanEventConsumer( EventType.VERSION_UPDATED ) - override fun accept(event: ArtifactEvent) { + fun accept(event: ArtifactEvent) { if (!acceptTypes.contains(event.type)) { return } diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt new file mode 100644 index 0000000000..ed63fa7623 --- /dev/null +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.fs.server.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.fs.server.listener.NodeModifyListener +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class FsConsumerConfig { + + @Bean("artifactEventFs") + fun nodeModifyListener( + nodeModifyListener: NodeModifyListener + ): Consumer> { + return Consumer { + nodeModifyListener.accept(it) + } + } +} diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt index a314c1bfad..3c3bff46f4 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt @@ -10,15 +10,14 @@ import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.util.concurrent.Executors -import java.util.function.Consumer -@Component("artifactEventFs") +@Component class NodeModifyListener( private val rRepositoryClient: RRepositoryClient, private val fileNodeService: FileNodeService -) : Consumer> { +) { - override fun accept(message: Message) { + fun accept(message: Message) { val event = message.payload val type = event.type // 覆盖创建也会先删除,再创建。所以这里只需关注删除事件即可。 diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt new file mode 100644 index 0000000000..ccc4b6aa26 --- /dev/null +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt @@ -0,0 +1,62 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.helm.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.helm.listener.consumer.PackageReplicationEventConsumer +import com.tencent.bkrepo.helm.listener.consumer.RemoteRepoEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class HelmConsumerConfig { + + @Bean("packageReplication") + fun packageReplicationEventConsumer( + packageReplicationEventConsumer: PackageReplicationEventConsumer + ): Consumer> { + return Consumer { + packageReplicationEventConsumer.accept(it) + } + } + + @Bean("remoteRepo") + fun remoteRepoEventConsumer( + remoteRepoEventConsumer: RemoteRepoEventConsumer + ): Consumer> { + return Consumer { + remoteRepoEventConsumer.accept(it) + } + } +} diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt index 25a9487a51..3f1e95a12f 100644 --- a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt @@ -33,16 +33,15 @@ import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 消费基于MQ传递的事件 * 消费分发同步的Package, 用于更新index文件 */ -@Component("packageReplication") +@Component class PackageReplicationEventConsumer( private val remoteEventJobExecutor: RemoteEventJobExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -52,7 +51,7 @@ class PackageReplicationEventConsumer( EventType.VERSION_UPDATED, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt index cf42ef9e5b..97a9df2e1a 100644 --- a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt @@ -30,7 +30,6 @@ package com.tencent.bkrepo.helm.listener.consumer import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.artifact.event.base.EventType import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor -import java.util.function.Consumer import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component @@ -39,10 +38,10 @@ import org.springframework.stereotype.Component * 构件事件消费者,用于实时同步 * 对应destination为对应ArtifactEvent.topic */ -@Component("remoteRepo") +@Component class RemoteRepoEventConsumer( private val remoteEventJobExecutor: RemoteEventJobExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -53,7 +52,7 @@ class RemoteRepoEventConsumer( EventType.REPO_REFRESHED ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt index d8a1a0fef8..b0edcd330d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt @@ -27,14 +27,18 @@ package com.tencent.bkrepo.job.config +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator import com.tencent.bkrepo.job.migrate.config.MigrateRepoStorageProperties import com.tencent.bkrepo.job.separation.config.DataSeparationConfig +import com.tencent.bkrepo.job.separation.listener.SeparationRecoveryEventConsumer import org.springframework.boot.autoconfigure.task.TaskExecutionProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor +import java.util.function.Consumer /** * Job配置 @@ -57,4 +61,13 @@ class JobConfig { Runtime.getRuntime().availableProcessors() ) } + + @Bean("separationRecovery") + fun separationRecoveryEventConsumer( + separationRecoveryEventConsumer: SeparationRecoveryEventConsumer + ): Consumer> { + return Consumer { + separationRecoveryEventConsumer.accept(it) + } + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt index e37c88fe9a..6882969ddc 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt @@ -49,19 +49,18 @@ import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.time.format.DateTimeFormatter -import java.util.function.Consumer /** * 消费降冷自动恢复事件 */ -@Component("separationRecovery") +@Component class SeparationRecoveryEventConsumer( private val separationTaskService: SeparationTaskService, private val dataSeparationConfig: DataSeparationConfig, private val separationPackageDao: SeparationPackageDao, private val separationPackageVersionDao: SeparationPackageVersionDao, private val separationNodeDao: SeparationNodeDao -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -70,7 +69,7 @@ class SeparationRecoveryEventConsumer( EventType.NODE_SEPARATION_RECOVERY, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!dataSeparationConfig.enableAutoRecovery) return if (!acceptTypes.contains(message.payload.type)) { return diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt new file mode 100644 index 0000000000..2c9470cff8 --- /dev/null +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt @@ -0,0 +1,63 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.oci.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.oci.listener.consumer.RemoteImageRepoEventConsumer +import com.tencent.bkrepo.oci.listener.consumer.ThirdPartyReplicationEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class OciConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("remoteOciRepo") + fun remoteImageRepoEventConsumer( + remoteImageRepoEventConsumer: RemoteImageRepoEventConsumer + ): Consumer> { + return Consumer { + remoteImageRepoEventConsumer.accept(it) + } + } + + @Bean("thirdPartyReplication") + fun thirdPartyReplicationEventConsumer( + thirdPartyReplicationEventConsumer: ThirdPartyReplicationEventConsumer + ): Consumer> { + return Consumer { + thirdPartyReplicationEventConsumer.accept(it) + } + } +} diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt index 753b5fd69b..638da05dfb 100644 --- a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt @@ -33,16 +33,15 @@ import com.tencent.bkrepo.oci.listener.base.EventExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 构件事件消费者,用于实时同步 * 对应destination为对应ArtifactEvent.topic */ -@Component("remoteOciRepo") +@Component class RemoteImageRepoEventConsumer( private val eventExecutor: EventExecutor -) : Consumer>{ +) { /** * 允许接收的事件类型 @@ -53,7 +52,7 @@ class RemoteImageRepoEventConsumer( EventType.REPO_REFRESHED ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt index bad477b59f..4820ce423f 100644 --- a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt @@ -33,17 +33,16 @@ import com.tencent.bkrepo.oci.listener.base.EventExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 消费基于MQ传递的事件 * 对应destination为对应ArtifactEvent.topic */ -@Component("thirdPartyReplication") +@Component class ThirdPartyReplicationEventConsumer( private val eventExecutor: EventExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -52,7 +51,7 @@ class ThirdPartyReplicationEventConsumer( EventType.REPLICATION_THIRD_PARTY ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt new file mode 100644 index 0000000000..b3efd2a1e3 --- /dev/null +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.replication.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.replication.replica.type.event.ArtifactEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.util.function.Consumer + +@Configuration +class ReplicationConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("artifactEventReplication") + fun artifactEventConsumer( + artifactEventConsumer: ArtifactEventConsumer + ): Consumer { + return Consumer { + artifactEventConsumer.accept(it) + } + } +} diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt index 8f834e5de2..a261f090dd 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt @@ -38,7 +38,7 @@ import org.springframework.stereotype.Component * 构件事件消费者,用于实时同步 * 对应binding name为artifactEvent-in-0 */ -@Component("artifactEventReplication") +@Component class ArtifactEventConsumer( private val replicaTaskService: ReplicaTaskService, private val eventBasedReplicaJobExecutor: EventBasedReplicaJobExecutor diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt index f5c63981b6..fcb1017687 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt @@ -29,19 +29,19 @@ package com.tencent.bkrepo.replication.replica.type.event import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.artifact.event.base.EventType -import java.util.function.Consumer + /** * 构件事件消费者,用于实时同步 * 对应binding name为artifactEvent-in-0 */ -abstract class EventConsumer : Consumer { +open class EventConsumer { /** * 允许接收的事件类型 */ open fun getAcceptTypes(): Set = emptySet() - override fun accept(message: ArtifactEvent) { + fun accept(message: ArtifactEvent) { if (!getAcceptTypes().contains(message.type)) { return } diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt new file mode 100644 index 0000000000..e86704d91f --- /dev/null +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt @@ -0,0 +1,53 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.repository.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.repository.listener.NodeUpdateAccessDateEventListener +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class RepositoryConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("nodeUpdateAccessDate") + fun nodeUpdateAccessDateEventConsumer( + nodeUpdateAccessDateEventListener: NodeUpdateAccessDateEventListener + ): Consumer> { + return Consumer { + nodeUpdateAccessDateEventListener.accept(it) + } + } +} diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt index 41ce9fd18f..8e1119a293 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt @@ -42,17 +42,16 @@ import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import java.util.function.Consumer /** * 消费基于MQ传递的事件去更新对应access date */ -@Component("nodeUpdateAccessDate") +@Component class NodeUpdateAccessDateEventListener( private val nodeDao: NodeDao, private val artifactEventProperties: ArtifactEventProperties, -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -61,7 +60,7 @@ class NodeUpdateAccessDateEventListener( EventType.NODE_UPDATE_ACCESS_DATE, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt new file mode 100644 index 0000000000..69c3fb482b --- /dev/null +++ b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.webhook.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.webhook.executor.ArtifactEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class WebHookConsumerConfig { + + @Bean("artifactEventWebhook") + fun artifactEventConsumer( + artifactEventConsumer: ArtifactEventConsumer + ): Consumer> { + return Consumer { + artifactEventConsumer.accept(it) + } + } +} \ No newline at end of file diff --git a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt index cf1624a924..c452d61dc0 100644 --- a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt +++ b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt @@ -43,18 +43,17 @@ import org.springframework.stereotype.Component import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -import java.util.function.Consumer import java.util.regex.Pattern /** * 事件消息消费者 */ -@Component("artifactEventWebhook") +@Component class ArtifactEventConsumer( private val webHookDao: WebHookDao, private val webHookExecutor: WebHookExecutor, private val webHookProperties: WebHookProperties -) : Consumer> { +) { private val executors = ThreadPoolExecutor( 100, @@ -74,7 +73,7 @@ class ArtifactEventConsumer( ) }) - override fun accept(message: Message) { + fun accept(message: Message) { logger.info("accept artifact event: ${message.payload}, header: ${message.headers}") val task = Runnable { triggerWebHooks(message.payload) }.trace() executors.execute(task)