Skip to content

Commit

Permalink
support vtable's select
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon9997 committed Dec 27, 2024
1 parent b9cbd46 commit 81fc4c5
Show file tree
Hide file tree
Showing 38 changed files with 1,442 additions and 113 deletions.
7 changes: 5 additions & 2 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
QUERY_NODE_VIRTUAL_TABLE,

// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
Expand Down Expand Up @@ -439,6 +440,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC,
QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN,

// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
Expand Down Expand Up @@ -494,6 +496,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY,
QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN,
} ENodeType;

typedef struct {
Expand Down Expand Up @@ -598,8 +601,8 @@ int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema);
typedef struct {
bool hasRef;
col_id_t id;
char *refTableName;
char *refColName;
char refTableName[TSDB_TABLE_NAME_LEN];
char refColName[TSDB_COL_NAME_LEN];
} SColRef;

typedef struct {
Expand Down
17 changes: 17 additions & 0 deletions include/libs/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ typedef struct SJoinLogicNode {
SNode* pRightOnCond; // table onCond filter
} SJoinLogicNode;

typedef struct SVirtualScanLogicNode {
SLogicNode node;
SNodeList* pScanCols;
SNodeList* pScanPseudoCols;
int8_t tableType;
uint64_t tableId;
uint64_t stableId;
SVgroupsInfo* pVgroupList;
EScanType scanType;
SName tableName;
} SVirtualScanLogicNode;

typedef struct SAggLogicNode {
SLogicNode node;
SNodeList* pGroupKeys;
Expand Down Expand Up @@ -455,6 +467,11 @@ typedef struct STagScanPhysiNode {

typedef SScanPhysiNode SBlockDistScanPhysiNode;

typedef struct SVirtualScanPhysiNode {
SScanPhysiNode scan;
SNodeList* pTargets;
}SVirtualScanPhysiNode;

typedef struct SLastRowScanPhysiNode {
SScanPhysiNode scan;
SNodeList* pGroupTags;
Expand Down
11 changes: 11 additions & 0 deletions include/libs/nodes/querynodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ typedef struct SColumnNode {
bool isPk;
int32_t projRefIdx;
int32_t resIdx;
bool hasRef;
char refColName[TSDB_COL_NAME_LEN];
char refTableName[TSDB_TABLE_NAME_LEN];
} SColumnNode;

typedef struct SColumnRefNode {
Expand Down Expand Up @@ -235,6 +238,14 @@ typedef struct STempTableNode {
SNode* pSubquery;
} STempTableNode;

typedef struct SVirtualTableNode {
STableNode table; // QUERY_NODE_VIRTUAL_TABLE
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
SNodeList* refTables;
} SVirtualTableNode;

typedef struct SViewNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
Expand Down
5 changes: 4 additions & 1 deletion include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,6 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_INVALID_VGID_LIST TAOS_DEF_ERROR_CODE(0, 0x2686)
#define TSDB_CODE_PAR_INVALID_REF_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2687)
#define TSDB_CODE_PAR_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2688)
#define TSDB_CODE_PAR_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2688)
#define TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2689)
#define TSDB_CODE_PAR_COLUMN_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x268A)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
Expand Down Expand Up @@ -1043,6 +1042,10 @@ int32_t taosGetErrSize();
#define TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD TAOS_DEF_ERROR_CODE(0, 0x6101)
#define TSDB_CODE_AUDIT_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0x6102)

// VTABLE
#define TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN TAOS_DEF_ERROR_CODE(0, 0x6200)
#define TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM TAOS_DEF_ERROR_CODE(0, 0x6201)
#define TSDB_CODE_VTABLE_PRIMTS_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x6201)
#ifdef __cplusplus
}
#endif
Expand Down
27 changes: 24 additions & 3 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -6040,6 +6040,13 @@ static int32_t tEncodeSTableMetaRsp(SEncoder *pEncoder, STableMetaRsp *pRsp) {
}
}

if (hasRefCol(pRsp->tableType)) {
for (int32_t i = 0; i < pRsp->numOfColumns; ++i) {
SColRef *pColRef = &pRsp->pColRefs[i];
TAOS_CHECK_RETURN(tEncodeSColRef(pEncoder, pColRef));
}
}

return 0;
}

Expand Down Expand Up @@ -6088,7 +6095,21 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
pRsp->pSchemaExt = NULL;
}
}
pRsp->pColRefs = NULL;
if (!tDecodeIsEnd(pDecoder)) {
if (hasRefCol(pRsp->tableType) && pRsp->numOfColumns > 0) {
pRsp->pColRefs = taosMemoryMalloc(sizeof(SColRef) * pRsp->numOfColumns);
if (pRsp->pColRefs == NULL) {
TAOS_CHECK_RETURN(terrno);
}

for (int32_t i = 0; i < pRsp->numOfColumns; ++i) {
SColRef *pColRef = &pRsp->pColRefs[i];
TAOS_CHECK_RETURN(tDecodeSColRef(pDecoder, pColRef));
}
} else {
pRsp->pColRefs = NULL;
}
}

return 0;
}
Expand Down Expand Up @@ -10265,8 +10286,8 @@ int32_t tDecodeSColRefWrapperEx(SDecoder *pDecoder, SColRefWrapper *pWrapper) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&p->hasRef));
if (p->hasRef) {
TAOS_CHECK_EXIT(tDecodeI16v(pDecoder, &p->id));
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &p->refColName));
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &p->refTableName));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->refColName));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->refTableName));
}
}

Expand Down
4 changes: 2 additions & 2 deletions source/dnode/vnode/src/meta/metaEntry.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ int meteDecodeColRefEntry(SDecoder *pDecoder, SMetaEntry *pME) {
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&p->hasRef));
if (p->hasRef) {
TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &p->id));
TAOS_CHECK_RETURN(tDecodeCStr(pDecoder, &p->refTableName));
TAOS_CHECK_RETURN(tDecodeCStr(pDecoder, &p->refColName));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, p->refTableName));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, p->refColName));
}
}
return 0;
Expand Down
72 changes: 55 additions & 17 deletions source/dnode/vnode/src/vnd/vnodeQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol
return 0;
}

int32_t fillTableColRef(SMetaReader *reader, SColRef *pRef, int32_t numOfCol) {
int8_t tblType = reader->me.type;
if (hasRefCol(tblType)) {
SColRefWrapper *p = &(reader->me.colRef);
if (numOfCol != p->nCols) {
vError("fillTableColRef table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols);
return TSDB_CODE_APP_ERROR;
}
for (int i = 0; i < p->nCols; i++) {
SColRef *pColRef = &p->pColRef[i];
pRef[i].hasRef = pColRef->hasRef;
if(pRef[i].hasRef) {
pRef[i].id = pColRef->id;
tstrncpy(pRef[i].refTableName, pColRef->refTableName, TSDB_TABLE_NAME_LEN);
tstrncpy(pRef[i].refColName, pColRef->refColName, TSDB_COL_NAME_LEN);
}
}
}
return 0;
}

int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableInfoReq infoReq = {0};
STableMetaRsp metaRsp = {0};
Expand Down Expand Up @@ -104,24 +125,34 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.vgId = TD_VID(pVnode);
metaRsp.tuid = mer1.me.uid;

if (mer1.me.type == TSDB_SUPER_TABLE) {
tstrncpy(metaRsp.stbName, mer1.me.name, TSDB_TABLE_NAME_LEN);
schema = mer1.me.stbEntry.schemaRow;
schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
switch (mer1.me.type) {
case TSDB_SUPER_TABLE: {
(void)strcpy(metaRsp.stbName, mer1.me.name);
schema = mer1.me.stbEntry.schemaRow;
schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid;
break;
}
case TSDB_CHILD_TABLE:
case TSDB_VIRTUAL_CHILD_TABLE:{
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;

tstrncpy(metaRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN);
metaRsp.suid = mer2.me.uid;
schema = mer2.me.stbEntry.schemaRow;
schemaTag = mer2.me.stbEntry.schemaTag;
} else if (mer1.me.type == TSDB_NORMAL_TABLE) {
schema = mer1.me.ntbEntry.schemaRow;
} else {
vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type);
goto _exit3;
(void)strcpy(metaRsp.stbName, mer2.me.name);
metaRsp.suid = mer2.me.uid;
schema = mer2.me.stbEntry.schemaRow;
schemaTag = mer2.me.stbEntry.schemaTag;
break;
}
case TSDB_NORMAL_TABLE:
case TSDB_VIRTUAL_TABLE: {
schema = mer1.me.ntbEntry.schemaRow;
break;
}
default: {
vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type);
goto _exit3;
}
}

metaRsp.numOfTags = schemaTag.nCols;
Expand Down Expand Up @@ -149,6 +180,13 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
metaRsp.pColRefs = (SColRef*)taosMemoryMalloc(sizeof(SColRef) * metaRsp.numOfColumns);
if (metaRsp.pColRefs) {
code = fillTableColRef(&mer1, metaRsp.pColRefs, metaRsp.numOfColumns);
if (code < 0) {
goto _exit;
}
}

// encode and send response
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
Expand Down
41 changes: 31 additions & 10 deletions source/libs/catalog/src/ctgCache.c
Original file line number Diff line number Diff line change
Expand Up @@ -538,31 +538,41 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt
ctx->tbInfo.suid = tbMeta->suid;
ctx->tbInfo.tbType = tbMeta->tableType;

if (tbMeta->tableType != TSDB_CHILD_TABLE) {
if (tbMeta->tableType != TSDB_CHILD_TABLE && tbMeta->tableType != TSDB_VIRTUAL_CHILD_TABLE) {
int32_t schemaExtSize = 0;
int32_t colRefSize = 0;
int32_t metaSize = CTG_META_SIZE(tbMeta);
if (tbMeta->schemaExt != NULL) {
if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
schemaExtSize = tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
*pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize);
if (hasRefCol(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
colRefSize += tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
*pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + colRefSize);
if (NULL == *pTableMeta) {
CTG_ERR_RET(terrno);
}

TAOS_MEMCPY(*pTableMeta, tbMeta, metaSize);
if (tbMeta->schemaExt != NULL) {
if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
(*pTableMeta)->schemaExt = (SSchemaExt *)((char *)*pTableMeta + metaSize);
TAOS_MEMCPY((*pTableMeta)->schemaExt, tbMeta->schemaExt, schemaExtSize);
} else {
(*pTableMeta)->schemaExt = NULL;
}
if (hasRefCol(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
(*pTableMeta)->colRef = (SColRef *)((char *)*pTableMeta + metaSize + schemaExtSize);
TAOS_MEMCPY((*pTableMeta)->colRef, tbMeta->colRef, colRefSize);
} else {
(*pTableMeta)->colRef = NULL;
}

ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
return TSDB_CODE_SUCCESS;
}

// PROCESS FOR CHILD TABLE

// TODO(smj): add virtual child table process
int32_t metaSize = sizeof(SCTableMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) {
Expand Down Expand Up @@ -3493,26 +3503,36 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe

SMetaRes res = {0};
STableMeta *pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
if (tbMeta->tableType != TSDB_CHILD_TABLE && tbMeta->tableType != TSDB_VIRTUAL_CHILD_TABLE) {
int32_t schemaExtSize = 0;
int32_t colRefSize = 0;
int32_t metaSize = CTG_META_SIZE(tbMeta);
if (tbMeta->schemaExt != NULL) {
if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
schemaExtSize = tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
if (hasRefCol(tbMeta->tableType) && tbMeta->colRef) {
colRefSize = tbMeta->tableInfo.numOfColumns * sizeof(SColRef);
}

pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize);
pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + colRefSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(terrno);
}

TAOS_MEMCPY(pTableMeta, tbMeta, metaSize);
if (tbMeta->schemaExt != NULL) {
if (useCompress(tbMeta->tableType) && tbMeta->schemaExt != NULL) {
pTableMeta->schemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
TAOS_MEMCPY(pTableMeta->schemaExt, tbMeta->schemaExt, schemaExtSize);
} else {
pTableMeta->schemaExt = NULL;
}
if (hasRefCol(tbMeta->tableType) && tbMeta->colRef) {
pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize);
TAOS_MEMCPY(pTableMeta->colRef, tbMeta->colRef, colRefSize);
} else {
pTableMeta->colRef = NULL;
}

CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
Expand All @@ -3529,6 +3549,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe

// PROCESS FOR CHILD TABLE

// TODO(smj): add virtual child table process
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
code = cloneTableMeta(lastTableMeta, &pTableMeta);
if (code) {
Expand Down Expand Up @@ -3646,7 +3667,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
}

TAOS_MEMCPY(&pTableMeta->sversion, &stbMeta->sversion, metaSize + schemaExtSize - sizeof(SCTableMeta));
if (stbMeta->schemaExt != NULL) {
if (useCompress(stbMeta->tableType) && stbMeta->schemaExt != NULL) {
pTableMeta->schemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
} else {
pTableMeta->schemaExt = NULL;
Expand Down
12 changes: 11 additions & 1 deletion source/libs/catalog/src/ctgUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -1650,11 +1650,15 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
int32_t schemaExtSize = 0;
int32_t colRefSize = 0;
if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
schemaExtSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
if (hasRefCol(output->tbMeta->tableType) && (*pOutput)->tbMeta->colRef) {
colRefSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SColRef);
}

(*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize);
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta);
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
Expand All @@ -1669,6 +1673,12 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
} else {
(*pOutput)->tbMeta->schemaExt = NULL;
}
if (hasRefCol(output->tbMeta->tableType) && (*pOutput)->tbMeta->colRef) {
(*pOutput)->tbMeta->colRef = (SColRef*)((char*)(*pOutput)->tbMeta + metaSize + schemaExtSize);
TAOS_MEMCPY((*pOutput)->tbMeta->colRef, output->tbMeta->colRef, colRefSize);
} else {
(*pOutput)->tbMeta->colRef = NULL;
}
}

return TSDB_CODE_SUCCESS;
Expand Down
Loading

0 comments on commit 81fc4c5

Please sign in to comment.