diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index ddeae022..804ac539 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -24,7 +24,6 @@ import ( "time" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env" - dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcPump "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/pump" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" @@ -634,7 +633,7 @@ func (b *Booster) unregisterWork() error { return nil } -func (b *Booster) sendAdditionFile() { +/*func (b *Booster) sendAdditionFile() { if b.config.Works.Local || b.config.Works.Degraded { return } @@ -683,7 +682,7 @@ func (b *Booster) sendAdditionFile() { return } blog.Infof("booster: finish send addition files: %v", b.config.Works.AdditionFiles) -} +}*/ func (b *Booster) runWorks( ctx context.Context, diff --git a/src/backend/booster/bk_dist/common/sdk/worker.go b/src/backend/booster/bk_dist/common/sdk/worker.go index 6c0c73c5..15ef9fb8 100644 --- a/src/backend/booster/bk_dist/common/sdk/worker.go +++ b/src/backend/booster/bk_dist/common/sdk/worker.go @@ -96,6 +96,7 @@ type FileDesc struct { NoDuplicated bool `json:"no_duplicated"` AllDistributed bool `json:"all_distributed"` Priority FileDescPriority `json:"priority"` + Retry bool `json:"retry"` } // UniqueKey define the file unique key diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 7d84ff15..f4587d96 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -39,8 +39,6 @@ const ( corkMaxSize = 1024 * 1024 * 10 // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB - - fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -68,12 +66,13 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { ), checkSendFileTick: 100 * time.Millisecond, fileSendMap: make(map[string]*fileSendMap), + failFileSendMap: make(map[string]*fileSendMap), fileCollectionSendMap: make(map[string]*[]*types.FileCollectionInfo), fileMessageBank: newFileMessageBank(), conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - toolChainRetryTick: 10 * time.Second, + retryCheckTick: 10 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -84,8 +83,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { } const ( - syncHostTimeTimes = 3 - toolChainRetryTimes = 10 + syncHostTimeTimes = 3 ) // Mgr describe the remote manager @@ -105,6 +103,9 @@ type Mgr struct { fileSendMutex sync.RWMutex fileSendMap map[string]*fileSendMap + failFileSendMutex sync.RWMutex + failFileSendMap map[string]*fileSendMap + fileCollectionSendMutex sync.RWMutex fileCollectionSendMap map[string]*[]*types.FileCollectionInfo @@ -114,12 +115,12 @@ type Mgr struct { conf *config.ServerConfig - resourceCheckTick time.Duration - workerCheckTick time.Duration - toolChainRetryTick time.Duration - lastUsed uint64 // only accurate to second now - lastApplied uint64 // only accurate to second now - remotejobs int64 // save job number which using remote worker + resourceCheckTick time.Duration + workerCheckTick time.Duration + retryCheckTick time.Duration + lastUsed uint64 // only accurate to second now + lastApplied uint64 // only accurate to second now + remotejobs int64 // save job number which using remote worker sendCorkTick time.Duration sendCorkChan chan bool @@ -136,7 +137,7 @@ type fileSendMap struct { cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -164,12 +165,6 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F for _, ci := range *c { if ci.Match(desc) { - //if worker is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed && !query { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) - ci.SendStatus = types.FileSending - return ci, false - } return ci, true } } @@ -178,6 +173,72 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F return info, false } +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool, error) { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + return nil, false, errors.New("file cache not found") + } + + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + return nil, false, fmt.Errorf("file %s not found, file cache is nil", desc.FilePath) + } + + for _, ci := range *c { + if ci.Match(desc) { + if ci.SendStatus == types.FileSendFailed && !query { + ci.SendStatus = types.FileSending + return ci, false, nil + } + return ci, true, nil + } + } + return nil, false, fmt.Errorf("file %s not found", desc.FilePath) +} + +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + blog.Warnf("file cache not found") + return []matchResult{} + } + + result := make([]matchResult, 0, len(descs)) + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + blog.Warnf("file %s not found", desc.FilePath) + continue + } + matched := false + for _, ci := range *c { + if ci.Match(*desc) { + fileMatched := true + if ci.SendStatus == types.FileSendFailed { + ci.SendStatus = types.FileSending + fileMatched = false + } + result = append(result, matchResult{ + info: ci, + match: fileMatched, + }) + matched = true + break + } + } + if matched { + continue + } + } + return result +} + func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -213,16 +274,9 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { matched := false for _, ci := range *c { if ci.Match(*desc) { - fileMatched := true - //if file is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) - fileMatched = false - ci.SendStatus = types.FileSending - } result = append(result, matchResult{ info: ci, - match: fileMatched, + match: true, }) matched = true break @@ -242,7 +296,48 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { return result } +func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { + if status == types.FileSendSucceed && !desc.Retry { + return + } + fsm.Lock() + defer fsm.Unlock() + + info := &types.FileInfo{ + FullPath: desc.FilePath, + Size: desc.FileSize, + LastModifyTime: desc.Lastmodifytime, + Md5: desc.Md5, + TargetRelativePath: desc.Targetrelativepath, + FileMode: desc.Filemode, + LinkTarget: desc.LinkTarget, + SendStatus: status, + } + if fsm.cache == nil { + fsm.cache = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.cache[info.FullPath] + if !ok || fc == nil || len(*fc) == 0 { + infoList := []*types.FileInfo{info} + fsm.cache[info.FullPath] = &infoList + blog.Debugf("file: update failed files with add:%v", info) + return + } + for _, ci := range *fc { + if ci.Match(desc) { + blog.Debugf("file: update failed files with refresh before:%v", ci) + ci.SendStatus = status + blog.Debugf("file: update failed files with refresh:%v", ci) + return + } + } + *fc = append(*fc, info) +} + func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { + if status != types.FileSendSucceed && desc.Retry { + return + } fsm.Lock() defer fsm.Unlock() @@ -259,7 +354,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS FileMode: desc.Filemode, LinkTarget: desc.LinkTarget, SendStatus: status, - FailCount: 0, } c, ok := fsm.cache[desc.FilePath] @@ -272,21 +366,14 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status - if status == types.FileSendFailed { - ci.FailCount++ - } - if status == types.FileSendSucceed { - ci.FailCount = 0 - } return } } *c = append(*c, info) - return } -func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { +func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() @@ -298,11 +385,9 @@ func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { if !ok || c == nil || len(*c) == 0 { continue } - for _, ci := range *fsm.cache[desc.FilePath] { + for _, ci := range *c { if ci.Match(desc) { - if ci.FailCount > fileMaxFailCount { - return true - } + return ci.SendStatus != types.FileSendSucceed } } } @@ -310,6 +395,36 @@ func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { return false } +func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { + fsm.RLock() + defer fsm.RUnlock() + + failFiles := make([]dcSDK.FileDesc, 0) + for _, v := range fsm.cache { + if v == nil || len(*v) == 0 { + continue + } + for _, ci := range *v { + if ci.SendStatus != types.FileSendFailed { + continue + } + failFiles = append(failFiles, dcSDK.FileDesc{ + FilePath: ci.FullPath, + Compresstype: dcProtocol.CompressLZ4, + FileSize: ci.Size, + Lastmodifytime: ci.LastModifyTime, + Md5: ci.Md5, + Targetrelativepath: ci.TargetRelativePath, + Filemode: ci.FileMode, + LinkTarget: ci.LinkTarget, + NoDuplicated: true, + Retry: true, + }) + } + } + return failFiles +} + // Init do the initialization for remote manager // !! only call once !! func (m *Mgr) Init() { @@ -335,8 +450,11 @@ func (m *Mgr) Init() { if m.conf.AutoResourceMgr { go m.resourceCheck(ctx) } - - go m.workerCheck(ctx) + if m.work.ID() != "" { + go m.workerCheck(ctx) + go m.retryFailFiles(ctx) + go m.retrySendToolChains(ctx) + } if m.conf.SendCork { m.sendCorkChan = make(chan bool, 1000) @@ -479,6 +597,7 @@ func (m *Mgr) resourceCheck(ctx context.Context) { func (m *Mgr) workerCheck(ctx context.Context) { blog.Infof("remote: run worker check tick for work: %s", m.work.ID()) ticker := time.NewTicker(m.workerCheckTick) + defer ticker.Stop() for { @@ -486,7 +605,7 @@ func (m *Mgr) workerCheck(ctx context.Context) { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - + //recover dead worker case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) for _, w := range m.resource.GetDeadWorkers() { @@ -501,7 +620,107 @@ func (m *Mgr) workerCheck(ctx context.Context) { }(w) } + } + } +} + +func (m *Mgr) retrySendToolChains(ctx context.Context) { + ticker := time.NewTicker(m.workerCheckTick) + defer ticker.Stop() + var workerStatus sync.Map + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID()) + return + case <-ticker.C: + if m.failFileSendMap == nil || len(m.failFileSendMap) == 0 { + continue + } + handler := m.remoteWorker.Handler(0, nil, nil, nil) + hosts := m.work.Resource().GetHosts() + count := 0 + wg := make(chan string, len(hosts)) + for _, h := range hosts { + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { + continue + } + fileCollections := m.getFailedFileCollectionByHost(h.Server) + if len(fileCollections) == 0 { + continue + } + workerStatus.Store(h.Server, false) + count++ + go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ + Pid: 0, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }, fileCollections, wg) + } + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() + } + } +} + +func (m *Mgr) retryFailFiles(ctx context.Context) { + ticker := time.NewTicker(m.retryCheckTick) + defer ticker.Stop() + + var workerStatus sync.Map + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID()) + return + case <-ticker.C: + if m.failFileSendMap == nil || len(m.failFileSendMap) == 0 { + continue + } + hosts := m.work.Resource().GetHosts() + wg := make(chan string, len(hosts)) + count := 0 + for _, h := range hosts { + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { + continue + } + m.failFileSendMutex.Lock() + sendMap := m.failFileSendMap[h.Server] + if sendMap == nil { + m.failFileSendMutex.Unlock() + blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) + continue + } + m.failFileSendMutex.Unlock() + + failFiles := sendMap.getFailFiles() + if len(failFiles) == 0 { + continue + } + workerStatus.Store(h.Server, false) + count++ + go m.retrySendFiles(h, failFiles, wg) + } + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() } } } @@ -534,6 +753,34 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) + hosts := m.work.Resource().GetHosts() + for _, c := range req.Req.Commands { + for _, s := range hosts { + m.failFileSendMutex.Lock() + f := m.failFileSendMap[s.Server] + m.failFileSendMutex.Unlock() + if f == nil { + continue + } + if f.isFilesSendFailed(c.Inputfiles) { + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(s) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, s) + } + } + } + } + if len(req.BanWorkerList) == len(hosts) { + return nil, errors.New("no available worker, all worker are banned") + } + + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList) @@ -573,14 +820,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) - m.retrySendToolChain(handler, req) return nil, err } - if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { - return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } - ret, err = checkHttpConn(req) if err != nil { return ret, err @@ -588,7 +830,16 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { - req.BanWorkerList = append(req.BanWorkerList, req.Server) + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(req.Server) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, req.Server) + } var banlistStr string for _, s := range req.BanWorkerList { banlistStr = banlistStr + s.Server + "," @@ -666,22 +917,21 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -// check if files send to remote worker failed and no need to send again -func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - m.fileSendMutex.Unlock() - return false - } - m.fileSendMutex.Unlock() - - for _, c := range commands { - if target.hasReachedFailCount(c.Inputfiles) { - return true - } +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, host chan string) { + blog.Infof("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) + _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ + Pid: 1, + Req: failFiles, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }) + if err != nil { + blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) + } else { + blog.Infof("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } - return false + host <- h.Server } func (m *Mgr) ensureFilesWithPriority( @@ -756,8 +1006,8 @@ func (m *Mgr) ensureFiles( settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", len(fileDetails), m.work.ID(), pid, sandbox.Dir) - blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", - len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) + //blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", + // len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) rules := settings.FilterRules // pump模式下,一次编译依赖的可能有上千个文件,现在的流程会随机的添加到cork发送队列 @@ -784,7 +1034,7 @@ func (m *Mgr) ensureFiles( _, t, _ := rules.Satisfy(fd.File.FilePath) - blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) + //blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) if f.AllDistributed { t = dcSDK.FilterRuleHandleAllDistribution } @@ -794,7 +1044,7 @@ func (m *Mgr) ensureFiles( if f.CompressedSize == -1 || f.FileSize == -1 { t = dcSDK.FilterRuleHandleDefault } - + blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) servers := make([]*dcProtocol.Host, 0, 0) switch t { case dcSDK.FilterRuleHandleDefault: @@ -827,7 +1077,7 @@ func (m *Mgr) ensureFiles( } r = append(r, f.Targetrelativepath) - blog.Debugf("remote: debug ensure into fd.Servers") + //blog.Debugf("remote: debug ensure into fd.Servers") for _, s := range fd.Servers { if s == nil { continue @@ -986,11 +1236,19 @@ func (m *Mgr) ensureSingleFile( } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) - - status, ok := m.checkOrLockSendFile(host.Server, desc, false) - + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) + var status types.FileSendStatus + var ok bool + if desc.Retry { + status, ok, err = m.checkOrLockSendFailFile(host.Server, desc, false) + if err != nil { // 没找到文件不处理,直接返回不影响其他失败文件发送 + blog.Warnf("remote: checkOrLockSendFailFile(%s) failed: %v", host.Server, err) + return err + } + } else { + status, ok = m.checkOrLockSendFile(host.Server, desc) + } // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ @@ -1002,14 +1260,18 @@ func (m *Mgr) ensureSingleFile( select { case <-tick.C: // 不是发送文件的goroutine,不需要修改状态,仅查询状态 - status, _ = m.checkOrLockSendFile(host.Server, desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, desc) + } } } switch status { case types.FileSendFailed: blog.Errorf("remote: failed to ensure single file(%s) for work(%s) to server(%s), "+ - "file already sent and failed", desc.FilePath, m.work.ID(), host.Server) + "file already sent and failed with retry %v", desc.FilePath, m.work.ID(), host.Server, desc.Retry) return types.ErrSendFileFailed case types.FileSendSucceed: blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", @@ -1038,8 +1300,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -1074,8 +1336,8 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) return nil } @@ -1099,7 +1361,11 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { select { case <-tick.C: // 不是发送文件的goroutine,不能修改状态 - status, _ = m.checkOrLockSendFile(host.Server, *desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, *desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, *desc) + } } } @@ -1210,10 +1476,9 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { t1 := time.Now().Local() m.fileSendMutex.Lock() - t2 := time.Now().Local() if d1 := t2.Sub(t1); d1 > 50*time.Millisecond { // blog.Debugf("check cache lock wait too long server(%s): %s", server, d1.String()) @@ -1232,10 +1497,29 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc, query) + info, match := target.matchOrInsert(desc) return info.SendStatus, match } +func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool, error) { + m.failFileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.failFileSendMutex.Unlock() + + info, match, err := target.matchFail(desc, query) + if err != nil { + return types.FileSendUnknown, false, err + } + if info == nil { + return types.FileSendUnknown, false, errors.New("file is nil") + } + return info.SendStatus, match, nil +} + type matchResult struct { info *types.FileInfo match bool @@ -1243,15 +1527,27 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - target = &fileSendMap{} - m.fileSendMap[server] = target - } - m.fileSendMutex.Unlock() + if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件 + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + target = &fileSendMap{} + m.fileSendMap[server] = target + } + m.fileSendMutex.Unlock() + + return target.matchOrInserts(descs) + } else { // 失败重试的文件 + m.fileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.fileSendMutex.Unlock() - return target.matchOrInserts(descs) + return target.matchFails(descs) + } } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1262,9 +1558,17 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi m.fileSendMap[server] = target } m.fileSendMutex.Unlock() - target.updateStatus(desc, status) - return + + m.failFileSendMutex.Lock() + failTarget, ok := m.failFileSendMap[server] + if !ok { + failTarget = &fileSendMap{} + m.failFileSendMap[server] = failTarget + } + m.failFileSendMutex.Unlock() + + failTarget.updateFailStatus(desc, status) } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1310,71 +1614,41 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } -// getFailedFileCollectionByHost 返回文件集合,如果文件集没有全部完成,返回false,否则返回true -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { +// getFailedFileCollectionByHost 返回失败文件集合 +func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollectionInfo { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server) + blog.Infof("remote: no found host(%s) in file send cache") + return nil } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { - //如果有fc未到终结状态,则直接返回 - if !re.SendStatus.IsFinished() { - blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) - return nil, false, nil - } + re.Retry = true if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } - return fcs, true, nil + return fcs } -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - if m.resource.CanWorkerRetry(req.Server) { - go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { - for i := 0; i < toolChainRetryTimes; { - fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) - if !isNotTeminated { - time.Sleep(m.toolChainRetryTick) - continue - } - i++ - blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", - m.work.ID(), i, req.Pid, req.Server.Server) - if err != nil || len(fileCollections) == 0 { - if err != nil { - blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) - } else { - blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } - time.Sleep(m.toolChainRetryTick) - continue - } - - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { - blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ - "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) - time.Sleep(m.toolChainRetryTick) - } else { - // enable worker - m.resource.EnableWorker(req.Server) - blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - return - } +// retry send failed tool chain +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, host chan string) { + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", + m.work.ID(), req.Pid, req.Server.Server) + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + if err != nil { + blog.Errorf("remote: retry send tool chain for work(%s) from pid(%d) to server(%s), "+ + "send tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) - } - m.resource.SetWorkerStatus(req.Server, RetryFailed) - blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) - }(handler, *req) } else { - blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)", - req.Server.Server, m.work.ID(), req.Pid, req.Server.Server) + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } - + host <- req.Server.Server } func (m *Mgr) sendFileCollectionOnce( @@ -1506,8 +1780,33 @@ func (m *Mgr) ensureOneFileCollection( Servers = append(Servers, host) fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) + + var failsendMap *fileSendMap + if fc.Retry { + m.failFileSendMutex.Lock() + failsendMap = m.failFileSendMap[host.Server] + if failsendMap == nil { + m.failFileSendMutex.Unlock() + blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) + return errors.New("remote: send file with no send map") + } + m.failFileSendMutex.Unlock() + } for _, f := range fc.Files { f.NoDuplicated = true + if fc.Retry { + for _, c := range failsendMap.cache { + for _, d := range *c { + if d.Match(f) { + f.Retry = fc.Retry + break + } + } + if f.Retry { + break + } + } + } fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f, @@ -1549,7 +1848,7 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { // set status to sending if fc send failed - if f.SendStatus == types.FileSendFailed { + if f.SendStatus == types.FileSendFailed && fc.Retry { f.SendStatus = types.FileSending return f.SendStatus, false } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index 7bcbb79d..17b94f80 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -282,36 +282,6 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) } -func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wo.workerLock.Lock() - defer wo.workerLock.Unlock() - - for _, wk := range wo.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead,do nothing now", host) - return false - } - - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying,do nothing now", host) - return true - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return false - } - - return false -} - func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { wo.workerLock.Lock() defer wo.workerLock.Unlock() diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 5b48381e..b4ad8406 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -28,7 +28,6 @@ type RemoteSlotMgr interface { RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) EnableWorker(host *dcProtocol.Host) - CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) @@ -299,37 +298,6 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) } -func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wr.workerLock.Lock() - defer wr.workerLock.Unlock() - for _, wk := range wr.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead, do nothing now", host) - return false - } - if !wk.disabled { - return false - } - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying, do nothing now", host) - return false - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return true - } - - return false -} - func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { wr.workerLock.Lock() defer wr.workerLock.Unlock() @@ -538,19 +506,26 @@ func (wr *resource) addWorker(host *dcProtocol.Host) { return } -func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) *worker { +func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) (*worker, bool) { var w *worker max := 0 + hasAvailableWorker := false for _, worker := range wr.worker { if worker.disabled || worker.dead { continue } + matched := false for _, host := range banWorkerList { if worker.host.Equal(host) { - continue + matched = true + break } } + if matched { + continue + } + hasAvailableWorker = true free := worker.totalSlots - worker.occupiedSlots if free >= max { max = free @@ -562,19 +537,32 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) // w = wr.worker[0] // } - return w + return w, hasAvailableWorker } // 大文件优先 -func (wr *resource) getWorkerLargeFileFirst(f string) *worker { +func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) (*worker, bool) { var w *worker max := 0 inlargequeue := false + hasAvailableWorker := false for _, worker := range wr.worker { if worker.disabled || worker.dead { continue } + matched := false + for _, host := range banWorkerList { + if worker.host.Equal(host) { + matched = true + break + } + } + if matched { + continue + } + + hasAvailableWorker = true free := worker.totalSlots - worker.occupiedSlots // 在资源空闲时,大文件优先 @@ -600,33 +588,34 @@ func (wr *resource) getWorkerLargeFileFirst(f string) *worker { if w == nil { // w = wr.worker[0] - return w + return w, hasAvailableWorker } if f != "" && !w.hasFile(f) { w.largefiles = append(w.largefiles, f) } - return w + return w, hasAvailableWorker } -func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host { +func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) (*dcProtocol.Host, bool) { wr.workerLock.Lock() defer wr.workerLock.Unlock() var w *worker + var hasWorkerAvailable bool if f == "" { - w = wr.getWorkerWithMostFreeSlots(banWorkerList) + w, hasWorkerAvailable = wr.getWorkerWithMostFreeSlots(banWorkerList) } else { - w = wr.getWorkerLargeFileFirst(f) + w, hasWorkerAvailable = wr.getWorkerLargeFileFirst(f, banWorkerList) } if w == nil { - return nil + return nil, hasWorkerAvailable } _ = w.occupySlot() - return w.host + return w.host, hasWorkerAvailable } func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) { @@ -644,6 +633,9 @@ func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) { } func (wr *resource) handleLock(ctx context.Context) { + ticker := time.NewTicker(time.Second * 20) + + defer ticker.Stop() wr.ctx = ctx for { @@ -656,6 +648,8 @@ func (wr *resource) handleLock(ctx context.Context) { wr.getSlot(msg) case <-wr.emptyChan: wr.onSlotEmpty() + case <-ticker.C: + wr.occupyWaitList() } } } @@ -693,13 +687,14 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 { set := wr.getUsageSet(usage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", - wr.totalSlots, wr.occupiedSlots) - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) - satisfied = true + if h, _ := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", + wr.totalSlots, wr.occupiedSlots) + msg.result <- h + satisfied = true + } } } @@ -715,24 +710,31 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { func (wr *resource) putSlot(msg lockWorkerMessage) { wr.freeWorkerSlots(msg.toward) wr.occupiedSlots-- + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.totalSlots) usage := msg.jobUsage set := wr.getUsageSet(usage) set.occupied-- // check whether other waiting is satisfied now if wr.waitingList.Len() > 0 { + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.waitingList.Len()) for e := wr.waitingList.Front(); e != nil; e = e.Next() { msg := e.Value.(*lockWorkerMessage) set := wr.getUsageSet(msg.jobUsage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{}) - - wr.waitingList.Remove(e) - - break + if h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + break + } else if !hasAvailableWorker { + msg.result <- nil + wr.waitingList.Remove(e) + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + } } } } @@ -748,3 +750,28 @@ func (wr *resource) onSlotEmpty() { wr.waitingList.Remove(e) } } + +func (wr *resource) occupyWaitList() { + if wr.waitingList.Len() > 0 { + for e := wr.waitingList.Front(); e != nil; e = e.Next() { + msg := e.Value.(*lockWorkerMessage) + set := wr.getUsageSet(msg.jobUsage) + if wr.isIdle(set) { + h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) + if h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + blog.Debugf("remote slot: occupy waiting list") + } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + msg.result <- nil + wr.waitingList.Remove(e) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + } + } + } + } +} diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 496b612c..2523d70e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -80,11 +80,13 @@ type worker struct { func (wr *worker) occupySlot() error { wr.occupiedSlots++ + blog.Debugf("worker: occupy slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil } func (wr *worker) freeSlot() error { wr.occupiedSlots-- + blog.Debugf("worker: free slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index e264a5f7..cc06b055 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -485,6 +485,7 @@ type FileCollectionInfo struct { SendStatus FileSendStatus `json:"send_status"` Files []dcSDK.FileDesc `json:"files"` Timestamp int64 `json:"timestamp"` + Retry bool `json:"retry"` } // FileInfo record file info @@ -497,7 +498,6 @@ type FileInfo struct { FileMode uint32 `json:"file_mode"` LinkTarget string `json:"link_target"` SendStatus FileSendStatus `json:"send_status"` - FailCount int `json:"fail_count"` } // Match check if the FileDesc is point to some file as this FileInfo diff --git a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go index 957df5ae..5c129625 100644 --- a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go +++ b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go @@ -424,9 +424,9 @@ func (r *CommonRemoteHandler) ExecuteSendFile( // 加本地资源锁 locallocked := false if mgr != nil { - if mgr.LockSlots(dcSDK.JobUsageLocalExe, 1) { + if mgr.LockSlots(dcSDK.JobUsageDefault, 1) { locallocked = true - blog.Debugf("remotehandle: succeed to get one local lock") + blog.Debugf("remotehandle: succeed to get one default lock") } } @@ -466,8 +466,8 @@ func (r *CommonRemoteHandler) ExecuteSendFile( t1 = t2 if locallocked { - mgr.UnlockSlots(dcSDK.JobUsageLocalExe, 1) - blog.Debugf("remotehandle: succeed to release one local lock") + mgr.UnlockSlots(dcSDK.JobUsageDefault, 1) + blog.Debugf("remotehandle: succeed to release one default lock") } if err != nil { diff --git a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go index 36ccd415..d8828ab5 100644 --- a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go +++ b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go @@ -212,9 +212,9 @@ func (r *CommonRemoteHandler) ExecuteSendFileLongTCP( // 加本地资源锁 locallocked := false if mgr != nil { - if mgr.LockSlots(dcSDK.JobUsageLocalExe, 1) { + if mgr.LockSlots(dcSDK.JobUsageDefault, 1) { locallocked = true - blog.Infof("remotehandle: succeed to get one local lock") + blog.Infof("remotehandle: succeed to get one default lock") } } @@ -261,8 +261,8 @@ func (r *CommonRemoteHandler) ExecuteSendFileLongTCP( t1 = t2 if locallocked { - mgr.UnlockSlots(dcSDK.JobUsageLocalExe, 1) - blog.Infof("remotehandle: succeed to release one local lock") + mgr.UnlockSlots(dcSDK.JobUsageDefault, 1) + blog.Infof("remotehandle: succeed to release one default lock") } if err != nil { diff --git a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt index 23c6b25c..bba6d030 100644 --- a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt +++ b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt @@ -5,6 +5,7 @@ import com.tencent.devops.turbo.model.TTurboDaySummaryEntity import com.tencent.devops.turbo.pojo.TurboDaySummaryOverviewModel import org.bson.Document import org.springframework.beans.factory.annotation.Autowired +import org.springframework.data.domain.Sort import org.springframework.data.mongodb.core.FindAndModifyOptions import org.springframework.data.mongodb.core.MongoTemplate import org.springframework.data.mongodb.core.aggregation.Aggregation @@ -144,8 +145,9 @@ class TurboSummaryDao @Autowired constructor( val skip = Aggregation.skip((pageNum * pageSize).toLong()) val limit = Aggregation.limit(pageSize.toLong()) + val sort = Aggregation.sort(Sort.Direction.ASC, "_id") - val aggregation = Aggregation.newAggregation(match, group, skip, limit) + val aggregation = Aggregation.newAggregation(match, group, sort, skip, limit) val queryResults: AggregationResults = mongoTemplate.aggregate(aggregation, "t_turbo_day_summary_entity", TurboDaySummaryOverviewModel::class.java) return queryResults.mappedResults diff --git a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt index 7513c099..b811e177 100644 --- a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt +++ b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt @@ -85,13 +85,6 @@ class BkMetricsDailyJob @Autowired constructor( turboSaveTime = saveTime ) - bkMetricsRabbitTemplate.convertAndSend(EXCHANGE_METRICS_STATISTIC_TURBO_DAILY, "", - JsonUtil.toJson(bkMetricsMessage)) { message: Message -> - val messageProperties = message.messageProperties - messageProperties.setHeader("contentType", "application/json") - messageProperties.setHeader("contentEncoding", "UTF-8") - messageProperties.deliveryMode = MessageDeliveryMode.PERSISTENT - message - } + bkMetricsRabbitTemplate.convertAndSend(EXCHANGE_METRICS_STATISTIC_TURBO_DAILY, "", bkMetricsMessage) } } diff --git a/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt b/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt index 1114f983..6ab476fa 100644 --- a/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt +++ b/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt @@ -12,7 +12,7 @@ const val ROUTE_TURBO_PLUGIN_DATA = "route.turbo.plugin.data.new" /** * 蓝盾度量数据上报 */ -const val EXCHANGE_METRICS_STATISTIC_TURBO_DAILY = "e.metrics.statistic.turbo.daily" +const val EXCHANGE_METRICS_STATISTIC_TURBO_DAILY = "metrics.statistic.turbo.daily" /** * 蓝盾项目停用广播通知