Skip to content

Commit

Permalink
Merge pull request #210 from nacos-group/develop
Browse files Browse the repository at this point in the history
Fix delete sync task issue #168
  • Loading branch information
paderlol authored Feb 22, 2021
2 parents 1713fd8 + 1b39a45 commit 8188868
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 32 deletions.
2 changes: 1 addition & 1 deletion nacossync-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.2</version>
<version>0.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nacossync-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.2</version>
<version>0.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.2</version>
<version>0.4.3</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.4.2</version>
<version>0.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.4.2</version>
<version>0.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nacossync-worker</artifactId>
<version>0.4.2</version>
<version>0.4.3</version>
<properties>
<zookeeper.version>3.4.9</zookeeper.version>
<curator.version>4.1.0</curator.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
*/
package com.alibaba.nacossync.extension.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
Expand All @@ -25,10 +27,14 @@
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.Collections;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -43,7 +49,9 @@
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.NACOS)
public class NacosSyncToNacosServiceImpl implements SyncService {

private Map<String, EventListener> nacosListenerMap = new ConcurrentHashMap<>();
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>();

private final Map<String, Set<String>> sourceInstanceSnapshot = new ConcurrentHashMap<>();

@Autowired
private MetricsManager metricsManager;
Expand All @@ -61,13 +69,15 @@ public boolean delete(TaskDO taskDO) {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());

sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.remove(taskDO.getTaskId()));
//移除订阅
sourceNamingService.unsubscribe(taskDO.getServiceName(), listenerMap.remove(taskDO.getTaskId()));
sourceInstanceSnapshot.remove(taskDO.getTaskId());

// 删除目标集群中同步的实例列表
List<Instance> instances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : instances) {
if (needDelete(instance.getMetadata(), taskDO)) {
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(),
new ArrayList<>(), false);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
Expand All @@ -81,54 +91,71 @@ public boolean delete(TaskDO taskDO) {

@Override
public boolean sync(TaskDO taskDO) {
String taskId = taskDO.getTaskId();
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());

nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
this.listenerMap.putIfAbsent(taskId, event -> {
if (event instanceof NamingEvent) {
try {
List<Instance> sourceInstances = ((NamingEvent) event).getInstances();
log.info("任务Id:{},迁入实例数量:{}", taskDO.getTaskId(), sourceInstances.size());

List<Instance> destInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(),
new ArrayList<>(), false);
log.info("任务Id:{},迁入实例数量:{}", taskId, sourceInstances.size());
// 先删除不存在的
List<String> instanceKeys = sourceInstances.stream().map(this::composeInstanceKey)
.collect(Collectors.toList());
for (Instance instance : destInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeys.contains(composeInstanceKey(instance))) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(),
instance.getPort());
}
}
this.removeInvalidInstance(taskDO, taskId, destNamingService, sourceInstances);

//再次添加新实例
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(instance, taskDO));
this.sourceInstanceSnapshot.compute(taskId, (key, value) -> {
if (CollectionUtils.isEmpty(value)) {
value = new TreeSet<>();
}
log.info("任务Id:{},已同步实例:{}", taskId, composeInstanceKey(instance));
value.add(composeInstanceKey(instance));
return value;
});
}
}

} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
log.error("event process fail, taskId:{}", taskId, e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
}
}
});

sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
sourceNamingService.subscribe(taskDO.getServiceName(), listenerMap.get(taskId));
} catch (Exception e) {
log.error("sync task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
log.error("sync task from nacos to nacos was failed, taskId:{}", taskId, e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
return true;
}

private void removeInvalidInstance(TaskDO taskDO, String taskId, NamingService destNamingService,
List<Instance> sourceInstances) throws NacosException {
if (this.sourceInstanceSnapshot.containsKey(taskId)) {
Set<String> oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId);
List<String> newInstanceKeys = sourceInstances.stream().map(this::composeInstanceKey)
.collect(Collectors.toList());
Collection<String> instanceKeys = Collections.subtract(oldInstanceKeys, newInstanceKeys);
for (String instanceKey : instanceKeys) {
log.info("任务Id:{},移除无效同步实例:{}", taskId, instanceKey);
String[] split = instanceKey.split(":", -1);
destNamingService.deregisterInstance(taskDO.getServiceName(), split[0],
Integer.parseInt(split[1]));

}
}
}

private String composeInstanceKey(Instance instance) {
return instance.getIp() + ":" + instance.getPort();
}
Expand All @@ -143,7 +170,7 @@ public Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
temp.setEnabled(instance.isEnabled());
temp.setHealthy(instance.isHealthy());
temp.setWeight(instance.getWeight());

temp.setEphemeral(instance.isEphemeral());
Map<String, String> metaData = new HashMap<>();
metaData.putAll(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
Expand All @@ -153,4 +180,6 @@ public Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
temp.setMetadata(metaData);
return temp;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.alibaba.nacossync.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

public abstract class Collections {

public static Collection subtract(final Collection a, final Collection b) {
ArrayList list = new ArrayList( a );
for (Iterator it = b.iterator(); it.hasNext();) {
list.remove(it.next());
}
return list;
}



}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-parent</artifactId>
<version>0.4.2</version>
<version>0.4.3</version>
<modules>
<module>nacossync-console</module>
<module>nacossync-worker</module>
Expand Down Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.4.2</version>
<version>0.4.3</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit 8188868

Please sign in to comment.