Skip to content

Commit

Permalink
Merge pull request #16147 from taosdata/fix/TS-1770-V26
Browse files Browse the repository at this point in the history
fix(rpc): insert data send failed
  • Loading branch information
DuanKuanJun authored Aug 18, 2022
2 parents 9d7ef2c + e204332 commit d9c580a
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 12 deletions.
7 changes: 4 additions & 3 deletions src/client/src/tscServer.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,14 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}


if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) {
if(pSql->cmd.command < TSDB_SQL_HB)
if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid) != BOOL_FALSE) {
if(pSql->cmd.command == TSDB_SQL_SELECT)
rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext);
return TSDB_CODE_SUCCESS;
}

return TSDB_CODE_SUCCESS;
tscError("0x%"PRIx64" rpc send data failed. msg=%s", pSql->self, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_SEND_DATA_FAILED;
}

// handle three situation
Expand Down
3 changes: 2 additions & 1 deletion src/client/src/tscSubquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -3296,7 +3296,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
// the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
// function while kill query by a user.
if (param == NULL) {
assert(code != TSDB_CODE_SUCCESS);
if(code != TSDB_CODE_SUCCESS)
tscError("tscRetrieveDataRes param is NULL, error code=%d", code);
return;
}

Expand Down
6 changes: 6 additions & 0 deletions src/inc/taos.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string

typedef enum {
BOOL_FALSE = 0,
BOOL_TRUE = 1,
BOOL_ASYNC = 2 //request is processed by async for another thread, not now true or false
} TBOOL;

typedef enum {
TSDB_OPTION_LOCALE,
TSDB_OPTION_CHARSET,
Expand Down
1 change: 1 addition & 0 deletions src/inc/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_RES_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0227) //"Result set too large to be output")
#define TSDB_CODE_TSC_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0228) //"invalid table schema version")
#define TSDB_CODE_TSC_TOO_MANY_SML_LINES TAOS_DEF_ERROR_CODE(0, 0x0229) //"too many lines in batch")
#define TSDB_CODE_TSC_SEND_DATA_FAILED TAOS_DEF_ERROR_CODE(0, 0x0230) //"Client send request data error"

// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed"
Expand Down
2 changes: 1 addition & 1 deletion src/inc/trpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
Expand Down
24 changes: 17 additions & 7 deletions src/rpc/src/rpcMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);

static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
Expand Down Expand Up @@ -394,7 +394,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}

bool rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
TBOOL rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;

Expand Down Expand Up @@ -1384,7 +1384,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
return;
}

static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen);
Expand All @@ -1394,8 +1394,9 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcConn *pConn = rpcSetupConnToServer(pContext);
if (pConn == NULL) {
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return false;
return BOOL_ASYNC;
}

pContext->pConn = pConn;
Expand Down Expand Up @@ -1436,7 +1437,16 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);

rpcUnlockConn(pConn);
return ret;

if(ret == BOOL_FALSE) {
// try next ip again
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
return BOOL_ASYNC;
}

return BOOL_TRUE;
}

static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
Expand Down Expand Up @@ -1478,18 +1488,18 @@ static void rpcProcessConnError(void *param, void *id) {
return;
}

tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);

if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
rpcMsg.msgType = pContext->msgType+1;
rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;

tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
rpcNotifyClient(pContext, &rpcMsg);
} else {
// move to next IP
tWarn("%s %p, connection error. retry to send request again. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
pContext->epSet.inUse++;
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext);
Expand Down
1 change: 1 addition & 0 deletions src/util/src/terror.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE, "Invalid line protocol
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_RES_TOO_MANY, "Result set too large to be output")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_TOO_MANY_SML_LINES, "Too many lines in batch")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SEND_DATA_FAILED, "Client send request data failed")

// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
Expand Down

0 comments on commit d9c580a

Please sign in to comment.