diff --git a/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 b/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 index 5de41ee2f..56779f276 100644 --- a/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 +++ b/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 @@ -22,6 +22,7 @@ statement | SHOW TRANSFORM JOB STATUS jobId=INT #showJobStatusStatement | CANCEL TRANSFORM JOB jobId=INT #cancelJobStatement | SHOW jobStatus TRANSFORM JOB #showEligibleJobStatement + | REMOVE HISTORYDATARESOURCE storageEngineID (COMMA storageEngineID)* #removeHistoryDataResourceStatement ; queryClause @@ -339,6 +340,7 @@ keyWords | JOIN | ON | USING + | REMOVE ; dateFormat @@ -372,6 +374,10 @@ realLiteral | EXPONENT ; +storageEngineID + : (INT) + ; + //============================ // Start of the keywords list //============================ @@ -702,6 +708,14 @@ ON USING : U S I N G ; + +REMOVE + : R E M O V E + ; + +HISTORYDATARESOURCE + : H I S T O R Y D A T A R E S O U R C E + ; //============================ // End of the keywords list //============================ diff --git a/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java b/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java index 789daf97e..78dd233b9 100644 --- a/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java +++ b/client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java @@ -402,7 +402,8 @@ private static Completer buildIginxCompleter() { Arrays.asList("clear", "data"), Arrays.asList("show", "time", "series"), Arrays.asList("show", "cluster", "info"), - Arrays.asList("show", "register", "python", "task") + Arrays.asList("show", "register", "python", "task"), + Arrays.asList("remove", "historyDataResource") ); addArgumentCompleters(iginxCompleters, withoutNullCompleters, false); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java index d867a6c43..272283df7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java @@ -190,49 +190,52 @@ public Status removeHistoryDataSource(RemoveHistoryDataSourceReq req) { return RpcUtils.ACCESS_DENY; } Status status = RpcUtils.SUCCESS; - long dummyStorageId = req.getDummyStorageId(); - StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId); - if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) { - status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode()); - status.setMessage("storage engine is not exists."); - return status; - } - try { - // 设置对应的 dummyFragament 为 invalid 状态 - meta.getDummyFragment().setIfValid(false); - meta.getDummyStorageUnit().setIfValid(false); - - // 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改 - StorageEngineMeta newMeta = new StorageEngineMeta( - meta.getId(), - meta.getIp(), - meta.getPort(), - false, - null, - null, - meta.isReadOnly(), - null, - null, - meta.getExtraParams(), - meta.getStorageEngine(), - meta.getStorageUnitList(), - meta.getCreatedBy(), - meta.isNeedReAllocate() - ); - - // 更新 zk 上元数据信息,以及 iginx 上元数据信息 - if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) { + List dummyStorageIdList = req.getDummyStorageId(); + for (long dummyStorageId : dummyStorageIdList) { + StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId); + if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) { status = RpcUtils.FAILURE; - status.setMessage("unexpected error during storage update"); + status.setMessage("dummy storage engine is not exists."); + return status; } + try { + // 设置对应的 dummyFragament 为 invalid 状态 + meta.getDummyFragment().setIfValid(false); + meta.getDummyStorageUnit().setIfValid(false); + + // 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改 + StorageEngineMeta newMeta = new StorageEngineMeta( + meta.getId(), + meta.getIp(), + meta.getPort(), + false, + null, + null, + meta.isReadOnly(), + null, + null, + meta.getExtraParams(), + meta.getStorageEngine(), + meta.getStorageUnitList(), + meta.getCreatedBy(), + meta.isNeedReAllocate() + ); + + // 更新 zk 上元数据信息,以及 iginx 上元数据信息 + if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) { + status = RpcUtils.FAILURE; + status.setMessage("unexpected error during storage update"); + return status; + } - return status; - } catch (Exception e) { - logger.error("unexpected error during storage migration: ", e); - status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode()); - status.setMessage("unexpected error during removing history data source: " + e.getMessage()); - return status; + } catch (Exception e) { + logger.error("unexpected error during storage migration: ", e); + status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode()); + status.setMessage("unexpected error during removing history data source: " + e.getMessage()); + return status; + } } + return status; } @Override @@ -465,8 +468,11 @@ public GetClusterInfoResp getClusterInfo(GetClusterInfoReq req) { // 数据库信息 List storageEngineInfos = new ArrayList<>(); for (StorageEngineMeta storageEngineMeta : metaManager.getStorageEngineList()) { - storageEngineInfos.add(new StorageEngineInfo(storageEngineMeta.getId(), storageEngineMeta.getIp(), - storageEngineMeta.getPort(), storageEngineMeta.getStorageEngine())); + StorageEngineInfo info = new StorageEngineInfo(storageEngineMeta.getId(), storageEngineMeta.getIp(), + storageEngineMeta.getPort(), storageEngineMeta.getStorageEngine()); + info.setSchemaPrefix(storageEngineMeta.getSchemaPrefix() == null ? "null" : storageEngineMeta.getSchemaPrefix()); + info.setDataPrefix(storageEngineMeta.getDataPrefix() == null ? "null" : storageEngineMeta.getDataPrefix()); + storageEngineInfos.add(info); } storageEngineInfos.sort(Comparator.comparingLong(StorageEngineInfo::getId)); resp.setStorageEngineInfos(storageEngineInfos); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/StatementBuilder.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/StatementBuilder.java index 5bcd239b0..0ca563ea2 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/StatementBuilder.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/StatementBuilder.java @@ -25,6 +25,7 @@ public class StatementBuilder { typeMap.put(StatementType.SELECT, SqlType.Query); typeMap.put(StatementType.INSERT_FROM_SELECT, SqlType.Insert); typeMap.put(StatementType.ADD_STORAGE_ENGINE, SqlType.AddStorageEngines); + typeMap.put(StatementType.REMOVE_HISTORY_DATA_RESOURCE, SqlType.RemoveHistoryDataResource); typeMap.put(StatementType.SHOW_REPLICATION, SqlType.GetReplicaNum); typeMap.put(StatementType.COUNT_POINTS, SqlType.CountPoints); typeMap.put(StatementType.CLEAR_DATA, SqlType.ClearData); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java index 6368c6221..7e7289168 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/metadata/entity/StorageEngineMeta.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; -public final class StorageEngineMeta implements Cloneable { +public final class StorageEngineMeta { /** * 数据库的 id @@ -213,11 +213,6 @@ public void setNeedReAllocate(boolean needReAllocate) { this.needReAllocate = needReAllocate; } - @Override - public StorageEngineMeta clone() throws CloneNotSupportedException { - return (StorageEngineMeta)super.clone(); - } - @Override public String toString() { return "StorageEngineMeta {" + diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java b/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java index 790f434e4..612b11d70 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java @@ -229,6 +229,13 @@ public Statement visitShowClusterInfoStatement(ShowClusterInfoStatementContext c return new ShowClusterInfoStatement(); } + @Override + public Statement visitRemoveHistoryDataResourceStatement(RemoveHistoryDataResourceStatementContext ctx) { + RemoveHsitoryDataSourceStatement statement = new RemoveHsitoryDataSourceStatement(); + ctx.storageEngineID().forEach(id -> statement.addStorageID(Long.parseLong(id.getText()))); + return statement; + } + private void parseFromPaths(FromClauseContext ctx, SelectStatement selectStatement) { if (ctx.queryClause() != null) { // parse sub query diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/RemoveHsitoryDataSourceStatement.java b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/RemoveHsitoryDataSourceStatement.java new file mode 100644 index 000000000..6ba392891 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/RemoveHsitoryDataSourceStatement.java @@ -0,0 +1,40 @@ +package cn.edu.tsinghua.iginx.sql.statement; + +import cn.edu.tsinghua.iginx.IginxWorker; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.Result; +import cn.edu.tsinghua.iginx.exceptions.ExecutionException; +import cn.edu.tsinghua.iginx.thrift.AddStorageEnginesReq; +import cn.edu.tsinghua.iginx.thrift.RemoveHistoryDataSourceReq; + +import java.util.ArrayList; +import java.util.List; + +public class RemoveHsitoryDataSourceStatement extends SystemStatement { + + private List storageIDList; + + public List getStorageID() { + return storageIDList; + } + + public void setStorageID(List storageID) { + this.storageIDList = storageID; + } + + public void addStorageID(Long storageID) { + this.storageIDList.add(storageID); + } + + public RemoveHsitoryDataSourceStatement() { + storageIDList = new ArrayList<>(); + this.statementType = StatementType.REMOVE_HISTORY_DATA_RESOURCE; + } + + @Override + public void execute(RequestContext ctx) throws ExecutionException { + IginxWorker worker = IginxWorker.getInstance(); + RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(ctx.getSessionId(), storageIDList); + ctx.setResult(new Result(worker.removeHistoryDataSource(req))); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/StatementType.java b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/StatementType.java index a95639c2d..3d01faed1 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/StatementType.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/StatementType.java @@ -19,5 +19,6 @@ public enum StatementType { COMMIT_TRANSFORM_JOB, SHOW_JOB_STATUS, CANCEL_JOB, - SHOW_ELIGIBLE_JOB + SHOW_ELIGIBLE_JOB, + REMOVE_HISTORY_DATA_RESOURCE } diff --git a/session/src/main/java/cn/edu/tsinghua/iginx/pool/SessionPool.java b/session/src/main/java/cn/edu/tsinghua/iginx/pool/SessionPool.java index 61b098fc5..22f2b0dee 100644 --- a/session/src/main/java/cn/edu/tsinghua/iginx/pool/SessionPool.java +++ b/session/src/main/java/cn/edu/tsinghua/iginx/pool/SessionPool.java @@ -319,6 +319,24 @@ public void addStorageEngines(List storageEngines) throws Session } } + public void removeHistoryDataSource(List idList) throws SessionException, ExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.removeHistoryDataSource(idList); + putBack(session); + return; + } catch (SessionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("remove history data source failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (ExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + private void cleanSessionAndMayThrowConnectionException( Session session, int times, SessionException e) throws SessionException { closeSession(session); diff --git a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java index 7c1c44f73..54d288847 100644 --- a/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java +++ b/session/src/main/java/cn/edu/tsinghua/iginx/session/Session.java @@ -1260,8 +1260,8 @@ public CurveMatchResult curveMatch(List paths, long startTime, long endT return new CurveMatchResult(resp.getMatchedTimestamp(), resp.getMatchedPath()); } - public void removeHistoryDataSource(long id) throws SessionException, ExecutionException { - RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(sessionId, id); + public void removeHistoryDataSource(List idList) throws SessionException, ExecutionException { + RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(sessionId, idList); try { Status status; do { diff --git a/session/src/main/java/cn/edu/tsinghua/iginx/session/SessionExecuteSqlResult.java b/session/src/main/java/cn/edu/tsinghua/iginx/session/SessionExecuteSqlResult.java index 7f432bb17..00d70ae8c 100644 --- a/session/src/main/java/cn/edu/tsinghua/iginx/session/SessionExecuteSqlResult.java +++ b/session/src/main/java/cn/edu/tsinghua/iginx/session/SessionExecuteSqlResult.java @@ -257,13 +257,15 @@ private String buildShowClusterInfoResult() { if (storageEngineInfos != null && !storageEngineInfos.isEmpty()) { builder.append("Storage engine infos:").append("\n"); List> cache = new ArrayList<>(); - cache.add(new ArrayList<>(Arrays.asList("ID", "IP", "PORT", "TYPE"))); + cache.add(new ArrayList<>(Arrays.asList("ID", "IP", "PORT", "TYPE", "SCHEMA_PREFIX", "DATAPREFIX"))); for (StorageEngineInfo info : storageEngineInfos) { cache.add(new ArrayList<>(Arrays.asList( String.valueOf(info.getId()), info.getIp(), String.valueOf(info.getPort()), - info.getType() + info.getType(), + info.getSchemaPrefix(), + info.getDataPrefix() ))); } builder.append(FormatUtils.formatResult(cache)); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java index d379eefa9..f4beb1120 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/iotdb/IoTDBHistoryDataCapacityExpansionIT.java @@ -4,6 +4,7 @@ import cn.edu.tsinghua.iginx.integration.SQLSessionIT; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.unit.SQLTestTools; +import cn.edu.tsinghua.iginx.pool.SessionPool; import cn.edu.tsinghua.iginx.session.Session; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -11,12 +12,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + public class IoTDBHistoryDataCapacityExpansionIT implements BaseCapacityExpansionIT { private static final Logger logger = LoggerFactory.getLogger(SQLSessionIT.class); private static Session session; + private static SessionPool sessionPool; + private String ENGINE_TYPE; public IoTDBHistoryDataCapacityExpansionIT(String engineType) { @@ -26,6 +32,14 @@ public IoTDBHistoryDataCapacityExpansionIT(String engineType) { @BeforeClass public static void setUp() { session = new Session("127.0.0.1", 6888, "root", "root"); + sessionPool = + new SessionPool.Builder() + .host("127.0.0.1") + .port(6888) + .user("root") + .password("root") + .maxSize(3) + .build(); try { session.openSession(); } catch (SessionException e) { @@ -37,6 +51,7 @@ public static void setUp() { public static void tearDown() { try { session.closeSession(); + sessionPool.close(); } catch (SessionException e) { logger.error(e.getMessage()); } @@ -540,7 +555,9 @@ public void testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataS "+---+---------------------+--------------------------+\n" + "Total line number = 2\n"; SQLTestTools.executeAndCompare(session, statement, expect); - session.removeHistoryDataSource(3); + List idList = new ArrayList<>(); + idList.add(3L); + session.removeHistoryDataSource(idList); statement = "select * from test"; expect = "ResultSets:\n" + "+---+\n" + @@ -549,5 +566,26 @@ public void testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataS "+---+\n" + "Empty set.\n"; SQLTestTools.executeAndCompare(session, statement, expect); + + idList.set(0, 2L); + sessionPool.removeHistoryDataSource(idList); + statement = "select * from p2.test"; + expect = "ResultSets:\n" + + "+---+\n" + + "|key|\n" + + "+---+\n" + + "+---+\n" + + "Empty set.\n"; + SQLTestTools.executeAndCompare(session, statement, expect); + + session.executeSql("remove historydataresource 1"); + statement = "select * from p1.test"; + expect = "ResultSets:\n" + + "+---+\n" + + "|key|\n" + + "+---+\n" + + "+---+\n" + + "Empty set.\n"; + SQLTestTools.executeAndCompare(session, statement, expect); } } diff --git a/thrift/src/main/proto/rpc.thrift b/thrift/src/main/proto/rpc.thrift index 5bafd8f44..595352152 100644 --- a/thrift/src/main/proto/rpc.thrift +++ b/thrift/src/main/proto/rpc.thrift @@ -58,7 +58,8 @@ enum SqlType { CommitTransformJob, ShowJobStatus, CancelJob, - ShowEligibleJob + ShowEligibleJob, + RemoveHistoryDataResource } enum AuthType { @@ -386,6 +387,8 @@ struct StorageEngineInfo { 2: required string ip 3: required i32 port 4: required string type + 5: optional string schemaPrefix + 6: optional string dataPrefix } struct MetaStorageInfo { @@ -583,7 +586,7 @@ struct DebugInfoResp { struct RemoveHistoryDataSourceReq { 1: required i64 sessionId - 2: required i64 dummyStorageId + 2: required list dummyStorageId } service IService {