diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3f4e9864aaad..07aff292f831 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -460,13 +460,14 @@ int tscSendMsgToServer(SSqlObj *pSql) { } - if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { - if(pSql->cmd.command < TSDB_SQL_HB) + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) { + if(pSql->cmd.command == TSDB_SQL_SELECT) rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); return TSDB_CODE_SUCCESS; } - return TSDB_CODE_SUCCESS; + tscError("0x%"PRIx64" rpc send data failed. msg=%s", pSql->self, taosMsg[pSql->cmd.msgType]); + return TSDB_CODE_TSC_SEND_DATA_FAILED; } // handle three situation diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index ab34d99bfc4b..09574e741fe7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3296,7 +3296,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this // function while kill query by a user. if (param == NULL) { - assert(code != TSDB_CODE_SUCCESS); + if(code != TSDB_CODE_SUCCESS) + tscError("tscRetrieveDataRes param is NULL, error code=%d", code); return; } diff --git a/src/inc/taos.h b/src/inc/taos.h index 5cb0420fe2e8..44d83969a809 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -48,6 +48,12 @@ typedef void **TAOS_ROW; #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_JSON 15 // json string +typedef enum { + BOOL_FALSE = 0, + BOOL_TRUE = 1, + BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false +} TBOOL; + typedef enum { TSDB_OPTION_LOCALE, TSDB_OPTION_CHARSET, diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 9f51952d28ab..fc387d331b07 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -118,6 +118,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_RES_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0227) //"Result set too large to be output") #define TSDB_CODE_TSC_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0228) //"invalid table schema version") #define TSDB_CODE_TSC_TOO_MANY_SML_LINES TAOS_DEF_ERROR_CODE(0, 0x0229) //"too many lines in batch") +#define TSDB_CODE_TSC_SEND_DATA_FAILED TAOS_DEF_ERROR_CODE(0, 0x0230) //"Client send request data error" // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed" diff --git a/src/inc/trpc.h b/src/inc/trpc.h index db2978569993..fe061eb4f20f 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,7 +85,7 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index dc69cb890557..6566bf9e79f3 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); @@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { return; } -static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { +static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); @@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcConn *pConn = rpcSetupConnToServer(pContext); if (pConn == NULL) { pContext->code = terrno; + // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); - return false; + return BOOL_ASYNC; } pContext->pConn = pConn; @@ -1436,7 +1437,16 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); - return ret; + + if(ret == BOOL_FALSE) { + // try next ip again + pContext->code = terrno; + // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query + taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); + return BOOL_ASYNC; + } + + return BOOL_TRUE; } static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -1478,8 +1488,6 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { rpcMsg.msgType = pContext->msgType+1; rpcMsg.ahandle = pContext->ahandle; @@ -1487,9 +1495,11 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.pCont = NULL; rpcMsg.contLen = 0; + tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP + tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType); pContext->epSet.inUse++; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); diff --git a/src/util/src/terror.c b/src/util/src/terror.c index 640e842de340..f66152988ee3 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -125,6 +125,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE, "Invalid line protocol TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PRECISION_TYPE, "Invalid timestamp precision type") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_RES_TOO_MANY, "Result set too large to be output") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_TOO_MANY_SML_LINES, "Too many lines in batch") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SEND_DATA_FAILED, "Client send request data failed") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")