diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a2f6a92c6c95..07aff292f831 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -460,7 +460,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { } - if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) { if(pSql->cmd.command == TSDB_SQL_SELECT) rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); return TSDB_CODE_SUCCESS; diff --git a/src/inc/taos.h b/src/inc/taos.h index 5cb0420fe2e8..44d83969a809 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -48,6 +48,12 @@ typedef void **TAOS_ROW; #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_JSON 15 // json string +typedef enum { + BOOL_FALSE = 0, + BOOL_TRUE = 1, + BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false +} TBOOL; + typedef enum { TSDB_OPTION_LOCALE, TSDB_OPTION_CHARSET, diff --git a/src/inc/trpc.h b/src/inc/trpc.h index db2978569993..fe061eb4f20f 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,7 +85,7 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index dc69cb890557..54df0525960f 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); @@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcConn *pConn = rpcSetupConnToServer(pContext); if (pConn == NULL) { pContext->code = terrno; + // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return false; + return BOOL_ASYNC; } pContext->pConn = pConn; @@ -1436,7 +1437,7 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); - return ret; + return ret ? BOOL_TRUE : BOOL_FALSE; } static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -1478,8 +1479,6 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { rpcMsg.msgType = pContext->msgType+1; rpcMsg.ahandle = pContext->ahandle; @@ -1487,9 +1486,11 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.pCont = NULL; rpcMsg.contLen = 0; + tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP + tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext);