diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8992845c6404..50a6ae10f25a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -274,6 +274,7 @@ typedef enum ENodeType { QUERY_NODE_TSMA_OPTIONS, QUERY_NODE_ANOMALY_WINDOW, QUERY_NODE_RANGE_AROUND, + QUERY_NODE_VIRTUAL_TABLE, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, @@ -439,6 +440,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, + QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN, // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, @@ -494,6 +496,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, + QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, } ENodeType; typedef struct { @@ -598,8 +601,8 @@ int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema); typedef struct { bool hasRef; col_id_t id; - char *refTableName; - char *refColName; + char refTableName[TSDB_TABLE_NAME_LEN]; + char refColName[TSDB_COL_NAME_LEN]; } SColRef; typedef struct { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3e95f1e28667..c6647ec0e27a 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -162,6 +162,18 @@ typedef struct SJoinLogicNode { SNode* pRightOnCond; // table onCond filter } SJoinLogicNode; +typedef struct SVirtualScanLogicNode { + SLogicNode node; + SNodeList* pScanCols; + SNodeList* pScanPseudoCols; + int8_t tableType; + uint64_t tableId; + uint64_t stableId; + SVgroupsInfo* pVgroupList; + EScanType scanType; + SName tableName; +} SVirtualScanLogicNode; + typedef struct SAggLogicNode { SLogicNode node; SNodeList* pGroupKeys; @@ -455,6 +467,11 @@ typedef struct STagScanPhysiNode { typedef SScanPhysiNode SBlockDistScanPhysiNode; +typedef struct SVirtualScanPhysiNode { + SScanPhysiNode scan; + SNodeList* pTargets; +}SVirtualScanPhysiNode; + typedef struct SLastRowScanPhysiNode { SScanPhysiNode scan; SNodeList* pGroupTags; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c10feddd591a..15eab2fc2882 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -93,6 +93,9 @@ typedef struct SColumnNode { bool isPk; int32_t projRefIdx; int32_t resIdx; + bool hasRef; + char refColName[TSDB_COL_NAME_LEN]; + char refTableName[TSDB_TABLE_NAME_LEN]; } SColumnNode; typedef struct SColumnRefNode { @@ -235,6 +238,14 @@ typedef struct STempTableNode { SNode* pSubquery; } STempTableNode; +typedef struct SVirtualTableNode { + STableNode table; // QUERY_NODE_VIRTUAL_TABLE + struct STableMeta* pMeta; + SVgroupsInfo* pVgroupList; + char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES + SNodeList* refTables; +} SVirtualTableNode; + typedef struct SViewNode { STableNode table; // QUERY_NODE_REAL_TABLE struct STableMeta* pMeta; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index f6fdef0ffc1a..d755dc57e1e9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -909,7 +909,6 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_INVALID_VGID_LIST TAOS_DEF_ERROR_CODE(0, 0x2686) #define TSDB_CODE_PAR_INVALID_REF_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2687) #define TSDB_CODE_PAR_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2688) -#define TSDB_CODE_PAR_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2688) #define TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2689) #define TSDB_CODE_PAR_COLUMN_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x268A) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) @@ -1043,6 +1042,10 @@ int32_t taosGetErrSize(); #define TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD TAOS_DEF_ERROR_CODE(0, 0x6101) #define TSDB_CODE_AUDIT_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0x6102) +// VTABLE +#define TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN TAOS_DEF_ERROR_CODE(0, 0x6200) +#define TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM TAOS_DEF_ERROR_CODE(0, 0x6201) +#define TSDB_CODE_VTABLE_PRIMTS_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x6201) #ifdef __cplusplus } #endif diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index ef8ad1058579..71b6c27ba600 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -6040,6 +6040,13 @@ static int32_t tEncodeSTableMetaRsp(SEncoder *pEncoder, STableMetaRsp *pRsp) { } } + if (hasRefCol(pRsp->tableType)) { + for (int32_t i = 0; i < pRsp->numOfColumns; ++i) { + SColRef *pColRef = &pRsp->pColRefs[i]; + TAOS_CHECK_RETURN(tEncodeSColRef(pEncoder, pColRef)); + } + } + return 0; } @@ -6088,7 +6095,21 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) { pRsp->pSchemaExt = NULL; } } - pRsp->pColRefs = NULL; + if (!tDecodeIsEnd(pDecoder)) { + if (hasRefCol(pRsp->tableType) && pRsp->numOfColumns > 0) { + pRsp->pColRefs = taosMemoryMalloc(sizeof(SColRef) * pRsp->numOfColumns); + if (pRsp->pColRefs == NULL) { + TAOS_CHECK_RETURN(terrno); + } + + for (int32_t i = 0; i < pRsp->numOfColumns; ++i) { + SColRef *pColRef = &pRsp->pColRefs[i]; + TAOS_CHECK_RETURN(tDecodeSColRef(pDecoder, pColRef)); + } + } else { + pRsp->pColRefs = NULL; + } + } return 0; } @@ -10265,8 +10286,8 @@ int32_t tDecodeSColRefWrapperEx(SDecoder *pDecoder, SColRefWrapper *pWrapper) { TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&p->hasRef)); if (p->hasRef) { TAOS_CHECK_EXIT(tDecodeI16v(pDecoder, &p->id)); - TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &p->refColName)); - TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &p->refTableName)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->refColName)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->refTableName)); } } diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 162e7355a136..9351d9ab8413 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -52,8 +52,8 @@ int meteDecodeColRefEntry(SDecoder *pDecoder, SMetaEntry *pME) { TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&p->hasRef)); if (p->hasRef) { TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &p->id)); - TAOS_CHECK_RETURN(tDecodeCStr(pDecoder, &p->refTableName)); - TAOS_CHECK_RETURN(tDecodeCStr(pDecoder, &p->refColName)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, p->refTableName)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, p->refColName)); } } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index c0f335ee49d3..7ade5cc39e3f 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -49,6 +49,27 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol return 0; } +int32_t fillTableColRef(SMetaReader *reader, SColRef *pRef, int32_t numOfCol) { + int8_t tblType = reader->me.type; + if (hasRefCol(tblType)) { + SColRefWrapper *p = &(reader->me.colRef); + if (numOfCol != p->nCols) { + vError("fillTableColRef table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols); + return TSDB_CODE_APP_ERROR; + } + for (int i = 0; i < p->nCols; i++) { + SColRef *pColRef = &p->pColRef[i]; + pRef[i].hasRef = pColRef->hasRef; + if(pRef[i].hasRef) { + pRef[i].id = pColRef->id; + tstrncpy(pRef[i].refTableName, pColRef->refTableName, TSDB_TABLE_NAME_LEN); + tstrncpy(pRef[i].refColName, pColRef->refColName, TSDB_COL_NAME_LEN); + } + } + } + return 0; +} + int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { STableInfoReq infoReq = {0}; STableMetaRsp metaRsp = {0}; @@ -104,24 +125,34 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { metaRsp.vgId = TD_VID(pVnode); metaRsp.tuid = mer1.me.uid; - if (mer1.me.type == TSDB_SUPER_TABLE) { - tstrncpy(metaRsp.stbName, mer1.me.name, TSDB_TABLE_NAME_LEN); - schema = mer1.me.stbEntry.schemaRow; - schemaTag = mer1.me.stbEntry.schemaTag; - metaRsp.suid = mer1.me.uid; - } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); - if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2; + switch (mer1.me.type) { + case TSDB_SUPER_TABLE: { + (void)strcpy(metaRsp.stbName, mer1.me.name); + schema = mer1.me.stbEntry.schemaRow; + schemaTag = mer1.me.stbEntry.schemaTag; + metaRsp.suid = mer1.me.uid; + break; + } + case TSDB_CHILD_TABLE: + case TSDB_VIRTUAL_CHILD_TABLE:{ + metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); + if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2; - tstrncpy(metaRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN); - metaRsp.suid = mer2.me.uid; - schema = mer2.me.stbEntry.schemaRow; - schemaTag = mer2.me.stbEntry.schemaTag; - } else if (mer1.me.type == TSDB_NORMAL_TABLE) { - schema = mer1.me.ntbEntry.schemaRow; - } else { - vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type); - goto _exit3; + (void)strcpy(metaRsp.stbName, mer2.me.name); + metaRsp.suid = mer2.me.uid; + schema = mer2.me.stbEntry.schemaRow; + schemaTag = mer2.me.stbEntry.schemaTag; + break; + } + case TSDB_NORMAL_TABLE: + case TSDB_VIRTUAL_TABLE: { + schema = mer1.me.ntbEntry.schemaRow; + break; + } + default: { + vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type); + goto _exit3; + } } metaRsp.numOfTags = schemaTag.nCols; @@ -149,6 +180,13 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + metaRsp.pColRefs = (SColRef*)taosMemoryMalloc(sizeof(SColRef) * metaRsp.numOfColumns); + if (metaRsp.pColRefs) { + code = fillTableColRef(&mer1, metaRsp.pColRefs, metaRsp.numOfColumns); + if (code < 0) { + goto _exit; + } + } // encode and send response rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index ef5da66bfe5d..22cfe4210a67 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -538,31 +538,41 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt ctx->tbInfo.suid = tbMeta->suid; ctx->tbInfo.tbType = tbMeta->tableType; - if (tbMeta->tableType != TSDB_CHILD_TABLE) { + if (tbMeta->tableType != TSDB_CHILD_TABLE && tbMeta->tableType != TSDB_VIRTUAL_CHILD_TABLE) { int32_t schemaExtSize = 0; + int32_t colRefSize = 0; int32_t metaSize = CTG_META_SIZE(tbMeta); - if (tbMeta->schemaExt != NULL) { + if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) { schemaExtSize = tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); } - *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize); + if (hasRefCol(tbMeta->tableType) && tbMeta->schemaExt != NULL) { + colRefSize += tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); + } + *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + colRefSize); if (NULL == *pTableMeta) { CTG_ERR_RET(terrno); } TAOS_MEMCPY(*pTableMeta, tbMeta, metaSize); - if (tbMeta->schemaExt != NULL) { + if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) { (*pTableMeta)->schemaExt = (SSchemaExt *)((char *)*pTableMeta + metaSize); TAOS_MEMCPY((*pTableMeta)->schemaExt, tbMeta->schemaExt, schemaExtSize); } else { (*pTableMeta)->schemaExt = NULL; } + if (hasRefCol(tbMeta->tableType) && tbMeta->schemaExt != NULL) { + (*pTableMeta)->colRef = (SColRef *)((char *)*pTableMeta + metaSize + schemaExtSize); + TAOS_MEMCPY((*pTableMeta)->colRef, tbMeta->colRef, colRefSize); + } else { + (*pTableMeta)->colRef = NULL; + } ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); return TSDB_CODE_SUCCESS; } // PROCESS FOR CHILD TABLE - + // TODO(smj): add virtual child table process int32_t metaSize = sizeof(SCTableMeta); *pTableMeta = taosMemoryCalloc(1, metaSize); if (NULL == *pTableMeta) { @@ -3493,26 +3503,36 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe SMetaRes res = {0}; STableMeta *pTableMeta = NULL; - if (tbMeta->tableType != TSDB_CHILD_TABLE) { + if (tbMeta->tableType != TSDB_CHILD_TABLE && tbMeta->tableType != TSDB_VIRTUAL_CHILD_TABLE) { int32_t schemaExtSize = 0; + int32_t colRefSize = 0; int32_t metaSize = CTG_META_SIZE(tbMeta); - if (tbMeta->schemaExt != NULL) { + if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) { schemaExtSize = tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); } + if (hasRefCol(tbMeta->tableType) && tbMeta->colRef) { + colRefSize = tbMeta->tableInfo.numOfColumns * sizeof(SColRef); + } - pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize); + pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + colRefSize); if (NULL == pTableMeta) { ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(terrno); } TAOS_MEMCPY(pTableMeta, tbMeta, metaSize); - if (tbMeta->schemaExt != NULL) { + if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) { pTableMeta->schemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize); TAOS_MEMCPY(pTableMeta->schemaExt, tbMeta->schemaExt, schemaExtSize); } else { pTableMeta->schemaExt = NULL; } + if (hasRefCol(tbMeta->tableType) && tbMeta->colRef) { + pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize); + TAOS_MEMCPY(pTableMeta->colRef, tbMeta->colRef, colRefSize); + } else { + pTableMeta->colRef = NULL; + } CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); @@ -3529,6 +3549,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe // PROCESS FOR CHILD TABLE + // TODO(smj): add virtual child table process if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) { code = cloneTableMeta(lastTableMeta, &pTableMeta); if (code) { @@ -3646,7 +3667,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe } TAOS_MEMCPY(&pTableMeta->sversion, &stbMeta->sversion, metaSize + schemaExtSize - sizeof(SCTableMeta)); - if (stbMeta->schemaExt != NULL) { + if (useCompress(stbMeta->tableType) && stbMeta->schemaExt != NULL) { pTableMeta->schemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize); } else { pTableMeta->schemaExt = NULL; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 5ab19c4419d3..a037ee3cd06b 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -1650,11 +1650,15 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput) if (output->tbMeta) { int32_t metaSize = CTG_META_SIZE(output->tbMeta); int32_t schemaExtSize = 0; + int32_t colRefSize = 0; if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) { schemaExtSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); } + if (hasRefCol(output->tbMeta->tableType) && (*pOutput)->tbMeta->colRef) { + colRefSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SColRef); + } - (*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize); + (*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize); qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta); if (NULL == (*pOutput)->tbMeta) { qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); @@ -1669,6 +1673,12 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput) } else { (*pOutput)->tbMeta->schemaExt = NULL; } + if (hasRefCol(output->tbMeta->tableType) && (*pOutput)->tbMeta->colRef) { + (*pOutput)->tbMeta->colRef = (SColRef*)((char*)(*pOutput)->tbMeta + metaSize + schemaExtSize); + TAOS_MEMCPY((*pOutput)->tbMeta->colRef, output->tbMeta->colRef, colRefSize); + } else { + (*pOutput)->tbMeta->colRef = NULL; + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 4fee909d9404..c1b7785774cc 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -219,6 +219,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, COL_DATA_SET_VAL_AND_CHECK(pCol5, pBlock->info.rows, buf, false); } } + if (hasRefCol(pMeta->tableType) && pMeta->colRef) { + //TODO(smj) : add ref column info + } fillTagCol = 0; @@ -265,6 +268,9 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** } else { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); } + if (pDesc->pMeta && hasRefCol(pDesc->pMeta->tableType) && pDesc->pMeta->colRef) { + //TODO(smj) : add ref column info + } } (void)blockDataDestroy(pBlock); return code; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 05eb3df0c4fa..4ae591413eb2 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -170,6 +170,7 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** ppOptInfo); +int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SVirtualScanPhysiNode * pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo); // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 6f6384af5b3d..1a254daca2e2 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -166,6 +166,14 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colId, void** pVal); */ uint64_t tsortGetGroupId(STupleHandle* pVHandle); void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pInfo); + +/** + * return the number of columns in the tuple + * @param pVHandle + * @return + */ +size_t tsortGetColNum(STupleHandle* pVHandle); + /** * * @param pSortHandle diff --git a/source/libs/executor/inc/virtualtablescan.h b/source/libs/executor/inc/virtualtablescan.h new file mode 100644 index 000000000000..c92a9fdd18c5 --- /dev/null +++ b/source/libs/executor/inc/virtualtablescan.h @@ -0,0 +1,47 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TDENGINE_VIRTUALTABLESCAN_H +#define TDENGINE_VIRTUALTABLESCAN_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "executorInt.h" +#include "operator.h" + +#define VTS_ERR_RET(c) \ + do { \ + int32_t _code = (c); \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) + +#define VTS_ERR_JRET(c) \ + do { \ + code = (c); \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VIRTUALTABLESCAN_H diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 5d25a81f6fcb..286661808179 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -633,6 +633,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) { code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); + } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) { + code = createVirtualTableMergeOperatorInfo(ops, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr); } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 17c390e239c3..d482f7d16992 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2929,6 +2929,7 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) { } } +size_t tsortGetColNum(STupleHandle* pVHandle) { return blockDataGetNumOfCols(pVHandle->pBlock); } uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; } void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; } diff --git a/source/libs/executor/src/virtualtablescanoperator.c b/source/libs/executor/src/virtualtablescanoperator.c new file mode 100644 index 000000000000..839d9502dd88 --- /dev/null +++ b/source/libs/executor/src/virtualtablescanoperator.c @@ -0,0 +1,341 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "executorInt.h" +#include "filter.h" +#include "operator.h" +#include "querytask.h" +#include "tdatablock.h" +#include "virtualtablescan.h" +#include "tsort.h" + +typedef struct SVirtualTableScanInfo { + SArray* pSortInfo; + SSortHandle* pSortHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SSDataBlock* pIntermediateBlock; // to hold the intermediate result + SSDataBlock* pInputBlock; + SHashObj* dataSlotMap; + int32_t tsSlotId; +} SVirtualTableScanInfo; + +typedef struct SVirtualScanMergeOperatorInfo { + SOptrBasicInfo binfo; + EMergeType type; + SVirtualTableScanInfo virtualScanInfo; + SLimitInfo limitInfo; + bool ignoreGroupId; + uint64_t groupId; +} SVirtualScanMergeOperatorInfo; + +int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + int32_t code = TSDB_CODE_SUCCESS; + + VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock)); + VTS_ERR_JRET(blockDataCheck(*ppBlock)); + + return code; +_return: + qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code)); + return code; +} + +int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) { + SVirtualScanMergeOperatorInfo * pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SVirtualTableScanInfo* pSortMergeInfo = &pInfo->virtualScanInfo; + + int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize; + + pSortMergeInfo->pSortHandle = NULL; + VTS_ERR_RET(tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize, + numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle)); + + tsortSetForceUsePQSort(pSortMergeInfo->pSortHandle); + tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL); + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + SOperatorInfo* pDownstream = pOperator->pDownstream[i]; + if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + VTS_ERR_RET(pDownstream->fpSet._openFn(pDownstream)); + } else { + VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM); + } + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + if (ps == NULL) { + return terrno; + } + + ps->param = pDownstream; + ps->onlyRef = true; + + VTS_ERR_RET(tsortAddSource(pSortMergeInfo->pSortHandle, ps)); + } + + return tsortOpen(pSortMergeInfo->pSortHandle); +} + +int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) { + int32_t code = 0; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + int64_t startTs = taosGetTimestampUs(); + + code = openVirtualTableScanOperatorImpl(pOperator); + + pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0; + pOperator->status = OP_RES_TO_RETURN; + + VTS_ERR_RET(code); + + OPTR_SET_OPENED(pOperator); + return code; +} + +static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, + SSDataBlock* p) { + int32_t code = 0; + int64_t lastTs = 0; + int64_t rowNums = -1; + while (1) { + STupleHandle* pTupleHandle = NULL; + code = tsortNextTuple(pHandle, &pTupleHandle); + + SSDataBlock *tmpblock = NULL; + tsortGetSortedDataBlock(pHandle, &tmpblock); + if (pTupleHandle == NULL || (code != 0)) { + break; + } + SDataBlockInfo info = {0}; + tsortGetBlockInfo(pTupleHandle, &info); + int32_t blockId = (int32_t)info.id.blockId; + + for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) { + bool isNull = tsortIsNullVal(pTupleHandle, i); + if (isNull) { + colDataSetNULL(taosArrayGet(p->pDataBlock, i), rowNums); + } else { + char* pData = NULL; + tsortGetValue(pTupleHandle, i, (void**)&pData); + + if (pData != NULL) { + if (i == 0) { + if (lastTs != *(int64_t*)pData) { + rowNums++; + for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) { + colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums); + } + if (pInfo->virtualScanInfo.tsSlotId != -1) { + VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false)); + } + lastTs = *(int64_t*)pData; + } + continue; + } + int32_t slotKey = blockId << 16 | i; + void *slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey)); + if (slotId == NULL) { + VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN); + } + VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false)); + } + + if (rowNums >= capacity) { + break; + } + } + } + } + p->info.rows = rowNums + 1; + p->info.dataLoad = 1; + p->info.scanFlag = MAIN_SCAN; + return code; +} + +int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { + int32_t code = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SVirtualScanMergeOperatorInfo* pInfo = pOperator->info; + SVirtualTableScanInfo* pSortMergeInfo = &pInfo->virtualScanInfo; + SSortHandle* pHandle = pSortMergeInfo->pSortHandle; + SSDataBlock* pDataBlock = pInfo->binfo.pRes; + int32_t capacity = pOperator->resultInfo.capacity; + + qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); + blockDataCleanup(pDataBlock); + + if (pSortMergeInfo->pIntermediateBlock == NULL) { + pSortMergeInfo->pIntermediateBlock = NULL; + VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock)); + if (pSortMergeInfo->pIntermediateBlock == NULL) { + return TSDB_CODE_SUCCESS; + } + + VTS_ERR_RET(blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity)); + } else { + blockDataCleanup(pSortMergeInfo->pIntermediateBlock); + } + + SSDataBlock* p = pSortMergeInfo->pIntermediateBlock; + VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p)); + + VTS_ERR_RET(copyDataBlock(pDataBlock, p)); + qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, + pDataBlock->info.rows); + + *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL; + return code; +} + +int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { + VTS_ERR_RET(openVirtualTableScanOperator(pOperator)); + VTS_ERR_RET(doVirtualTableMerge(pOperator, pResBlock)); + return TSDB_CODE_SUCCESS; +} + +void destroyVirtualTableScanOperatorInfo(void* param) { + SVirtualScanMergeOperatorInfo* pInfo = (SVirtualScanMergeOperatorInfo*)param; + blockDataDestroy(pInfo->binfo.pRes); + pInfo->binfo.pRes = NULL; + + taosMemoryFreeClear(param); +} + +int32_t getVirtualTableScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + int32_t code = 0; + return code; +} + +int32_t makeTSMergeKey(SNodeList** pMergeKeys) { + int32_t code = TSDB_CODE_SUCCESS; + SNodeList *pNodeList = NULL; + SColumnNode *pColumnNode = NULL; + SOrderByExprNode *pOrderByExprNode = NULL; + + VTS_ERR_JRET(nodesMakeList(&pNodeList)); + + VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode)); + pColumnNode->slotId = 0; + + VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode)); + pOrderByExprNode->pExpr = (SNode*)pColumnNode; + pOrderByExprNode->order = ORDER_ASC; + pOrderByExprNode->nullOrder = NULL_ORDER_FIRST; + + VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode)); + + *pMergeKeys = pNodeList; + return code; +_return: + nodesDestroyNode((SNode*)pColumnNode); + nodesDestroyNode((SNode*)pOrderByExprNode); + nodesDestroyList(pNodeList); + return code; +} + +int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId) { + size_t numOfCols = LIST_LENGTH(pNodeList); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + *tsSlotId = -1; + + *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i); + TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno); + + if (pColNode->isPrimTs) { + *tsSlotId = i; + } else if (pColNode->hasRef) { + int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId; + VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i))); + } + } + + return code; +_return: + taosHashCleanup(*pSlotMap); + *pSlotMap = NULL; + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SVirtualScanPhysiNode* pVirtualScanPhyNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + SPhysiNode* pPhyNode = (SPhysiNode*)pVirtualScanPhyNode; + int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; + if (pInfo == NULL || pOperator == NULL) { + VTS_ERR_JRET(terrno); + } + + pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder; + pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder; + + SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo; + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno); + + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno); + pVirtualScanInfo->pInputBlock = pInputBlock; + + initResultSizeInfo(&pOperator->resultInfo, 1024); + TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return); + + size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + SNodeList *pMergeKeys = NULL; + + TSDB_CHECK_CODE(makeTSMergeKey(&pMergeKeys), lino, _return); + pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys); + pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); + pVirtualScanInfo->sortBufSize = + pVirtualScanInfo->bufPageSize * (numOfDownstream + 1); // one additional is reserved for merged result. + VTS_ERR_JRET(extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId)); + + setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false, + OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo, + optrDefaultBufFn, getVirtualTableScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); + + VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); + + *pOptrInfo = pOperator; + return TSDB_CODE_SUCCESS; + +_return: + if (pInfo != NULL) { + destroyVirtualTableScanOperatorInfo(pInfo); + } + pTaskInfo->code = code; + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); + return code; +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 22f6dc741845..0d91b25dcf98 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -126,6 +126,9 @@ static int32_t columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) { COPY_SCALAR_FIELD(numOfPKs); COPY_SCALAR_FIELD(projRefIdx); COPY_SCALAR_FIELD(resIdx); + COPY_SCALAR_FIELD(hasRef); + COPY_CHAR_ARRAY_FIELD(refColName); + COPY_CHAR_ARRAY_FIELD(refTableName); return TSDB_CODE_SUCCESS; } @@ -445,6 +448,14 @@ static int32_t windowOffsetCopy(const SWindowOffsetNode* pSrc, SWindowOffsetNode return TSDB_CODE_SUCCESS; } +static int32_t virtualTableNodeCopy(const SVirtualTableNode * pSrc, SVirtualTableNode* pDst) { + COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); + CLONE_OBJECT_FIELD(pMeta, tableMetaClone); + CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); + COPY_CHAR_ARRAY_FIELD(qualDbName); + CLONE_NODE_LIST_FIELD(refTables); + return TSDB_CODE_SUCCESS; +} static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pTargets); @@ -510,6 +521,19 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { return TSDB_CODE_SUCCESS; } +static int32_t logicVirtualScanCopy(const SVirtualScanLogicNode * pSrc, SVirtualScanLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pScanCols); + CLONE_NODE_LIST_FIELD(pScanPseudoCols); + COPY_SCALAR_FIELD(tableType); + COPY_SCALAR_FIELD(tableId); + COPY_SCALAR_FIELD(stableId); + CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); + COPY_SCALAR_FIELD(scanType); + COPY_OBJECT_FIELD(tableName, sizeof(SName)); + return TSDB_CODE_SUCCESS; +} + static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); @@ -749,6 +773,12 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) { return TSDB_CODE_SUCCESS; } +static int32_t physiVirtualTableScanCopy(const SVirtualScanPhysiNode* pSrc, SVirtualScanPhysiNode* pDst) { + COPY_BASE_OBJECT_FIELD(scan, physiScanCopy); + CLONE_NODE_LIST_FIELD(pTargets); + return TSDB_CODE_SUCCESS; +} + static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) { COPY_BASE_OBJECT_FIELD(scan, physiScanCopy); COPY_SCALAR_FIELD(onlyMetaCtbIdx); @@ -1042,6 +1072,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_WINDOW_OFFSET: code = windowOffsetCopy((const SWindowOffsetNode*)pNode, (SWindowOffsetNode*)pDst); break; + case QUERY_NODE_VIRTUAL_TABLE: + code = virtualTableNodeCopy((const SVirtualTableNode*)pNode, (SVirtualTableNode*)pDst); + break; case QUERY_NODE_SET_OPERATOR: code = setOperatorCopy((const SSetOperator*)pNode, (SSetOperator*)pDst); break; @@ -1096,6 +1129,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: code = logicDynQueryCtrlCopy((const SDynQueryCtrlLogicNode*)pNode, (SDynQueryCtrlLogicNode*)pDst); break; + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + code = logicVirtualScanCopy((const SVirtualScanLogicNode *)pNode, (SVirtualScanLogicNode*)pDst); + break; case QUERY_NODE_LOGIC_SUBPLAN: code = logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); break; @@ -1126,6 +1162,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: code = physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst); break; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + code = physiVirtualTableScanCopy((const SVirtualScanPhysiNode*)pNode, (SVirtualScanPhysiNode*)pDst); + break; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst); break; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b00c6325c46a..890bb511c036 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -41,6 +41,8 @@ const char* nodesNodeName(ENodeType type) { return "Function"; case QUERY_NODE_REAL_TABLE: return "RealTable"; + case QUERY_NODE_VIRTUAL_TABLE: + return "VirtualTable"; case QUERY_NODE_TEMP_TABLE: return "TempTable"; case QUERY_NODE_JOIN_TABLE: @@ -365,6 +367,8 @@ const char* nodesNodeName(ENodeType type) { return "LogicGroupCache"; case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: return "LogicDynamicQueryCtrl"; + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + return "LogicVirtualTableScan"; case QUERY_NODE_LOGIC_SUBPLAN: return "LogicSubplan"; case QUERY_NODE_LOGIC_PLAN: @@ -466,6 +470,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiGroupCache"; case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: return "PhysiDynamicQueryCtrl"; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + return "PhysiVirtualTableScan"; case QUERY_NODE_PHYSICAL_SUBPLAN: return "PhysiSubplan"; case QUERY_NODE_PHYSICAL_PLAN: @@ -1849,6 +1855,65 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkVirtualTableScanLogicPlanScanCols = "ScanCols"; +static const char* jkVirtualTableScanLogicPlanScanPseudoCols = "ScanPseudoCols"; +static const char* jkVirtualTableScanLogicPlanTableType = "TableType"; +static const char* jkVirtualTableScanLogicPlanTableId = "TableId"; +static const char* jkVirtualTableScanLogicPlanStableId = "StableId"; +static const char* jkVirtualTableScanLogicPlanScanType = "ScanType"; + +static int32_t logicVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { + const SVirtualScanLogicNode* pNode = (const SVirtualScanLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtualTableScanLogicPlanScanCols, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtualTableScanLogicPlanScanPseudoCols, pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanLogicPlanTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanLogicPlanTableId, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanLogicPlanStableId, pNode->stableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableScanLogicPlanScanType, pNode->scanType); + } + return code; +} + +static int32_t jsonToLogicVirtualTableScanNode(const SJson* pJson, void* pObj) { + SVirtualScanLogicNode* pNode = (SVirtualScanLogicNode *)pObj; + + int32_t objSize = 0; + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtualTableScanLogicPlanScanCols, &pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtualTableScanLogicPlanScanPseudoCols, &pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkVirtualTableScanLogicPlanTableType, &pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkVirtualTableScanLogicPlanTableId, &pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkVirtualTableScanLogicPlanStableId, &pNode->stableId); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkVirtualTableScanLogicPlanScanType, pNode->scanType, code); + } + return code; +} + + static const char* jkPhysiPlanOutputDataBlockDesc = "OutputDataBlockDesc"; static const char* jkPhysiPlanConditions = "Conditions"; static const char* jkPhysiPlanChildren = "Children"; @@ -2274,6 +2339,31 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkVirtualTableScanPhysiPlanTargets = "Targets"; + +static int32_t physiVirtualTableScanNodeToJson(const void* pObj, SJson* pJson) { + const SVirtualScanPhysiNode* pNode = (const SVirtualScanPhysiNode*)pObj; + + int32_t code = physiScanNodeToJson(pObj, pJson); + + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtualTableScanPhysiPlanTargets, pNode->pTargets); + } + + return code; +} + +static int32_t jsonToPhysiVirtualTableScanNode(const SJson* pJson, void* pObj) { + SVirtualScanPhysiNode* pNode = (SVirtualScanPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtualTableScanPhysiPlanTargets, &pNode->pTargets); + } + + return code; +} + static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; static const char* jkSysTableScanPhysiPlanAccountId = "AccountId"; @@ -4884,6 +4974,61 @@ static int32_t jsonToJoinTableNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkVirtualTableMetaSize = "MetaSize"; +static const char* jkVirtuaTableMeta = "Meta"; +static const char* jkVirtuaTableVgroupsInfoSize = "VgroupsInfoSize"; +static const char* jkVirtuaTableVgroupsInfo = "VgroupsInfo"; +static const char* jkVirtuaTableRefTables = "RefTables"; + +static int32_t virtualTableNodeToJson(const void* pObj, SJson* pJson) { + const SVirtualTableNode* pNode = (const SVirtualTableNode*)pObj; + + int32_t code = tableNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtualTableMetaSize, TABLE_META_SIZE(pNode->pMeta)); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkVirtuaTableMeta, tableMetaToJson, pNode->pMeta); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVirtuaTableVgroupsInfoSize, VGROUPS_INFO_SIZE(pNode->pVgroupList)); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkVirtuaTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkVirtuaTableRefTables, pNode->refTables); + } + + + return code; +} + +static int32_t jsonToVirtualTableNode(const SJson* pJson, void* pObj) { + SVirtualTableNode* pNode = (SVirtualTableNode*)pObj; + + int32_t objSize = 0; + int32_t code = jsonToTableNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkVirtualTableMetaSize, &objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonMakeObject(pJson, jkVirtuaTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkVirtuaTableVgroupsInfoSize, &objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonMakeObject(pJson, jkVirtuaTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkVirtuaTableRefTables, &pNode->refTables); + } + + return code; +} + + static const char* jkGroupingSetType = "GroupingSetType"; static const char* jkGroupingSetParameter = "Parameters"; @@ -8138,6 +8283,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return tempTableNodeToJson(pObj, pJson); case QUERY_NODE_JOIN_TABLE: return joinTableNodeToJson(pObj, pJson); + case QUERY_NODE_VIRTUAL_TABLE: + return virtualTableNodeToJson(pObj, pJson); case QUERY_NODE_GROUPING_SET: return groupingSetNodeToJson(pObj, pJson); case QUERY_NODE_ORDER_BY_EXPR: @@ -8385,6 +8532,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicScanNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_JOIN: return logicJoinNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + return logicVirtualTableScanNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_AGG: return logicAggNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_PROJECT: @@ -8428,6 +8577,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return physiTableScanNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + return physiVirtualTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return physiSysTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: @@ -8523,6 +8674,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToTempTableNode(pJson, pObj); case QUERY_NODE_JOIN_TABLE: return jsonToJoinTableNode(pJson, pObj); + case QUERY_NODE_VIRTUAL_TABLE: + return jsonToVirtualTableNode(pJson, pObj); case QUERY_NODE_GROUPING_SET: return jsonToGroupingSetNode(pJson, pObj); case QUERY_NODE_ORDER_BY_EXPR: @@ -8770,6 +8923,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicScanNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_JOIN: return jsonToLogicJoinNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + return jsonToLogicVirtualTableScanNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_AGG: return jsonToLogicAggNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_PROJECT: @@ -8813,6 +8968,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + return jsonToPhysiVirtualTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return jsonToPhysiSysTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: diff --git a/source/libs/nodes/src/nodesEqualFuncs.c b/source/libs/nodes/src/nodesEqualFuncs.c index 891843761a03..eb3a621b8b9e 100644 --- a/source/libs/nodes/src/nodesEqualFuncs.c +++ b/source/libs/nodes/src/nodesEqualFuncs.c @@ -190,6 +190,7 @@ bool nodesEqualNode(const SNode* a, const SNode* b) { case QUERY_NODE_GROUPING_SET: return groupingSetNodeEqual((const SGroupingSetNode*)a, (const SGroupingSetNode*)b); case QUERY_NODE_REAL_TABLE: + case QUERY_NODE_VIRTUAL_TABLE: case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_JOIN_TABLE: case QUERY_NODE_ORDER_BY_EXPR: diff --git a/source/libs/nodes/src/nodesMatchFuncs.c b/source/libs/nodes/src/nodesMatchFuncs.c index 401c7aad283c..0db04d6ee956 100755 --- a/source/libs/nodes/src/nodesMatchFuncs.c +++ b/source/libs/nodes/src/nodesMatchFuncs.c @@ -168,6 +168,7 @@ bool nodesMatchNode(const SNode* pSub, const SNode* p) { case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_JOIN_TABLE: + case QUERY_NODE_VIRTUAL_TABLE: case QUERY_NODE_GROUPING_SET: case QUERY_NODE_ORDER_BY_EXPR: case QUERY_NODE_LIMIT: diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 930a88aea089..91e1fe53da8c 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -733,7 +733,19 @@ static int32_t columnNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) { } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueI16(pEncoder, pNode->numOfPKs); - } + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->isPrimTs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->hasRef); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueCStr(pEncoder, pNode->refColName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueCStr(pEncoder, pNode->refTableName); + } return code; } @@ -783,7 +795,20 @@ static int32_t msgToColumnNodeInline(STlvDecoder* pDecoder, void* pObj) { } if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueI16(pDecoder, &pNode->numOfPKs); - } + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->isPrimTs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->hasRef); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueCStr(pDecoder, pNode->refColName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueCStr(pDecoder, pNode->refTableName); + } + return code; } @@ -2107,6 +2132,43 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_VIRTUAL_TABLE_SCAN_CODE_SCAN = 1, + PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, +}; + +static int32_t physiVirtualTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SVirtualScanPhysiNode* pNode = (const SVirtualScanPhysiNode *)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + return code; +} + +static int32_t msgToPhysiVirtualTableScanNode(STlvDecoder* pDecoder, void* pObj) { + SVirtualScanPhysiNode* pNode = (SVirtualScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_VIRTUAL_TABLE_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); + break; + case PHY_VIRTUAL_TABLE_SCAN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + default: + break; + } + } + + return code; +} + enum { PHY_TAG_SCAN_CODE_SCAN = 1, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX @@ -4686,6 +4748,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: code = physiDynQueryCtrlNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + code = physiVirtualTableScanNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_SUBPLAN: code = subplanToMsg(pObj, pEncoder); break; @@ -4855,6 +4920,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: code = msgToPhysiDynQueryCtrlNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + code = msgToPhysiVirtualTableScanNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_SUBPLAN: code = msgToSubplan(pDecoder, pObj); break; diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index f3f7395a3786..0e8a27d60c4e 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -78,6 +78,7 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa break; case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: + case QUERY_NODE_VIRTUAL_TABLE: break; // todo case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; @@ -289,6 +290,7 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit } case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: + case QUERY_NODE_VIRTUAL_TABLE: break; // todo case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 18703f3c4ed3..18f8571738ce 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -371,6 +371,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_REAL_TABLE: code = makeNode(type, sizeof(SRealTableNode), &pNode); break; + case QUERY_NODE_VIRTUAL_TABLE: + code = makeNode(type, sizeof(SVirtualTableNode), &pNode); + break; case QUERY_NODE_TEMP_TABLE: code = makeNode(type, sizeof(STempTableNode), &pNode); break; @@ -816,6 +819,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: code = makeNode(type, sizeof(SDynQueryCtrlLogicNode), &pNode); break; + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + code = makeNode(type, sizeof(SVirtualScanLogicNode), &pNode); + break; case QUERY_NODE_LOGIC_SUBPLAN: code = makeNode(type, sizeof(SLogicSubplan), &pNode); break; @@ -973,6 +979,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: code = makeNode(type, sizeof(SStreamInterpFuncPhysiNode), &pNode); break; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: + code = makeNode(type, sizeof(SVirtualScanPhysiNode), &pNode); + break; default: break; } @@ -1103,6 +1112,13 @@ void nodesDestroyNode(SNode* pNode) { taosArrayDestroy(pReal->tsmaTargetTbInfo); break; } + case QUERY_NODE_VIRTUAL_TABLE: { + SVirtualTableNode *pVirtual = (SVirtualTableNode*)pNode; + taosMemoryFreeClear(pVirtual->pMeta); + taosMemoryFreeClear(pVirtual->pVgroupList); + nodesDestroyList(pVirtual->refTables); + break; + } case QUERY_NODE_TEMP_TABLE: nodesDestroyNode(((STempTableNode*)pNode)->pSubquery); break; @@ -1708,6 +1724,14 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pRightOnCond); break; } + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: { + SVirtualScanLogicNode * pLogicNode = (SVirtualScanLogicNode*)pNode; + destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyList(pLogicNode->pScanCols); + nodesDestroyList(pLogicNode->pScanPseudoCols); + taosMemoryFreeClear(pLogicNode->pVgroupList); + break; + } case QUERY_NODE_LOGIC_PLAN_AGG: { SAggLogicNode* pLogicNode = (SAggLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); @@ -1826,6 +1850,12 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: destroyScanPhysiNode((SScanPhysiNode*)pNode); break; + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: { + SVirtualScanPhysiNode* pPhyNode = (SVirtualScanPhysiNode*)pNode; + destroyScanPhysiNode((SScanPhysiNode*)pNode); + nodesDestroyList(pPhyNode->pTargets); + break; + } case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: { SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode; diff --git a/source/libs/parser/CMakeLists.txt b/source/libs/parser/CMakeLists.txt index bd2dd95ee0f3..33f0aff93c90 100644 --- a/source/libs/parser/CMakeLists.txt +++ b/source/libs/parser/CMakeLists.txt @@ -2,6 +2,7 @@ aux_source_directory(src PARSER_SRC) IF(TD_ENTERPRISE) LIST(APPEND PARSER_SRC ${TD_ENTERPRISE_DIR}/src/plugins/view/src/parserView.c) + LIST(APPEND PARSER_SRC ${TD_ENTERPRISE_DIR}/src/plugins/vtable/src/parserVtable.c) ENDIF() add_custom_command( diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 5999ada70f16..9f52ab9e2511 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -48,6 +48,7 @@ void tfreeSParseQueryRes(void* p); #ifdef TD_ENTERPRISE int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName); +int32_t translateVirtualTable(STranslateContext* pCxt, SNode** pTable, SName* pName); int32_t getViewMetaFromMetaCache(STranslateContext* pCxt, SName* pName, SViewMeta** ppViewMeta); #endif #ifdef __cplusplus diff --git a/source/libs/parser/inc/parTranslater.h b/source/libs/parser/inc/parTranslater.h index 43e2af2e917e..4b38bce82c47 100644 --- a/source/libs/parser/inc/parTranslater.h +++ b/source/libs/parser/inc/parTranslater.h @@ -48,6 +48,7 @@ typedef struct STranslateContext { SNode* pPostRoot; bool dual; // whether select stmt without from stmt, true for without. bool skipCheck; + bool refTable; } STranslateContext; int32_t biRewriteToTbnameFunc(STranslateContext* pCxt, SNode** ppNode, bool* pRet); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7ed5029532b6..cc7a0aeed970 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -615,7 +615,8 @@ static int32_t rewriteDropTableWithMetaCache(STranslateContext* pCxt) { sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags); int32_t schemaExtSize = (useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0; - const char* pTbName = (const char*)pMeta + metaSize + schemaExtSize; + int32_t colRefSize = (hasRefCol(pMeta->tableType) && pMeta->colRef) ? sizeof(SColRef) * pMeta->tableInfo.numOfColumns : 0; + const char* pTbName = (const char*)pMeta + metaSize + schemaExtSize + colRefSize; SName name = {0}; toName(pParCxt->acctId, dbName, pTbName, &name); @@ -1345,6 +1346,33 @@ static bool hasPkInTable(const STableMeta* pTableMeta) { return hasPK; } +static void setVtbColumnInfoBySchema(const SVirtualTableNode* pTable, const SSchema* pColSchema, int32_t tagFlag, + SColumnNode* pCol) { + strcpy(pCol->dbName, pTable->table.dbName); + strcpy(pCol->tableAlias, pTable->table.tableAlias); + strcpy(pCol->tableName, pTable->table.tableName); + strcpy(pCol->colName, pColSchema->name); + if ('\0' == pCol->node.aliasName[0]) { + strcpy(pCol->node.aliasName, pColSchema->name); + } + if ('\0' == pCol->node.userAlias[0]) { + strcpy(pCol->node.userAlias, pColSchema->name); + } + pCol->tableId = pTable->pMeta->uid; + pCol->tableType = pTable->pMeta->tableType; + pCol->colId = pColSchema->colId; + pCol->colType = (tagFlag >= 0 ? COLUMN_TYPE_TAG : COLUMN_TYPE_COLUMN); + pCol->hasIndex = false; + pCol->node.resType.type = pColSchema->type; + pCol->node.resType.bytes = pColSchema->bytes; + if (TSDB_DATA_TYPE_TIMESTAMP == pCol->node.resType.type) { + pCol->node.resType.precision = pTable->pMeta->tableInfo.precision; + } + pCol->tableHasPk = false; + pCol->isPk = false; + pCol->numOfPKs = 0; +} + static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, int32_t tagFlag, SColumnNode* pCol) { tstrncpy(pCol->dbName, pTable->table.dbName, TSDB_DB_NAME_LEN); @@ -1470,7 +1498,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p setColumnPrimTs(pCxt, pCol, pTable); code = nodesListStrictAppend(pList, (SNode*)pCol); } - } else { + } else if (QUERY_NODE_TEMP_TABLE == nodeType(pTable)) { STempTableNode* pTempTable = (STempTableNode*)pTable; SNodeList* pProjectList = getProjectList(pTempTable->pSubquery); SNode* pNode; @@ -1492,6 +1520,19 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p break; } } + } else if (QUERY_NODE_VIRTUAL_TABLE == nodeType(pTable)) { + const STableMeta* pMeta = ((SVirtualTableNode *)pTable)->pMeta; + int32_t nums = pMeta->tableInfo.numOfColumns; + for (int32_t i = 0; i < nums; ++i) { + SColumnNode* pCol = NULL; + code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol); + if (TSDB_CODE_SUCCESS != code) { + return generateSyntaxErrMsg(&pCxt->msgBuf, code); + } + setVtbColumnInfoBySchema((SVirtualTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol); + setColumnPrimTs(pCxt, pCol, pTable); + code = nodesListStrictAppend(pList, (SNode*)pCol); + } } return code; } @@ -1501,6 +1542,98 @@ static bool isInternalPrimaryKey(const SColumnNode* pCol) { (0 == strcmp(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME) || 0 == strcmp(pCol->colName, C0_PSEUDO_COLUMN_NAME)); } +static int32_t findAndSetRealTableColumn(STranslateContext* pCxt, SColumnNode** pColRef, STableNode* pTable, bool* pFound) { + int32_t code = TSDB_CODE_SUCCESS; + SColumnNode* pCol = *pColRef; + *pFound = false; + + const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; + if (isInternalPrimaryKey(pCol)) { + if (TSDB_SYSTEM_TABLE == pMeta->tableType) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName); + } + + setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, -1, pCol); + pCol->isPrimTs = true; + *pFound = true; + return TSDB_CODE_SUCCESS; + } + int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; + for (int32_t i = 0; i < nums; ++i) { + if (0 == strcmp(pCol->colName, pMeta->schema[i].name) && + !invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) { + setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol); + setColumnPrimTs(pCxt, pCol, pTable); + *pFound = true; + break; + } + } + + if (pCxt->showRewrite && pMeta->tableType == TSDB_SYSTEM_TABLE) { + if (strncmp(pCol->dbName, TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB)) == 0 && + strncmp(pCol->tableName, TSDB_INS_DISK_USAGE, strlen(TSDB_INS_DISK_USAGE)) == 0 && + strncmp(pCol->colName, "db_name", strlen("db_name")) == 0) { + pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; + pCol->node.resType.bytes = 8; + pCxt->skipCheck = true; + ((SSelectStmt*)pCxt->pCurrStmt)->mixSysTableAndActualTable = true; + } + } + return code; +} + +static int32_t findAndSetTempTableColumn(STranslateContext* pCxt, SColumnNode** pColRef, STableNode* pTable, bool* pFound) { + int32_t code = TSDB_CODE_SUCCESS; + SColumnNode* pCol = *pColRef; + STempTableNode* pTempTable = (STempTableNode*)pTable; + SNodeList* pProjectList = getProjectList(pTempTable->pSubquery); + SNode* pNode; + FOREACH(pNode, pProjectList) { + SExprNode* pExpr = (SExprNode*)pNode; + if (0 == strcmp(pCol->colName, pExpr->aliasName)) { + if (*pFound) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AMBIGUOUS_COLUMN, pCol->colName); + } + code = setColumnInfoByExpr(pTempTable, pExpr, pColRef); + if (TSDB_CODE_SUCCESS != code) { + break; + } + *pFound = true; + } else if (isPrimaryKeyImpl(pNode) && isInternalPrimaryKey(pCol)) { + code = setColumnInfoByExpr(pTempTable, pExpr, pColRef); + if (TSDB_CODE_SUCCESS != code) break; + pCol->isPrimTs = true; + *pFound = true; + } + } + return code; +} + +static int32_t findAndSetVirtualTableColumn(STranslateContext* pCxt, SColumnNode** pColRef, STableNode* pTable, bool* pFound) { + int32_t code = TSDB_CODE_SUCCESS; + SColumnNode* pCol = *pColRef; + + const STableMeta* pMeta = ((SVirtualTableNode*)pTable)->pMeta; + if (isInternalPrimaryKey(pCol)) { + setVtbColumnInfoBySchema((SVirtualTableNode*)pTable, pMeta->schema, -1, pCol); + pCol->isPrimTs = true; + *pFound = true; + return TSDB_CODE_SUCCESS; + } + + int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; + for (int32_t i = 0; i < nums; ++i) { + if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) { + setVtbColumnInfoBySchema((SVirtualTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol); + setColumnPrimTs(pCxt, pCol, pTable); + *pFound = true; + break; + } + } + + return code; +} + static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, STableNode* pTable, bool* pFound, bool keepOriginTable) { SColumnNode* pCol = *pColRef; @@ -1527,62 +1660,21 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, } int32_t code = 0; - if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { - const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; - if (isInternalPrimaryKey(pCol)) { - if (TSDB_SYSTEM_TABLE == pMeta->tableType) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName); - } - - setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, -1, pCol); - pCol->isPrimTs = true; - *pFound = true; - return TSDB_CODE_SUCCESS; - } - int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; - for (int32_t i = 0; i < nums; ++i) { - if (0 == strcmp(pCol->colName, pMeta->schema[i].name) && - !invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) { - setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol); - setColumnPrimTs(pCxt, pCol, pTable); - *pFound = true; - break; - } - } - - if (pCxt->showRewrite && pMeta->tableType == TSDB_SYSTEM_TABLE) { - if (strncmp(pCol->dbName, TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB)) == 0 && - strncmp(pCol->tableName, TSDB_INS_DISK_USAGE, strlen(TSDB_INS_DISK_USAGE)) == 0 && - strncmp(pCol->colName, "db_name", strlen("db_name")) == 0) { - pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; - pCol->node.resType.bytes = 8; - pCxt->skipCheck = true; - ((SSelectStmt*)pCxt->pCurrStmt)->mixSysTableAndActualTable = true; - } - } - } else { - STempTableNode* pTempTable = (STempTableNode*)pTable; - SNodeList* pProjectList = getProjectList(pTempTable->pSubquery); - SNode* pNode; - FOREACH(pNode, pProjectList) { - SExprNode* pExpr = (SExprNode*)pNode; - if (0 == strcmp(pCol->colName, pExpr->aliasName)) { - if (*pFound) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AMBIGUOUS_COLUMN, pCol->colName); - } - code = setColumnInfoByExpr(pTempTable, pExpr, pColRef); - if (TSDB_CODE_SUCCESS != code) { - break; - } - *pFound = true; - } else if (isPrimaryKeyImpl(pNode) && isInternalPrimaryKey(pCol)) { - code = setColumnInfoByExpr(pTempTable, pExpr, pColRef); - if (TSDB_CODE_SUCCESS != code) break; - pCol->isPrimTs = true; - *pFound = true; - } - } + switch (nodeType(pTable)) { + case QUERY_NODE_REAL_TABLE: + code = findAndSetRealTableColumn(pCxt, pColRef, pTable, pFound); + break; + case QUERY_NODE_TEMP_TABLE: + code = findAndSetTempTableColumn(pCxt, pColRef, pTable, pFound); + break; + case QUERY_NODE_VIRTUAL_TABLE: + code = findAndSetVirtualTableColumn(pCxt, pColRef, pTable, pFound); + break; + default: + code = TSDB_CODE_PAR_INVALID_TABLE_TYPE; + break; } + return code; } @@ -5058,6 +5150,17 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare code = translateAudit(pCxt, pRealTable, &name); #endif if (TSDB_CODE_SUCCESS == code) code = setTableVgroupList(pCxt, &name, pRealTable); +#ifdef TD_ENTERPRISE + if (TSDB_VIRTUAL_TABLE == pRealTable->pMeta->tableType || TSDB_VIRTUAL_CHILD_TABLE == pRealTable->pMeta->tableType) { + if (!isSelectStmt(pCxt->pCurrStmt)) { + // virtual table only support select operation + code = TSDB_CODE_TSC_INVALID_OPERATION; + break; + } + PAR_ERR_RET(translateVirtualTable(pCxt, pTable, &name)); + PAR_RET(addNamespace(pCxt, (SVirtualTableNode*)*pTable)); + } +#endif if (TSDB_CODE_SUCCESS == code) { code = setTableIndex(pCxt, &name, pRealTable); } @@ -5080,7 +5183,9 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare break; } } - code = addNamespace(pCxt, pRealTable); + if (!pCxt->refTable) { + code = addNamespace(pCxt, pRealTable); + } } break; } @@ -14520,11 +14625,8 @@ typedef struct SVgroupCreateTableBatch { static int32_t setColRef(SColRef* colRef, col_id_t index, col_id_t colId, char* refColName, char* refTableName) { colRef[index].id = colId; colRef[index].hasRef = true; - colRef[index].refColName = taosStrdup(refColName); - colRef[index].refTableName = taosStrdup(refTableName); - if (NULL == colRef[index].refTableName || NULL == colRef[index].refColName) { - return terrno; - } + tstrncpy(colRef[index].refColName, refColName, TSDB_COL_NAME_LEN); + tstrncpy(colRef[index].refTableName, refTableName, TSDB_TABLE_NAME_LEN); return TSDB_CODE_SUCCESS; } @@ -14550,6 +14652,8 @@ static int32_t buildVirtualTableBatchReq(const SCreateVTableStmt* pStmt, const S FOREACH(pCol, pStmt->pCols) { SColumnDefNode* pColDef = (SColumnDefNode*)pCol; + SSchema* pSchema = req.ntb.schemaRow.pSchema + index; + toSchema(pColDef, index + 1, pSchema); if (pColDef->pOptions && ((SColumnOptions*)pColDef->pOptions)->hasRef) { PAR_ERR_JRET(setColRef(req.colRef.pColRef, index, index + 1, ((SColumnOptions*)pColDef->pOptions)->refColumn, ((SColumnOptions*)pColDef->pOptions)->refTable)); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 2a1fffae4492..59ae7a79e3d4 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -327,17 +327,24 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) { bool hasSchemaExt = pTableMeta->schemaExt == NULL ? false : true; size_t schemaExtSize = hasSchemaExt ? pTableMeta->tableInfo.numOfColumns * sizeof(SSchemaExt) : 0; + bool hasColRef = pTableMeta->colRef == NULL ? false : true; + size_t colRefSize = hasColRef ? pTableMeta->tableInfo.numOfColumns * sizeof(SColRef) : 0; size_t size = sizeof(STableMeta) + numOfFields * sizeof(SSchema); - STableMeta* p = taosMemoryMalloc(size + schemaExtSize); + STableMeta* p = taosMemoryMalloc(size + schemaExtSize + colRefSize); if (NULL == p) return NULL; - memcpy(p, pTableMeta, schemaExtSize + size); + memcpy(p, pTableMeta, colRefSize + schemaExtSize + size); if (hasSchemaExt) { p->schemaExt = (SSchemaExt*)(((char*)p) + size); } else { p->schemaExt = NULL; } + if (hasColRef) { + p->colRef = (SColRef*)(((char*)p) + size + schemaExtSize); + } else { + p->colRef = NULL; + } return p; } @@ -1067,7 +1074,8 @@ int32_t getTableNameFromCache(SParseMetaCache* pMetaCache, const SName* pName, c sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags); int32_t schemaExtSize = (useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0; - const char* pTableName = (const char*)pMeta + metaSize + schemaExtSize; + int32_t colRefSize = (hasRefCol(pMeta->tableType) && pMeta->colRef) ? sizeof(SColRef) * pMeta->tableInfo.numOfColumns : 0; + const char* pTableName = (const char*)pMeta + metaSize + schemaExtSize + colRefSize; tstrncpy(pTbName, pTableName, TSDB_TABLE_NAME_LEN); } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 59e771454c68..b287ce273853 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -44,6 +44,32 @@ typedef struct SPhysiPlanContext { #define planDebugL(param, ...) qDebugL("PLAN: " param, ##__VA_ARGS__) #define planTrace(param, ...) qTrace("PLAN: " param, ##__VA_ARGS__) +#define PLAN_ERR_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) +#define PLAN_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) +#define PLAN_ERR_JRET(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) + + int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList); int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 66f3b2e0845b..bc1f3bfcebb2 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -27,6 +27,7 @@ typedef struct SLogicPlanContext { SLogicNode* pCurrRoot; SSHashObj* pChildTables; bool hasScan; + bool refScan; } SLogicPlanContext; typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**); @@ -441,7 +442,6 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { - SNodeList* pNewScanPseudoCols = NULL; code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM, NULL); /* if (TSDB_CODE_SUCCESS == code && NULL != pScan->pScanPseudoCols) { @@ -666,6 +666,102 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return code; } +static int32_t findRefTableNode(SNodeList *refTableList, const char *tableName, SNode **pRefTable) { + SNode *pRef = NULL; + FOREACH(pRef, refTableList) { + if (0 == strcasecmp(((SRealTableNode*)pRef)->table.tableName, tableName)) { + *pRefTable = pRef; + return TSDB_CODE_SUCCESS; + } + } + return TSDB_CODE_NOT_FOUND; +} + +static int32_t findRefColId(SNode *pRefTable, const char *colName, col_id_t *colId) { + SRealTableNode *pRealTable = (SRealTableNode*)pRefTable; + for (int32_t i = 0; i < pRealTable->pMeta->tableInfo.numOfColumns; ++i) { + if (0 == strcasecmp(pRealTable->pMeta->schema[i].name, colName)) { + *colId = pRealTable->pMeta->schema[i].colId; + return TSDB_CODE_SUCCESS; + } + } + return TSDB_CODE_NOT_FOUND; +} + +static int32_t scanAddCol(SLogicNode* pLogicNode, SColRef* colRef, const SSchema* pSchema, col_id_t colId) { + int32_t code = TSDB_CODE_SUCCESS; + SColumnNode *pRefTableScanCol = NULL; + SScanLogicNode *pLogicScan = (SScanLogicNode*)pLogicNode; + PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pRefTableScanCol)); + tstrncpy(pRefTableScanCol->tableAlias, colRef->refTableName, sizeof(pRefTableScanCol->tableAlias)); + tstrncpy(pRefTableScanCol->tableName, colRef->refTableName, sizeof(pRefTableScanCol->tableName)); + tstrncpy(pRefTableScanCol->colName, colRef->refColName, sizeof(pRefTableScanCol->colName)); + pRefTableScanCol->colId = colId; + pRefTableScanCol->tableId = pLogicScan->tableId; + pRefTableScanCol->tableType = pLogicScan->tableType; + pRefTableScanCol->node.resType.type = pSchema->type; + pRefTableScanCol->node.resType.bytes = pSchema->bytes; + pRefTableScanCol->colType = COLUMN_TYPE_COLUMN; + pRefTableScanCol->isPk = false; + pRefTableScanCol->tableHasPk = false; + pRefTableScanCol->numOfPKs = 0; + PLAN_ERR_JRET(nodesListAppend(pLogicScan->pScanCols, (SNode*)pRefTableScanCol)); + nodesDestroyList(pLogicScan->node.pTargets); + PLAN_ERR_JRET(createColumnByRewriteExprs(pLogicScan->pScanCols, &pLogicScan->node.pTargets)); + return code; +_return: + nodesDestroyNode((SNode*)pRefTableScanCol); + return code; +} + +static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, + SVirtualTableNode* pVirtualTable, SLogicNode** pLogicNode) { + int32_t code = TSDB_CODE_SUCCESS; + SVirtualScanLogicNode *pVtableScan = NULL; + SLogicNode *pRefScan = NULL; + SNode *pRefTable = NULL; + PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN, (SNode**)&pVtableScan)); + PLAN_ERR_JRET(nodesMakeList(&pVtableScan->node.pChildren)); + + PLAN_ERR_JRET(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pVirtualTable->table.tableAlias, COLLECT_COL_TYPE_COL, + &pVtableScan->pScanCols)); + + pCxt->refScan = true; + SNode *pNode = NULL; + int32_t slotId = 0; + FOREACH(pNode, pVtableScan->pScanCols) { + SColumnNode *pCol = (SColumnNode*)pNode; + if (pVirtualTable->pMeta->colRef[pCol->colId - 1].hasRef) { + if (pCol->isPrimTs) { + PLAN_ERR_JRET(TSDB_CODE_VTABLE_PRIMTS_HAS_REF); + } + SColRef *pColRef = &pVirtualTable->pMeta->colRef[pCol->colId - 1]; + tstrncpy(pCol->refTableName, pColRef->refTableName, TSDB_TABLE_NAME_LEN); + tstrncpy(pCol->refColName, pColRef->refColName, TSDB_COL_NAME_LEN); + pCol->hasRef = true; + col_id_t colId = 0; + PLAN_ERR_JRET(findRefTableNode(pVirtualTable->refTables, pColRef->refTableName, &pRefTable)); + PLAN_ERR_JRET(findRefColId(pRefTable, pColRef->refColName, &colId)); + PLAN_ERR_JRET(doCreateLogicNodeByTable(pCxt, pSelect, pRefTable, &pRefScan)); + PLAN_ERR_JRET(scanAddCol(pRefScan, pColRef, &pVirtualTable->pMeta->schema[pCol->colId - 1], colId)); + PLAN_ERR_JRET(nodesListStrictAppend(pVtableScan->node.pChildren, (SNode*)pRefScan)); + } else { + pCol->hasRef = false; + } + } + pCxt->refScan = false; + // set output + PLAN_ERR_JRET(createColumnByRewriteExprs(pVtableScan->pScanCols, &pVtableScan->node.pTargets)); + PLAN_ERR_JRET(createColumnByRewriteExprs(pVtableScan->pScanPseudoCols, &pVtableScan->node.pTargets)); + + *pLogicNode = (SLogicNode*)pVtableScan; + return code; +_return: + nodesDestroyNode((SNode*)pVtableScan); + nodesDestroyNode((SNode*)pRefScan); + return code; +} + static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, SLogicNode** pLogicNode) { switch (nodeType(pTable)) { @@ -675,6 +771,8 @@ static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pS return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable, pLogicNode); case QUERY_NODE_JOIN_TABLE: return createJoinLogicNode(pCxt, pSelect, (SJoinTableNode*)pTable, pLogicNode); + case QUERY_NODE_VIRTUAL_TABLE: + return createVirtualTableLogicNode(pCxt, pSelect, (SVirtualTableNode*)pTable, pLogicNode); default: break; } @@ -2248,7 +2346,7 @@ static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) { } int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { - SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false}; + SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false, .refScan = false}; SLogicSubplan* pSubplan = NULL; int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7085c8dc7ca5..fb9cd5636cde 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3393,7 +3393,8 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode, void* pCtx) { if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) || - QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode->pParent))) { + QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode->pParent) || + QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN != nodeType(pNode->pParent))) { return false; } @@ -3462,6 +3463,9 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, if (!cxt.canUse) return false; } + if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pChild)) { + return false; + } if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) { return false; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index c60024b3231c..852d7e7bab22 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -125,6 +125,23 @@ static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int return code; } +static int32_t getVirtualSlotKey(SNode* pNode, char** ppKey, int32_t* pLen, uint16_t extraBufLen) { + int32_t code = 0; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + *ppKey = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen); + if (!*ppKey) { + return terrno; + } + strcat(*ppKey, pCol->refTableName); + strcat(*ppKey, "."); + strcat(*ppKey, pCol->refColName); + *pLen = taosHashBinary(*ppKey, strlen(*ppKey)); + return code; + } + return code; +} + static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId, bool output, bool reserve) { SSlotDescNode* pSlot = NULL; @@ -369,6 +386,13 @@ typedef struct SSetSlotIdCxt { SHashObj* pRightProdIdxHash; } SSetSlotIdCxt; +typedef struct SVTableSetSlotIdCxt { + int32_t errCode; + SArray* hashArray; + SArray* projIdxHashArray; + SNodeList* pChild; +} SVTableSetSlotIdCxt; + static void dumpSlots(const char* pName, SHashObj* pHash) { if (NULL == pHash) { return; @@ -424,6 +448,61 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static int32_t getVTableScanDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) { + *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc; + return TSDB_CODE_SUCCESS; +} + +static EDealRes doSetVtableSlotId(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) { + if (!((SColumnNode*)pNode)->hasRef) { + return DEAL_RES_CONTINUE; + } + SVTableSetSlotIdCxt* pCxt = (SVTableSetSlotIdCxt*)pContext; + char* name = NULL; + int32_t len = 0; + pCxt->errCode = getVirtualSlotKey(pNode, &name, &len, 16); + if (TSDB_CODE_SUCCESS != pCxt->errCode) { + return DEAL_RES_ERROR; + } + SSlotIndex* pIndex = NULL; + int32_t idx = 0; + if (((SColumnNode*)pNode)->projRefIdx > 0) { + sprintf(name + strlen(name), "_%d", ((SColumnNode*)pNode)->projRefIdx); + while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) { + SHashObj *tmpHash = + taosArrayGetP(pCxt->projIdxHashArray, + ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId); + pIndex = taosHashGet(tmpHash, name, strlen(name)); + idx++; + } + } else { + while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) { + SHashObj *tmpHash = + taosArrayGetP(pCxt->hashArray, + ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId); + pIndex = taosHashGet(tmpHash, name, len); + idx++; + } + } + // pIndex is definitely not NULL, otherwise it is a bug + if (NULL == pIndex) { + planError("doSetSlotId failed, invalid slot name %s", name); + for (int32_t i = 0; i < taosArrayGetSize(pCxt->hashArray); i++) { + //dumpSlots("vtable datablock desc", taosArrayGetP(pCxt->hashArray, i)); + } + pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR; + taosMemoryFree(name); + return DEAL_RES_ERROR; + } + ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; + ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId; + taosMemoryFree(name); + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode, SNode** pOutput) { if (NULL == pNode) { @@ -452,6 +531,7 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } + static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, const SNodeList* pList, SNodeList** pOutput) { if (NULL == pList) { @@ -479,6 +559,33 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } +static int32_t setVtableListSlotId(SPhysiPlanContext* pCxt, SNodeList* pChild, const SNodeList* pList, + SNodeList** pOutput) { + int32_t code = TSDB_CODE_SUCCESS; + if (NULL == pList) { + PLAN_RET(code); + } + + SNodeList* pRes = NULL; + PLAN_ERR_JRET(nodesCloneList(pList, &pRes)); + + SVTableSetSlotIdCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .hashArray = pCxt->pLocationHelper, + .projIdxHashArray = pCxt->pProjIdxLocHelper, + .pChild = pChild + }; + + nodesWalkExprs(pRes, doSetVtableSlotId, &cxt); + PLAN_ERR_JRET(cxt.errCode); + + *pOutput = pRes; + return code; +_return: + nodesDestroyList(pRes); + return code; +} + static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) { SPhysiNode* pPhysiNode = NULL; int32_t code = nodesMakeNode(type, (SNode**)&pPhysiNode); @@ -1575,6 +1682,62 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren return TSDB_CODE_FAILED; } +static int32_t createVirtualScanCols(SPhysiPlanContext* pCxt, SVirtualScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) { + if (NULL == pScanCols) { + return TSDB_CODE_SUCCESS; + } + + pScanPhysiNode->scan.pScanCols = NULL; + int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->scan.pScanCols); + if (NULL == pScanPhysiNode->scan.pScanCols) { + return code; + } + return sortScanCols(pScanPhysiNode->scan.pScanCols); +} + +static int32_t createVirtualTableScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, + SVirtualScanLogicNode* pScanLogicNode, + SVirtualScanPhysiNode* pScanPhysiNode, + SPhysiNode** pPhyNode) { + int32_t code = TSDB_CODE_SUCCESS; + + PLAN_ERR_JRET(createVirtualScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols)); + PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc)); + if (NULL != pScanLogicNode->pScanPseudoCols) { + pScanPhysiNode->scan.pScanPseudoCols = NULL; + PLAN_ERR_JRET(nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->scan.pScanPseudoCols)); + PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanPseudoCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc)); + } + + PLAN_ERR_JRET(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode)); + + pScanPhysiNode->scan.uid = pScanLogicNode->tableId; + pScanPhysiNode->scan.suid = pScanLogicNode->stableId; + pScanPhysiNode->scan.tableType = pScanLogicNode->tableType; + memcpy(&pScanPhysiNode->scan.tableName, &pScanLogicNode->tableName, sizeof(SName)); + + *pPhyNode = (SPhysiNode*)pScanPhysiNode; + return code; + +_return: + nodesDestroyNode((SNode*)pScanPhysiNode); + return code; +} + +static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SVirtualScanLogicNode * pScanLogicNode, SPhysiNode** pPhyNode) { + int32_t code = TSDB_CODE_SUCCESS; + SVirtualScanPhysiNode * pVirtualScan = + (SVirtualScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN); + if (NULL == pVirtualScan) { + return terrno; + } + + PLAN_ERR_RET(createVirtualTableScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SVirtualScanPhysiNode*)pVirtualScan, pPhyNode)); + PLAN_ERR_RET(setVtableListSlotId(pCxt, pChildren, pScanLogicNode->node.pTargets, &pVirtualScan->pTargets)); + return code; +} + static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SGroupCacheLogicNode* pLogicNode, SPhysiNode** pPhyNode) { SGroupCachePhysiNode* pGrpCache = @@ -2767,6 +2930,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN: + return createVirtualTableScanPhysiNode(pCxt, pChildren, (SVirtualScanLogicNode*)pLogicNode, pPhyNode); default: break; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 5c2d1efd7bcb..995c85274e2b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1998,6 +1998,41 @@ static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return code; } +typedef struct SVirtualTableSplitInfo { + SVirtualScanLogicNode *pVirtual; + SLogicSubplan *pSubplan; +} SVirtualTableSplitInfo; + +static bool virtualTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SVirtualTableSplitInfo* pInfo) { + if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode) && 0 != LIST_LENGTH(pNode->pChildren) && + QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + pInfo->pVirtual = (SVirtualScanLogicNode*)pNode; + pInfo->pSubplan = pSubplan; + return true; + } + return false; +} + +static int32_t virtualTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + int32_t code = TSDB_CODE_SUCCESS; + SVirtualTableSplitInfo info = {0}; + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)virtualTableFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + int32_t startGroupId = pCxt->groupId; + SNode* pChild = NULL; + FOREACH(pChild, info.pVirtual->node.pChildren) { + PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)pChild, info.pSubplan->subplanType)); + PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0))); + ++(pCxt->groupId); + } + pSubplan->subplanType = SUBPLAN_TYPE_MERGE; +_return: + pCxt->split = true; + return code; +} + typedef struct SQnodeSplitInfo { SLogicNode* pSplitNode; SLogicSubplan* pSubplan; @@ -2054,7 +2089,8 @@ static const SSplitRule splitRuleSet[] = { {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet - {.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit} + {.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}, + {.pName = "VirtualtableSplit", .splitFunc = virtualTableSplit} }; // clang-format on diff --git a/source/libs/planner/src/planValidator.c b/source/libs/planner/src/planValidator.c index a3b09dff228b..278b8022dec7 100755 --- a/source/libs/planner/src/planValidator.c +++ b/source/libs/planner/src/planValidator.c @@ -78,6 +78,7 @@ int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN: break; case QUERY_NODE_PHYSICAL_PLAN_MERGE: return validateMergePhysiNode(pCxt, (SMergePhysiNode*)pNode); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index ee460e26105f..f64fa48260d1 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -43,6 +43,19 @@ static int32_t dumpQueryPlan(SQueryPlan* pPlan) { return code; } +static int32_t printPlanNode(SLogicNode *pNode, int32_t level) { + // print pnode and it's child for each level + const char *nodename = nodesNodeName(nodeType((pNode))); + char *blanks = taosMemoryMalloc(level * 4); + memset(blanks, ' ', level * 4); + qInfo("%s%s", blanks, nodename); + SNode *tmp = NULL; + FOREACH(tmp, pNode->pChildren) { + printPlanNode((SLogicNode *)tmp, level + 1); + } + return TSDB_CODE_SUCCESS; +} + int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) { SLogicSubplan* pLogicSubplan = NULL; SQueryLogicPlan* pLogicPlan = NULL; @@ -51,18 +64,28 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { code = createLogicPlan(pCxt, &pLogicSubplan); } + qInfo("After logic plan"); + printPlanNode(pLogicSubplan->pNode, 0); if (TSDB_CODE_SUCCESS == code) { code = optimizeLogicPlan(pCxt, pLogicSubplan); } + qInfo("After optimize plan"); + printPlanNode(pLogicSubplan->pNode, 0); if (TSDB_CODE_SUCCESS == code) { code = splitLogicPlan(pCxt, pLogicSubplan); } + qInfo("After split plan"); + printPlanNode(pLogicSubplan->pNode, 0); if (TSDB_CODE_SUCCESS == code) { code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan); } + qInfo("After scale out plan"); + printPlanNode(pLogicSubplan->pNode, 0); if (TSDB_CODE_SUCCESS == code) { code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList); } + qInfo("After physic plan"); + printPlanNode(pLogicSubplan->pNode, 0); if (TSDB_CODE_SUCCESS == code) { code = validateQueryPlan(pCxt, *pPlan); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 6d637bee983c..f613ce74d4c5 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -553,10 +553,14 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema); int32_t schemaExtSize = 0; + int32_t colRefSize = 0; if (useCompress(pSrc->tableType) && pSrc->schemaExt) { schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt); } - *pDst = taosMemoryMalloc(metaSize + schemaExtSize); + if (hasRefCol(pSrc->tableType) && pSrc->colRef) { + colRefSize = pSrc->tableInfo.numOfColumns * sizeof(SColRef); + } + *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize); if (NULL == *pDst) { return terrno; } @@ -567,6 +571,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { } else { (*pDst)->schemaExt = NULL; } + if (hasRefCol(pSrc->tableType) && pSrc->colRef) { + (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize); + memcpy((*pDst)->colRef, pSrc->colRef, colRefSize); + } else { + (*pDst)->colRef = NULL; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 0ee61726e305..ae14eb453b3e 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -515,7 +515,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) { } if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && - pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE) { + pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE && + pMetaMsg->tableType != TSDB_VIRTUAL_TABLE && pMetaMsg->tableType != TSDB_VIRTUAL_CHILD_TABLE) { qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -558,13 +559,15 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta * int32_t total = msg->numOfColumns + msg->numOfTags; int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0; + int32_t pColRefSize = (hasRefCol(msg->tableType) && msg->pColRefs) ? sizeof(SColRef) * msg->numOfColumns : 0; - STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize); + STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + pColRefSize); if (NULL == pTableMeta) { qError("calloc size[%d] failed", metaSize); return terrno; } SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize); + SColRef *pColRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize); pTableMeta->vgId = isStb ? 0 : msg->vgId; pTableMeta->tableType = isStb ? TSDB_SUPER_TABLE : msg->tableType; @@ -585,6 +588,13 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta * pTableMeta->schemaExt = NULL; } + if (hasRefCol(msg->tableType) && msg->pColRefs) { + pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize); + memcpy(pTableMeta->colRef, msg->pColRefs, pColRefSize); + } else { + pTableMeta->colRef = NULL; + } + bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY); for (int32_t i = 0; i < msg->numOfColumns; ++i) { pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; @@ -614,14 +624,16 @@ int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta int32_t total = msg->numOfColumns + msg->numOfTags; int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0; + int32_t pColRefSize = (hasRefCol(msg->tableType) && msg->pColRefs) ? sizeof(SColRef) * msg->numOfColumns : 0; int32_t tbNameSize = strlen(msg->tbName) + 1; - STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + tbNameSize); + STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + pColRefSize + tbNameSize); if (NULL == pTableMeta) { qError("calloc size[%d] failed", metaSize); return terrno; } SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize); + SColRef *pColRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize); pTableMeta->vgId = isStb ? 0 : msg->vgId; pTableMeta->tableType = isStb ? TSDB_SUPER_TABLE : msg->tableType; @@ -642,6 +654,13 @@ int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta pTableMeta->schemaExt = NULL; } + if (hasRefCol(msg->tableType) && msg->pColRefs) { + pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize); + memcpy(pTableMeta->colRef, msg->pColRefs, pColRefSize); + } else { + pTableMeta->colRef = NULL; + } + bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY); for (int32_t i = 0; i < msg->numOfColumns; ++i) { pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; @@ -654,7 +673,7 @@ int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta } } - char *pTbName = (char *)pTableMeta + metaSize + schemaExtSize; + char *pTbName = (char *)pTableMeta + metaSize + schemaExtSize + pColRefSize; tstrncpy(pTbName, msg->tbName, tbNameSize); qDebug("table %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s stb %s suid %" PRIx64 diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 16357d58c6b2..3a67a614d4af 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -886,6 +886,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_NOT_FORMAT_TO_JSON, "can't format to jso TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD, "Failed to send out audit record") TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_GENERATE_JSON, "Failed to generate json") +// VTABLE +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN, "Virtual table scan unmatched column") +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM, "Virtual table scan invalid downstream operator type") +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_PRIMTS_HAS_REF, "Virtual table prim timestamp column should not has ref column") #ifdef TAOS_ERROR_C }; #endif