Skip to content

Commit

Permalink
Merge pull request #26322 from taosdata/fix/TS-4921
Browse files Browse the repository at this point in the history
fix:[TS-4921]errors in test & disable slow log monitor
  • Loading branch information
dapan1121 authored Jun 27, 2024
2 parents 0716a64 + ad6eb20 commit 1b17bec
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
8 changes: 8 additions & 0 deletions source/client/src/clientHb.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "scheduler.h"
#include "trpc.h"
#include "tglobal.h"
#include "clientMonitor.h"

typedef struct {
union {
Expand Down Expand Up @@ -546,7 +547,14 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}

SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
int32_t oldInterval = pInst->monitorParas.tsMonitorInterval;
pInst->monitorParas = pRsp.monitorParas;
if(oldInterval > pInst->monitorParas.tsMonitorInterval){
char* value = taosStrdup("");
if(monitorPutData2MonitorQueue(pInst->clusterId, value) < 0){
taosMemoryFree(value);
}
}
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);

Expand Down
49 changes: 36 additions & 13 deletions source/client/src/clientMonitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *
while(1){
int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset);
if (readSize <= 0) {
uError("failed to read len from file:%p since %s", pFile, terrstr());
if (readSize < 0)
uError("failed to read len from file:%p since %s", pFile, terrstr());
return;
}

Expand Down Expand Up @@ -220,8 +221,8 @@ static void reportSendProcess(void* param, void* tmrId) {

SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
taosRUnLockLatch(&monitorLock);
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
taosRUnLockLatch(&monitorLock);
}

static void sendAllSlowLog(){
Expand Down Expand Up @@ -423,7 +424,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**

MonitorClient* pMonitor = *ppMonitor;
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
if (ppCounter == NULL || *ppCounter != NULL) {
if (ppCounter == NULL || *ppCounter == NULL) {
uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName);
goto end;
}
Expand All @@ -449,7 +450,7 @@ static void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }

static void reportSlowLog(void* param, void* tmrId) {
taosRLockLatch(&monitorLock);
taosWLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
Expand All @@ -470,9 +471,11 @@ static void reportSlowLog(void* param, void* tmrId) {

SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep);
taosRUnLockLatch(&monitorLock);

taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
if((*(SlowLogClient**)tmp)->timer == tmrId){
taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &(*(SlowLogClient**)tmp)->timer);
}
taosWUnLockLatch(&monitorLock);
}

static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
Expand Down Expand Up @@ -534,6 +537,23 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
taosWUnLockLatch(&monitorLock);
}

static void restartReportTimer(int64_t clusterId){
taosWLockLatch(&monitorLock);

void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if(tmp){
taosTmrStopA(&(*(SlowLogClient**)tmp)->timer);
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL){
uError("failed to get app inst, clusterId:%"PRIx64, clusterId);
return;
}
(*(SlowLogClient**)tmp)->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)clusterId, monitorTimer);

}
taosWUnLockLatch(&monitorLock);
}

static void* monitorThreadFunc(void *param){
setThreadName("client-monitor-slowlog");

Expand All @@ -543,10 +563,6 @@ static void* monitorThreadFunc(void *param){
}
#endif

if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}

char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){
return NULL;
Expand All @@ -568,6 +584,10 @@ static void* monitorThreadFunc(void *param){
uError("open queue error since %s", terrstr());
return NULL;
}

if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}
uDebug("monitorThreadFunc start");
while (1) {
if (slowLogFlag > 0) break;
Expand All @@ -578,7 +598,9 @@ static void* monitorThreadFunc(void *param){
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
if (slowLogData->value == NULL){
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
}else{
} else if(strlen(slowLogData->value) == 0){
restartReportTimer(slowLogData->clusterId);
} else{
monitorWriteSlowLog2File(slowLogData, tmpPath);
}
}
Expand Down Expand Up @@ -658,6 +680,7 @@ void monitorClose() {
}

int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){
return -1; // disable slow log monitor
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
if (slowLogData == NULL) {
uError("[monitor] failed to allocate slow log data");
Expand All @@ -666,8 +689,8 @@ int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){
slowLogData->clusterId = clusterId;
slowLogData->value = value;
uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
while (monitorQueue == NULL) {
taosMsleep(100);
while (atomic_load_32(&slowLogFlag) == -1) {
taosMsleep(5);
}
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem);
Expand Down

0 comments on commit 1b17bec

Please sign in to comment.