From 3494f6aeefb5951f1b27fa7abf6455d6b4c7b237 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Mon, 26 Jul 2021 20:47:04 +0800 Subject: [PATCH 1/7] dev cloud edge policy --- conf/config.properties | 13 + .../cn/edu/tsinghua/iginx/conf/Config.java | 30 ++ .../tsinghua/iginx/conf/ConfigDescriptor.java | 4 + .../context/InsertColumnRecordsContext.java | 9 + .../core/context/InsertRowRecordsContext.java | 13 + .../iginx/metadata/DefaultMetaManager.java | 7 +- ...geCloudCollaborationFragmentGenerator.java | 158 +++++++ .../EdgeCloudCollaborationPlanSplitter.java | 420 ++++++++++++++++++ .../cloud/EdgeCloudCollaborationPolicy.java | 123 +++++ 9 files changed, 776 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPlanSplitter.java create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPolicy.java diff --git a/conf/config.properties b/conf/config.properties index be782c781..e9a2a097d 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -90,3 +90,16 @@ mqtt_handler_pool_size=1 mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter mqtt_max_message_size=1048576 + +#################### +### 边云协同 配置 +#################### + +# 是否开启边云协同功能,默认关闭 +enable_edge_cloud_collaboration=false + +# iginx 是否为边缘端,默认为非边缘端 +is_edge=false + +# 边缘端名字,所有由边缘端写入的序列均包含该前缀;非边缘端不需要填写 +edge_name= diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java index 129990cff..79a9b835a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java @@ -78,6 +78,12 @@ public class Config { private int mqttMaxMessageSize = 1048576; + private boolean enableEdgeCloudCollaboration = false; + + private boolean isEdge = false; + + private String edgeName = ""; + public int getMaxTimeseriesLength() { return maxTimeseriesLength; } @@ -309,4 +315,28 @@ public int getMqttMaxMessageSize() { public void setMqttMaxMessageSize(int mqttMaxMessageSize) { this.mqttMaxMessageSize = mqttMaxMessageSize; } + + public boolean isEnableEdgeCloudCollaboration() { + return enableEdgeCloudCollaboration; + } + + public void setEnableEdgeCloudCollaboration(boolean enableEdgeCloudCollaboration) { + this.enableEdgeCloudCollaboration = enableEdgeCloudCollaboration; + } + + public boolean isEdge() { + return isEdge; + } + + public void setEdge(boolean edge) { + isEdge = edge; + } + + public String getEdgeName() { + return edgeName; + } + + public void setEdgeName(String edgeName) { + this.edgeName = edgeName; + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java index a145e45e9..c8f9beb25 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java @@ -89,6 +89,10 @@ private void loadPropsFromFile() { config.setMqttHandlerPoolSize(Integer.parseInt(properties.getProperty("mqtt_handler_pool_size", "1"))); config.setMqttPayloadFormatter(properties.getProperty("mqtt_payload_formatter", "cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter")); config.setMqttMaxMessageSize(Integer.parseInt(properties.getProperty("mqtt_max_message_size", "1048576"))); + + config.setEnableEdgeCloudCollaboration(Boolean.parseBoolean(properties.getProperty("enable_edge_cloud_collaboration", "false"))); + config.setEdge(Boolean.parseBoolean(properties.getProperty("is_edge", "false"))); + config.setEdgeName(properties.getProperty("edge_name", "")); } catch (IOException e) { logger.error("Fail to load properties: ", e); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java index 05f17a392..27e3f0ec9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java @@ -18,10 +18,14 @@ */ package cn.edu.tsinghua.iginx.core.context; +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; import cn.edu.tsinghua.iginx.thrift.InsertColumnRecordsReq; import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.stream.Collectors; + @EqualsAndHashCode(callSuper = true) @Data public class InsertColumnRecordsContext extends RequestContext { @@ -31,6 +35,11 @@ public class InsertColumnRecordsContext extends RequestContext { public InsertColumnRecordsContext(InsertColumnRecordsReq req) { super(req.sessionId, ContextType.InsertColumnRecords); this.req = req; + Config config = ConfigDescriptor.getInstance().getConfig(); + if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) { + String prefix = config.getEdgeName() + "."; + this.req.setPaths(this.req.getPaths().stream().map(e -> prefix + e).collect(Collectors.toList())); + } } public InsertColumnRecordsReq getReq() { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java index a9c74bdc8..0ece98618 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java @@ -18,10 +18,14 @@ */ package cn.edu.tsinghua.iginx.core.context; +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; import cn.edu.tsinghua.iginx.thrift.InsertRowRecordsReq; import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.stream.Collectors; + @EqualsAndHashCode(callSuper = true) @Data public class InsertRowRecordsContext extends RequestContext { @@ -31,5 +35,14 @@ public class InsertRowRecordsContext extends RequestContext { public InsertRowRecordsContext(InsertRowRecordsReq req) { super(req.sessionId, ContextType.InsertRowRecords); this.req = req; + Config config = ConfigDescriptor.getInstance().getConfig(); + if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) { + String prefix = config.getEdgeName() + "."; + this.req.setPaths(this.req.getPaths().stream().map(e -> prefix + e).collect(Collectors.toList())); + } + } + + public InsertRowRecordsReq getReq() { + return req; } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java index 98e338431..ffcc6edc7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.metadata; +import cn.edu.tsinghua.iginx.conf.Config; import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; import cn.edu.tsinghua.iginx.db.StorageEngine; import cn.edu.tsinghua.iginx.exceptions.MetaStorageException; @@ -126,8 +127,12 @@ private void initIginx() throws MetaStorageException { for (IginxMeta iginx: storage.loadIginx().values()) { cache.addIginx(iginx); } + Map extraParams = new HashMap<>(); + extraParams.put("enable_edge_cloud_collaboration", Boolean.toString(ConfigDescriptor.getInstance().getConfig().isEnableEdgeCloudCollaboration())); + extraParams.put("is_edge", Boolean.toString(ConfigDescriptor.getInstance().getConfig().isEdge())); + extraParams.put("edge_name", ConfigDescriptor.getInstance().getConfig().getEdgeName()); IginxMeta iginx = new IginxMeta(0L, ConfigDescriptor.getInstance().getConfig().getIp(), - ConfigDescriptor.getInstance().getConfig().getPort(), null); + ConfigDescriptor.getInstance().getConfig().getPort(), extraParams); id = storage.registerIginx(iginx); SnowFlakeUtils.init(id); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java new file mode 100644 index 000000000..4b1622050 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package cn.edu.tsinghua.iginx.policy.cloud; + +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.metadata.IMetaManager; +import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta; +import cn.edu.tsinghua.iginx.metadata.entity.IginxMeta; +import cn.edu.tsinghua.iginx.metadata.entity.StorageEngineMeta; +import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta; +import cn.edu.tsinghua.iginx.utils.Pair; +import org.apache.commons.lang3.RandomStringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +class EdgeCloudCollaborationFragmentGenerator { + + private final IMetaManager iMetaManager; + + public EdgeCloudCollaborationFragmentGenerator(IMetaManager iMetaManager) { + this.iMetaManager = iMetaManager; + } + + public Pair, List> generateFragmentsAndStorageUnits(long startTime) { + List iginxList = iMetaManager.getIginxList(); + List storageEngineList = iMetaManager.getStorageEngineList(); + + List edges = getEdges(iginxList, storageEngineList); + Map> groupedStorageEngineLists = storageEngineList.stream().collect(Collectors.groupingBy(e -> e.getExtraParams().getOrDefault("edgeName", ""))); + int replicaNum = Math.min(1 + ConfigDescriptor.getInstance().getConfig().getReplicaNum(), groupedStorageEngineLists.getOrDefault("", Collections.emptyList()).size() + 1); // 最多备份数 = 云端服务节点树 + 1 + List> storageEngineFragmentCounts = storageEngineList.stream().filter(e -> !e.getExtraParams().getOrDefault("edgeName", "").equals("")) + .map(e -> new Pair<>(e.getId(), e.getStorageUnitList().size())).collect(Collectors.toList()); // 记录每个云端存储单元已经分配的分片的个数 + + List fragmentList = new ArrayList<>(); + List storageUnitList = new ArrayList<>(); + for (String edge : edges) { + List edgeStorageEngineList = groupedStorageEngineLists.get(edge); + if (edgeStorageEngineList == null) { // 边缘端部署了 iginx,没有部署 iotdb,所有的备份均存储在云端 + List storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); + Pair pair = generateFragmentAndStorageUnit(edge + '.', edge + (char) ('.' + 1), startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + } else { // 边缘端部署了 iginx 也部署了 iotdb,每个分片在本地存储一个备份 + List partitions = partitionWithinEdge(edge, edgeStorageEngineList.size()); + for (int i = 0; i < partitions.size() - 1; i++) { + List storageEngineIds = new ArrayList<>(Collections.singletonList(edgeStorageEngineList.get(i).getId())); + storageEngineIds.addAll(selectStorageEngines(storageEngineFragmentCounts, replicaNum - 1)); // 主本已经有了 + Pair pair = generateFragmentAndStorageUnit(partitions.get(i), partitions.get(i + 1), startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + } + } + + } + // 对于边缘端形成的空隙 + if (edges.size() == 0) { + // 只有一个大空隙 + List storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); + Pair pair = generateFragmentAndStorageUnit(null, null, startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + } else { + // 上空隙 + List storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); + Pair pair = generateFragmentAndStorageUnit(null, edges.get(0) + '.', startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + + // 下空隙 + storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); + pair = generateFragmentAndStorageUnit(edges.get(edges.size() - 1) + (char) ('.' + 1), null, startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + + // 中间的空隙 + for (int i = 0; i < edges.size() - 1; i++) { + storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); + pair = generateFragmentAndStorageUnit(edges.get(i) + (char) ('.' + 1), edges.get(i + 1) + '.', startTime, storageEngineIds); + fragmentList.add(pair.k); + storageUnitList.add(pair.v); + } + } + return new Pair<>(fragmentList, storageUnitList); + } + + private List getEdges(List iginxList, List storageEngineList) { + Set edges = new HashSet<>(); + for (IginxMeta iginx: iginxList) { + String edge = iginx.getExtraParams().get("edge_name"); + if (edge != null) { + edges.add(edge); + } + } + for (StorageEngineMeta storageEngine: storageEngineList) { + String edge = storageEngine.getExtraParams().get("edgeName"); + if (edge != null) { + edges.add(edge); + } + } + return edges.stream().sorted().collect(Collectors.toList()); + } + + private List selectStorageEngines(List> storageEngineFragmentCounts, int count) { + storageEngineFragmentCounts = storageEngineFragmentCounts.stream().sorted(Comparator.comparingInt(o -> o.v)).collect(Collectors.toList()); + List storageEngines = new ArrayList<>(); + for (int i = 0; i < count; i++) { + if (i > storageEngineFragmentCounts.size()) { + break; + } + storageEngines.add(storageEngineFragmentCounts.get(i).k); + storageEngineFragmentCounts.get(i).v++; + } + return storageEngines; + } + + private List partitionWithinEdge(String edge, int partition) { + if (partition < 1) { + return Collections.emptyList(); + } + return Arrays.asList(edge + '.', edge + (char) ('.' + 1)); + } + + // 根据时间和序列区间以及一组 storageEngineList 来生成分片和存储单元 + private Pair generateFragmentAndStorageUnit(String startPath, String endPath, long startTime, List storageEngineList) { + String masterId = RandomStringUtils.randomAlphanumeric(16); + StorageUnitMeta storageUnit = new StorageUnitMeta(masterId, storageEngineList.get(0), masterId, true); + FragmentMeta fragment = new FragmentMeta(startPath, endPath, startTime, Long.MAX_VALUE, masterId); + for (int i = 1; i < storageEngineList.size(); i++) { + storageUnit.addReplica(new StorageUnitMeta(RandomStringUtils.randomAlphanumeric(16), storageEngineList.get(i), masterId, false)); + } + return new Pair<>(fragment, storageUnit); + } + +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPlanSplitter.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPlanSplitter.java new file mode 100644 index 000000000..b92162d52 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPlanSplitter.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package cn.edu.tsinghua.iginx.policy.cloud; + +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.metadata.IMetaManager; +import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta; +import cn.edu.tsinghua.iginx.metadata.entity.StorageEngineMeta; +import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta; +import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval; +import cn.edu.tsinghua.iginx.metadata.entity.TimeSeriesInterval; +import cn.edu.tsinghua.iginx.plan.AvgQueryPlan; +import cn.edu.tsinghua.iginx.plan.CountQueryPlan; +import cn.edu.tsinghua.iginx.plan.DeleteColumnsPlan; +import cn.edu.tsinghua.iginx.plan.DeleteDataInColumnsPlan; +import cn.edu.tsinghua.iginx.plan.FirstQueryPlan; +import cn.edu.tsinghua.iginx.plan.IginxPlan; +import cn.edu.tsinghua.iginx.plan.InsertColumnRecordsPlan; +import cn.edu.tsinghua.iginx.plan.InsertRowRecordsPlan; +import cn.edu.tsinghua.iginx.plan.LastQueryPlan; +import cn.edu.tsinghua.iginx.plan.MaxQueryPlan; +import cn.edu.tsinghua.iginx.plan.MinQueryPlan; +import cn.edu.tsinghua.iginx.plan.QueryDataPlan; +import cn.edu.tsinghua.iginx.plan.SumQueryPlan; +import cn.edu.tsinghua.iginx.plan.ValueFilterQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleAvgQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleCountQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleFirstQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleLastQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleMaxQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleMinQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleQueryPlan; +import cn.edu.tsinghua.iginx.plan.downsample.DownsampleSumQueryPlan; +import cn.edu.tsinghua.iginx.policy.IPlanSplitter; +import cn.edu.tsinghua.iginx.split.SplitInfo; +import cn.edu.tsinghua.iginx.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +class EdgeCloudCollaborationPlanSplitter implements IPlanSplitter { + + private static final Logger logger = LoggerFactory.getLogger(EdgeCloudCollaborationPlanSplitter.class); + + private final EdgeCloudCollaborationPolicy policy; + + private final IMetaManager iMetaManager; + + public EdgeCloudCollaborationPlanSplitter(EdgeCloudCollaborationPolicy policy, IMetaManager iMetaManager) { + this.policy = policy; + this.iMetaManager = iMetaManager; + } + + public static List splitTimeIntervalForDownsampleQuery(List timeIntervals, + long beginTime, long endTime, long precision) { + List resultList = new ArrayList<>(); + for (TimeInterval timeInterval : timeIntervals) { + long midIntervalBeginTime = Math.max(timeInterval.getStartTime(), beginTime); + long midIntervalEndTime = Math.min(timeInterval.getEndTime(), endTime); + if (timeInterval.getStartTime() > beginTime && (timeInterval.getStartTime() - beginTime) % precision != 0) { // 只有非第一个分片才有可能创建前缀分片 + long prefixIntervalEndTime = Math.min(midIntervalBeginTime + precision - (timeInterval.getStartTime() - beginTime) % precision, midIntervalEndTime); + resultList.add(new TimeInterval(midIntervalBeginTime, prefixIntervalEndTime)); + midIntervalBeginTime = prefixIntervalEndTime; + } + if ((midIntervalEndTime - midIntervalBeginTime) >= precision) { + midIntervalEndTime -= (midIntervalEndTime - midIntervalBeginTime) % precision; + resultList.add(new TimeInterval(midIntervalBeginTime, midIntervalEndTime)); + } else { + midIntervalEndTime = midIntervalBeginTime; + } + if (midIntervalEndTime != Math.min(timeInterval.getEndTime(), endTime)) { + resultList.add(new TimeInterval(midIntervalEndTime, Math.min(timeInterval.getEndTime(), endTime))); + } + } + return resultList; + } + + public List getSplitDeleteColumnsPlanResults(DeleteColumnsPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesInterval(plan.getTsInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, false); + for (StorageUnitMeta storageUnit : storageUnitList) { + logger.info("add storage unit id {} to duplicate remove set.", storageUnit.getId()); + infoList.add(new SplitInfo(new TimeInterval(0L, Long.MAX_VALUE), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + public List getSplitInsertColumnRecordsPlanResults(InsertColumnRecordsPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + if (fragmentMap.isEmpty()) { + //on startup + policy.setNeedReAllocate(false); + Pair, List> fragmentsAndStorageUnits = policy.getFragmentGenerator().generateFragmentsAndStorageUnits(0L); + iMetaManager.createInitialFragmentsAndStorageUnits(fragmentsAndStorageUnits.v, fragmentsAndStorageUnits.k); + fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval(plan.getTsInterval(), plan.getTimeInterval()); + } else if (policy.isNeedReAllocate()) { + //on scale-out or any events requiring reallocation + Pair, List> fragmentsAndStorageUnits = policy.getFragmentGenerator().generateFragmentsAndStorageUnits(plan.getEndTime() + TimeUnit.SECONDS.toMillis(ConfigDescriptor.getInstance().getConfig().getDisorderMargin()) * 2 + 1); + iMetaManager.createFragmentsAndStorageUnits(fragmentsAndStorageUnits.v, fragmentsAndStorageUnits.k); + } + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, false); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitInsertRowRecordsPlanResults(InsertRowRecordsPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + if (fragmentMap.isEmpty()) { + policy.setNeedReAllocate(false); + Pair, List> fragmentsAndStorageUnits = policy.getFragmentGenerator().generateFragmentsAndStorageUnits(0L); + iMetaManager.createInitialFragmentsAndStorageUnits(fragmentsAndStorageUnits.v, fragmentsAndStorageUnits.k); + fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval(plan.getTsInterval(), plan.getTimeInterval()); + } else if (policy.isNeedReAllocate()) { + Pair, List> fragmentsAndStorageUnits = policy.getFragmentGenerator().generateFragmentsAndStorageUnits(plan.getEndTime() + TimeUnit.SECONDS.toMillis(ConfigDescriptor.getInstance().getConfig().getDisorderMargin()) * 2 + 1); + iMetaManager.createFragmentsAndStorageUnits(fragmentsAndStorageUnits.v, fragmentsAndStorageUnits.k); + } + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, false); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitDeleteDataInColumnsPlanResults(DeleteDataInColumnsPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, false); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + public List getSplitQueryDataPlanResults(QueryDataPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + private List getSplitResultsForDownsamplePlan(DownsampleQueryPlan plan, IginxPlan.IginxPlanType intervalQueryPlan) { + List infoList = new ArrayList<>(); + // 对查出来的分片结果按照开始时间进行排序、分组 + List> fragmentMetasList = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()).values().stream().flatMap(List::stream) + .sorted(Comparator.comparingLong(e -> e.getTimeInterval().getStartTime())) + .collect(Collectors.groupingBy(e -> e.getTimeInterval().getStartTime())).entrySet().stream() + .sorted(Comparator.comparingLong(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList()); + // 聚合精度,后续会多次用到 + long precision = plan.getPrecision(); + // 计算子查询时间片列表 + List planTimeIntervals = splitTimeIntervalForDownsampleQuery(fragmentMetasList.stream() + .map(e -> e.get(0).getTimeInterval()).collect(Collectors.toList()), plan.getStartTime(), plan.getEndTime(), plan.getPrecision()); + // 所属的合并组的组号 + int combineGroup = 0; + int index = 0; + long timespan = 0L; + for (List fragmentMetas : fragmentMetasList) { + long endTime = fragmentMetas.get(0).getTimeInterval().getEndTime(); + while (index < planTimeIntervals.size() && planTimeIntervals.get(index).getEndTime() <= endTime) { + TimeInterval timeInterval = planTimeIntervals.get(index++); + if (timeInterval.getSpan() >= precision) { + // 对于聚合子查询,清空 timespan,并且在计划全部加入之后增加组号 + for (FragmentMeta fragment : fragmentMetas) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(timeInterval, fragment.getTsInterval(), storageUnit, plan.getIginxPlanType(), combineGroup)); + } + } + timespan = 0L; + combineGroup += 1; + } else { + for (FragmentMeta fragment : fragmentMetas) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(timeInterval, fragment.getTsInterval(), storageUnit, intervalQueryPlan, combineGroup)); + } + } + timespan += timeInterval.getSpan(); + if (timespan >= precision) { + timespan = 0L; + combineGroup += 1; + } + } + } + } + return infoList; + } + + @Override + public List getSplitDownsampleMaxQueryPlanResults(DownsampleMaxQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.MAX); + } + + @Override + public List getSplitDownsampleMinQueryPlanResults(DownsampleMinQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.MIN); + } + + @Override + public List getSplitDownsampleSumQueryPlanResults(DownsampleSumQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.SUM); + } + + @Override + public List getSplitDownsampleCountQueryPlanResults(DownsampleCountQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.COUNT); + } + + @Override + public List getSplitDownsampleAvgQueryPlanResults(DownsampleAvgQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.AVG); + } + + @Override + public List getSplitDownsampleFirstQueryPlanResults(DownsampleFirstQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.FIRST); + } + + @Override + public List getSplitDownsampleLastQueryPlanResults(DownsampleLastQueryPlan plan) { + return getSplitResultsForDownsamplePlan(plan, IginxPlan.IginxPlanType.LAST); + } + + @Override + public List getValueFilterQueryPlanResults(ValueFilterQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval(plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitMaxQueryPlanResults(MaxQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitMinQueryPlanResults(MinQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitSumQueryPlanResults(SumQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitCountQueryPlanResults(CountQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitAvgQueryPlanResults(AvgQueryPlan plan) { + List infoList = new ArrayList<>(); + Map> fragmentMap = iMetaManager.getFragmentMapByTimeSeriesIntervalAndTimeInterval( + plan.getTsInterval(), plan.getTimeInterval()); + for (Map.Entry> entry : fragmentMap.entrySet()) { + for (FragmentMeta fragment : entry.getValue()) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), entry.getKey(), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitFirstQueryPlanResults(FirstQueryPlan plan) { + List infoList = new ArrayList<>(); + for (String path : plan.getPaths()) { + List fragmentList = iMetaManager.getFragmentListByTimeSeriesNameAndTimeInterval(path, plan.getTimeInterval()); + for (FragmentMeta fragment : fragmentList) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), new TimeSeriesInterval(path, path, true), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitLastQueryPlanResults(LastQueryPlan plan) { + List infoList = new ArrayList<>(); + for (String path : plan.getPaths()) { + List fragmentList = iMetaManager.getFragmentListByTimeSeriesNameAndTimeInterval(path, plan.getTimeInterval()); + for (FragmentMeta fragment : fragmentList) { + List storageUnitList = selectStorageUnitList(fragment, true); + for (StorageUnitMeta storageUnit : storageUnitList) { + infoList.add(new SplitInfo(fragment.getTimeInterval(), new TimeSeriesInterval(path, path, true), storageUnit)); + } + } + } + return infoList; + } + + @Override + public List getSplitShowColumnsPlanResult() { + return iMetaManager.getStorageEngineList().stream().map(StorageEngineMeta::getId).collect(Collectors.toList()); + } + + @Override + public List selectStorageUnitList(FragmentMeta fragment, boolean isQuery) { + List storageUnitList = new ArrayList<>(); + storageUnitList.add(fragment.getMasterStorageUnit()); + if (!isQuery) { + storageUnitList.addAll(fragment.getMasterStorageUnit().getReplicas()); + } + return storageUnitList; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPolicy.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPolicy.java new file mode 100644 index 000000000..53c3c4960 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationPolicy.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package cn.edu.tsinghua.iginx.policy.cloud; + +import cn.edu.tsinghua.iginx.core.processor.PostQueryExecuteProcessor; +import cn.edu.tsinghua.iginx.core.processor.PostQueryPlanProcessor; +import cn.edu.tsinghua.iginx.core.processor.PostQueryProcessor; +import cn.edu.tsinghua.iginx.core.processor.PostQueryResultCombineProcessor; +import cn.edu.tsinghua.iginx.core.processor.PreQueryExecuteProcessor; +import cn.edu.tsinghua.iginx.core.processor.PreQueryPlanProcessor; +import cn.edu.tsinghua.iginx.core.processor.PreQueryResultCombineProcessor; +import cn.edu.tsinghua.iginx.metadata.IMetaManager; +import cn.edu.tsinghua.iginx.metadata.hook.StorageEngineChangeHook; +import cn.edu.tsinghua.iginx.policy.IFragmentGenerator; +import cn.edu.tsinghua.iginx.policy.IPlanSplitter; +import cn.edu.tsinghua.iginx.policy.IPolicy; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EdgeCloudCollaborationPolicy implements IPolicy { + + protected AtomicBoolean needReAllocate = new AtomicBoolean(false); + + private EdgeCloudCollaborationPlanSplitter iPlanSplitter; + + private IMetaManager iMetaManager; + + private EdgeCloudCollaborationFragmentGenerator iFragmentGenerator; + + @Override + public PostQueryExecuteProcessor getPostQueryExecuteProcessor() { + return null; + } + + @Override + public PostQueryPlanProcessor getPostQueryPlanProcessor() { + return null; + } + + @Override + public PostQueryProcessor getPostQueryProcessor() { + return null; + } + + @Override + public PostQueryResultCombineProcessor getPostQueryResultCombineProcessor() { + return null; + } + + @Override + public PreQueryExecuteProcessor getPreQueryExecuteProcessor() { + return null; + } + + @Override + public PreQueryPlanProcessor getPreQueryPlanProcessor() { + return null; + } + + @Override + public PreQueryResultCombineProcessor getPreQueryResultCombineProcessor() { + return null; + } + + @Override + public IPlanSplitter getIPlanSplitter() { + return iPlanSplitter; + } + + @Override + public IFragmentGenerator getIFragmentGenerator() { + return null; + } + + public EdgeCloudCollaborationFragmentGenerator getFragmentGenerator() { + return iFragmentGenerator; + } + + @Override + public void init(IMetaManager iMetaManager) { + this.iMetaManager = iMetaManager; + this.iPlanSplitter = new EdgeCloudCollaborationPlanSplitter(this, this.iMetaManager); + this.iFragmentGenerator = new EdgeCloudCollaborationFragmentGenerator(this.iMetaManager); + StorageEngineChangeHook hook = getStorageEngineChangeHook(); + if (hook != null) { + iMetaManager.registerStorageEngineChangeHook(hook); + } + } + + @Override + public StorageEngineChangeHook getStorageEngineChangeHook() { + return (before, after) -> { + // 哪台机器加了分片,哪台机器初始化,并且在批量添加的时候只有最后一个存储引擎才会导致扩容发生 + if (before == null && after != null && after.getCreatedBy() == iMetaManager.getIginxId() && after.isLastOfBatch()) { + needReAllocate.set(true); + } + }; + } + + public boolean isNeedReAllocate() { + return needReAllocate.getAndSet(false); + } + + public void setNeedReAllocate(boolean needReAllocate) { + this.needReAllocate.set(needReAllocate); + } + +} From 64dca5183f970fa52c114c32c5a621faf81cf640 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 27 Jul 2021 11:06:47 +0800 Subject: [PATCH 2/7] fix bugs in edge cloud collaboration --- .../cn/edu/tsinghua/iginx/metadata/entity/IginxMeta.java | 3 +++ .../tsinghua/iginx/metadata/entity/StorageEngineMeta.java | 3 +++ .../cloud/EdgeCloudCollaborationFragmentGenerator.java | 6 +++--- .../main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java | 6 ++++++ 4 files changed, 15 insertions(+), 3 deletions(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/IginxMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/IginxMeta.java index 06297cef7..1d1a6a200 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/IginxMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/IginxMeta.java @@ -18,6 +18,7 @@ */ package cn.edu.tsinghua.iginx.metadata.entity; +import java.util.HashMap; import java.util.Map; public final class IginxMeta { @@ -62,6 +63,8 @@ public int getPort() { } public Map getExtraParams() { + if (extraParams == null) + return new HashMap<>(); return extraParams; } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java index ce759fc3d..251598f5e 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java @@ -21,6 +21,7 @@ import cn.edu.tsinghua.iginx.db.StorageEngine; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,6 +100,8 @@ public void setPort(int port) { } public Map getExtraParams() { + if (extraParams == null) + return new HashMap<>(); return extraParams; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java index 4b1622050..9ce2e4cf2 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java @@ -52,7 +52,7 @@ public Pair, List> generateFragmentsAndStora List edges = getEdges(iginxList, storageEngineList); Map> groupedStorageEngineLists = storageEngineList.stream().collect(Collectors.groupingBy(e -> e.getExtraParams().getOrDefault("edgeName", ""))); int replicaNum = Math.min(1 + ConfigDescriptor.getInstance().getConfig().getReplicaNum(), groupedStorageEngineLists.getOrDefault("", Collections.emptyList()).size() + 1); // 最多备份数 = 云端服务节点树 + 1 - List> storageEngineFragmentCounts = storageEngineList.stream().filter(e -> !e.getExtraParams().getOrDefault("edgeName", "").equals("")) + List> storageEngineFragmentCounts = storageEngineList.stream().filter(e -> e.getExtraParams().getOrDefault("edgeName", "").equals("")) .map(e -> new Pair<>(e.getId(), e.getStorageUnitList().size())).collect(Collectors.toList()); // 记录每个云端存储单元已经分配的分片的个数 List fragmentList = new ArrayList<>(); @@ -111,13 +111,13 @@ private List getEdges(List iginxList, List Set edges = new HashSet<>(); for (IginxMeta iginx: iginxList) { String edge = iginx.getExtraParams().get("edge_name"); - if (edge != null) { + if (edge != null && !edge.equals("")) { edges.add(edge); } } for (StorageEngineMeta storageEngine: storageEngineList) { String edge = storageEngine.getExtraParams().get("edgeName"); - if (edge != null) { + if (edge != null && !edge.equals("")) { edges.add(edge); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java new file mode 100644 index 000000000..2cd1c3355 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java @@ -0,0 +1,6 @@ +package cn.edu.tsinghua.iginx.policy.cloud;/** + * Created on 27/07/2021. + * Description: + * @author ziyuan + */public class Test { +} From cfe9ff22c8afe119251925995c3b37f4d1508b2e Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 27 Jul 2021 11:09:28 +0800 Subject: [PATCH 3/7] remove auto prefux --- .../iginx/core/context/InsertColumnRecordsContext.java | 5 ----- .../tsinghua/iginx/core/context/InsertRowRecordsContext.java | 5 ----- 2 files changed, 10 deletions(-) diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java index 27e3f0ec9..87283748e 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertColumnRecordsContext.java @@ -35,11 +35,6 @@ public class InsertColumnRecordsContext extends RequestContext { public InsertColumnRecordsContext(InsertColumnRecordsReq req) { super(req.sessionId, ContextType.InsertColumnRecords); this.req = req; - Config config = ConfigDescriptor.getInstance().getConfig(); - if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) { - String prefix = config.getEdgeName() + "."; - this.req.setPaths(this.req.getPaths().stream().map(e -> prefix + e).collect(Collectors.toList())); - } } public InsertColumnRecordsReq getReq() { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java index 0ece98618..8aaed5a7d 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/core/context/InsertRowRecordsContext.java @@ -35,11 +35,6 @@ public class InsertRowRecordsContext extends RequestContext { public InsertRowRecordsContext(InsertRowRecordsReq req) { super(req.sessionId, ContextType.InsertRowRecords); this.req = req; - Config config = ConfigDescriptor.getInstance().getConfig(); - if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) { - String prefix = config.getEdgeName() + "."; - this.req.setPaths(this.req.getPaths().stream().map(e -> prefix + e).collect(Collectors.toList())); - } } public InsertRowRecordsReq getReq() { From 12d5fb87d8f54d65041862ee3c176b7f0a2b60d6 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 27 Jul 2021 14:19:42 +0800 Subject: [PATCH 4/7] fix bug --- .../iginx/metadata/storage/zk/ZooKeeperMetaStorage.java | 3 +++ .../main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java | 6 ------ 2 files changed, 3 insertions(+), 6 deletions(-) delete mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java index 4fe611c07..c83a4fdad 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java @@ -434,6 +434,9 @@ private void registerStorageEngineListener() throws Exception { break; } data = event.getData().getData(); + if (event.getData().getPath().equals(STORAGE_ENGINE_NODE_PREFIX)) { + break; + } logger.info("storage engine meta updated " + event.getData().getPath()); logger.info("storage engine: " + new String(data)); storageEngineMeta = JsonUtils.fromJson(data, StorageEngineMeta.class); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java deleted file mode 100644 index 2cd1c3355..000000000 --- a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/Test.java +++ /dev/null @@ -1,6 +0,0 @@ -package cn.edu.tsinghua.iginx.policy.cloud;/** - * Created on 27/07/2021. - * Description: - * @author ziyuan - */public class Test { -} From c3d8ea0b7e6fa0ac8d3b9f8a88f70f90c25cc430 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 27 Jul 2021 15:02:48 +0800 Subject: [PATCH 5/7] fix bug --- ...dgeCloudCollaborationFragmentGenerator.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java index 9ce2e4cf2..5c84f3de8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java @@ -61,7 +61,7 @@ public Pair, List> generateFragmentsAndStora List edgeStorageEngineList = groupedStorageEngineLists.get(edge); if (edgeStorageEngineList == null) { // 边缘端部署了 iginx,没有部署 iotdb,所有的备份均存储在云端 List storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); - Pair pair = generateFragmentAndStorageUnit(edge + '.', edge + (char) ('.' + 1), startTime, storageEngineIds); + Pair pair = generateFragmentAndStorageUnit(lowerBound(edge), upperBound(edge), startTime, storageEngineIds); fragmentList.add(pair.k); storageUnitList.add(pair.v); } else { // 边缘端部署了 iginx 也部署了 iotdb,每个分片在本地存储一个备份 @@ -86,20 +86,20 @@ public Pair, List> generateFragmentsAndStora } else { // 上空隙 List storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); - Pair pair = generateFragmentAndStorageUnit(null, edges.get(0) + '.', startTime, storageEngineIds); + Pair pair = generateFragmentAndStorageUnit(null, lowerBound(edges.get(0)), startTime, storageEngineIds); fragmentList.add(pair.k); storageUnitList.add(pair.v); // 下空隙 storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); - pair = generateFragmentAndStorageUnit(edges.get(edges.size() - 1) + (char) ('.' + 1), null, startTime, storageEngineIds); + pair = generateFragmentAndStorageUnit(upperBound(edges.get(edges.size() - 1)), null, startTime, storageEngineIds); fragmentList.add(pair.k); storageUnitList.add(pair.v); // 中间的空隙 for (int i = 0; i < edges.size() - 1; i++) { storageEngineIds = selectStorageEngines(storageEngineFragmentCounts, replicaNum); - pair = generateFragmentAndStorageUnit(edges.get(i) + (char) ('.' + 1), edges.get(i + 1) + '.', startTime, storageEngineIds); + pair = generateFragmentAndStorageUnit(upperBound(edges.get(i)), lowerBound(edges.get(i + 1)), startTime, storageEngineIds); fragmentList.add(pair.k); storageUnitList.add(pair.v); } @@ -141,7 +141,7 @@ private List partitionWithinEdge(String edge, int partition) { if (partition < 1) { return Collections.emptyList(); } - return Arrays.asList(edge + '.', edge + (char) ('.' + 1)); + return Arrays.asList(lowerBound(edge), upperBound(edge)); } // 根据时间和序列区间以及一组 storageEngineList 来生成分片和存储单元 @@ -155,4 +155,12 @@ private Pair generateFragmentAndStorageUnit(Strin return new Pair<>(fragment, storageUnit); } + private static String lowerBound(String string) { + return string + '.' + (char)('A' - 1); + } + + private static String upperBound(String string) { + return string + '.' + (char)('z' + 1); + } + } From c88689405c5b0f80020f3e1fd9df593b86a62b02 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Tue, 27 Jul 2021 15:45:51 +0800 Subject: [PATCH 6/7] temp storage --- conf/config.properties | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/conf/config.properties b/conf/config.properties index e9a2a097d..15531f685 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -15,8 +15,8 @@ username=root password=root # 时序数据库列表,使用','分隔不同实例 -storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=100#dataDir=/path/to/your/data/ -#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/#token=your-token#organization=your-organization +storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=edge1,127.0.0.1#6668#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=,127.0.0.1#6669#iotdb#username=root#password=root#sessionPoolSize=100#edgeName= +#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/ # 异步请求最大重复次数 maxAsyncRetryTimes=3 @@ -34,7 +34,7 @@ replicaNum=1 databaseClassNames=iotdb=cn.edu.tsinghua.iginx.iotdb.IoTDBPlanExecutor,influxdb=cn.edu.tsinghua.iginx.influxdb.InfluxDBPlanExecutor # 策略类名 -policyClassName=cn.edu.tsinghua.iginx.policy.naive.NativePolicy +policyClassName=cn.edu.tsinghua.iginx.policy.cloud.EdgeCloudCollaborationPolicy # 统计信息收集类 # statisticsCollectorClassName=cn.edu.tsinghua.iginx.statistics.StatisticsCollector @@ -53,7 +53,7 @@ restIp=0.0.0.0 restPort=6666 # 是否启用 rest 服务 -enableRestService=true +enableRestService=false # 乱序数据 margin, 单位是秒 disorderMargin=10 @@ -96,10 +96,14 @@ mqtt_max_message_size=1048576 #################### # 是否开启边云协同功能,默认关闭 -enable_edge_cloud_collaboration=false +enable_edge_cloud_collaboration=true # iginx 是否为边缘端,默认为非边缘端 is_edge=false # 边缘端名字,所有由边缘端写入的序列均包含该前缀;非边缘端不需要填写 +<<<<<<< HEAD edge_name= +======= +edge_name=edge2 +>>>>>>> temp storage From 987d674b8b2a0f5a89a152498068cad600b35c80 Mon Sep 17 00:00:00 2001 From: Yuan Zi Date: Wed, 28 Jul 2021 08:37:32 +0800 Subject: [PATCH 7/7] for shangfei --- conf/config.properties | 13 +-- .../iginx/mqtt/ShangFeiPayloadFormatter.java | 92 +++++++++++++++++++ ...geCloudCollaborationFragmentGenerator.java | 2 +- 3 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/mqtt/ShangFeiPayloadFormatter.java diff --git a/conf/config.properties b/conf/config.properties index 15531f685..b445cf7c4 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -15,8 +15,7 @@ username=root password=root # 时序数据库列表,使用','分隔不同实例 -storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=edge1,127.0.0.1#6668#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=,127.0.0.1#6669#iotdb#username=root#password=root#sessionPoolSize=100#edgeName= -#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/ +storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=30#edgeName=edge1 # 异步请求最大重复次数 maxAsyncRetryTimes=3 @@ -36,12 +35,6 @@ databaseClassNames=iotdb=cn.edu.tsinghua.iginx.iotdb.IoTDBPlanExecutor,influxdb= # 策略类名 policyClassName=cn.edu.tsinghua.iginx.policy.cloud.EdgeCloudCollaborationPolicy -# 统计信息收集类 -# statisticsCollectorClassName=cn.edu.tsinghua.iginx.statistics.StatisticsCollector - -# 统计信息打印间隔,单位毫秒 -# statisticsLogInterval=1000 - #################### ### Rest 服务配置 #################### @@ -87,7 +80,7 @@ mqtt_port=1883 mqtt_handler_pool_size=1 -mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter +mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.ShangFeiPayloadFormatter mqtt_max_message_size=1048576 @@ -99,7 +92,7 @@ mqtt_max_message_size=1048576 enable_edge_cloud_collaboration=true # iginx 是否为边缘端,默认为非边缘端 -is_edge=false +is_edge=true # 边缘端名字,所有由边缘端写入的序列均包含该前缀;非边缘端不需要填写 <<<<<<< HEAD diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/mqtt/ShangFeiPayloadFormatter.java b/core/src/main/java/cn/edu/tsinghua/iginx/mqtt/ShangFeiPayloadFormatter.java new file mode 100644 index 000000000..cf3faa01b --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/mqtt/ShangFeiPayloadFormatter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package cn.edu.tsinghua.iginx.mqtt; + +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.thrift.DataType; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ShangFeiPayloadFormatter implements IPayloadFormatter { + + private static final Gson gson = new GsonBuilder().create(); + + private static final Logger logger = LoggerFactory.getLogger(ShangFeiPayloadFormatter.class); + + private final Config config = ConfigDescriptor.getInstance().getConfig(); + + @Override + public List format(ByteBuf payload) { + if (payload == null) { + return null; + } + String txt = payload.toString(StandardCharsets.UTF_8); + JsonArray jsonArray = gson.fromJson(txt, JsonArray.class); + List messages = new ArrayList<>(); + + for (int i = 0; i < jsonArray.size(); i++) { + JsonObject jsonObject = jsonArray.get(i).getAsJsonObject(); + JsonObject metadata = jsonObject.get("metadata").getAsJsonObject(); + JsonElement value = jsonObject.get("value"); + + String displayName = metadata.get("displayName").getAsString(); + String path = metadata.get("path").getAsString().replace('/', '.').substring(1) + jsonObject.get("name").getAsString() + "@" + displayName; + if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) { + path = config.getEdgeName() + "." + path; + } + long timestamp = jsonObject.get("timestamp").getAsLong(); + + Message message = new Message(); + message.setPath(path); + message.setTimestamp(timestamp); + + String dataType = metadata.get("dataType").getAsString(); + switch (dataType) { + case "Boolean": + message.setValue(value.getAsBoolean()); + message.setDataType(DataType.BOOLEAN); + break; + case "Char": + message.setValue(value.getAsString().getBytes(StandardCharsets.UTF_8)); + message.setDataType(DataType.BINARY); + break; + case "Byte": + message.setValue(value.getAsInt()); + message.setDataType(DataType.INTEGER); + break; + default: + logger.warn("unknown datatype of mqtt: " + dataType); + } + messages.add(message); + } + return messages; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java index 5c84f3de8..4096169ff 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/policy/cloud/EdgeCloudCollaborationFragmentGenerator.java @@ -128,7 +128,7 @@ private List selectStorageEngines(List> storageEngineF storageEngineFragmentCounts = storageEngineFragmentCounts.stream().sorted(Comparator.comparingInt(o -> o.v)).collect(Collectors.toList()); List storageEngines = new ArrayList<>(); for (int i = 0; i < count; i++) { - if (i > storageEngineFragmentCounts.size()) { + if (i >= storageEngineFragmentCounts.size()) { break; } storageEngines.add(storageEngineFragmentCounts.get(i).k);