diff --git a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/LazyInitManagementProtocolSupports.java b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/LazyInitManagementProtocolSupports.java index 815684659..daabc1db6 100755 --- a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/LazyInitManagementProtocolSupports.java +++ b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/LazyInitManagementProtocolSupports.java @@ -1,96 +1,37 @@ package org.jetlinks.community.protocol; -import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.cluster.ClusterManager; -import org.jetlinks.supports.protocol.StaticProtocolSupports; -import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.supports.protocol.management.DefaultProtocolSupportManager; import org.jetlinks.supports.protocol.management.ProtocolSupportLoader; -import org.jetlinks.supports.protocol.management.ProtocolSupportManager; import org.springframework.boot.CommandLineRunner; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; -import reactor.core.publisher.Mono; - -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; @Slf4j @Getter @Setter @Order(Ordered.HIGHEST_PRECEDENCE) -public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner { - - private ProtocolSupportManager manager; - - private ProtocolSupportLoader loader; - - private ClusterManager clusterManager; - - @Setter(AccessLevel.PRIVATE) - private Map configProtocolIdMapping = new ConcurrentHashMap<>(); - - private Duration loadTimeOut = Duration.ofSeconds(30); - - public void init() { - - clusterManager.getTopic("_protocol_changed") - .subscribe() - .subscribe(protocol -> this.init(protocol).subscribe()); - - try { - manager - .loadAll() - .filter(de -> de.getState() == 1) - .flatMap(this::init) - .blockLast(loadTimeOut); - } catch (Throwable e) { - log.error("load protocol error", e); - } +public class LazyInitManagementProtocolSupports extends DefaultProtocolSupportManager implements CommandLineRunner { + public LazyInitManagementProtocolSupports(EventBus eventBus, + ClusterManager clusterManager, + ProtocolSupportLoader loader) { + super(eventBus, clusterManager.getCache("__protocol_supports"), loader); } - public Mono init(ProtocolSupportDefinition definition) { - try { - if (definition.getState() != 1) { - String protocol = configProtocolIdMapping.get(definition.getId()); - if (protocol != null) { - log.debug("uninstall protocol:{}", definition); - unRegister(protocol); - return Mono.empty(); - } - } - String operation = definition.getState() != 1 ? "uninstall" : "install"; - Consumer consumer = definition.getState() != 1 ? this::unRegister : this::register; - - log.debug("{} protocol:{}", operation, definition); - return loader - .load(definition) - .doOnNext(e -> { - e.init(definition.getConfiguration()); - log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e); - configProtocolIdMapping.put(definition.getId(), e.getId()); - consumer.accept(e); - }) - .onErrorResume((e) -> { - log.error("{} protocol[{}] error: {}", operation, definition.getId(), e.getLocalizedMessage()); - return Mono.empty(); - }) - .then(); - } catch (Throwable err) { - log.error("init protocol error", err); - } - return Mono.empty(); + public void init() { + super.init(); } @Override public void run(String... args) { init(); } + + } diff --git a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/configuration/ProtocolAutoConfiguration.java b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/configuration/ProtocolAutoConfiguration.java index 953920faf..f385cd97e 100644 --- a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/configuration/ProtocolAutoConfiguration.java +++ b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/configuration/ProtocolAutoConfiguration.java @@ -28,10 +28,10 @@ @AutoConfigureBefore(DeviceClusterConfiguration.class) public class ProtocolAutoConfiguration { - @Bean - public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) { - return new ClusterProtocolSupportManager(clusterManager); - } +// @Bean +// public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) { +// return new ClusterProtocolSupportManager(clusterManager); +// } @Bean public ServiceContext serviceContext(ApplicationContext applicationContext) { @@ -39,14 +39,10 @@ public ServiceContext serviceContext(ApplicationContext applicationContext) { } @Bean - public LazyInitManagementProtocolSupports managementProtocolSupports(ProtocolSupportManager supportManager, - ProtocolSupportLoader loader, - ClusterManager clusterManager) { - LazyInitManagementProtocolSupports supports = new LazyInitManagementProtocolSupports(); - supports.setClusterManager(clusterManager); - supports.setManager(supportManager); - supports.setLoader(loader); - return supports; + public LazyInitManagementProtocolSupports managementProtocolSupports(EventBus eventBus, + ClusterManager clusterManager, + ProtocolSupportLoader loader) { + return new LazyInitManagementProtocolSupports(eventBus, clusterManager, loader); } @Bean @@ -57,7 +53,7 @@ public LazyProtocolSupports protocolSupports() { @Bean public AutoDownloadJarProtocolSupportLoader autoDownloadJarProtocolSupportLoader(WebClient.Builder builder, FileManager fileManager) { - return new AutoDownloadJarProtocolSupportLoader(builder,fileManager); + return new AutoDownloadJarProtocolSupportLoader(builder, fileManager); } @Bean