Skip to content

Commit

Permalink
Fix memory leak while inserting using sql
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Jan 21, 2025
1 parent 41a49e7 commit 9f8bf85
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public boolean fetchResults() throws StatementExecutionException, IoTDBConnectio
throw new IoTDBConnectionException("This DataSet is already closed");
}
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
req.setStatementId(statementId);
req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,23 @@ public void addQueryId(Long statementId, long queryId) {

@Override
public void removeQueryId(Long statementId, Long queryId) {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
removeQueryId(statementIdToQueryId, statementId, queryId);
}

public static void removeQueryId(
Map<Long, Set<Long>> statementIdToQueryId, Long statementId, Long queryId) {
if (statementId == null) {
statementIdToQueryId.forEach(
(k, v) -> {
if (v != null) {
v.remove(queryId);
}
});
} else {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public TSConnectionInfo convertToTSConnectionInfo() {

public abstract void addQueryId(Long statementId, long queryId);

// statementId could be null
public abstract void removeQueryId(Long statementId, Long queryId);

public SqlDialect getSqlDialect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public void addQueryId(Long statementId, long queryId) {

@Override
public void removeQueryId(Long statementId, Long queryId) {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
}
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ public ClientRPCServiceImpl() {
private TSExecuteStatementResp executeStatementInternal(
TSExecuteStatementReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
String statement = req.getStatement();
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Expand Down Expand Up @@ -362,10 +363,6 @@ private TSExecuteStatementResp executeStatementInternal(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}

// TODO: permission check

// TODO audit log, quota, StatementType

queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

result =
Expand Down Expand Up @@ -439,7 +436,7 @@ private TSExecuteStatementResp executeStatementInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
statementType, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
Expand All @@ -448,9 +445,21 @@ private TSExecuteStatementResp executeStatementInternal(
}
}

private void clearUp(
IClientSession clientSession,
Long statementId,
Long queryId,
org.apache.thrift.TBase<?, ?> req,
Throwable t) {
COORDINATOR.cleanupQueryExecution(queryId, req, t);
// clear up queryId Map in clientSession
clientSession.removeQueryId(statementId, queryId);
}

private TSExecuteStatementResp executeRawDataQueryInternal(
TSRawDataQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -530,7 +539,7 @@ private TSExecuteStatementResp executeRawDataQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand All @@ -543,6 +552,7 @@ private TSExecuteStatementResp executeRawDataQueryInternal(
private TSExecuteStatementResp executeLastDataQueryInternal(
TSLastDataQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -623,7 +633,7 @@ private TSExecuteStatementResp executeLastDataQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand All @@ -636,6 +646,7 @@ private TSExecuteStatementResp executeLastDataQueryInternal(
private TSExecuteStatementResp executeAggregationQueryInternal(
TSAggregationQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -713,7 +724,7 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down Expand Up @@ -870,6 +881,7 @@ public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) {
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
TSFastLastDataQueryForOneDeviceReq req) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -1061,7 +1073,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
Expand Down Expand Up @@ -1178,8 +1190,9 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
String statementType = null;
Throwable t = null;
IQueryExecution queryExecution = null;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Long statementId = req.isSetStatementId() ? req.getStatementId() : null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
Expand Down Expand Up @@ -1230,7 +1243,7 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
clearUp(clientSession, statementId, req.queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down Expand Up @@ -1786,8 +1799,9 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
String statementType = null;
Throwable t = null;
IQueryExecution queryExecution = null;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Long statementId = req.isSetStatementId() ? req.getStatementId() : null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
Expand Down Expand Up @@ -1838,7 +1852,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
clearUp(clientSession, statementId, req.queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,6 @@ public TSStatus executeCQ(TExecuteCQ req) {
return result.status;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return onQueryException(e, "\"" + executedSQL + "\". " + OperationType.EXECUTE_STATEMENT);
} finally {
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ public int getQueryExecutionMapSize() {
return queryExecutionMap.size();
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
Expand All @@ -472,7 +471,6 @@ private ExecutorService getWriteExecutor() {
coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService
if (!state.isDone()) {
return;
}
// TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
// invoked
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,16 @@ public void start() {
// QueryState to Running
stateMachine.transitionToRunning();

// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
this.stateTracker.start();
logger.debug("state tracker starts");
}

@Override
public void stop(Throwable t) {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
// practice ?
dispatcher.abort();
if (stateTracker != null) {
stateTracker.abort();
}
// TODO: (xingtanzjr) handle the exception when the termination cannot succeed
if (queryTerminator != null) {
queryTerminator.terminate(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ struct TSFetchResultsReq{
4: required i64 queryId
5: required bool isAlign
6: optional i64 timeout
7: optional i64 statementId
}

struct TSFetchResultsResp{
Expand Down

0 comments on commit 9f8bf85

Please sign in to comment.