From 9addf7641f6a9f6f08148043587646368cf6cb24 Mon Sep 17 00:00:00 2001 From: bao liu Date: Fri, 16 Oct 2020 12:02:17 +0800 Subject: [PATCH] =?UTF-8?q?[#37]@Autowired=E7=9A=84=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E4=BC=9Adubbo=E4=BC=9A=E6=8F=90=E5=89=8D=E8=AE=A2=E9=98=85cons?= =?UTF-8?q?umer=EF=BC=8C=E5=AF=BC=E8=87=B4=E8=AE=A2=E9=98=85=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=20(#39)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dubbo/discovery/RegistrationListener.java | 68 +++++++++++-------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java b/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java index ed69a0d..bf8d293 100644 --- a/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java +++ b/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java @@ -28,17 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextRefreshedEvent; - import org.apache.dubbo.common.URL; -import org.apache.dubbo.config.spring.context.*; import org.apache.dubbo.registry.NotifyListener; import org.apache.servicecomb.http.client.common.HttpConfiguration.AKSKProperties; import org.apache.servicecomb.http.client.common.HttpConfiguration.SSLProperties; @@ -54,6 +44,13 @@ import org.apache.servicecomb.service.center.client.model.MicroserviceInstance; import org.apache.servicecomb.service.center.client.model.MicroserviceInstancesResponse; import org.apache.servicecomb.service.center.client.model.MicroservicesResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; import com.google.common.eventbus.Subscribe; import com.huaweicloud.dubbo.common.CommonConfiguration; @@ -137,6 +134,8 @@ static class SubscriptionData { private GovernanceData governanceData; + private List pendingSubscribeEvent = new ArrayList<>(); + public RegistrationListener() { } @@ -200,31 +199,36 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) { // 第一次订阅, 按照 dubbo 的要求, 需要查询实例列表 if (newSubscriberEvent.getUrl().getProtocol().equals("consumer")) { if (registrationInProgress) { - LOGGER.warn("registration is in progress, can not subscribe new consumers. "); + pendingSubscribeEvent.add(newSubscriberEvent); return; } - Microservice microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath()); - if (microservice == null) { - // provider 后于 consumer 启动的场景, 再查询一次。 - updateInterfaceMap(); - microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath()); - } - if (microservice == null) { - LOGGER.error("the subscribe url [{}] is not registered.", newSubscriberEvent.getUrl().getPath()); - return; - } - MicroserviceInstancesResponse instancesResponse = client - .getMicroserviceInstanceList(microservice.getServiceId()); - subscriptions.put(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName(), - newSubscriberEvent.getUrl().getPath()), - new SubscriptionData(newSubscriberEvent.getNotifyListener(), new ArrayList<>())); - notify(microservice.getAppId(), microservice.getServiceName(), instancesResponse.getInstances()); - serviceCenterDiscovery.register(microservice); + processNewSubscriberEvent(newSubscriberEvent); } } } + private void processNewSubscriberEvent(NewSubscriberEvent newSubscriberEvent) { + Microservice microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath()); + if (microservice == null) { + // provider 后于 consumer 启动的场景, 再查询一次。 + updateInterfaceMap(); + microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath()); + } + if (microservice == null) { + LOGGER.error("the subscribe url [{}] is not registered.", newSubscriberEvent.getUrl().getPath()); + return; + } + MicroserviceInstancesResponse instancesResponse = client + .getMicroserviceInstanceList(microservice.getServiceId()); + subscriptions.put(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName(), + newSubscriberEvent.getUrl().getPath()), + new SubscriptionData(newSubscriberEvent.getNotifyListener(), new ArrayList<>())); + // 第一次订阅, 按照 dubbo 的要求, 需要查询实例列表 + notify(microservice.getAppId(), microservice.getServiceName(), instancesResponse.getInstances()); + serviceCenterDiscovery.register(microservice); + } + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -257,9 +261,17 @@ private void updateInterfaceMap() { public void onHeartBeatEvent(HeartBeatEvent event) { if (event.isSuccess()) { registrationInProgress = false; + processPendingEvent(); } } + private void processPendingEvent() { + List events = new ArrayList<>(pendingSubscribeEvent.size()); + events.addAll(pendingSubscribeEvent); + pendingSubscribeEvent.clear(); + events.forEach(item -> processNewSubscriberEvent(item)); + } + @Subscribe public void onMicroserviceRegistrationEvent(MicroserviceRegistrationEvent event) { registrationInProgress = true;