diff --git a/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java b/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java index b99b191..a05b528 100644 --- a/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java +++ b/discovery/servicecomb-service-center/src/main/java/com/huaweicloud/dubbo/discovery/RegistrationListener.java @@ -24,9 +24,11 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -60,7 +62,9 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import com.google.common.base.Charsets; import com.google.common.eventbus.Subscribe; +import com.google.common.hash.Hashing; import com.huaweicloud.dubbo.common.AuthHeaderProviders; import com.huaweicloud.dubbo.common.CommonConfiguration; import com.huaweicloud.dubbo.common.EventManager; @@ -147,7 +151,7 @@ static class SubscriptionData { private GovernanceData governanceData; - private List pendingSubscribeEvent = new ArrayList<>(); + private final List pendingSubscribeEvent = new ArrayList<>(); private ServiceCenterConfigurationManager serviceCenterConfigurationManager; @@ -208,13 +212,7 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) { } instance = serviceCenterConfigurationManager.createMicroserviceInstance(); - List endpoints = new ArrayList<>(); - if (registry != null) { - endpoints.addAll(registry.getRegisters().stream() - .map(URL::toString) - .collect(Collectors.toList())); - } - instance.setEndpoints(endpoints); + addEndpoints(); instance.setHostName(InetAddress.getLocalHost().getHostName()); EventManager.register(this); @@ -222,6 +220,7 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) { EventManager.getEventBus()); serviceCenterRegistration.setMicroservice(microservice); serviceCenterRegistration.setMicroserviceInstance(instance); + addSchemaInfo(serviceCenterRegistration); serviceCenterRegistration.startRegistration(); EventManager.post(new RegistrationReadyEvent()); @@ -241,6 +240,41 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) { } } + private void addEndpoints() { + Set endpoints = new HashSet<>(); + if (registry != null) { + for (URL url : registry.getRegisters()) { + URL newUrl = new URL(url.getProtocol(), url.getHost(), url.getPort()); + endpoints.add(newUrl.toString()); + } + } + instance.setEndpoints(new ArrayList<>(endpoints)); + } + + + private void addSchemaInfo(ServiceCenterRegistration registration) { + if (registry != null) { + // consumer: 如果没有 provider 接口, dubbo 启动的时候, 不会初始化 Registry。 调用接口的时候,才会初始化。 + microservice.setSchemas(registry.getRegisters().stream().map(URL::getPath).collect(Collectors.toList())); + registration.setSchemaInfos( + registry.getRegisters().stream().map(this::createSchemaInfo).collect(Collectors.toList())); + } + } + + private org.apache.servicecomb.service.center.client.model.SchemaInfo createSchemaInfo(URL url) { + URL newUrl = url.setHost(microservice.getServiceName()); + org.apache.servicecomb.service.center.client.model.SchemaInfo info + = new org.apache.servicecomb.service.center.client.model.SchemaInfo(); + info.setSchemaId(newUrl.getPath()); + info.setSchema(newUrl.toString()); + info.setSummary(calcSchemaSummary(info.getSchema())); + return info; + } + + private static String calcSchemaSummary(String schemaContent) { + return Hashing.sha256().newHasher().putString(schemaContent, Charsets.UTF_8).hash().toString(); + } + private void processNewSubscriberEvent(NewSubscriberEvent newSubscriberEvent) { Microservice microservice = interfaceMap.get(newSubscriberEvent.getUrl().getPath()); if (microservice == null) { @@ -314,7 +348,7 @@ private void processPendingEvent() { List events = new ArrayList<>(pendingSubscribeEvent.size()); events.addAll(pendingSubscribeEvent); pendingSubscribeEvent.clear(); - events.forEach(item -> processNewSubscriberEvent(item)); + events.forEach(this::processNewSubscriberEvent); } @Subscribe @@ -340,8 +374,6 @@ public void onMicroserviceInstanceRegistrationEvent(MicroserviceInstanceRegistra } updateInterfaceMap(); firstRegistrationWaiter.countDown(); - LOGGER.info("register microservice successfully, serviceId={}, instanceId={}.", microservice.getServiceId(), - instance.getInstanceId()); } } // --- END ---- // @@ -349,8 +381,7 @@ public void onMicroserviceInstanceRegistrationEvent(MicroserviceInstanceRegistra // --- 实例发现事件处理 ---- // @Subscribe public void onInstanceChangedEvent(InstanceChangedEvent event) { - boolean watchFlag = Boolean.parseBoolean(ConfigUtils.getProperty(KEY_REGISTRY_WATCH, "")); - applicationEventPublisher.publishEvent(new HeartBeatEvent(watchFlag)); + notify(event.getAppName(), event.getServiceName(), event.getInstances()); } // --- END ---- // @@ -383,11 +414,28 @@ private void notify(String appId, String serviceName, List private Map> instancesToURLs(List instances) { Map> notifyUrls = new HashMap<>(); - instances.forEach(instance -> instance.getEndpoints().forEach(e -> { - URL url = URL.valueOf(e); - notifyUrls.putIfAbsent(url.getPath(), new ArrayList<>()); - notifyUrls.get(url.getPath()).add(url); - })); + instances.forEach(instance -> { + List schemaInfos = client + .getServiceSchemasList(instance.getServiceId(), true); + instance.getEndpoints().forEach(e -> { + URL url = URL.valueOf(e); + if (schemaInfos.isEmpty()) { + // old version new schema info + notifyUrls.putIfAbsent(url.getPath(), new ArrayList<>()); + notifyUrls.get(url.getPath()).add(url); + return; + } + // parameters are in schema info + schemaInfos.forEach(schema -> { + URL newUrl = URL.valueOf(schema.getSchema()); + if (!newUrl.getProtocol().equals(url.getProtocol())) { + return; + } + notifyUrls.putIfAbsent(newUrl.getPath(), new ArrayList<>()); + notifyUrls.get(newUrl.getPath()).add(newUrl.setHost(url.getHost()).setPort(url.getPort())); + }); + }); + }); return notifyUrls; } diff --git a/integration-tests/cse-v2/cse-v2-consumer/src/main/resources/dubbo.properties b/integration-tests/cse-v2/cse-v2-consumer/src/main/resources/dubbo.properties index fa7853d..1d95c5e 100644 --- a/integration-tests/cse-v2/cse-v2-consumer/src/main/resources/dubbo.properties +++ b/integration-tests/cse-v2/cse-v2-consumer/src/main/resources/dubbo.properties @@ -24,7 +24,7 @@ dubbo.servicecomb.registry.address=${CSE_V2_SC} #### config center address #### dubbo.servicecomb.config.address=${CSE_V2_CC} dubbo.servicecomb.config.type=kie -##### enabled SSL #### +##### when service center or config center enables SSL, must set to true. #### dubbo.servicecomb.ssl.enabled=true ##### RBAC configuration #### #dubbo.servicecomb.credentials.account.name=用户名 diff --git a/integration-tests/cse-v2/cse-v2-provider/src/main/resources/dubbo.properties b/integration-tests/cse-v2/cse-v2-provider/src/main/resources/dubbo.properties index 2f3e5f7..bf6af4e 100644 --- a/integration-tests/cse-v2/cse-v2-provider/src/main/resources/dubbo.properties +++ b/integration-tests/cse-v2/cse-v2-provider/src/main/resources/dubbo.properties @@ -22,7 +22,7 @@ dubbo.servicecomb.registry.address=${CSE_V2_SC} #### config center address #### dubbo.servicecomb.config.address=${CSE_V2_CC} dubbo.servicecomb.config.type=kie -##### enabled SSL #### +##### when service center or config center enables SSL, must set to true. #### dubbo.servicecomb.ssl.enabled=true ##### RBAC configuration #### #dubbo.servicecomb.credentials.account.name=用户名