Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
[#37]@Autowired的场景会dubbo会提前订阅consumer,导致订阅失败 (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored Oct 16, 2020
1 parent d3d25bc commit 9addf76
Showing 1 changed file with 40 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.spring.context.*;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.servicecomb.http.client.common.HttpConfiguration.AKSKProperties;
import org.apache.servicecomb.http.client.common.HttpConfiguration.SSLProperties;
Expand All @@ -54,6 +44,13 @@
import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
import org.apache.servicecomb.service.center.client.model.MicroserviceInstancesResponse;
import org.apache.servicecomb.service.center.client.model.MicroservicesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.google.common.eventbus.Subscribe;
import com.huaweicloud.dubbo.common.CommonConfiguration;
Expand Down Expand Up @@ -137,6 +134,8 @@ static class SubscriptionData {

private GovernanceData governanceData;

private List<NewSubscriberEvent> pendingSubscribeEvent = new ArrayList<>();

public RegistrationListener() {
}

Expand Down Expand Up @@ -200,31 +199,36 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) {
// 第一次订阅, 按照 dubbo 的要求, 需要查询实例列表
if (newSubscriberEvent.getUrl().getProtocol().equals("consumer")) {
if (registrationInProgress) {
LOGGER.warn("registration is in progress, can not subscribe new consumers. ");
pendingSubscribeEvent.add(newSubscriberEvent);
return;
}

Microservice microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath());
if (microservice == null) {
// provider 后于 consumer 启动的场景, 再查询一次。
updateInterfaceMap();
microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath());
}
if (microservice == null) {
LOGGER.error("the subscribe url [{}] is not registered.", newSubscriberEvent.getUrl().getPath());
return;
}
MicroserviceInstancesResponse instancesResponse = client
.getMicroserviceInstanceList(microservice.getServiceId());
subscriptions.put(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName(),
newSubscriberEvent.getUrl().getPath()),
new SubscriptionData(newSubscriberEvent.getNotifyListener(), new ArrayList<>()));
notify(microservice.getAppId(), microservice.getServiceName(), instancesResponse.getInstances());
serviceCenterDiscovery.register(microservice);
processNewSubscriberEvent(newSubscriberEvent);
}
}
}

private void processNewSubscriberEvent(NewSubscriberEvent newSubscriberEvent) {
Microservice microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath());
if (microservice == null) {
// provider 后于 consumer 启动的场景, 再查询一次。
updateInterfaceMap();
microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath());
}
if (microservice == null) {
LOGGER.error("the subscribe url [{}] is not registered.", newSubscriberEvent.getUrl().getPath());
return;
}
MicroserviceInstancesResponse instancesResponse = client
.getMicroserviceInstanceList(microservice.getServiceId());
subscriptions.put(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName(),
newSubscriberEvent.getUrl().getPath()),
new SubscriptionData(newSubscriberEvent.getNotifyListener(), new ArrayList<>()));
// 第一次订阅, 按照 dubbo 的要求, 需要查询实例列表
notify(microservice.getAppId(), microservice.getServiceName(), instancesResponse.getInstances());
serviceCenterDiscovery.register(microservice);
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -257,9 +261,17 @@ private void updateInterfaceMap() {
public void onHeartBeatEvent(HeartBeatEvent event) {
if (event.isSuccess()) {
registrationInProgress = false;
processPendingEvent();
}
}

private void processPendingEvent() {
List<NewSubscriberEvent> events = new ArrayList<>(pendingSubscribeEvent.size());
events.addAll(pendingSubscribeEvent);
pendingSubscribeEvent.clear();
events.forEach(item -> processNewSubscriberEvent(item));
}

@Subscribe
public void onMicroserviceRegistrationEvent(MicroserviceRegistrationEvent event) {
registrationInProgress = true;
Expand Down

0 comments on commit 9addf76

Please sign in to comment.