diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c7563411642..a8cd62db094 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -771,6 +771,29 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { return 0; } +static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ + // iter all vnode to delete handle + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); + pReq->head.vgId = htonl(pVgEp->vgId); + pReq->vgId = pVgEp->vgId; + pReq->consumerId = -1; + memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); + STransAction action = {0}; + action.epSet = pVgEp->epSet; + action.pCont = pReq; + action.contLen = sizeof(SMqVDeleteReq); + action.msgType = TDMT_VND_TMQ_DELETE_SUB; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + return 0; +} + static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; @@ -831,6 +854,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); + code = sendDeleteSubToVnode(pSub, pTrans); + if (code != 0) { + goto end; + } + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = -1; @@ -1113,25 +1141,10 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) sdbCancelFetch(pSdb, pIter); return -1; } - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); - SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); - pReq->head.vgId = htonl(pVgEp->vgId); - pReq->vgId = pVgEp->vgId; - pReq->consumerId = -1; - memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - STransAction action = {0}; - action.epSet = pVgEp->epSet; - action.pCont = pReq; - action.contLen = sizeof(SMqVDeleteReq); - action.msgType = TDMT_VND_TMQ_DELETE_SUB; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; - } + if (sendDeleteSubToVnode(pSub, pTrans) != 0) { + sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); + return -1; } if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {