diff --git a/conf/config.properties b/conf/config.properties
index 910cd6c0f..554ae4489 100644
--- a/conf/config.properties
+++ b/conf/config.properties
@@ -14,7 +14,7 @@ username=root
password=root
# 时序数据库列表,使用','分隔不同实例
-storageEngineList=127.0.0.1#6667#iotdb11#username=root#password=root#sessionPoolSize=20#has_data=false#is_read_only=false
+storageEngineList=127.0.0.1#6667#iotdb11#username=root#password=root#sessionPoolSize=50#has_data=false#is_read_only=false,127.0.0.1#6668#iotdb11#username=root#password=root#sessionPoolSize=50#has_data=false#is_read_only=false
#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/#token=your-token#organization=your-organization#has_data=false
#storageEngineList=127.0.0.1#4242#opentsdb#url=http://127.0.0.1
#storageEngineList=11.101.17.21#5432#timescaledb#username=postgres#password=123456
@@ -75,6 +75,8 @@ migrationPolicyClassName=cn.edu.tsinghua.iginx.migration.GreedyMigrationPolicy
# parquet是否为本地存储
isLocalParquetStorage=true
+migration_thread_pool_size=20
+
##########################
### simple policy 策略配置
##########################
@@ -184,3 +186,28 @@ batchSize=50
transformTaskThreadPoolSize=10
# Transform最大重试次数
transformMaxRetryTimes=3
+
+##########################
+### 容错相关
+##########################
+
+# 是否开启容错
+enable_storage_heartbeat=false
+
+# 存储节点心跳包间隔
+storage_heartbeat_interval=10s
+
+# 存储节点单个心跳包最大重试次数
+storage_heartbeat_max_retry_times=2
+
+# 存储节点心跳包超时时间
+storage_heartbeat_timeout=1s
+
+# 存储节点宕机后重连的时间间隔
+storage_retry_connect_interval=30s
+
+# 存储心跳包检测调度线程池大小
+storage_heartbeat_threshold_pool_size=20
+
+# 尝试重连概率
+storage_restore_heartbeat_probability=0.2
\ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index d86940453..fca006cd1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -46,17 +46,22 @@
cn.edu.tsinghua
iginx-thrift
- ${project.version}
+ ${parent.version}
cn.edu.tsinghua
iginx-shared
- ${project.version}
+ ${parent.version}
cn.edu.tsinghua
iginx-session
- ${project.version}
+ ${parent.version}
+
+
+ cn.edu.tsinghua
+ iginx-sync
+ ${parent.version}
javax.ws.rs
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
index b302591bf..c96100ac9 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
@@ -28,9 +28,12 @@
import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl;
import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
+import cn.edu.tsinghua.iginx.exceptions.StatusCode;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.*;
+import cn.edu.tsinghua.iginx.migration.MigrationManager;
+import cn.edu.tsinghua.iginx.migration.storage.StorageMigrationExecutor;
import cn.edu.tsinghua.iginx.utils.JsonUtils;
import cn.edu.tsinghua.iginx.resource.QueryResourceManager;
import cn.edu.tsinghua.iginx.thrift.*;
@@ -44,7 +47,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -182,6 +184,33 @@ public QueryDataResp queryData(QueryDataReq req) {
return ctx.getResult().getQueryDataResp();
}
+ @Override
+ public Status removeStorageEngine(RemoveStorageEngineReq req) {
+ if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
+ return RpcUtils.ACCESS_DENY;
+ }
+ long storageId = req.getStorageId();
+ StorageEngineMeta storageEngine = metaManager.getStorageEngine(storageId);
+ if (storageEngine == null) {
+ Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
+ status.setMessage("storage engine is not exists.");
+ return status;
+ }
+ try {
+ if (StorageMigrationExecutor.getInstance().migration(storageId, req.sync)) {
+ return RpcUtils.SUCCESS;
+ }
+ Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
+ status.setMessage("unexpected error during storage migration");
+ return status;
+ } catch (Exception e) {
+ logger.error("unexpected error during storage migration: ", e);
+ Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
+ status.setMessage("unexpected error during storage migration: " + e.getMessage());
+ return status;
+ }
+ }
+
@Override
public Status addStorageEngines(AddStorageEnginesReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
index ac06fd0ce..6083237d3 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
@@ -173,6 +173,47 @@ public class Config {
private boolean isLocalParquetStorage = true;
+ ////////////////////
+ // 容错相关配置 //
+ ////////////////////
+
+ /**
+ * 是否开启存储活性检测
+ */
+ private boolean enableStorageHeartbeat = false;
+
+ /**
+ * 存储节点心跳包间隔,单位是 ms,如果为 0 表示不检测存储节点活性,默认为 10s
+ */
+ private long storageHeartbeatInterval = 10000;
+
+ /**
+ * 存储节点单个心跳包尝试发送的最大重试次数
+ */
+ private int storageHeartbeatMaxRetryTimes = 5;
+
+ /**
+ * 存储节点单个心跳包超时时间,单位 ms,默认为 1s
+ */
+ private long storageHeartbeatTimeout = 1000;
+
+ /**
+ * 存储节点宕机后重连的时间间隔,单位 ms,默认为 50s
+ */
+ private long storageRetryConnectInterval = 50000;
+
+ /**
+ * 存储节点心跳包线程池大小,默认为 10
+ */
+ private int storageHeartbeatThresholdPoolSize = 10;
+
+ /**
+ * 存储宕机后尝试重连概率,默认为0.05
+ */
+ private double storageRestoreHeartbeatProbability = 0.05;
+
+ private int migrationThreadPoolSize = 20;
+
public int getMaxTimeseriesLength() {
return maxTimeseriesLength;
}
@@ -712,4 +753,68 @@ public boolean isLocalParquetStorage() {
public void setLocalParquetStorage(boolean localParquetStorage) {
isLocalParquetStorage = localParquetStorage;
}
+
+ public boolean isEnableStorageHeartbeat() {
+ return enableStorageHeartbeat;
+ }
+
+ public void setEnableStorageHeartbeat(boolean enableStorageHeartbeat) {
+ this.enableStorageHeartbeat = enableStorageHeartbeat;
+ }
+
+ public long getStorageHeartbeatInterval() {
+ return storageHeartbeatInterval;
+ }
+
+ public void setStorageHeartbeatInterval(long storageHeartbeatInterval) {
+ this.storageHeartbeatInterval = storageHeartbeatInterval;
+ }
+
+ public int getStorageHeartbeatMaxRetryTimes() {
+ return storageHeartbeatMaxRetryTimes;
+ }
+
+ public void setStorageHeartbeatMaxRetryTimes(int storageHeartbeatMaxRetryTimes) {
+ this.storageHeartbeatMaxRetryTimes = storageHeartbeatMaxRetryTimes;
+ }
+
+ public long getStorageHeartbeatTimeout() {
+ return storageHeartbeatTimeout;
+ }
+
+ public void setStorageHeartbeatTimeout(long storageHeartbeatTimeout) {
+ this.storageHeartbeatTimeout = storageHeartbeatTimeout;
+ }
+
+ public long getStorageRetryConnectInterval() {
+ return storageRetryConnectInterval;
+ }
+
+ public void setStorageRetryConnectInterval(long storageRetryConnectInterval) {
+ this.storageRetryConnectInterval = storageRetryConnectInterval;
+ }
+
+ public int getStorageHeartbeatThresholdPoolSize() {
+ return storageHeartbeatThresholdPoolSize;
+ }
+
+ public void setStorageHeartbeatThresholdPoolSize(int storageHeartbeatThresholdPoolSize) {
+ this.storageHeartbeatThresholdPoolSize = storageHeartbeatThresholdPoolSize;
+ }
+
+ public double getStorageRestoreHeartbeatProbability() {
+ return storageRestoreHeartbeatProbability;
+ }
+
+ public void setStorageRestoreHeartbeatProbability(double storageRestoreHeartbeatProbability) {
+ this.storageRestoreHeartbeatProbability = storageRestoreHeartbeatProbability;
+ }
+
+ public int getMigrationThreadPoolSize() {
+ return migrationThreadPoolSize;
+ }
+
+ public void setMigrationThreadPoolSize(int migrationThreadPoolSize) {
+ this.migrationThreadPoolSize = migrationThreadPoolSize;
+ }
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java
index 1beebe533..74728cdf1 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java
@@ -141,6 +141,16 @@ private void loadPropsFromFile() {
config.setHistoricalPrefixList(properties.getProperty("historicalPrefixList", ""));
config.setExpectedStorageUnitNum(Integer.parseInt(properties.getProperty("expectedStorageUnitNum", "0")));
config.setLocalParquetStorage(Boolean.parseBoolean(properties.getProperty("isLocalParquetStorage", "true")));
+
+ // 容错相关
+ config.setEnableStorageHeartbeat(Boolean.parseBoolean(properties.getProperty("enable_storage_heartbeat", "false")));
+ config.setStorageHeartbeatInterval(ConfigUtils.parseTime(properties.getProperty("storage_heartbeat_interval", "10s")));
+ config.setStorageHeartbeatMaxRetryTimes(Integer.parseInt(properties.getProperty("storage_heartbeat_max_retry_times", "5")));
+ config.setStorageHeartbeatTimeout(ConfigUtils.parseTime(properties.getProperty("storage_heartbeat_timeout", "1s")));
+ config.setStorageRetryConnectInterval(ConfigUtils.parseTime(properties.getProperty("storage_retry_connect_interval", "50s")));
+ config.setStorageHeartbeatThresholdPoolSize(Integer.parseInt(properties.getProperty("storage_heartbeat_threshold_pool_size", "10")));
+ config.setStorageRestoreHeartbeatProbability(Double.parseDouble(properties.getProperty("storage_restore_heartbeat_probability", "0.05")));
+ config.setMigrationThreadPoolSize(Integer.parseInt(properties.getProperty("migration_thread_pool_size", "20")));
} catch (IOException e) {
logger.error("Fail to load properties: ", e);
}
@@ -205,6 +215,15 @@ private void loadPropsFromEnv() {
config.setHistoricalPrefixList(EnvUtils.loadEnv("historicalPrefixList", config.getHistoricalPrefixList()));
config.setExpectedStorageUnitNum(EnvUtils.loadEnv("expectedStorageUnitNum", config.getExpectedStorageUnitNum()));
config.setLocalParquetStorage(EnvUtils.loadEnv("isLocalParquetStorage", config.isLocalParquetStorage()));
+
+ // 容错相关
+ config.setEnableStorageHeartbeat(EnvUtils.loadEnv("enable_storage_heartbeat", config.isEnableStorageHeartbeat()));
+ config.setStorageHeartbeatInterval(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_heartbeat_interval", ConfigUtils.toTimeString(config.getStorageHeartbeatInterval()))));
+ config.setStorageHeartbeatMaxRetryTimes(EnvUtils.loadEnv("storage_heartbeat_max_retry_times", config.getStorageHeartbeatMaxRetryTimes()));
+ config.setStorageHeartbeatTimeout(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_heartbeat_timeout", ConfigUtils.toTimeString(config.getStorageHeartbeatTimeout()))));
+ config.setStorageRetryConnectInterval(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_retry_connect_interval", ConfigUtils.toTimeString(config.getStorageRetryConnectInterval()))));
+ config.setStorageHeartbeatThresholdPoolSize(EnvUtils.loadEnv("storageHeartbeatThresholdPoolSize", config.getStorageHeartbeatThresholdPoolSize()));
+ config.setStorageRestoreHeartbeatProbability(EnvUtils.loadEnv("storageRestoreHeartbeatProbability", config.getStorageRestoreHeartbeatProbability()));
}
private void loadUDFListFromFile() {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigUtils.java
new file mode 100644
index 000000000..eb1aa1a8e
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigUtils.java
@@ -0,0 +1,38 @@
+package cn.edu.tsinghua.iginx.conf;
+
+public class ConfigUtils {
+
+ private static final String[] suffixList = new String[] {
+ "ms", "s", "m", "h", "min", "hour", "day"
+ };
+
+ private static final long[] factors = new long[] {
+ 1, 1000, 1000 * 60, 1000 * 60 * 60, 1000 * 60, 1000 * 60 * 60, 1000 * 60 * 60 * 24
+ };
+
+ /**
+ * 将表示时间的字符串以毫秒的形式解析。
+ * 支持使用 ms,s,m,h 作为后缀,不含后缀的情况下默认为 ms
+ * @param s 表示时间的字符串
+ * @return 字符串表示的
+ */
+ public static long parseTime(String s) {
+ long factor = 1;
+ int suffixLen = 0;
+ for (int i = 0; i < suffixList.length; i++) {
+ if (s.endsWith(suffixList[i])) {
+ factor = factors[i];
+ suffixLen = suffixList[i].length();
+ break;
+ }
+ }
+ s = s.substring(0, s.length() - suffixLen);
+ long value = Long.parseLong(s);
+ return value * factor;
+ }
+
+ public static String toTimeString(long time) {
+ return Long.toString(time) + "ms";
+ }
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/optimizer/naive/NaiveReplicaDispatcher.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/optimizer/naive/NaiveReplicaDispatcher.java
index cb88d044f..807d94483 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/optimizer/naive/NaiveReplicaDispatcher.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/optimizer/naive/NaiveReplicaDispatcher.java
@@ -20,6 +20,9 @@
import cn.edu.tsinghua.iginx.engine.physical.optimizer.ReplicaDispatcher;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
+import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
+import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;
+import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitState;
public class NaiveReplicaDispatcher implements ReplicaDispatcher {
@@ -33,7 +36,12 @@ public String chooseReplica(StoragePhysicalTask task) {
if (task == null) {
return null;
}
- return task.getTargetFragment().getMasterStorageUnitId();
+ String masterStorageUnitId = task.getTargetFragment().getMasterStorageUnitId();
+ StorageUnitMeta masterStorageUnit = DefaultMetaManager.getInstance().getStorageUnit(masterStorageUnitId);
+ if (masterStorageUnit.getState() == StorageUnitState.DISCARD) {
+ return masterStorageUnit.getMigrationTo();
+ }
+ return masterStorageUnitId;
}
public static NaiveReplicaDispatcher getInstance() {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java
index 2dd1df3b3..332ab7847 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java
@@ -20,6 +20,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval;
@@ -31,6 +32,8 @@
public interface IStorage {
+ Connector getConnector();
+
TaskExecuteResult execute(StoragePhysicalTask task);
List getTimeSeries() throws PhysicalException;
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/StorageManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/StorageManager.java
index 1fe37242d..309fd0891 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/StorageManager.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/StorageManager.java
@@ -19,6 +19,8 @@
package cn.edu.tsinghua.iginx.engine.physical.storage;
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.ConnectionManager;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.IStorageWrapper;
import cn.edu.tsinghua.iginx.metadata.entity.StorageEngineMeta;
import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval;
import cn.edu.tsinghua.iginx.metadata.entity.TimeSeriesInterval;
@@ -47,6 +49,8 @@ public class StorageManager {
private static final Map> storageMap = new ConcurrentHashMap<>();
+ private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
+
public StorageManager(List metaList) {
initClassLoaderAndDrivers();
for (StorageEngineMeta meta : metaList) {
@@ -93,18 +97,6 @@ public static Pair getBoundaryOfStorage(StorageEn
return new Pair<>(new TimeSeriesInterval(null, null), new TimeInterval(0, Long.MAX_VALUE));
}
- private static String getDriverClassName(String storageEngine) {
- String[] parts = ConfigDescriptor.getInstance().getConfig().getDatabaseClassNames().split(",");
- for (String part : parts) {
- String[] kAndV = part.split("=");
- if (!kAndV[0].equals(storageEngine)) {
- continue;
- }
- return kAndV[1];
- }
- throw new RuntimeException("cannot find driver for " + storageEngine + ", please check config.properties ");
- }
-
private boolean initStorage(StorageEngineMeta meta) {
String engine = meta.getStorageEngine();
String driver = drivers.get(engine);
@@ -114,11 +106,15 @@ private boolean initStorage(StorageEngineMeta meta) {
ClassLoader loader = classLoaders.get(engine);
IStorage storage = (IStorage) loader.loadClass(driver)
.getConstructor(StorageEngineMeta.class).newInstance(meta);
+ storage = new IStorageWrapper(storage);
// 启动一个派发线程池
ThreadPoolExecutor dispatcher = new ThreadPoolExecutor(ConfigDescriptor.getInstance().getConfig().getPhysicalTaskThreadPoolSizePerStorage(),
Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>());
storageMap.put(meta.getId(), new Pair<>(storage, dispatcher));
+ if (ConfigDescriptor.getInstance().getConfig().isEnableStorageHeartbeat()) {
+ connectionManager.registerConnector(id, storage.getConnector());
+ }
}
} catch (ClassNotFoundException e) {
logger.error("load class {} for engine {} failure: {}", driver, engine, e);
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java
index 70bc76336..426efcaa3 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java
@@ -39,6 +39,7 @@
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.StorageEngineMeta;
import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;
+import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitState;
import cn.edu.tsinghua.iginx.metadata.hook.StorageEngineChangeHook;
import cn.edu.tsinghua.iginx.metadata.hook.StorageUnitHook;
import cn.edu.tsinghua.iginx.utils.Pair;
@@ -116,6 +117,7 @@ private StoragePhysicalTaskExecutor() {
result = pair.k.execute(task);
} catch (Exception e) {
logger.error("execute task error: " + e);
+ e.printStackTrace();
result = new TaskExecuteResult(new PhysicalException(e));
}
task.setResult(result);
@@ -131,12 +133,15 @@ private StoragePhysicalTaskExecutor() {
logger.error("task " + task + " will not broadcasting to replicas for the sake of exception: " + result.getException());
task.setResult(new TaskExecuteResult(result.getException()));
} else {
- StorageUnitMeta masterStorageUnit = task.getTargetFragment().getMasterStorageUnit();
+ StorageUnitMeta masterStorageUnit = metaManager.getStorageUnit(id);
List replicaIds = masterStorageUnit.getReplicas()
.stream().map(StorageUnitMeta::getId).collect(Collectors.toList());
replicaIds.add(masterStorageUnit.getId());
+ if (masterStorageUnit.getState() == StorageUnitState.MIGRATION) { // 迁移过程需要双写
+ replicaIds.add(masterStorageUnit.getMigrationTo());
+ }
for (String replicaId : replicaIds) {
- if (replicaId.equals(task.getStorageUnit())) {
+ if (replicaId.equals(id)) {
continue;
}
StoragePhysicalTask replicaTask = new StoragePhysicalTask(task.getOperators(), false, false);
@@ -255,7 +260,12 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) {
public void commit(List tasks) {
for (StoragePhysicalTask task : tasks) {
if (replicaDispatcher == null) {
- storageTaskQueues.get(task.getTargetFragment().getMasterStorageUnitId()).addTask(task); // 默认情况下,异步写备,查询只查主
+ String masterStorageUnitId = task.getTargetFragment().getMasterStorageUnitId();
+ StorageUnitMeta masterStorageUnit = DefaultMetaManager.getInstance().getStorageUnit(masterStorageUnitId);
+ if (masterStorageUnit.getState() == StorageUnitState.DISCARD) {
+ masterStorageUnitId = masterStorageUnit.getMigrationTo();
+ }
+ storageTaskQueues.get(masterStorageUnitId).addTask(task); // 默认情况下,异步写备,查询只查主
} else {
storageTaskQueues.get(replicaDispatcher.chooseReplica(task)).addTask(task); // 在优化策略提供了选择器的情况下,利用选择器提供的结果
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/ConnectionManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/ConnectionManager.java
new file mode 100644
index 000000000..4bb50be3c
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/ConnectionManager.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance;
+
+import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
+import cn.edu.tsinghua.iginx.engine.physical.storage.execute.StoragePhysicalTaskExecutor;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content.LossConnectionProposalContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content.RestoreConnectionProposalContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content.LossConnectionVoteContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content.RestoreConnectionVoteContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.listener.LossConnectionVoteListener;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.listener.RestoreConnectionVoteListener;
+import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
+import cn.edu.tsinghua.iginx.metadata.IMetaManager;
+import cn.edu.tsinghua.iginx.proposal.ProposalListener;
+import cn.edu.tsinghua.iginx.proposal.SyncProposal;
+import cn.edu.tsinghua.iginx.proposal.SyncVote;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
+import cn.edu.tsinghua.iginx.protocol.VoteExpiredException;
+import cn.edu.tsinghua.iginx.utils.JsonUtils;
+import cn.hutool.core.collection.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class ConnectionManager {
+
+ private static final String LOSS_CONNECTION = "loss_connection";
+
+ private static final String RESTORE_CONNECTION = "restore_connection";
+
+ private static final String PROPOSAL_KEY = "storage_%d";
+
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+
+ private static final Random random = new Random();
+
+ private final IMetaManager iMetaManager;
+
+ private final ReadWriteLock lock;
+
+ private final Map connectors;
+
+ private final Set blockedStorages;
+
+ private final Set inVotes;
+
+ private final Set updatedConnectors;
+
+ private final ScheduledExecutorService scheduledService;
+
+ private final ProposalListener lossConnectionListener = new ProposalListener() {
+
+ @Override
+ public void onCreate(String key, SyncProposal proposal) {
+ logger.info("receive proposal(key = " + key + ") create for loss connection: " + new String(JsonUtils.toJson(proposal)));
+ LossConnectionProposalContent content = JsonUtils.fromJson(proposal.getContent(), LossConnectionProposalContent.class);
+
+ long id = content.getId();
+ inVotes.add(id);
+
+ // test connection access able:
+ checkAndVoteForLossConnection(key, id);
+ }
+
+ @Override
+ public void onUpdate(String key, SyncProposal before, SyncProposal after) {
+ logger.info("receive proposal(key = " + key + ") update for loss connection: " + new String(JsonUtils.toJson(after)));
+ LossConnectionProposalContent content = JsonUtils.fromJson(after.getContent(), LossConnectionProposalContent.class);
+ long id = content.getId();
+ if (!content.isAlive()) {
+ blockedStorages.add(id);
+ }
+ inVotes.remove(id);
+ if (content.isAlive()) {
+ return;
+ }
+ // TODO: storage engine will be remove from cluster
+ logger.info("remove storage " + id + " from cluster");
+ IStorageWrapper wrapper = (IStorageWrapper) StoragePhysicalTaskExecutor.getInstance().getStorageManager().getStorage(id).k;
+ wrapper.setBlocked(true);
+ }
+ };
+
+ private final ProposalListener restoreConnectionListener = new ProposalListener() {
+ @Override
+ public void onCreate(String key, SyncProposal proposal) {
+ logger.info("receive proposal(key = " + key + ") create for restore connection: " + new String(JsonUtils.toJson(proposal)));
+ RestoreConnectionProposalContent content = JsonUtils.fromJson(proposal.getContent(), RestoreConnectionProposalContent.class);
+
+ long id = content.getId();
+ inVotes.add(id);
+
+ // test connection access able:
+ checkAndVoteForRestoreConnection(key, id);
+ }
+
+ @Override
+ public void onUpdate(String key, SyncProposal before, SyncProposal after) {
+ logger.info("receive proposal(key = " + key + ") update for loss connection: " + new String(JsonUtils.toJson(after)));
+ RestoreConnectionProposalContent content = JsonUtils.fromJson(after.getContent(), RestoreConnectionProposalContent.class);
+ long id = content.getId();
+ inVotes.remove(id);
+ if (!content.isAlive()) {
+ logger.info("storage " + id + " still not alive!");
+ return;
+ }
+ blockedStorages.remove(id);
+ // TODO: storage engine will be remove from cluster
+ logger.info("resume storage " + id + " from cluster");
+ IStorageWrapper wrapper = (IStorageWrapper) StoragePhysicalTaskExecutor.getInstance().getStorageManager().getStorage(id).k;
+ wrapper.setBlocked(false);
+ }
+ };
+
+ private SyncProtocol lossConnectionProtocol;
+
+ private SyncProtocol restoreConnectionProtocol;
+
+ private ConnectionManager() {
+ iMetaManager = DefaultMetaManager.getInstance();
+ this.initProtocols();
+ lock = new ReentrantReadWriteLock();
+ blockedStorages = new ConcurrentHashSet<>();
+ inVotes = new ConcurrentHashSet<>();
+ connectors = new HashMap<>();
+ updatedConnectors = new HashSet<>();
+ scheduledService = new ScheduledThreadPoolExecutor(ConfigDescriptor.getInstance().getConfig().getStorageHeartbeatThresholdPoolSize());
+ }
+
+ private void initProtocols() {
+ try {
+ iMetaManager.initProtocol(LOSS_CONNECTION);
+ lossConnectionProtocol = iMetaManager.getProtocol(LOSS_CONNECTION);
+ lossConnectionProtocol.registerProposalListener(lossConnectionListener);
+ iMetaManager.initProtocol(RESTORE_CONNECTION);
+ restoreConnectionProtocol = iMetaManager.getProtocol(RESTORE_CONNECTION);
+ restoreConnectionProtocol.registerProposalListener(restoreConnectionListener);
+ } catch (Exception e) {
+ logger.error("init protocol failure: ", e);
+ System.exit(-1);
+ }
+ }
+
+ public void registerConnector(long id, Connector connector) {
+ if (connector == null) {
+ throw new IllegalArgumentException("connector for storage{id=" + id + "} shouldn't be null");
+ }
+ lock.writeLock().lock();
+ boolean alreadyExists = connectors.containsKey(id);
+ if (alreadyExists) {
+ updatedConnectors.add(id);
+ } else {
+ scheduledService.scheduleWithFixedDelay(new HeartbeatTask(id, connector),
+ 0, ConfigDescriptor.getInstance().getConfig().getStorageHeartbeatInterval(), TimeUnit.MILLISECONDS);
+ }
+ connectors.put(id, connector);
+ lock.writeLock().unlock();
+ }
+
+ private void checkAndVoteForLossConnection(String key, long id) {
+ scheduledService.submit(() -> {
+ LossConnectionVoteContent content = new LossConnectionVoteContent(checkConnection(id));
+ logger.info("[checkAndVoteForLossConnection] async check connection for " + id + ", is alive? " + content.isAlive());
+ try {
+ lossConnectionProtocol.voteFor(key, new SyncVote(iMetaManager.getIginxId(), JsonUtils.toJson(content)));
+ } catch (NetworkException e) {
+ logger.error("[checkAndVoteForLossConnection] vote for " + id + " failure: ", e);
+ } catch (VoteExpiredException e) {
+ logger.error("[checkAndVoteForLossConnection] vote for " + id + " expired: ", e);
+ }
+ });
+ }
+
+ private void checkAndVoteForRestoreConnection(String key, long id) {
+ scheduledService.submit(() -> {
+ RestoreConnectionVoteContent content = new RestoreConnectionVoteContent(checkConnection(id));
+ logger.info("[checkAndVoteForRestoreConnection] async check connection for " + id + ", is alive? " + content.isAlive());
+ try {
+ restoreConnectionProtocol.voteFor(key, new SyncVote(iMetaManager.getIginxId(), JsonUtils.toJson(content)));
+ } catch (NetworkException e) {
+ logger.error("[checkAndVoteForRestoreConnection] vote for " + id + " failure: ", e);
+ } catch (VoteExpiredException e) {
+ logger.error("[checkAndVoteForRestoreConnection] vote for " + id + " expired: ", e);
+ }
+ });
+ }
+
+ private boolean checkConnection(long id) {
+ this.lock.readLock().lock();
+ Connector connector = connectors.get(id);
+ this.lock.readLock().unlock();
+ if (connector == null) {
+ return false;
+ }
+ return connector.echo(ConfigDescriptor.getInstance().getConfig().getStorageHeartbeatTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ private class HeartbeatTask implements Runnable {
+
+ private final long id;
+
+ private Connector connector;
+
+ private final double restoreConnectionProbability = ConfigDescriptor.getInstance().getConfig().getStorageRestoreHeartbeatProbability();
+
+ public HeartbeatTask(long id, Connector connector) {
+ this.id = id;
+ this.connector = connector;
+ }
+
+ @Override
+ public void run() {
+ logger.info("scheduled test connection for " + id);
+ if (inVotes.contains(id)) {
+ logger.info("don't need to check connection for " + id + ", because it is in vote!");
+ return;
+ }
+ boolean block = blockedStorages.contains(id);
+ if (block) {
+ if (random.nextDouble() > restoreConnectionProbability) {
+ logger.info("don't need to check connection for " + id);
+ return;
+ }
+ logger.info("try restore connection for " + id + " timely");
+ }
+ ConnectionManager manager = ConnectionManager.this;
+ int maxRetryTimes = ConfigDescriptor.getInstance().getConfig().getStorageHeartbeatMaxRetryTimes();
+ long heartbeatTimeout = ConfigDescriptor.getInstance().getConfig().getStorageHeartbeatTimeout();
+
+ boolean updated;
+ manager.lock.readLock().lock();
+ updated = manager.updatedConnectors.contains(id);
+ manager.lock.readLock().unlock();
+ if (updated) {
+ Connector newConnector;
+ manager.lock.writeLock().lock();
+ manager.updatedConnectors.remove(id);
+ newConnector = manager.connectors.get(id);
+ manager.lock.writeLock().unlock();
+ connector.reset();
+ connector = newConnector;
+ }
+ for (int i = 0; i < maxRetryTimes; i++) {
+ if (connector.echo(heartbeatTimeout, TimeUnit.MILLISECONDS)) {
+ logger.info("not loss connection for " + id + ", curr timestamp = " + System.currentTimeMillis());
+ if (block) {
+ // start proposal for restore storage status
+ SyncProposal proposal = new SyncProposal(iMetaManager.getIginxId(), JsonUtils.toJson(new RestoreConnectionProposalContent(id)));
+ boolean success;
+ try {
+ success = restoreConnectionProtocol.startProposal(String.format(PROPOSAL_KEY, id), proposal, new RestoreConnectionVoteListener(iMetaManager.getIginxClusterSize(), proposal, restoreConnectionProtocol));
+ } catch (NetworkException e) {
+ logger.error("start restore connection proposal for " + id + " failure: ", e);
+ return;
+ }
+ if (!success) {
+ logger.warn("start restore connection proposal for " + id + " failure, due to race!");
+ return;
+ }
+ }
+ return;
+ }
+ logger.info("connection for " + id + " failure, retry cnt = " + i);
+ }
+ if (block) {
+ return;
+ }
+ logger.error("loss connection for " + id + ", curr timestamp = " + System.currentTimeMillis());
+
+ // start proposal for check storage status
+ SyncProposal proposal = new SyncProposal(iMetaManager.getIginxId(), JsonUtils.toJson(new LossConnectionProposalContent(id)));
+ boolean success;
+ try {
+ success = lossConnectionProtocol.startProposal(String.format(PROPOSAL_KEY, id), proposal, new LossConnectionVoteListener(iMetaManager.getIginxClusterSize(), proposal, lossConnectionProtocol));
+ } catch (NetworkException e) {
+ logger.error("start loss connection proposal for " + id + " failure: ", e);
+ return;
+ }
+ if (!success) {
+ logger.warn("start loss connection proposal for " + id + " failure, due to race!");
+ return;
+ }
+ logger.info("start loss proposal for " + id);
+ }
+ }
+
+ public static ConnectionManager getInstance() {
+ return ConnectionManagerInstanceHolder.INSTANCE;
+ }
+
+ private static class ConnectionManagerInstanceHolder {
+
+ private static final ConnectionManager INSTANCE = new ConnectionManager();
+
+ }
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/Connector.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/Connector.java
new file mode 100644
index 000000000..228153e06
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/Connector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance;
+
+import java.util.concurrent.TimeUnit;
+
+public interface Connector {
+
+ boolean echo(long timeout, TimeUnit unit);
+
+ void reset();
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/IStorageWrapper.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/IStorageWrapper.java
new file mode 100644
index 000000000..3932cec47
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/IStorageWrapper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance;
+
+import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
+import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalTaskExecuteFailureException;
+import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
+import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
+import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
+import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval;
+import cn.edu.tsinghua.iginx.metadata.entity.TimeSeriesRange;
+import cn.edu.tsinghua.iginx.utils.Pair;
+
+import java.util.List;
+
+public class IStorageWrapper implements IStorage {
+
+ private static final String ERROR_MESSAGE = "storage is blocked due to loss connection";
+
+ private final IStorage storage;
+
+ private volatile boolean blocked;
+
+ public IStorageWrapper(IStorage storage) {
+ this(storage, false);
+ }
+
+ public IStorageWrapper(IStorage storage, boolean blocked) {
+ this.storage = storage;
+ this.blocked = blocked;
+ }
+
+ @Override
+ public Connector getConnector() {
+ return storage.getConnector();
+ }
+
+ @Override
+ public TaskExecuteResult execute(StoragePhysicalTask task) {
+ if (blocked) {
+ return new TaskExecuteResult(new PhysicalTaskExecuteFailureException(ERROR_MESSAGE));
+ }
+ return storage.execute(task);
+ }
+
+ @Override
+ public List getTimeSeries() throws PhysicalException {
+ if (blocked) {
+ throw new PhysicalTaskExecuteFailureException(ERROR_MESSAGE);
+ }
+ return storage.getTimeSeries();
+ }
+
+ @Override
+ public Pair getBoundaryOfStorage(String prefix) throws PhysicalException {
+ if (blocked) {
+ throw new PhysicalTaskExecuteFailureException(ERROR_MESSAGE);
+ }
+ return storage.getBoundaryOfStorage(prefix);
+ }
+
+ @Override
+ public void release() throws PhysicalException {
+ if (blocked) {
+ throw new PhysicalTaskExecuteFailureException(ERROR_MESSAGE);
+ }
+ storage.release();
+ }
+
+ public void setBlocked(boolean blocked) {
+ this.blocked = blocked;
+ }
+
+ public boolean isBlocked() {
+ return blocked;
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/LossConnectionProposalContent.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/LossConnectionProposalContent.java
new file mode 100644
index 000000000..a6c372d0c
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/LossConnectionProposalContent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content;
+
+public class LossConnectionProposalContent {
+
+ private static final int NOT_SET = 0;
+
+ private static final int NOT_ALIVE = 1;
+
+ private static final int ALIVE = 2;
+
+ private final long id;
+
+ private int isAlive;
+
+ public LossConnectionProposalContent(long id) {
+ this.id = id;
+ this.isAlive = NOT_SET;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public boolean isAlive() {
+ return isAlive == ALIVE;
+ }
+
+ public void setAlive(boolean alive) {
+ isAlive = alive ? ALIVE : NOT_ALIVE;
+ }
+
+ public boolean isSetAlive() {
+ return isAlive != NOT_SET;
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/RestoreConnectionProposalContent.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/RestoreConnectionProposalContent.java
new file mode 100644
index 000000000..10e03fd60
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/proposal/content/RestoreConnectionProposalContent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content;
+
+public class RestoreConnectionProposalContent {
+
+ private static final int NOT_SET = 0;
+
+ private static final int NOT_ALIVE = 1;
+
+ private static final int ALIVE = 2;
+
+ private final long id;
+
+ private int isAlive;
+
+ public RestoreConnectionProposalContent(long id) {
+ this.id = id;
+ this.isAlive = NOT_SET;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public boolean isAlive() {
+ return isAlive == ALIVE;
+ }
+
+ public void setAlive(boolean alive) {
+ isAlive = alive ? ALIVE : NOT_ALIVE;
+ }
+
+ public boolean isSetAlive() {
+ return isAlive != NOT_SET;
+ }
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/LossConnectionVoteContent.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/LossConnectionVoteContent.java
new file mode 100644
index 000000000..5ca5d8706
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/LossConnectionVoteContent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content;
+
+public class LossConnectionVoteContent {
+
+ private final boolean isAlive;
+
+ public LossConnectionVoteContent(boolean isAlive) {
+ this.isAlive = isAlive;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/RestoreConnectionVoteContent.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/RestoreConnectionVoteContent.java
new file mode 100644
index 000000000..637a48d80
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/content/RestoreConnectionVoteContent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content;
+
+public class RestoreConnectionVoteContent {
+
+ private final boolean isAlive;
+
+ public RestoreConnectionVoteContent(boolean isAlive) {
+ this.isAlive = isAlive;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/LossConnectionVoteListener.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/LossConnectionVoteListener.java
new file mode 100644
index 000000000..30c9777de
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/LossConnectionVoteListener.java
@@ -0,0 +1,88 @@
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.listener;
+
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content.LossConnectionProposalContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content.LossConnectionVoteContent;
+import cn.edu.tsinghua.iginx.proposal.SyncVote;
+import cn.edu.tsinghua.iginx.proposal.VoteListener;
+import cn.edu.tsinghua.iginx.protocol.ExecutionException;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
+import cn.edu.tsinghua.iginx.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import cn.edu.tsinghua.iginx.proposal.SyncProposal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LossConnectionVoteListener implements VoteListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(LossConnectionVoteListener.class);
+
+ private final int targetVote;
+
+ private final Map votes = new HashMap<>();
+
+ private final SyncProposal proposal;
+
+ private final SyncProtocol protocol;
+
+ public LossConnectionVoteListener(int targetVote, SyncProposal proposal, SyncProtocol protocol) {
+ this.targetVote = targetVote;
+ this.protocol = protocol;
+ this.proposal = proposal;
+ }
+
+ @Override
+ public synchronized void receive(String key, SyncVote vote) {
+ long voter = vote.getVoter();
+ votes.put(voter, vote);
+ if (votes.size() != targetVote) {
+ return;
+ }
+ logger.info("receive enough vote for " + key);
+ int supportCount = 0;
+ for (SyncVote v: votes.values()) {
+ LossConnectionVoteContent content = JsonUtils.fromJson(v.getContent(), LossConnectionVoteContent.class);
+ if (!content.isAlive()) {
+ supportCount++;
+ }
+ }
+ boolean alive = true;
+ if (supportCount * 2 > targetVote) {
+ alive = false;
+ }
+ LossConnectionProposalContent content = JsonUtils.fromJson(proposal.getContent(), LossConnectionProposalContent.class);
+ content.setAlive(alive);
+ proposal.setContent(JsonUtils.toJson(content));
+ try {
+ protocol.endProposal(key, proposal);
+ } catch (NetworkException | ExecutionException e) {
+ logger.error("end proposal failure: ", e);
+ }
+ logger.info("end proposal success for " + key);
+ }
+
+ @Override
+ public void end(String key) {
+ logger.info("current timestamp: " + System.currentTimeMillis() + ", end loss connection vote for " + key);
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/RestoreConnectionVoteListener.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/RestoreConnectionVoteListener.java
new file mode 100644
index 000000000..7b260ac9d
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/fault_tolerance/vote/listener/RestoreConnectionVoteListener.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.listener;
+
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content.LossConnectionProposalContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.proposal.content.RestoreConnectionProposalContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content.LossConnectionVoteContent;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.vote.content.RestoreConnectionVoteContent;
+import cn.edu.tsinghua.iginx.proposal.SyncProposal;
+import cn.edu.tsinghua.iginx.proposal.SyncVote;
+import cn.edu.tsinghua.iginx.proposal.VoteListener;
+import cn.edu.tsinghua.iginx.protocol.ExecutionException;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
+import cn.edu.tsinghua.iginx.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RestoreConnectionVoteListener implements VoteListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(RestoreConnectionVoteListener.class);
+
+ private final int targetVote;
+
+ private final Map votes = new HashMap<>();
+
+ private final SyncProposal proposal;
+
+ private final SyncProtocol protocol;
+
+ public RestoreConnectionVoteListener(int targetVote, SyncProposal proposal, SyncProtocol protocol) {
+ this.targetVote = targetVote;
+ this.proposal = proposal;
+ this.protocol = protocol;
+ }
+
+ @Override
+ public synchronized void receive(String key, SyncVote vote) {
+ long voter = vote.getVoter();
+ votes.put(voter, vote);
+ if (votes.size() != targetVote) {
+ return;
+ }
+ int supportCount = 0;
+ for (SyncVote v: votes.values()) {
+ RestoreConnectionVoteContent content = JsonUtils.fromJson(v.getContent(), RestoreConnectionVoteContent.class);
+ if (!content.isAlive()) {
+ supportCount++;
+ }
+ }
+ boolean alive = true;
+ if (supportCount * 2 > targetVote) {
+ alive = false;
+ }
+ RestoreConnectionProposalContent content = JsonUtils.fromJson(proposal.getContent(), RestoreConnectionProposalContent.class);
+ content.setAlive(alive);
+ proposal.setContent(JsonUtils.toJson(content));
+ try {
+ protocol.endProposal(key, proposal);
+ } catch (NetworkException | ExecutionException e) {
+ logger.error("end proposal failure: ", e);
+ }
+ }
+
+ @Override
+ public void end(String key) {
+ logger.info("current timestamp: " + System.currentTimeMillis() + ", end restore connection vote for " + key);
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java
index 1dcda294f..5601c82af 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/type/OperatorType.java
@@ -63,7 +63,7 @@ public static boolean isMultipleOperator(OperatorType op) {
}
public static boolean isGlobalOperator(OperatorType op) {
- return op == ShowTimeSeries;
+ return op == ShowTimeSeries || op == Migration;
}
public static boolean isNeedBroadcasting(OperatorType op) {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java
index f73feca3a..0aa57d762 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/DefaultMetaManager.java
@@ -31,6 +31,8 @@
import cn.edu.tsinghua.iginx.metadata.storage.etcd.ETCDMetaStorage;
import cn.edu.tsinghua.iginx.metadata.storage.zk.ZooKeeperMetaStorage;
import cn.edu.tsinghua.iginx.policy.simple.TimeSeriesCalDO;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
import cn.edu.tsinghua.iginx.sql.statement.InsertStatement;
import cn.edu.tsinghua.iginx.thrift.AuthType;
import cn.edu.tsinghua.iginx.thrift.UserType;
@@ -333,6 +335,80 @@ public boolean addStorageEngines(List storageEngineMetas) {
return false;
}
+ @Override
+ public Map startMigrationStorageUnits(Map migrationMap) {
+ try {
+ Map migrationStorageUnitMap = new HashMap<>();
+ storage.lockStorageUnit();
+ for (String storageUnitId: migrationMap.keySet()) {
+ String newStorageUnitId = storage.addStorageUnit();
+ StorageUnitMeta storageUnit = getStorageUnit(storageUnitId);
+ StorageUnitMeta clonedStorageUnit = storageUnit.clone();
+ StorageUnitMeta newStorageUnit = clonedStorageUnit.migrationStorageUnitMeta(newStorageUnitId, id, migrationMap.get(storageUnitId));
+ // 更新新的 storage unit
+ cache.updateStorageUnit(newStorageUnit);
+ for (StorageUnitHook hook : storageUnitHooks) {
+ hook.onChange(null, newStorageUnit);
+ }
+ storage.updateStorageUnit(newStorageUnit);
+ // 更新旧的 storage unit
+ cache.updateStorageUnit(clonedStorageUnit);
+ for (StorageUnitHook hook : storageUnitHooks) {
+ hook.onChange(storageUnit, clonedStorageUnit);
+ }
+ storage.updateStorageUnit(clonedStorageUnit);
+ migrationStorageUnitMap.put(storageUnitId, newStorageUnitId);
+ }
+ return migrationStorageUnitMap;
+ } catch (MetaStorageException e) {
+ logger.error("migration storage unit error: ", e);
+ } finally {
+ try {
+ storage.releaseStorageUnit();
+ } catch (MetaStorageException e) {
+ logger.error("release storage unit lock error: ", e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean finishMigrationStorageUnit(String storageUnitId) {
+ try {
+ storage.lockStorageUnit();
+ StorageUnitMeta sourceStorageUnit = getStorageUnit(storageUnitId);
+ StorageUnitMeta clonedSourceStorageUnit = sourceStorageUnit.clone();
+ clonedSourceStorageUnit.setState(StorageUnitState.DISCARD);
+
+ StorageUnitMeta targetStorageUnit = getStorageUnit(sourceStorageUnit.getMigrationTo());
+ StorageUnitMeta clonedTargetStorageUnit = targetStorageUnit.clone();
+ clonedTargetStorageUnit.setState(StorageUnitState.NORMAL);
+
+ cache.updateStorageUnit(clonedTargetStorageUnit);
+ for (StorageUnitHook hook : storageUnitHooks) {
+ hook.onChange(targetStorageUnit, clonedTargetStorageUnit);
+ }
+ storage.updateStorageUnit(clonedTargetStorageUnit);
+ // 更新旧的 storage unit
+ cache.updateStorageUnit(clonedSourceStorageUnit);
+ for (StorageUnitHook hook : storageUnitHooks) {
+ hook.onChange(sourceStorageUnit, clonedSourceStorageUnit);
+ }
+ storage.updateStorageUnit(clonedSourceStorageUnit);
+
+ return true;
+ } catch (MetaStorageException e) {
+ logger.error("migration storage unit error: ", e);
+ } finally {
+ try {
+ storage.releaseStorageUnit();
+ } catch (MetaStorageException e) {
+ logger.error("release storage unit lock error: ", e);
+ }
+ }
+ return false;
+ }
+
@Override
public List getStorageEngineList() {
return new ArrayList<>(cache.getStorageEngineList());
@@ -373,6 +449,11 @@ public List getIginxList() {
return new ArrayList<>(cache.getIginxList());
}
+ @Override
+ public int getIginxClusterSize() {
+ return cache.getIginxList().size();
+ }
+
@Override
public long getIginxId() {
return id;
@@ -383,6 +464,11 @@ public List getFragments() {
return cache.getFragments();
}
+ @Override
+ public List getFragmentsByStorageUnit(String storageUnitId) {
+ return cache.getFragmentListByStorageUnitId(storageUnitId);
+ }
+
@Override
public Pair getBoundaryOfStorageUnit(String storageUnitId) {
List fragmentMetaList = cache.getFragmentListByStorageUnitId(storageUnitId);
@@ -1255,4 +1341,13 @@ public void submitMaxActiveEndTime() {
logger.error("encounter error when submitting max active time: ", e);
}
}
+
+ public void initProtocol(String category) throws NetworkException {
+ storage.initProtocol(category);
+ }
+
+ @Override
+ public SyncProtocol getProtocol(String category) {
+ return storage.getProtocol(category);
+ }
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java
index 2d0e4543b..a2769b674 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/IMetaManager.java
@@ -25,7 +25,8 @@
import cn.edu.tsinghua.iginx.policy.simple.TimeSeriesCalDO;
import cn.edu.tsinghua.iginx.sql.statement.InsertStatement;
import cn.edu.tsinghua.iginx.thrift.AuthType;
-import cn.edu.tsinghua.iginx.thrift.StorageEngine;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
@@ -39,6 +40,10 @@ public interface IMetaManager {
*/
boolean addStorageEngines(List storageEngineMetas);
+ Map startMigrationStorageUnits(Map migrationMap);
+
+ boolean finishMigrationStorageUnit(String storageUnitId);
+
/**
* 获取所有的存储引擎实例的原信息(包括每个存储引擎的存储单元列表)
*/
@@ -67,6 +72,8 @@ public interface IMetaManager {
*/
List getIginxList();
+ int getIginxClusterSize();
+
/**
* 获取当前 iginx 节点的 ID
*/
@@ -77,6 +84,8 @@ public interface IMetaManager {
*/
List getFragments();
+ List getFragmentsByStorageUnit(String storageUnitId);
+
/**
* 获取某个du的时空范围
* */
@@ -252,4 +261,9 @@ StorageUnitMeta generateNewStorageUnitMetaByFragment(FragmentMeta fragmentMeta,
long getMaxActiveEndTime();
void submitMaxActiveEndTime();
+
+ void initProtocol(String category) throws NetworkException;
+
+ SyncProtocol getProtocol(String category);
+
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java
index 3ca54cc1b..73e424a2f 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitMeta.java
@@ -23,7 +23,7 @@
import java.util.List;
import java.util.Objects;
-public final class StorageUnitMeta {
+public final class StorageUnitMeta implements Cloneable {
private String id;
@@ -39,6 +39,10 @@ public final class StorageUnitMeta {
private boolean dummy = false;
+ private StorageUnitState state = StorageUnitState.NORMAL;
+
+ private String migrationTo = null;
+
private transient List replicas = new ArrayList<>();
public StorageUnitMeta(String id, long storageEngineId, String masterId, boolean isMaster) {
@@ -76,6 +80,19 @@ public StorageUnitMeta(String id, long storageEngineId, String masterId, boolean
this.replicas = replicas;
}
+ public StorageUnitMeta(String id, long storageEngineId, String masterId, boolean isMaster, long createdBy, boolean initialStorageUnit, boolean dummy, StorageUnitState state, String migrationTo, List replicas) {
+ this.id = id;
+ this.storageEngineId = storageEngineId;
+ this.masterId = masterId;
+ this.isMaster = isMaster;
+ this.createdBy = createdBy;
+ this.initialStorageUnit = initialStorageUnit;
+ this.dummy = dummy;
+ this.state = state;
+ this.migrationTo = migrationTo;
+ this.replicas = replicas;
+ }
+
public void addReplica(StorageUnitMeta storageUnit) {
if (replicas == null)
replicas = new ArrayList<>();
@@ -129,6 +146,22 @@ public StorageUnitMeta renameStorageUnitMeta(String id, String masterId) {
return storageUnitMeta;
}
+ public StorageUnitMeta migrationStorageUnitMeta(String id, long migrationBy, long storageEngineId) {
+ String masterId = getMasterId();
+ if (isMaster) {
+ masterId = id;
+ }
+ StorageUnitMeta storageUnitMeta = new StorageUnitMeta(id, storageEngineId, masterId, isMaster);
+ storageUnitMeta.setCreatedBy(migrationBy);
+ storageUnitMeta.setInitialStorageUnit(initialStorageUnit);
+ storageUnitMeta.setState(StorageUnitState.CREATING);
+ storageUnitMeta.setReplicas(replicas);
+
+ this.setMigrationTo(id);
+ this.setState(StorageUnitState.MIGRATION);
+ return storageUnitMeta;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -189,4 +222,30 @@ public void setStorageEngineId(long storageEngineId) {
public boolean isDummy() {
return dummy;
}
+
+ public StorageUnitState getState() {
+ return state;
+ }
+
+ public void setState(StorageUnitState state) {
+ this.state = state;
+ }
+
+ public String getMigrationTo() {
+ return migrationTo;
+ }
+
+ public void setMigrationTo(String migrationTo) {
+ this.migrationTo = migrationTo;
+ }
+
+ @Override
+ public StorageUnitMeta clone() {
+ try {
+ return (StorageUnitMeta) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new AssertionError();
+ }
+ }
+
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitState.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitState.java
new file mode 100644
index 000000000..b145b9157
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageUnitState.java
@@ -0,0 +1,10 @@
+package cn.edu.tsinghua.iginx.metadata.entity;
+
+public enum StorageUnitState {
+
+ NORMAL,
+ MIGRATION,
+ CREATING,
+ DISCARD
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java
index e5ba2e330..dd11d2fd3 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/IMetaStorage.java
@@ -21,6 +21,8 @@
import cn.edu.tsinghua.iginx.exceptions.MetaStorageException;
import cn.edu.tsinghua.iginx.metadata.entity.*;
import cn.edu.tsinghua.iginx.metadata.hook.*;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
import java.util.List;
import java.util.Map;
@@ -120,4 +122,9 @@ public interface IMetaStorage {
void releaseMaxActiveEndTimeStatistics() throws MetaStorageException;
void registerMaxActiveEndTimeStatisticsChangeHook(MaxActiveEndTimeStatisticsChangeHook hook) throws MetaStorageException;
+
+ void initProtocol(String category) throws NetworkException;
+
+ SyncProtocol getProtocol(String category);
+
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java
index f0114c48b..6de083e7d 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/etcd/ETCDMetaStorage.java
@@ -23,6 +23,7 @@
import cn.edu.tsinghua.iginx.metadata.entity.*;
import cn.edu.tsinghua.iginx.metadata.hook.*;
import cn.edu.tsinghua.iginx.metadata.storage.IMetaStorage;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
import cn.edu.tsinghua.iginx.utils.JsonUtils;
import io.etcd.jetcd.*;
import io.etcd.jetcd.kv.GetResponse;
@@ -1072,4 +1073,14 @@ public void close() throws MetaStorageException {
this.client = null;
}
+ @Override
+ public void initProtocol(String category) {
+
+ }
+
+ @Override
+ public SyncProtocol getProtocol(String category) {
+ return null;
+ }
+
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java
index 2431b44d4..a8eb44580 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/storage/zk/ZooKeeperMetaStorage.java
@@ -23,6 +23,9 @@
import cn.edu.tsinghua.iginx.metadata.entity.*;
import cn.edu.tsinghua.iginx.metadata.hook.*;
import cn.edu.tsinghua.iginx.metadata.storage.IMetaStorage;
+import cn.edu.tsinghua.iginx.protocol.NetworkException;
+import cn.edu.tsinghua.iginx.protocol.SyncProtocol;
+import cn.edu.tsinghua.iginx.protocol.zk.ZooKeeperSyncProtocolImpl;
import cn.edu.tsinghua.iginx.utils.JsonUtils;
import java.util.Map.Entry;
import org.apache.curator.framework.CuratorFramework;
@@ -40,7 +43,9 @@
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class ZooKeeperMetaStorage implements IMetaStorage {
@@ -155,6 +160,10 @@ public class ZooKeeperMetaStorage implements IMetaStorage {
private TreeCache transformCache;
+ private Map protocols = new HashMap<>();
+
+ private ReadWriteLock protocolLock = new ReentrantReadWriteLock();
+
public ZooKeeperMetaStorage() {
client = CuratorFrameworkFactory.builder()
.connectString(ConfigDescriptor.getInstance().getConfig().getZookeeperConnectionString())
@@ -1417,11 +1426,32 @@ public void releaseMaxActiveEndTimeStatistics() throws MetaStorageException {
}
@Override
- public void registerMaxActiveEndTimeStatisticsChangeHook(
- MaxActiveEndTimeStatisticsChangeHook hook) throws MetaStorageException {
+ public void initProtocol(String category) throws NetworkException {
+ protocolLock.writeLock().lock();
+ try {
+ if (protocols.containsKey(category)) {
+ return;
+ }
+ SyncProtocol protocol = new ZooKeeperSyncProtocolImpl(category, client, null);
+ protocols.put(category, protocol);
+ } finally {
+ protocolLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void registerMaxActiveEndTimeStatisticsChangeHook(MaxActiveEndTimeStatisticsChangeHook hook) throws MetaStorageException {
this.maxActiveEndTimeStatisticsChangeHook = hook;
}
+ public SyncProtocol getProtocol(String category) {
+ SyncProtocol protocol;
+ protocolLock.readLock().lock();
+ protocol = protocols.get(category);
+ protocolLock.readLock().unlock();
+ return protocol;
+ }
+
public static boolean isNumeric(String str) {
String bigStr;
try {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationManager.java
index 9247f690f..d63bef69f 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationManager.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationManager.java
@@ -3,6 +3,10 @@
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import java.util.HashMap;
import java.util.Map;
+
+import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
+import cn.edu.tsinghua.iginx.migration.storage.GreedyStorageMigrationPolicy;
+import cn.edu.tsinghua.iginx.migration.storage.StorageMigrationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,4 +45,8 @@ public MigrationPolicy getMigration() {
}
return policy;
}
+
+ public StorageMigrationPolicy getStorageMigration() {
+ return new GreedyStorageMigrationPolicy(DefaultMetaManager.getInstance());
+ }
}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationPolicy.java b/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationPolicy.java
index fe156e04f..8a44ee47f 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationPolicy.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/migration/MigrationPolicy.java
@@ -450,6 +450,45 @@ private void migrateData(long sourceStorageId, long targetStorageId,
}
}
+ public boolean migrationData(String sourceStorageUnitId, String targetStorageUnitId) {
+ try {
+ List fragmentMetas = DefaultMetaManager.getInstance().getFragmentsByStorageUnit(sourceStorageUnitId);
+
+ Set pathRegexSet = new HashSet<>();
+ ShowTimeSeries showTimeSeries = new ShowTimeSeries(new GlobalSource(),
+ pathRegexSet, null, Integer.MAX_VALUE, 0);
+ RowStream rowStream = physicalEngine.execute(showTimeSeries);
+ SortedSet pathSet = new TreeSet<>();
+ while (rowStream.hasNext()) {
+ Row row = rowStream.next();
+ String timeSeries = new String((byte[]) row.getValue(0));
+ if (timeSeries.contains("{") && timeSeries.contains("}")) {
+ timeSeries = timeSeries.split("\\{")[0];
+ }
+ logger.info("[migrationData] need migration path: {}", timeSeries);
+ for (FragmentMeta fragmentMeta: fragmentMetas) {
+ if (fragmentMeta.getTsInterval().isContain(timeSeries)) {
+ pathSet.add(timeSeries);
+ logger.info("[migrationData] path {} belong to {}", timeSeries, fragmentMeta);
+ }
+ }
+ }
+ StorageUnitMeta sourceStorageUnit = DefaultMetaManager.getInstance().getStorageUnit(sourceStorageUnitId);
+ StorageUnitMeta targetStorageUnit = DefaultMetaManager.getInstance().getStorageUnit(targetStorageUnitId);
+ // 开始迁移数据
+ for (FragmentMeta fragmentMeta: fragmentMetas) {
+ Migration migration = new Migration(new GlobalSource(), sourceStorageUnit.getStorageEngineId(), targetStorageUnit.getStorageEngineId(),
+ fragmentMeta, new ArrayList<>(pathSet), targetStorageUnit);
+ physicalEngine.execute(migration);
+ }
+ return true;
+ } catch (Exception e) {
+ logger.error("encounter error when migrate data from {} to {} ", sourceStorageUnitId,
+ targetStorageUnitId, e);
+ }
+ return false;
+ }
+
private FragmentMeta reshardFragment(long sourceStorageId, long targetStorageId,
FragmentMeta fragmentMeta) {
try {
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/GreedyStorageMigrationPolicy.java b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/GreedyStorageMigrationPolicy.java
new file mode 100644
index 000000000..6a16c52c7
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/GreedyStorageMigrationPolicy.java
@@ -0,0 +1,62 @@
+package cn.edu.tsinghua.iginx.migration.storage;
+
+import cn.edu.tsinghua.iginx.metadata.IMetaManager;
+import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+public class GreedyStorageMigrationPolicy extends StorageMigrationPolicy {
+
+ private static final Logger logger = LoggerFactory.getLogger(GreedyStorageMigrationPolicy.class);
+
+ public GreedyStorageMigrationPolicy(IMetaManager metaManager) {
+ super(metaManager);
+ }
+
+ static class StoragePriority implements Comparable {
+
+ long storageId;
+
+ int weight;
+
+ StoragePriority(long storageId, int weight) {
+ this.storageId = storageId;
+ this.weight = weight;
+ }
+
+ @Override
+ public int compareTo(StoragePriority o) {
+ return weight - o.weight;
+ }
+ }
+
+ @Override
+ public Map generateMigrationPlans(long sourceStorageId) {
+ logger.info("[storage migration] decide storage migration for " + sourceStorageId);
+ Map> storageUnitsMap = metaManager.getStorageUnits().stream().filter(e -> !e.isDummy()).collect(Collectors.groupingBy(StorageUnitMeta::getStorageEngineId));
+ List storageUnits = storageUnitsMap.get(sourceStorageId);
+
+ PriorityQueue storagePriorities = new PriorityQueue<>();
+ for (long storageId: storageUnitsMap.keySet()) {
+ if (storageId == sourceStorageId) {
+ continue;
+ }
+ storagePriorities.add(new StoragePriority(storageId, storageUnitsMap.get(storageId).size()));
+ }
+
+ Map migrationMap = new HashMap<>();
+ for (StorageUnitMeta storageUnit: storageUnits) {
+ StoragePriority priority = storagePriorities.remove();
+ migrationMap.put(storageUnit.getId(), priority.storageId);
+ priority.weight++;
+ storagePriorities.add(priority);
+ }
+ return migrationMap;
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationExecutor.java
new file mode 100644
index 000000000..03b1128a3
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationExecutor.java
@@ -0,0 +1,100 @@
+package cn.edu.tsinghua.iginx.migration.storage;
+
+import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
+import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
+import cn.edu.tsinghua.iginx.metadata.IMetaManager;
+import cn.edu.tsinghua.iginx.migration.MigrationManager;
+import cn.edu.tsinghua.iginx.migration.MigrationPolicy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+public class StorageMigrationExecutor {
+
+ private final IMetaManager metaManager;
+
+ private final ThreadPoolExecutor executor;
+
+ private static class MigrationTask implements Callable, Runnable {
+
+ private final ThreadPoolExecutor executor;
+
+ private final IMetaManager metaManager;
+
+ private final long storageId;
+
+ public MigrationTask(ThreadPoolExecutor executor, IMetaManager metaManager, long storageId) {
+ this.executor = executor;
+ this.metaManager = metaManager;
+ this.storageId = storageId;
+ }
+
+ @Override
+ public Boolean call() {
+ Map migrationMap = MigrationManager.getInstance().getStorageMigration().generateMigrationPlans(storageId);
+ Map storageUnitMigrationMap = metaManager.startMigrationStorageUnits(migrationMap);
+
+ List> tasks = new ArrayList<>();
+
+ for (String sourceStorageUnit: storageUnitMigrationMap.keySet()) {
+ String targetStorageUnit = storageUnitMigrationMap.get(sourceStorageUnit);
+ MigrationPolicy migrationPolicy = MigrationManager.getInstance().getMigration();
+ tasks.add(() -> migrationPolicy.migrationData(sourceStorageUnit, targetStorageUnit));
+ }
+
+ try {
+ List> futures = executor.invokeAll(tasks);
+ for(Future future: futures) {
+ Boolean ret = future.get();
+ if (ret == null || !ret) {
+ return false;
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ return false;
+ }
+ for (String storageUnitId: migrationMap.keySet()) {
+ metaManager.finishMigrationStorageUnit(storageUnitId);
+ }
+ return true;
+ }
+
+ @Override
+ public void run() {
+ call();
+ }
+ }
+
+ private StorageMigrationExecutor() {
+ metaManager = DefaultMetaManager.getInstance();
+ executor = new ThreadPoolExecutor(ConfigDescriptor.getInstance().getConfig().getMigrationThreadPoolSize(),
+ Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+ }
+
+ public static StorageMigrationExecutor getInstance() {
+ return StorageMigrationExecutorHolder.INSTANCE;
+ }
+
+ public boolean migration(long storageId, boolean sync) {
+ MigrationTask task = new MigrationTask(executor, metaManager, storageId);
+ if (sync) {
+ return Boolean.TRUE.equals(task.call());
+ }
+ executor.execute(task);
+ return true;
+ }
+
+ private static class StorageMigrationExecutorHolder {
+
+ private static final StorageMigrationExecutor INSTANCE = new StorageMigrationExecutor();
+
+ private StorageMigrationExecutorHolder() {
+ }
+
+ }
+
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationPolicy.java b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationPolicy.java
new file mode 100644
index 000000000..1573728f1
--- /dev/null
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/migration/storage/StorageMigrationPolicy.java
@@ -0,0 +1,16 @@
+package cn.edu.tsinghua.iginx.migration.storage;
+
+import cn.edu.tsinghua.iginx.metadata.IMetaManager;
+
+import java.util.Map;
+
+public abstract class StorageMigrationPolicy {
+
+ protected final IMetaManager metaManager;
+
+ public StorageMigrationPolicy(IMetaManager metaManager) {
+ this.metaManager = metaManager;
+ }
+
+ public abstract Map generateMigrationPlans(long storageId);
+}
diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java
index 42c7df27f..b97f455c4 100644
--- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java
+++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java
@@ -24,6 +24,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
@@ -144,6 +145,10 @@ private void reloadHistoryData() {
}
}
+ public Connector getConnector() {
+ return null;
+ }
+
@Override
public Pair getBoundaryOfStorage(String dataPrefix) throws PhysicalException {
List bucketNames = new ArrayList<>(historyBucketMap.keySet());
diff --git a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBConnector.java b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBConnector.java
new file mode 100644
index 000000000..e22491bb1
--- /dev/null
+++ b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBConnector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.iotdb;
+
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class IoTDBConnector implements Connector {
+
+ private static final Logger logger = LoggerFactory.getLogger(IoTDBConnector.class);
+
+ private static final ExecutorService service = Executors.newSingleThreadExecutor();
+
+ private Session session;
+
+ public IoTDBConnector(String ip, int port, String username, String password) {
+ this.session = new Session(ip, port, username, password);
+ }
+
+ @Override
+ public boolean echo(long timeout, TimeUnit unit) {
+ Future future = service.submit(() -> {
+ try {
+ session.open();
+ SessionDataSet dataSet = session.executeQueryStatement("show version");
+ dataSet.closeOperationHandle();
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ logger.error("connect to iotdb error: " + e.getMessage());
+ return false;
+ } catch (StatementExecutionException e) {
+ logger.error("execute statement error: " + e.getMessage());
+ }
+ return true;
+ });
+ try {
+ return future.get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ future.cancel(true);
+ logger.error("connection timeout: ", e);
+ }
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ this.session = null;
+ }
+
+}
diff --git a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
index 2a8692358..0df43cdc3 100644
--- a/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
+++ b/dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
@@ -24,6 +24,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
@@ -159,6 +160,13 @@ private SessionPool createSessionPool() {
return new SessionPool(meta.getIp(), meta.getPort(), username, password, sessionPoolSize);
}
+ @Override
+ public Connector getConnector() {
+ return new IoTDBConnector(this.meta.getIp(), this.meta.getPort(),
+ this.meta.getExtraParams().getOrDefault(USERNAME, DEFAULT_USERNAME),
+ this.meta.getExtraParams().getOrDefault(PASSWORD, DEFAULT_PASSWORD));
+ }
+
@Override
public TaskExecuteResult execute(StoragePhysicalTask task) {
List operators = task.getOperators();
diff --git a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
index 8e7b1b70a..dde7a9022 100644
--- a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
+++ b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java
@@ -24,6 +24,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
@@ -151,6 +152,11 @@ private SessionPool createSessionPool() {
return new SessionPool(meta.getIp(), meta.getPort(), username, password, sessionPoolSize);
}
+ @Override
+ public Connector getConnector() {
+ return null;
+ }
+
@Override
public TaskExecuteResult execute(StoragePhysicalTask task) {
List operators = task.getOperators();
diff --git a/dataSources/opentsdb/src/main/java/cn/edu/tsinghua/iginx/opentsdb/OpenTSDBStorage.java b/dataSources/opentsdb/src/main/java/cn/edu/tsinghua/iginx/opentsdb/OpenTSDBStorage.java
index c9026d272..6bf9d3105 100644
--- a/dataSources/opentsdb/src/main/java/cn/edu/tsinghua/iginx/opentsdb/OpenTSDBStorage.java
+++ b/dataSources/opentsdb/src/main/java/cn/edu/tsinghua/iginx/opentsdb/OpenTSDBStorage.java
@@ -6,6 +6,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
@@ -101,6 +102,11 @@ private boolean testConnection() {
return true;
}
+ @Override
+ public Connector getConnector() {
+ return null;
+ }
+
@Override
public TaskExecuteResult execute(StoragePhysicalTask task) {
List operators = task.getOperators();
diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java
index c2e2f721c..50241db54 100644
--- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java
+++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java
@@ -7,6 +7,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
@@ -169,4 +170,10 @@ public Pair getBoundaryOfStorage(String prefix) t
public void release() throws PhysicalException {
executor.close();
}
+
+ @Override
+ public Connector getConnector() {
+ return null;
+ }
+
}
diff --git a/dataSources/postgresql/src/main/java/cn/edu/tsinghua/iginx/postgresql/PostgreSQLStorage.java b/dataSources/postgresql/src/main/java/cn/edu/tsinghua/iginx/postgresql/PostgreSQLStorage.java
index d4fc420ab..df6c756c9 100644
--- a/dataSources/postgresql/src/main/java/cn/edu/tsinghua/iginx/postgresql/PostgreSQLStorage.java
+++ b/dataSources/postgresql/src/main/java/cn/edu/tsinghua/iginx/postgresql/PostgreSQLStorage.java
@@ -24,6 +24,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
+import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
@@ -141,6 +142,11 @@ private boolean testConnection() {
}
}
+ @Override
+ public Connector getConnector() {
+ return null;
+ }
+
@Override
public TaskExecuteResult execute(StoragePhysicalTask task) {
List operators = task.getOperators();
diff --git a/example/src/main/java/cn/edu/tsinghua/iginx/session/RemoveExample.java b/example/src/main/java/cn/edu/tsinghua/iginx/session/RemoveExample.java
new file mode 100644
index 000000000..30395ffc7
--- /dev/null
+++ b/example/src/main/java/cn/edu/tsinghua/iginx/session/RemoveExample.java
@@ -0,0 +1,2 @@
+package cn.edu.tsinghua.iginx.session;public class RemoveExample {
+}
diff --git a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java
index 9d401a877..d3aa56e09 100644
--- a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java
+++ b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java
@@ -1259,4 +1259,22 @@ public CurveMatchResult curveMatch(List paths, long startTime, long endT
}
return new CurveMatchResult(resp.getMatchedTimestamp(), resp.getMatchedPath());
}
+
+ public void removeStorage(long id) throws SessionException, ExecutionException {
+ RemoveStorageEngineReq req = new RemoveStorageEngineReq(sessionId, id, true);
+ try {
+ Status status;
+ do {
+ lock.readLock().lock();
+ try {
+ status = client.removeStorageEngine(req);
+ } finally {
+ lock.readLock().unlock();
+ }
+ } while(checkRedirect(status));
+ RpcUtils.verifySuccess(status);
+ } catch (TException e) {
+ throw new SessionException(e);
+ }
+ }
}
diff --git a/sync/src/main/java/cn/edu/tsinghua/iginx/proposal/ProposalListener.java b/sync/src/main/java/cn/edu/tsinghua/iginx/proposal/ProposalListener.java
index dd430d70a..37999187e 100644
--- a/sync/src/main/java/cn/edu/tsinghua/iginx/proposal/ProposalListener.java
+++ b/sync/src/main/java/cn/edu/tsinghua/iginx/proposal/ProposalListener.java
@@ -17,11 +17,4 @@ public interface ProposalListener {
*/
void onUpdate(String key, SyncProposal beforeSyncProposal, SyncProposal afterSyncProposal);
- /**
- * when proposal deleted, this method will be called.
- * @param key proposal key
- * @param syncProposal proposal content
- */
- void onDelete(String key, SyncProposal syncProposal);
-
}
diff --git a/thrift/src/main/proto/rpc.thrift b/thrift/src/main/proto/rpc.thrift
index 5c535cef1..a2a812700 100644
--- a/thrift/src/main/proto/rpc.thrift
+++ b/thrift/src/main/proto/rpc.thrift
@@ -581,6 +581,12 @@ struct DebugInfoResp {
2: optional binary payload
}
+struct RemoveStorageEngineReq {
+ 1: required i64 sessionId
+ 2: required i64 storageId
+ 3: required bool sync
+}
+
service IService {
OpenSessionResp openSession(1: OpenSessionReq req);
@@ -603,6 +609,8 @@ service IService {
Status addStorageEngines(1: AddStorageEnginesReq req);
+ Status removeStorageEngine(1: RemoveStorageEngineReq req);
+
AggregateQueryResp aggregateQuery(1: AggregateQueryReq req);
LastQueryResp lastQuery(1: LastQueryReq req);