Skip to content

Commit

Permalink
Merge pull request #29219 from taosdata/fix/job.remain2
Browse files Browse the repository at this point in the history
fix: job remained issue cause of task dropped during pre-process phase
  • Loading branch information
guanshengliang authored Dec 20, 2024
2 parents ab58fdd + 956f9a5 commit 1ce6f4a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
4 changes: 2 additions & 2 deletions source/libs/catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom nodes
)

# if(${BUILD_TEST})
#if(${BUILD_TEST})
# ADD_SUBDIRECTORY(test)
# endif(${BUILD_TEST})
#endif(${BUILD_TEST})
2 changes: 1 addition & 1 deletion source/libs/catalog/test/catalogTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void ctgTestInitLogFile() {
(void)ctgdEnableDebug("cache", true);
(void)ctgdEnableDebug("lock", true);

if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
if (taosInitLog(defaultLogFileNamePrefix, 1, false) < 0) {
(void)printf("failed to open log file in directory:%s\n", tsLogDir);
ASSERT(0);
}
Expand Down
10 changes: 6 additions & 4 deletions source/libs/qworker/src/qwUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
}
}

atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
(void)atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);

if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
Expand All @@ -283,7 +283,7 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
qDestroyTask(otaskHandle);
taosDisableMemPoolUsage();

atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1);
(void)atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1);

qDebug("task handle destroyed");
}
Expand All @@ -297,7 +297,7 @@ void qwFreeSinkHandle(SQWTaskCtx *ctx) {
dsDestroyDataSinker(osinkHandle);
QW_SINK_DISABLE_MEMPOOL();

atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1);
(void)atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1);

qDebug("sink handle destroyed");
}
Expand Down Expand Up @@ -409,6 +409,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {

if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) {
QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode);
} else {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
}

atomic_store_ptr(&ctx->taskHandle, NULL);
Expand All @@ -428,7 +430,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {

QW_TASK_DLOG_E("task ctx dropped");

atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1);
(void)atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1);

return code;
}
Expand Down
5 changes: 5 additions & 0 deletions source/libs/qworker/src/qworker.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {

QW_LOCK(QW_WRITE, &ctx->lock);

if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, drop event:%d", QW_GET_EVENT(ctx, QW_EVENT_DROP));
QW_ERR_JRET(ctx->rspCode);
}

ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->sId = sId;
ctx->phase = -1;
Expand Down
18 changes: 18 additions & 0 deletions tests/script/api/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

make clean

make

pgrep taosd || taosd >> /dev/null 2>&1 &

sleep 10

./dbTableRoute localhost
./batchprepare localhost
./stmt-crash localhost
./insertSameTs localhost
./passwdTest localhost
./whiteListTest localhost
./tmqViewTest

0 comments on commit 1ce6f4a

Please sign in to comment.