Skip to content

Commit

Permalink
Merge pull request #22819 from taosdata/fix/3_1_patch_liaohj
Browse files Browse the repository at this point in the history
other: merge fix from main.
  • Loading branch information
hjxilinx authored Sep 11, 2023
2 parents 867f85d + 44f88c1 commit 51c9656
Show file tree
Hide file tree
Showing 25 changed files with 554 additions and 300 deletions.
2 changes: 1 addition & 1 deletion cmake/cmake.define
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(CMAKE_VERBOSE_MAKEFILE FALSE)
set(TD_BUILD_TAOSA_INTERNAL FALSE)

#set output directory
Expand Down
20 changes: 18 additions & 2 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it
int64_t nextProcessVer; // current offset in WAL, not serialize it
} SCheckpointInfo;

typedef struct SStreamStatus {
Expand Down Expand Up @@ -312,12 +312,27 @@ typedef struct STaskSchedInfo {
void* pTimer;
} STaskSchedInfo;

typedef struct SSinkTaskRecorder {
int64_t numOfSubmit;
int64_t numOfBlocks;
int64_t numOfRows;
} SSinkTaskRecorder;

typedef struct {
int64_t created;
int64_t init;
int64_t step1Start;
int64_t step2Start;
int64_t sinkStart;
} STaskTimestamp;

typedef struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
} STokenBucket;

struct SStreamTask {
int64_t ver;
SStreamTaskId id;
Expand Down Expand Up @@ -345,6 +360,8 @@ struct SStreamTask {
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
SSinkTaskRecorder sinkRecorder;
STokenBucket tokenBucket;

void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
Expand Down Expand Up @@ -683,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);

// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
Expand Down
2 changes: 1 addition & 1 deletion include/libs/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);

// wal reader
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
Expand Down
14 changes: 7 additions & 7 deletions source/dnode/snode/src/snode.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} else {
if (pTask->chkInfo.currentVer == -1) {
pTask->chkInfo.currentVer = 0;
if (pTask->chkInfo.nextProcessVer == -1) {
pTask->chkInfo.nextProcessVer = 0;
}
}

qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);

Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
Expand Down
34 changes: 20 additions & 14 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {

if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}

// reset the task status from unfinished transaction
Expand All @@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {

// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);

Expand Down Expand Up @@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
}

int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId;

int32_t code;
SStreamTaskCheckRsp rsp;
Expand All @@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
return -1;
}

Expand Down Expand Up @@ -1087,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}

// now we can stop the stream task execution
streamTaskHalt(pStreamTask);

int64_t latestVer = 0;
taosThreadMutexLock(&pStreamTask->lock);
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
taosThreadMutexUnlock(&pStreamTask->lock);

// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);

if (done) {
Expand All @@ -1115,15 +1121,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {

int64_t dstVer = pTask->dataRange.range.minVer - 1;

pTask->chkInfo.currentVer = dstVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);

atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);

// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask);
}

Expand All @@ -1148,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
}

code = streamTaskScanHistoryDataComplete(pTask);
Expand Down Expand Up @@ -1283,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.currentVer);
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
Expand Down Expand Up @@ -1423,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}

if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/vnode/src/tq/tqMeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
return -1;
}
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
handle->execHandle.pTqReader = tqReaderOpen(pVnode);

buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);

if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
Expand Down
21 changes: 11 additions & 10 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,26 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
int32_t code = -1;
int32_t vgId = TD_VID(pTq->pVnode);
int64_t id = pHandle->pWalReader->readerId;

int64_t offset = *fetchOffset;
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);

wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
vgId, offset, lastVer, committedVer, appliedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64,
vgId, offset, lastVer, committedVer, appliedVer, id);

while (offset <= appliedVer) {
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
goto END;
}

tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);

if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader);
Expand Down Expand Up @@ -250,7 +251,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
return NULL;
}

pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
if (pReader->pWalReader == NULL) {
taosMemoryFree(pReader);
return NULL;
Expand Down Expand Up @@ -491,11 +492,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {

void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
return true;
} else {
tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
}

pReader->nextBlk++;
Expand Down
Loading

0 comments on commit 51c9656

Please sign in to comment.