diff --git a/cmake/cmake.define b/cmake/cmake.define index edc5dd601a6a..53026df4efe6 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 99160f151917..4c3bec670459 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -260,7 +260,7 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version - int64_t currentVer; // current offset in WAL, not serialize it + int64_t nextProcessVer; // current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { @@ -312,12 +312,27 @@ typedef struct STaskSchedInfo { void* pTimer; } STaskSchedInfo; +typedef struct SSinkTaskRecorder { + int64_t numOfSubmit; + int64_t numOfBlocks; + int64_t numOfRows; +} SSinkTaskRecorder; + typedef struct { + int64_t created; int64_t init; int64_t step1Start; int64_t step2Start; + int64_t sinkStart; } STaskTimestamp; +typedef struct STokenBucket { + int32_t capacity; // total capacity + int64_t fillTimestamp;// fill timestamp + int32_t numOfToken; // total available tokens + int32_t rate; // number of token per second +} STokenBucket; + struct SStreamTask { int64_t ver; SStreamTaskId id; @@ -345,6 +360,8 @@ struct SStreamTask { STaskSinkSma smaSink; STaskSinkFetch fetchSink; }; + SSinkTaskRecorder sinkRecorder; + STokenBucket tokenBucket; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle @@ -683,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level -int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pRpcInfo); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index cfe70a186cf1..a56a5567eb54 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver); // int32_t walDataCorrupted(SWal*); // wal reader -SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); +SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c1a59416f6f5..2b1885fb0e8f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; - qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); + pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } else { - if (pTask->chkInfo.currentVer == -1) { - pTask->chkInfo.currentVer = 0; + if (pTask->chkInfo.nextProcessVer == -1) { + pTask->chkInfo.nextProcessVer = 0; } } - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 + qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.fillHistory, pTask->info.triggerParam); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3355e771e2da..536273c044db 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg); +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 447a7e2d90f9..45ab424610f6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } // reset the task status from unfinished transaction @@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; + pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.fillHistory, pTask->info.triggerParam); @@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); } -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t vgId = pTq->pStreamMeta->vgId; int32_t code; SStreamTaskCheckRsp rsp; @@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { tDecoderInit(&decoder, (uint8_t*)pReq, len); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; tDecoderClear(&decoder); + tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); return -1; } @@ -1087,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // now we can stop the stream task execution - streamTaskHalt(pStreamTask); + int64_t latestVer = 0; + taosThreadMutexLock(&pStreamTask->lock); + streamTaskHalt(pStreamTask); tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); + latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + taosThreadMutexUnlock(&pStreamTask->lock); // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); if (done) { @@ -1115,7 +1121,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int64_t dstVer = pTask->dataRange.range.minVer - 1; - pTask->chkInfo.currentVer = dstVer; + pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); @@ -1123,7 +1129,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); // set the fill-history task to be normal - if (pTask->info.fillHistory == 1) { + if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { streamSetStatusNormal(pTask); } @@ -1148,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug( "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, - id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); + id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); } code = streamTaskScanHistoryDataComplete(pTask); @@ -1283,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.currentVer); + tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.nextProcessVer); streamProcessRunReq(pTask); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); @@ -1423,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 85151c6e1944..154ac1e8c188 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ return -1; } } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); handle->execHandle.pTqReader = tqReaderOpen(pVnode); buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, (SSnapContext**)(&reader.sContext)); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0839a2bfa344..cadbc70c6f70 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -187,25 +187,26 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) { int32_t code = -1; int32_t vgId = TD_VID(pTq->pVnode); + int64_t id = pHandle->pWalReader->readerId; int64_t offset = *fetchOffset; int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal); int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal); int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal); - wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64, - vgId, offset, lastVer, committedVer, appliedVer); + wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64, + vgId, offset, lastVer, committedVer, appliedVer, id); while (offset <= appliedVer) { if (walFetchHead(pHandle->pWalReader, offset) < 0) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 - ", no more log to return, reqId:0x%" PRIx64, - pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); + ", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id); goto END; } - tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, - pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id); if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader); @@ -250,7 +251,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) { return NULL; } - pReader->pWalReader = walOpenReader(pVnode->pWal, NULL); + pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); if (pReader->pWalReader == NULL) { taosMemoryFree(pReader); return NULL; @@ -491,11 +492,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { - tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); + tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); return true; } else { - tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, - taosHashGetSize(pReader->tbIdHash), idstr); + tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, + taosHashGetSize(pReader->tbIdHash), idstr); } pReader->nextBlk++; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f7132ff6c4ea..88bdc8a4d9ec 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,10 +24,13 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; -static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, - SSDataBlock* pDataBlock, SStreamTask* pTask); -static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, +static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, + SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); +static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); +static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); +static void fillBucket(STokenBucket* pBucket); +static bool hasAvailableToken(STokenBucket* pBucket); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -133,6 +136,148 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } +static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, + int64_t suid) { + tqDebug("s-task:%s build create table msg", pTask->id.idStr); + + STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t rows = pDataBlock->info.rows; + SArray* tagArray = NULL; + int32_t code = 0; + + SVCreateTbBatchReq reqs = {0}; + + SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); + if (NULL == reqs.pArray) { + goto _end; + } + + for (int32_t rowId = 0; rowId < rows; rowId++) { + SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + + // set tag content + int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); + if (size == 2) { + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; + + taosArrayPush(tagArray, &tagVal); + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + } else { + tagArray = taosArrayInit(size - 1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + + for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { + SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); + + STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; + void* pData = colDataGetData(pTagData, rowId); + if (colDataIsNull_s(pTagData, rowId)) { + continue; + } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { + tagVal.nData = varDataLen(pData); + tagVal.pData = (uint8_t*)varDataVal(pData); + } else { + memcpy(&tagVal.i64, pData, pTagData->info.bytes); + } + taosArrayPush(tagArray, &tagVal); + } + } + pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set table name + if (!pDataBlock->info.parTbName[0]) { + SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + + void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); + } else { + pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); + } + + taosArrayPush(reqs.pArray, pCreateTbReq); + tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); + } + + reqs.nReqs = taosArrayGetSize(reqs.pArray); + code = tqPutReqToQueue(pVnode, &reqs); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s failed to send create table msg", pTask->id.idStr); + } + + _end: + taosArrayDestroy(tagArray); + taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); + return code; +} + +static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) { + const char* id = pTask->id.idStr; + int32_t vgId = TD_VID(pVnode); + int32_t len = 0; + void* pBuf = NULL; + + int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code)); + return code; + } + + SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; + code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); + } else { + tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); + } + + pTask->sinkRecorder.numOfSubmit += 1; + + if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0; + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 + " submit into dst table, duration:%.2f Sec.", + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); + } + + return TSDB_CODE_SUCCESS; +} void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; @@ -143,136 +288,147 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { int32_t vgId = TD_VID(pVnode); int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); - - SArray* tagArray = NULL; - SArray* pVals = NULL; - SArray* crTblArray = NULL; - - for (int32_t i = 0; i < numOfBlocks; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - int32_t rows = pDataBlock->info.rows; + if (pTask->tsInfo.sinkStart == 0) { + pTask->tsInfo.sinkStart = taosGetTimestampMs(); + } - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - tqDebug("s-task:%s build create table msg", pTask->id.idStr); + bool isMixBlocks = true; + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { + isMixBlocks = true; + break; + } + } - SVCreateTbBatchReq reqs = {0}; - crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); - if (NULL == reqs.pArray) { - goto _end; - } + if (isMixBlocks) { + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, + numOfBlocks); + + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } else { + pTask->sinkRecorder.numOfBlocks += 1; - for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + return; + } - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + taosArrayPush(submitReq.aSubmitTbData, &tbData); - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq); + } + } + } else { + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); + SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + taosHashCleanup(pTableIndexMap); + return; + } - // set tag content - int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); - if (size == 2) { - tagArray = taosArrayInit(1, sizeof(STagVal)); + bool hasSubmit = false; - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } + for (int32_t i = 0; i < numOfBlocks; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } else { + hasSubmit = true; + pTask->sinkRecorder.numOfBlocks += 1; - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); - taosArrayPush(tagArray, &tagVal); + int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid)); + if (index == NULL) { // no data yet, append it + taosArrayPush(submitReq.aSubmitTbData, &tbData); - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; + int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; + taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size)); } else { - tagArray = taosArrayInit(size - 1, sizeof(STagVal)); - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - - for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { - SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); - - STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; - void* pData = colDataGetData(pTagData, rowId); - if (colDataIsNull_s(pTagData, rowId)) { - continue; - } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { - tagVal.nData = varDataLen(pData); - tagVal.pData = (uint8_t*) varDataVal(pData); + SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + // merge the new submit table block with the existed blocks + // if ts in the new data block overlap with existed one, replace it + int32_t oldLen = taosArrayGetSize(pExisted->aRowP); + int32_t newLen = taosArrayGetSize(tbData.aRowP); + + int32_t j = 0, k = 0; + SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); + while (j < newLen && k < oldLen) { + SRow* pNewRow = taosArrayGetP(tbData.aRowP, j); + SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); + if (pNewRow->ts <= pOldRow->ts) { + taosArrayPush(pFinal, &pNewRow); + if (pNewRow->ts < pOldRow->ts) { + j += 1; + } else { + j += 1; + k += 1; + } } else { - memcpy(&tagVal.i64, pData, pTagData->info.bytes); + taosArrayPush(pFinal, &pOldRow); + k += 1; } - taosArrayPush(tagArray, &tagVal); } - } - pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - tdDestroySVCreateTbReq(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } + while (j < newLen) { + SRow* pRow = taosArrayGetP(tbData.aRowP, j++); + taosArrayPush(pFinal, &pRow); + } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + while (k < oldLen) { + SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + taosArrayPush(pFinal, &pRow); + } - // set table name - if (!pDataBlock->info.parTbName[0]) { - SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); - void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); - } else { - pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pExisted->aRowP); + pExisted->aRowP = pFinal; + + tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, + (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL)); } - taosArrayPush(reqs.pArray, pCreateTbReq); - tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); + pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; } + } - reqs.nReqs = taosArrayGetSize(reqs.pArray); - if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { - goto _end; - } + taosHashCleanup(pTableIndexMap); - tagArray = taosArrayDestroy(tagArray); - taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); - crTblArray = NULL; - } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { - continue; + if (hasSubmit) { + doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq); } else { - code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); } } - tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); - -_end: - taosArrayDestroy(tagArray); - taosArrayDestroy(pVals); - taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); // TODO: change } -int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, +int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; @@ -390,7 +546,6 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableSinkInfo); - tqError("s-task:%s failed to put tableSinkInfo in to cache, code:%s", id, tstrerror(code)); } else { tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data, pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap)); @@ -399,26 +554,75 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi return code; } -int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, - SStreamTask* pTask) { +int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { + int32_t code = 0; + void* pBuf = NULL; + *msgLen = 0; + + // encode + int32_t len = 0; + tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code); + + SEncoder encoder; + len += sizeof(SSubmitReq2Msg); + + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); + if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); + tEncoderClear(&encoder); + rpcFreeCont(pBuf); + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); + return code; + } + + tEncoderClear(&encoder); + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); + + *msgLen = len; + *pMsg = pBuf; + return TSDB_CODE_SUCCESS; +} + +static int32_t tsAscendingSortFn(const void* p1, const void* p2) { + SRow* pRow1 = *(SRow**) p1; + SRow* pRow2 = *(SRow**) p2; + + if (pRow1->ts == pRow2->ts) { + return 0; + } else { + return pRow1->ts > pRow2->ts? 1:-1; + } +} + +int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, + SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) { int32_t numOfRows = pDataBlock->info.rows; int32_t vgId = TD_VID(pVnode); uint64_t groupId = pDataBlock->info.id.groupId; STSchema* pTSchema = pTask->tbSink.pTSchema; int32_t code = TSDB_CODE_SUCCESS; - void* pBuf = NULL; SArray* pVals = NULL; const char* id = pTask->id.idStr; - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64, + tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id, blockIndex + 1, numOfRows, suid); - tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); + pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); - if (tbData.aRowP == NULL || pVals == NULL) { - taosArrayDestroy(tbData.aRowP); + if (pTableData->aRowP == NULL || pVals == NULL) { + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); code = TSDB_CODE_OUT_OF_MEMORY; @@ -459,13 +663,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, } if (exist) { - tbData.uid = pTableSinkInfo->uid; + pTableData->uid = pTableSinkInfo->uid; - if (tbData.uid == 0) { + if (pTableData->uid == 0) { tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); } while (pTableSinkInfo->uid == 0) { + if (streamTaskShouldStop(&pTask->status)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); + taosArrayDestroy(pVals); + + return TSDB_CODE_SUCCESS; + } + // wait for the table to be created SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, 0); @@ -476,21 +688,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, if (!isValid) { // not valid table, ignore it metaReaderClear(&mr); - taosArrayDestroy(tbData.aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } else { tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); - tbData.uid = mr.me.uid; + pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; metaReaderClear(&mr); } - } else { // not exist, wait and retry + } else { // not exist, wait and retry metaReaderClear(&mr); taosMsleep(100); - tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName); + tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); } } @@ -510,12 +722,12 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); - if (tbData.pCreateTbReq == NULL) { + pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); + if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosArrayDestroy(tbData.aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return terrno; @@ -527,14 +739,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, if (!isValid) { metaReaderClear(&mr); taosMemoryFree(pTableSinkInfo); - taosArrayDestroy(tbData.aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } else { - tbData.uid = mr.me.uid; + pTableData->uid = mr.me.uid; metaReaderClear(&mr); - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id); + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id); } } } @@ -565,7 +777,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { // address copy, no value - SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; + SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { @@ -582,69 +794,20 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, SRow* pRow = NULL; code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); + tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(tbData.aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return code; } ASSERT(pRow); - taosArrayPush(tbData.aRowP, &pRow); + taosArrayPush(pTableData->aRowP, &pRow); } - SSubmitReq2 submitReq = {0}; - if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pVals); - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - - // encode - int32_t len = 0; - tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); - - SEncoder encoder; - len += sizeof(SSubmitReq2Msg); - - pBuf = rpcMallocCont(len); - if (NULL == pBuf) { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pVals); - } - - ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; - ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); - ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); - - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); - tEncoderClear(&encoder); - rpcFreeCont(pBuf); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - - return code; - } - - tEncoderClear(&encoder); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - - SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; - code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - - if(code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows); - } else { - tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); - } + taosArraySort(pTableData->aRowP, tsAscendingSortFn); + tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); taosArrayDestroy(pVals); return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 58c7686aeb19..44474d26294a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -112,7 +112,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exist", vgId); + tqDebug("vgId:%d no stream tasks existed to run", vgId); taosWUnLockLatch(&pMeta->lock); return 0; } @@ -150,7 +150,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exist", vgId); + tqDebug("vgId:%d no stream tasks existed to run", vgId); taosWUnLockLatch(&pMeta->lock); return 0; } @@ -227,36 +227,36 @@ int32_t tqStopStreamTasks(STQ* pTq) { int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); - if (pTask->chkInfo.currentVer < firstVer) { + if (pTask->chkInfo.nextProcessVer < firstVer) { tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer); - pTask->chkInfo.currentVer = firstVer; + pTask->chkInfo.nextProcessVer = firstVer; // todo need retry if failed - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; } // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (currentVer == -1) { // we only seek the read for the first time - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit return code; } // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.currentVer); + pTask->chkInfo.nextProcessVer); } } int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader); - if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) { + if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) { int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit return code; @@ -275,7 +275,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if (!pTask->status.appendTranstateBlock) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); @@ -284,7 +284,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */ streamSchedExec(pTask); } else { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", id, ver, maxVer); } } @@ -367,25 +367,23 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t numOfItems = streamTaskGetInputQItems(pTask); int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; - SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); + taosThreadMutexLock(&pTask->lock); - if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue - handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); + pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); continue; } - taosThreadMutexLock(&pTask->lock); - pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + SStreamQueueItem* pItem = NULL; + code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); - if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { - tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); - taosThreadMutexUnlock(&pTask->lock); + if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue + handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); - if (pItem != NULL) { - streamFreeQitem(pItem); - } + taosThreadMutexUnlock(&pTask->lock); continue; } @@ -394,12 +392,12 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = streamTaskPutDataIntoInputQ(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); - pTask->chkInfo.currentVer = ver; + pTask->chkInfo.nextProcessVer = ver; handleFillhistoryScanComplete(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, - pTask->chkInfo.currentVer); + pTask->chkInfo.nextProcessVer); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 22d5d4087788..2f82d1bde352 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -756,7 +756,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_VND_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: - return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg); + return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: @@ -1443,8 +1443,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0); TSKEY *aKey = (TSKEY *)(pColData->pData); + vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), pColData->nVal, pSubmitTbData->uid); for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) { + vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aKey[iRow]); + if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); @@ -1456,10 +1459,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); + vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), nRow, pSubmitTbData->uid); for (int32_t iRow = 0; iRow < nRow; ++iRow) { + vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aRow[iRow]->ts); + if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); + vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); goto _exit; } } @@ -1564,6 +1570,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in } else { // create table failed if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { code = terrno; + vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name, + tstrerror(terrno)); goto _exit; } terrno = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 12ec63b4cb4c..474128007a62 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2257,7 +2257,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); + qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id); if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); continue; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bb81582a2dbc..bbb7595e5ada 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -77,6 +77,8 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 82fa21ea40d5..8a80d74c639a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1591,7 +1591,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - // qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index baf319d014bd..361602fac900 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -287,7 +287,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, streamGetTaskStatusStr(prev)); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 39becca78123..da4c3ecbcf12 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -994,9 +994,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + double el = 0; + if (pTask->msgInfo.blockingTs == 0) { + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + } else { + el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0; + } + + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 + " wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ff667fa77811..8d282696c145 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -84,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); taosMsleep(1000); continue; } @@ -563,11 +563,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { SIZE_IN_MB(resSize), totalBlocks); // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer); + ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, - pTask->chkInfo.checkpointVer, ver); + qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64, + pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); pTask->chkInfo.checkpointVer = ver; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 34b0a0063938..a667ec237139 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -29,6 +29,8 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; +static bool streamTaskHasAvailableToken(STokenBucket* pBucket); + static void streamQueueCleanup(SStreamQueue* pQueue) { void* qItem = NULL; while ((qItem = streamQueueNextItem(pQueue)) != NULL) { @@ -175,6 +177,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } + STokenBucket* pBucket = &pTask->tokenBucket; + bool has = streamTaskHasAvailableToken(pBucket); + if (!has) { // no available token in th bucket, ignore this execution +// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, +// pBucket->capacity, pBucket->rate); + return TSDB_CODE_SUCCESS; + } + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); @@ -264,9 +274,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { - qError( - "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); +// qError( +// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", +// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -288,8 +298,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pQueue)) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); +// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", +// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } @@ -320,3 +330,45 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } + +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { + if (cap < 100 || rate < 50 || pBucket == NULL) { + qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); + return TSDB_CODE_INVALID_PARA; + } + + pBucket->capacity = cap; + pBucket->rate = rate; + pBucket->numOfToken = cap; + pBucket->fillTimestamp = taosGetTimestampMs(); + return TSDB_CODE_SUCCESS; +} + +static void fillBucket(STokenBucket* pBucket) { + int64_t now = taosGetTimestampMs(); + int64_t delta = now - pBucket->fillTimestamp; + ASSERT(pBucket->numOfToken >= 0); + + int32_t inc = (delta / 1000.0) * pBucket->rate; + if (inc > 0) { + if ((pBucket->numOfToken + inc) < pBucket->capacity) { + pBucket->numOfToken += inc; + } else { + pBucket->numOfToken = pBucket->capacity; + } + + pBucket->fillTimestamp = now; + qDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); + } +} + +bool streamTaskHasAvailableToken(STokenBucket* pBucket) { + fillBucket(pBucket); + if (pBucket->numOfToken > 0) { +// qDebug("current token:%d", pBucket->numOfToken); + --pBucket->numOfToken; + return true; + } else { + return false; + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4b86b9713c12..fe836de3a22c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -30,6 +30,13 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, + streamGetTaskStatusStr(pTask->status.taskStatus)); + } + ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; @@ -97,11 +104,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { streamSetParamForScanHistory(pTask); streamTaskEnablePause(pTask); } - - streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); - streamTaskScanHistoryPrepare(pTask); } return 0; } @@ -396,15 +400,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// agg -int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); - qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, - streamGetTaskStatusStr(pTask->status.taskStatus)); - return 0; -} - int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { @@ -503,7 +498,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; - pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; + // the query version range should be limited to the already processed data + pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 711dbf65e752..ea4c2e71bc70 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -375,16 +375,17 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return -1; } - pTask->tsInfo.init = taosGetTimestampMs(); + pTask->tsInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; - pTask->chkInfo.currentVer = ver; + pTask->chkInfo.nextProcessVer = ver; pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; + streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100); taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 929965199916..b167f2ecb69a 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -59,7 +59,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ASSERT(pData->pWal != NULL); taosThreadMutexInit(&(pData->mutex), NULL); - pData->pWalHandle = walOpenReader(pData->pWal, NULL); + pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); ASSERT(pData->pWalHandle != NULL); pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a53830723c3f..3117bbf00e70 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -395,7 +395,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { tTrace("success to dispatch conn to work thread"); } else { - tError("fail to dispatch conn to work thread"); + tError("fail to dispatch conn to work thread, code:%s", uv_strerror(status)); } if (!uv_is_closing((uv_handle_t*)req->data)) { uv_close((uv_handle_t*)req->data, uvFreeCb); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index d9e43e432422..2eee04a27a71 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -16,7 +16,7 @@ #include "taoserror.h" #include "walInt.h" -SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { +SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) { SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); if (pReader == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -24,7 +24,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { } pReader->pWal = pWal; - pReader->readerId = tGenIdPI64(); + pReader->readerId = (id != 0)? id:tGenIdPI64(); pReader->pIdxFile = NULL; pReader->pLogFile = NULL; pReader->curVersion = -1; @@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } + while (fetchVer <= appliedVer) { if (walFetchHead(pReader, fetchVer) < 0) { return -1; @@ -257,9 +258,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { bool seeked = false; wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, + ", applied ver:%" PRId64", 0x%"PRIx64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, - pRead->pWal->vers.appliedVer); + pRead->pWal->vers.appliedVer, pRead->readerId); // TODO: valid ver if (ver > pRead->pWal->vers.commitVer) { @@ -297,7 +298,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { code = walValidHeadCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver, + pRead->readerId); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -307,9 +309,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { int32_t walSkipFetchBody(SWalReader *pRead) { wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, + ", applied ver:%" PRId64", 0x%"PRIx64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, - pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); + pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -324,11 +326,13 @@ int32_t walSkipFetchBody(SWalReader *pRead) { int32_t walFetchBody(SWalReader *pRead) { SWalCont *pReadHead = &pRead->pHead->head; int64_t ver = pReadHead->version; + int32_t vgId = pRead->pWal->cfg.vgId; + int64_t id = pRead->readerId; wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, - pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, - pRead->pWal->vers.appliedVer); + ", applied ver:%" PRId64 ", 0x%" PRIx64, + vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer, id); if (pRead->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -344,26 +348,25 @@ int32_t walFetchBody(SWalReader *pRead) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", - pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64, + vgId, pReadHead->version, ver, tstrerror(terrno), id); } else { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", - pRead->pWal->cfg.vgId, pReadHead->version, ver); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64, + vgId, pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } return -1; } if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pReadHead->version, ver); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId, + pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(pRead->pHead) != 0) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, - ver); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 0784db917a47..70d8921be32a 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -326,7 +326,7 @@ TEST_F(WalCleanDeleteEnv, roll) { TEST_F(WalKeepEnv, readHandleRead) { walResetEnv(); int code; - SWalReader* pRead = walOpenReader(pWal, NULL); + SWalReader* pRead = walOpenReader(pWal, NULL, 0); ASSERT(pRead != NULL); int i; @@ -387,7 +387,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ASSERT_EQ(pWal->vers.lastVer, 99); - SWalReader* pRead = walOpenReader(pWal, NULL); + SWalReader* pRead = walOpenReader(pWal, NULL, 0); ASSERT(pRead != NULL); for (int i = 0; i < 1000; i++) { diff --git a/tests/system-test/8-stream/scalar_function.py b/tests/system-test/8-stream/scalar_function.py index 56537e2f5481..3bc44a7dc75f 100644 --- a/tests/system-test/8-stream/scalar_function.py +++ b/tests/system-test/8-stream/scalar_function.py @@ -6,7 +6,8 @@ from util.common import * class TDTestCase: - updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, + 'asynclog': 0} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__)