Skip to content

Commit

Permalink
Merge pull request #29229 from taosdata/enh/TS-5749-3.0
Browse files Browse the repository at this point in the history
enh: seperate tsdb async tasks to different thread pools
  • Loading branch information
guanshengliang authored Dec 20, 2024
2 parents 99377ae + 8b77ac9 commit 1a176e6
Show file tree
Hide file tree
Showing 21 changed files with 240 additions and 176 deletions.
1 change: 1 addition & 0 deletions include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ extern int32_t tsRetentionSpeedLimitMB;

extern const char *tsAlterCompactTaskKeywords;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;

// sync raft
extern int32_t tsElectInterval;
Expand Down
3 changes: 2 additions & 1 deletion source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
*/

#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "cJSON.h"
#include "defines.h"
#include "os.h"
#include "osString.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tgrant.h"
#include "tjson.h"
#include "tlog.h"
Expand Down Expand Up @@ -104,6 +104,7 @@ int32_t tsRetentionSpeedLimitMB = 0; // unlimited

const char *tsAlterCompactTaskKeywords = "max_compact_tasks";
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;

// sync raft
int32_t tsElectInterval = 25 * 1000;
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}

extern void tsdbAlterMaxCompactTasks();
extern void tsdbAlterNumCompactThreads();
static int32_t dmAlterMaxCompactTask(const char *value) {
int32_t max_compact_tasks;
char *endptr = NULL;
Expand All @@ -489,7 +489,7 @@ static int32_t dmAlterMaxCompactTask(const char *value) {
dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
tsNumOfCompactThreads = max_compact_tasks;
#ifdef TD_ENTERPRISE
tsdbAlterMaxCompactTasks();
(void)tsdbAlterNumCompactThreads();
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mgmt/mgmt_vnode/src/vmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("vnode-sync", "initialized");

if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
dError("failed to init vnode since %s", tstrerror(code));
goto _OVER;
}
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/inc/vnode.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ extern const SVnodeCfg vnodeCfgDefault;

typedef void (*StopDnodeFp)();

int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp);
int32_t vnodeInit(StopDnodeFp stopDnodeFp);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
bool vnodeShouldRemoveWal(SVnode *pVnode);
Expand Down
3 changes: 0 additions & 3 deletions source/dnode/vnode/src/inc/tsdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -1083,9 +1083,6 @@ void tsdbRemoveFile(const char *path);
} \
} while (0)

int32_t tsdbInit();
void tsdbCleanUp();

#ifdef __cplusplus
}
#endif
Expand Down
22 changes: 19 additions & 3 deletions source/dnode/vnode/src/inc/vnd.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,32 @@ typedef enum {
EVA_PRIORITY_LOW,
} EVAPriority;

int32_t vnodeAsyncOpen(int32_t numOfThreads);
typedef enum {
EVA_TASK_COMMIT = 1,
EVA_TASK_MERGE,
EVA_TASK_COMPACT,
EVA_TASK_RETENTION,
} EVATaskT;

#define COMMIT_TASK_ASYNC 1
#define MERGE_TASK_ASYNC 2
#define COMPACT_TASK_ASYNC 3
#define RETENTION_TASK_ASYNC 4

int32_t vnodeAsyncOpen();
void vnodeAsyncClose();
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID);
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
SVATaskID* taskID);
int32_t vnodeAsyncC(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID);
void vnodeAWait(SVATaskID* taskID);
int32_t vnodeACancel(SVATaskID* taskID);
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);

const char* vnodeGetATaskName(EVATaskT task);

// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
Expand Down
3 changes: 1 addition & 2 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ struct SVnode {
SVBufPool* onRecycle;

// commit variables
SVAChannelID commitChannel;
SVATaskID commitTask;
SVATaskID commitTask;

SMeta* pMeta;
SSma* pSma;
Expand Down
6 changes: 3 additions & 3 deletions source/dnode/vnode/src/tsdb/tsdbCommit2.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
// begin tasks on file set
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
tsdbBeginTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT, &fset);
if (fset) {
code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
if (code) {
Expand Down Expand Up @@ -712,7 +712,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->fset) {
tsdbFinishTaskOnFileSet(tsdb, info->fid);
tsdbFinishTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT);
}
}

Expand Down Expand Up @@ -743,7 +743,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid);
tsdbFinishTaskOnFileSet(pTsdb, info->fid, EVA_TASK_COMMIT);
}
}
(void)taosThreadMutexUnlock(&pTsdb->mutex);
Expand Down
111 changes: 64 additions & 47 deletions source/dnode/vnode/src/tsdb/tsdbFS2.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ extern void tsdbStopAllCompTask(STsdb *tsdb);

int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSystem *fs = pTsdb->pFS;
SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID));
if (channelArray == NULL) {
SArray *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
if (asyncTasks == NULL) {
return terrno;
}

Expand All @@ -783,30 +783,31 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
// collect channel
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->channelOpened) {
if (taosArrayPush(channelArray, &fset->channel) == NULL) {
taosArrayDestroy(channelArray);
(void)taosThreadMutexUnlock(&pTsdb->mutex);
return terrno;
}
fset->channel = (SVAChannelID){0};
fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false);
fset->channelOpened = false;
if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL //
|| taosArrayPush(asyncTasks, &fset->compactTask) == NULL //
|| taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) {
taosArrayDestroy(asyncTasks);
(void)taosThreadMutexUnlock(&pTsdb->mutex);
return terrno;
}
fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false);
}

(void)taosThreadMutexUnlock(&pTsdb->mutex);

// destroy all channels
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
SVAChannelID *channel = taosArrayGet(channelArray, i);
int32_t code = vnodeAChannelDestroy(channel, true);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
for (int32_t k = 0; k < 2; k++) {
for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
SVATaskID *task = taosArrayGet(asyncTasks, i);
if (k == 0) {
(void)vnodeACancel(task);
} else {
(void)vnodeAWait(task);
}
}
}
taosArrayDestroy(channelArray);
taosArrayDestroy(asyncTasks);

#ifdef TD_ENTERPRISE
tsdbStopAllCompTask(pTsdb);
Expand Down Expand Up @@ -934,9 +935,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
// bool skipMerge = false;
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger && (!fset->mergeScheduled)) {
code = tsdbTFileSetOpenChannel(fset);
TSDB_CHECK_CODE(code, lino, _exit);

SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
code = terrno;
Expand All @@ -946,7 +944,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;

code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL);
code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
}
Expand Down Expand Up @@ -1202,42 +1200,61 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev

void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }

void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
// Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;

tsdbFSGetFSet(tsdb->pFS, fid, fset);
if (sttTrigger == 1 && (*fset)) {
for (;;) {
if ((*fset)->taskRunning) {
(*fset)->numWaitTask++;

(void)taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex);
if (*fset == NULL) {
return;
}

tsdbFSGetFSet(tsdb->pFS, fid, fset);
struct STFileSetCond *cond = NULL;
if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
cond = &(*fset)->conds[0];
} else {
cond = &(*fset)->conds[1];
}

(*fset)->numWaitTask--;
} else {
(*fset)->taskRunning = true;
break;
}
while (1) {
if (cond->running) {
cond->numWait++;
(void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
cond->numWait--;
} else {
cond->running = true;
break;
}
tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid);
}

tsdbInfo("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
return;
}

void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
// Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
if (sttTrigger == 1) {
STFileSet *fset = NULL;
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset != NULL && fset->taskRunning) {
fset->taskRunning = false;
if (fset->numWaitTask > 0) {
(void)taosThreadCondSignal(&fset->beginTask);
}
tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
}

STFileSet *fset = NULL;
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset == NULL) {
return;
}

struct STFileSetCond *cond = NULL;
if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
cond = &fset->conds[0];
} else {
cond = &fset->conds[1];
}

cond->running = false;
if (cond->numWait > 0) {
(void)taosThreadCondSignal(&cond->cond);
}

tsdbInfo("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
return;
}

struct SFileSetReader {
Expand Down
5 changes: 3 additions & 2 deletions source/dnode/vnode/src/tsdb/tsdbFS2.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include "tsdbFSet2.h"
#include "vnd.h"

#ifndef _TSDB_FILE_SYSTEM_H
#define _TSDB_FILE_SYSTEM_H
Expand Down Expand Up @@ -61,8 +62,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
// other
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset);
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task);
// utils
int32_t save_fs(const TFileSetArray *arr, const char *fname);
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
Expand Down
27 changes: 10 additions & 17 deletions source/dnode/vnode/src/tsdb/tsdbFSet2.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,18 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
fset[0]->maxVerValid = VERSION_MAX;
TARRAY2_INIT(fset[0]->lvlArr);

// background task queue
(void)taosThreadCondInit(&(*fset)->beginTask, NULL);
(*fset)->taskRunning = false;
(*fset)->numWaitTask = 0;

// block commit variables
(void)taosThreadCondInit(&fset[0]->canCommit, NULL);
(*fset)->numWaitCommit = 0;
(*fset)->blockCommit = false;

for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
struct STFileSetCond *cond = &(*fset)->conds[i];
cond->running = false;
cond->numWait = 0;
(void)taosThreadCondInit(&cond->cond, NULL);
}

return 0;
}

Expand Down Expand Up @@ -648,8 +650,10 @@ void tsdbTFileSetClear(STFileSet **fset) {

TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);

(void)taosThreadCondDestroy(&(*fset)->beginTask);
(void)taosThreadCondDestroy(&(*fset)->canCommit);
for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
(void)taosThreadCondDestroy(&(*fset)->conds[i].cond);
}
taosMemoryFreeClear(*fset);
}
}
Expand Down Expand Up @@ -703,14 +707,3 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
}
return TARRAY2_SIZE(fset->lvlArr) == 0;
}

int32_t tsdbTFileSetOpenChannel(STFileSet *fset) {
int32_t code;
if (!fset->channelOpened) {
if ((code = vnodeAChannelInit(2, &fset->channel))) {
return code;
}
fset->channelOpened = true;
}
return 0;
}
Loading

0 comments on commit 1a176e6

Please sign in to comment.