Skip to content

Commit

Permalink
fix(query): add TBOOL type
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed Aug 17, 2022
1 parent e99f512 commit de5a515
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/client/src/tscServer.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}


if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) {
if(pSql->cmd.command == TSDB_SQL_SELECT)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext);
return TSDB_CODE_SUCCESS;
Expand Down
6 changes: 6 additions & 0 deletions src/inc/taos.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string

typedef enum {
BOOL_FALSE = 0,
BOOL_TRUE = 1,
BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false
} TBOOL;

typedef enum {
TSDB_OPTION_LOCALE,
TSDB_OPTION_CHARSET,
Expand Down
2 changes: 1 addition & 1 deletion src/inc/trpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
Expand Down
15 changes: 8 additions & 7 deletions src/rpc/src/rpcMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);

static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
Expand Down Expand Up @@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;

Expand Down Expand Up @@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return;
}

static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen);
Expand All @@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcConn *pConn = rpcSetupConnToServer(pContext);
if (pConn == NULL) {
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return false;
return BOOL_ASYNC;
}

pContext->pConn = pConn;
Expand Down Expand Up @@ -1436,7 +1437,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);

rpcUnlockConn(pConn);
return ret;
return ret ? BOOL_TRUE : BOOL_FALSE;
}

static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
Expand Down Expand Up @@ -1478,18 +1479,18 @@ static void rpcProcessConnError(void *param, void *id) {
return;
}

tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);

if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
rpcMsg.msgType = pContext->msgType+1;
rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;

tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
rpcNotifyClient(pContext, &rpcMsg);
} else {
// move to next IP
tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
pContext->epSet.inUse++;
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext);
Expand Down

0 comments on commit de5a515

Please sign in to comment.