Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove history data resource sql #501

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ statement
| SHOW TRANSFORM JOB STATUS jobId=INT #showJobStatusStatement
| CANCEL TRANSFORM JOB jobId=INT #cancelJobStatement
| SHOW jobStatus TRANSFORM JOB #showEligibleJobStatement
| REMOVE HISTORYDATARESOURCE storageEngineID (COMMA storageEngineID)* #removeHistoryDataResourceStatement
;

queryClause
Expand Down Expand Up @@ -339,6 +340,7 @@ keyWords
| JOIN
| ON
| USING
| REMOVE
;

dateFormat
Expand Down Expand Up @@ -372,6 +374,10 @@ realLiteral
| EXPONENT
;

storageEngineID
: (INT)
;

//============================
// Start of the keywords list
//============================
Expand Down Expand Up @@ -702,6 +708,14 @@ ON
USING
: U S I N G
;

REMOVE
: R E M O V E
;

HISTORYDATARESOURCE
: H I S T O R Y D A T A R E S O U R C E
;
//============================
// End of the keywords list
//============================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ private static Completer buildIginxCompleter() {
Arrays.asList("clear", "data"),
Arrays.asList("show", "time", "series"),
Arrays.asList("show", "cluster", "info"),
Arrays.asList("show", "register", "python", "task")
Arrays.asList("show", "register", "python", "task"),
Arrays.asList("remove", "historyDataResource")
);
addArgumentCompleters(iginxCompleters, withoutNullCompleters, false);

Expand Down
88 changes: 47 additions & 41 deletions core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,49 +190,52 @@ public Status removeHistoryDataSource(RemoveHistoryDataSourceReq req) {
return RpcUtils.ACCESS_DENY;
}
Status status = RpcUtils.SUCCESS;
long dummyStorageId = req.getDummyStorageId();
StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId);
if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) {
status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("storage engine is not exists.");
return status;
}
try {
// 设置对应的 dummyFragament 为 invalid 状态
meta.getDummyFragment().setIfValid(false);
meta.getDummyStorageUnit().setIfValid(false);

// 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改
StorageEngineMeta newMeta = new StorageEngineMeta(
meta.getId(),
meta.getIp(),
meta.getPort(),
false,
null,
null,
meta.isReadOnly(),
null,
null,
meta.getExtraParams(),
meta.getStorageEngine(),
meta.getStorageUnitList(),
meta.getCreatedBy(),
meta.isNeedReAllocate()
);

// 更新 zk 上元数据信息,以及 iginx 上元数据信息
if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) {
List<Long> dummyStorageIdList = req.getDummyStorageId();
for (long dummyStorageId : dummyStorageIdList) {
StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId);
if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) {
status = RpcUtils.FAILURE;
status.setMessage("unexpected error during storage update");
status.setMessage("dummy storage engine is not exists.");
return status;
}
try {
// 设置对应的 dummyFragament 为 invalid 状态
meta.getDummyFragment().setIfValid(false);
meta.getDummyStorageUnit().setIfValid(false);

// 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改
StorageEngineMeta newMeta = new StorageEngineMeta(
meta.getId(),
meta.getIp(),
meta.getPort(),
false,
null,
null,
meta.isReadOnly(),
null,
null,
meta.getExtraParams(),
meta.getStorageEngine(),
meta.getStorageUnitList(),
meta.getCreatedBy(),
meta.isNeedReAllocate()
);

// 更新 zk 上元数据信息,以及 iginx 上元数据信息
if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) {
status = RpcUtils.FAILURE;
status.setMessage("unexpected error during storage update");
return status;
}

return status;
} catch (Exception e) {
logger.error("unexpected error during storage migration: ", e);
status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during removing history data source: " + e.getMessage());
return status;
} catch (Exception e) {
logger.error("unexpected error during storage migration: ", e);
status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during removing history data source: " + e.getMessage());
return status;
}
}
return status;
}

@Override
Expand Down Expand Up @@ -465,8 +468,11 @@ public GetClusterInfoResp getClusterInfo(GetClusterInfoReq req) {
// 数据库信息
List<StorageEngineInfo> storageEngineInfos = new ArrayList<>();
for (StorageEngineMeta storageEngineMeta : metaManager.getStorageEngineList()) {
storageEngineInfos.add(new StorageEngineInfo(storageEngineMeta.getId(), storageEngineMeta.getIp(),
storageEngineMeta.getPort(), storageEngineMeta.getStorageEngine()));
StorageEngineInfo info = new StorageEngineInfo(storageEngineMeta.getId(), storageEngineMeta.getIp(),
storageEngineMeta.getPort(), storageEngineMeta.getStorageEngine());
info.setSchemaPrefix(storageEngineMeta.getSchemaPrefix() == null ? "null" : storageEngineMeta.getSchemaPrefix());
info.setDataPrefix(storageEngineMeta.getDataPrefix() == null ? "null" : storageEngineMeta.getDataPrefix());
storageEngineInfos.add(info);
}
storageEngineInfos.sort(Comparator.comparingLong(StorageEngineInfo::getId));
resp.setStorageEngineInfos(storageEngineInfos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class StatementBuilder {
typeMap.put(StatementType.SELECT, SqlType.Query);
typeMap.put(StatementType.INSERT_FROM_SELECT, SqlType.Insert);
typeMap.put(StatementType.ADD_STORAGE_ENGINE, SqlType.AddStorageEngines);
typeMap.put(StatementType.REMOVE_HISTORY_DATA_RESOURCE, SqlType.RemoveHistoryDataResource);
typeMap.put(StatementType.SHOW_REPLICATION, SqlType.GetReplicaNum);
typeMap.put(StatementType.COUNT_POINTS, SqlType.CountPoints);
typeMap.put(StatementType.CLEAR_DATA, SqlType.ClearData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import java.util.Map;

public final class StorageEngineMeta implements Cloneable {
public final class StorageEngineMeta {

/**
* 数据库的 id
Expand Down Expand Up @@ -213,11 +213,6 @@ public void setNeedReAllocate(boolean needReAllocate) {
this.needReAllocate = needReAllocate;
}

@Override
public StorageEngineMeta clone() throws CloneNotSupportedException {
return (StorageEngineMeta)super.clone();
}

@Override
public String toString() {
return "StorageEngineMeta {" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ public Statement visitShowClusterInfoStatement(ShowClusterInfoStatementContext c
return new ShowClusterInfoStatement();
}

@Override
public Statement visitRemoveHistoryDataResourceStatement(RemoveHistoryDataResourceStatementContext ctx) {
RemoveHsitoryDataSourceStatement statement = new RemoveHsitoryDataSourceStatement();
ctx.storageEngineID().forEach(id -> statement.addStorageID(Long.parseLong(id.getText())));
return statement;
}

private void parseFromPaths(FromClauseContext ctx, SelectStatement selectStatement) {
if (ctx.queryClause() != null) {
// parse sub query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cn.edu.tsinghua.iginx.sql.statement;

import cn.edu.tsinghua.iginx.IginxWorker;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.Result;
import cn.edu.tsinghua.iginx.exceptions.ExecutionException;
import cn.edu.tsinghua.iginx.thrift.AddStorageEnginesReq;
import cn.edu.tsinghua.iginx.thrift.RemoveHistoryDataSourceReq;

import java.util.ArrayList;
import java.util.List;

public class RemoveHsitoryDataSourceStatement extends SystemStatement {

private List<Long> storageIDList;

public List<Long> getStorageID() {
return storageIDList;
}

public void setStorageID(List<Long> storageID) {
this.storageIDList = storageID;
}

public void addStorageID(Long storageID) {
this.storageIDList.add(storageID);
}

public RemoveHsitoryDataSourceStatement() {
storageIDList = new ArrayList<>();
this.statementType = StatementType.REMOVE_HISTORY_DATA_RESOURCE;
}

@Override
public void execute(RequestContext ctx) throws ExecutionException {
IginxWorker worker = IginxWorker.getInstance();
RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(ctx.getSessionId(), storageIDList);
ctx.setResult(new Result(worker.removeHistoryDataSource(req)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public enum StatementType {
COMMIT_TRANSFORM_JOB,
SHOW_JOB_STATUS,
CANCEL_JOB,
SHOW_ELIGIBLE_JOB
SHOW_ELIGIBLE_JOB,
REMOVE_HISTORY_DATA_RESOURCE
}
18 changes: 18 additions & 0 deletions session/src/main/java/cn/edu/tsinghua/iginx/pool/SessionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ public void addStorageEngines(List<StorageEngine> storageEngines) throws Session
}
}

public void removeHistoryDataSource(List<Long> idList) throws SessionException, ExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
session.removeHistoryDataSource(idList);
putBack(session);
return;
} catch (SessionException e) {
// TException means the connection is broken, remove it and get a new one.
logger.warn("remove history data source failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (ExecutionException | RuntimeException e) {
putBack(session);
throw e;
}
}
}

private void cleanSessionAndMayThrowConnectionException(
Session session, int times, SessionException e) throws SessionException {
closeSession(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,8 +1260,8 @@ public CurveMatchResult curveMatch(List<String> paths, long startTime, long endT
return new CurveMatchResult(resp.getMatchedTimestamp(), resp.getMatchedPath());
}

public void removeHistoryDataSource(long id) throws SessionException, ExecutionException {
RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(sessionId, id);
public void removeHistoryDataSource(List<Long> idList) throws SessionException, ExecutionException {
RemoveHistoryDataSourceReq req = new RemoveHistoryDataSourceReq(sessionId, idList);
try {
Status status;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ private String buildShowClusterInfoResult() {
if (storageEngineInfos != null && !storageEngineInfos.isEmpty()) {
builder.append("Storage engine infos:").append("\n");
List<List<String>> cache = new ArrayList<>();
cache.add(new ArrayList<>(Arrays.asList("ID", "IP", "PORT", "TYPE")));
cache.add(new ArrayList<>(Arrays.asList("ID", "IP", "PORT", "TYPE", "SCHEMA_PREFIX", "DATAPREFIX")));
for (StorageEngineInfo info : storageEngineInfos) {
cache.add(new ArrayList<>(Arrays.asList(
String.valueOf(info.getId()),
info.getIp(),
String.valueOf(info.getPort()),
info.getType()
info.getType(),
info.getSchemaPrefix(),
info.getDataPrefix()
)));
}
builder.append(FormatUtils.formatResult(cache));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
import cn.edu.tsinghua.iginx.integration.SQLSessionIT;
import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT;
import cn.edu.tsinghua.iginx.integration.expansion.unit.SQLTestTools;
import cn.edu.tsinghua.iginx.pool.SessionPool;
import cn.edu.tsinghua.iginx.session.Session;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class IoTDBHistoryDataCapacityExpansionIT implements BaseCapacityExpansionIT {

private static final Logger logger = LoggerFactory.getLogger(SQLSessionIT.class);

private static Session session;

private static SessionPool sessionPool;

private String ENGINE_TYPE;

public IoTDBHistoryDataCapacityExpansionIT(String engineType) {
Expand All @@ -26,6 +32,14 @@ public IoTDBHistoryDataCapacityExpansionIT(String engineType) {
@BeforeClass
public static void setUp() {
session = new Session("127.0.0.1", 6888, "root", "root");
sessionPool =
new SessionPool.Builder()
.host("127.0.0.1")
.port(6888)
.user("root")
.password("root")
.maxSize(3)
.build();
try {
session.openSession();
} catch (SessionException e) {
Expand All @@ -37,6 +51,7 @@ public static void setUp() {
public static void tearDown() {
try {
session.closeSession();
sessionPool.close();
} catch (SessionException e) {
logger.error(e.getMessage());
}
Expand Down Expand Up @@ -540,7 +555,9 @@ public void testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataS
"+---+---------------------+--------------------------+\n" +
"Total line number = 2\n";
SQLTestTools.executeAndCompare(session, statement, expect);
session.removeHistoryDataSource(3);
List<Long> idList = new ArrayList<>();
idList.add(3L);
session.removeHistoryDataSource(idList);
statement = "select * from test";
expect = "ResultSets:\n" +
"+---+\n" +
Expand All @@ -549,5 +566,26 @@ public void testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataS
"+---+\n" +
"Empty set.\n";
SQLTestTools.executeAndCompare(session, statement, expect);

idList.set(0, 2L);
sessionPool.removeHistoryDataSource(idList);
statement = "select * from p2.test";
expect = "ResultSets:\n" +
"+---+\n" +
"|key|\n" +
"+---+\n" +
"+---+\n" +
"Empty set.\n";
SQLTestTools.executeAndCompare(session, statement, expect);

session.executeSql("remove historydataresource 1");
statement = "select * from p1.test";
expect = "ResultSets:\n" +
"+---+\n" +
"|key|\n" +
"+---+\n" +
"+---+\n" +
"Empty set.\n";
SQLTestTools.executeAndCompare(session, statement, expect);
}
}
Loading