From 86888eaf1a329aa36b12fe071aba00884c17aee1 Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 23 Oct 2024 17:12:09 +0800 Subject: [PATCH 01/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/booster/pkg/booster.go | 5 +- .../controller/pkg/manager/remote/mgr.go | 214 +++++++++++++----- .../pkg/manager/remote/slotbyworkeroffer.go | 4 +- .../controller/pkg/manager/remote/slots.go | 15 +- .../bk_dist/controller/pkg/types/manager.go | 1 - 5 files changed, 168 insertions(+), 71 deletions(-) diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index d6492e98..4aef6524 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -24,7 +24,6 @@ import ( "time" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env" - dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcPump "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/pump" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" @@ -630,7 +629,7 @@ func (b *Booster) unregisterWork() error { return nil } -func (b *Booster) sendAdditionFile() { +/*func (b *Booster) sendAdditionFile() { if b.config.Works.Local || b.config.Works.Degraded { return } @@ -679,7 +678,7 @@ func (b *Booster) sendAdditionFile() { return } blog.Infof("booster: finish send addition files: %v", b.config.Works.AdditionFiles) -} +}*/ func (b *Booster) runWorks( ctx context.Context, diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 2c51bca2..6a0566b8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -40,7 +40,7 @@ const ( // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB - fileMaxFailCount = 5 + //fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -73,7 +73,8 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - toolChainRetryTick: 10 * time.Second, + toolChainRetryTick: 6 * time.Second, + fileRetryCheckTick: 7 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -116,6 +117,7 @@ type Mgr struct { resourceCheckTick time.Duration workerCheckTick time.Duration + fileRetryCheckTick time.Duration toolChainRetryTick time.Duration lastUsed uint64 // only accurate to second now lastApplied uint64 // only accurate to second now @@ -166,7 +168,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if ci.Match(desc) { //if worker is send failed before, try to send it again if ci.SendStatus == types.FileSendFailed && !query { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) + blog.Debugf("file: retry send file %s", desc.FilePath) ci.SendStatus = types.FileSending return ci, false } @@ -216,7 +218,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fileMatched := true //if file is send failed before, try to send it again if ci.SendStatus == types.FileSendFailed { - blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) + blog.Debugf("file: retry send file %s", desc.FilePath) fileMatched = false ci.SendStatus = types.FileSending } @@ -259,7 +261,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS FileMode: desc.Filemode, LinkTarget: desc.LinkTarget, SendStatus: status, - FailCount: 0, } c, ok := fsm.cache[desc.FilePath] @@ -272,12 +273,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status - if status == types.FileSendFailed { - ci.FailCount++ - } - if status == types.FileSendSucceed { - ci.FailCount = 0 - } return } } @@ -286,7 +281,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS return } -func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { +/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() @@ -307,8 +302,57 @@ func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { } } + return false +}*/ + +func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { + fsm.RLock() + defer fsm.RUnlock() + + if fsm.cache == nil { + return false + } + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + continue + } + for _, ci := range *fsm.cache[desc.FilePath] { + if ci.Match(desc) { + if ci.SendStatus == types.FileSendFailed { + return true + } + } + } + } + return false } +func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { + fsm.RLock() + defer fsm.RUnlock() + + failFiles := make([]dcSDK.FileDesc, 0) + for _, v := range fsm.cache { + for _, ci := range *v { + if ci.SendStatus == types.FileSendFailed { + failFiles = append(failFiles, dcSDK.FileDesc{ + FilePath: ci.FullPath, + Compresstype: dcProtocol.CompressLZ4, + FileSize: ci.Size, + Lastmodifytime: ci.LastModifyTime, + Md5: "", + Targetrelativepath: ci.TargetRelativePath, + Filemode: ci.FileMode, + LinkTarget: ci.LinkTarget, + NoDuplicated: true, + //Priority: dcSDK.GetPriority(ci), + }) + } + } + } + return failFiles +} // Init do the initialization for remote manager // !! only call once !! @@ -335,8 +379,9 @@ func (m *Mgr) Init() { if m.conf.AutoResourceMgr { go m.resourceCheck(ctx) } - - go m.workerCheck(ctx) + if m.work.ID() != "" { + go m.workerCheck(ctx) + } if m.conf.SendCork { m.sendCorkChan = make(chan bool, 1000) @@ -479,14 +524,19 @@ func (m *Mgr) resourceCheck(ctx context.Context) { func (m *Mgr) workerCheck(ctx context.Context) { blog.Infof("remote: run worker check tick for work: %s", m.work.ID()) ticker := time.NewTicker(m.workerCheckTick) + fileRetryTicker := time.NewTicker(m.fileRetryCheckTick) + toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick) + defer ticker.Stop() + defer fileRetryTicker.Stop() + defer toolChainRetryTicker.Stop() for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - + //recover dead worker case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) for _, w := range m.resource.GetDeadWorkers() { @@ -501,7 +551,24 @@ func (m *Mgr) workerCheck(ctx context.Context) { }(w) } - + // retry send fail file for all workers + case <-fileRetryTicker.C: + blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID()) + for _, h := range m.work.Resource().GetHosts() { + go m.retrySendFiles(h) + } + //retry failed tool chain for all workers, recover disable worker + case <-toolChainRetryTicker.C: + blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID()) + handler := m.remoteWorker.Handler(0, nil, nil, nil) + for _, h := range m.work.Resource().GetHosts() { + go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ + Pid: 0, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }) + } } } } @@ -524,6 +591,22 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) + servers := m.work.Resource().GetHosts() + m.fileSendMutex.Lock() + for _, c := range req.Req.Commands { + for server, f := range m.fileSendMap { + if f.isFilesSendFailed(c.Inputfiles) { + for _, s := range servers { + if s.Server == server { + req.BanWorkerList = append(req.BanWorkerList, s) + break + } + } + } + } + } + m.fileSendMutex.Unlock() + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList) @@ -558,13 +641,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) - m.retrySendToolChain(handler, req) + //m.retrySendToolChain(handler, req) return nil, err } - if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { + /*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } + }*/ remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { req.BanWorkerList = append(req.BanWorkerList, req.Server) @@ -640,8 +723,34 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } +func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { + m.fileSendMutex.Lock() + sendMap := m.fileSendMap[h.Server] + if sendMap == nil { + m.fileSendMutex.Unlock() + return + } + m.fileSendMutex.Unlock() + failFiles := sendMap.getFailFiles() + if len(failFiles) == 0 { + return + } + + blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles) + if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ + Pid: 0, + Req: failFiles, + Server: h, + Sandbox: &dcSyscall.Sandbox{Dir: ""}, + Stats: &dcSDK.ControllerJobStats{}, + }); err != nil { + blog.Errorf("mgr: send remote file failed: %v", err) + } + blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID()) +} + // check if files send to remote worker failed and no need to send again -func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { +/*func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -656,7 +765,7 @@ func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand } } return false -} +}*/ func (m *Mgr) ensureFilesWithPriority( handler dcSDK.RemoteWorkerHandler, @@ -768,7 +877,7 @@ func (m *Mgr) ensureFiles( if f.CompressedSize == -1 || f.FileSize == -1 { t = dcSDK.FilterRuleHandleDefault } - + blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) servers := make([]*dcProtocol.Host, 0, 0) switch t { case dcSDK.FilterRuleHandleDefault: @@ -1307,47 +1416,32 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect return fcs, true, nil } +// retry send failed tool chain func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - if m.resource.CanWorkerRetry(req.Server) { - go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { - for i := 0; i < toolChainRetryTimes; { - fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) - if !isNotTeminated { - time.Sleep(m.toolChainRetryTick) - continue - } - i++ - blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", - m.work.ID(), i, req.Pid, req.Server.Server) - if err != nil || len(fileCollections) == 0 { - if err != nil { - blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) - } else { - blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } - time.Sleep(m.toolChainRetryTick) - continue - } - - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { - blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ - "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) - time.Sleep(m.toolChainRetryTick) - } else { - // enable worker - m.resource.EnableWorker(req.Server) - blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - return - } + fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + if err != nil { + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) + return + } + if !isNotTeminated { + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) all file collection in host(%s) is not finished", m.work.ID(), req.Pid, req.Server.Server) + return + } + if len(fileCollections) == 0 { + //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) + return + } - } - m.resource.SetWorkerStatus(req.Server, RetryFailed) - blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) - }(handler, *req) - } else { - blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)", - req.Server.Server, m.work.ID(), req.Pid, req.Server.Server) + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", + m.work.ID(), req.Pid, req.Server.Server) + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { + blog.Errorf("remote: retry send tool chain for work(%s) from pid(%d) to server(%s), "+ + "send tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) + return } + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) succeed", m.work.ID(), req.Pid, req.Server.Server) } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index 7bcbb79d..bd3b545d 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -282,7 +282,7 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) } -func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { +/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { if host == nil { return false } @@ -310,7 +310,7 @@ func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { } return false -} +}*/ func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { wo.workerLock.Lock() diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 5b48381e..1e309d32 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -28,7 +28,7 @@ type RemoteSlotMgr interface { RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) EnableWorker(host *dcProtocol.Host) - CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying + //CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) @@ -299,7 +299,7 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) } -func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { +/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { if host == nil { return false } @@ -328,7 +328,7 @@ func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { } return false -} +}*/ func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { wr.workerLock.Lock() @@ -566,7 +566,7 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) } // 大文件优先 -func (wr *resource) getWorkerLargeFileFirst(f string) *worker { +func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) *worker { var w *worker max := 0 inlargequeue := false @@ -574,6 +574,11 @@ func (wr *resource) getWorkerLargeFileFirst(f string) *worker { if worker.disabled || worker.dead { continue } + for _, host := range banWorkerList { + if worker.host.Equal(host) { + continue + } + } free := worker.totalSlots - worker.occupiedSlots @@ -618,7 +623,7 @@ func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host if f == "" { w = wr.getWorkerWithMostFreeSlots(banWorkerList) } else { - w = wr.getWorkerLargeFileFirst(f) + w = wr.getWorkerLargeFileFirst(f, banWorkerList) } if w == nil { diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index f37cb0c3..c35d4f3f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -280,7 +280,6 @@ type FileInfo struct { FileMode uint32 `json:"file_mode"` LinkTarget string `json:"link_target"` SendStatus FileSendStatus `json:"send_status"` - FailCount int `json:"fail_count"` } // Match check if the FileDesc is point to some file as this FileInfo From 27ea2f5eba2611baaac82e78db5673d1c5024554 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 24 Oct 2024 09:42:46 +0800 Subject: [PATCH 02/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 48 +++++-------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 6a0566b8..f7e4cd44 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -281,30 +281,6 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS return } -/*func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { - fsm.RLock() - defer fsm.RUnlock() - - if fsm.cache == nil { - return false - } - for _, desc := range descs { - c, ok := fsm.cache[desc.FilePath] - if !ok || c == nil || len(*c) == 0 { - continue - } - for _, ci := range *fsm.cache[desc.FilePath] { - if ci.Match(desc) { - if ci.FailCount > fileMaxFailCount { - return true - } - } - } - } - - return false -}*/ - func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() @@ -317,7 +293,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { if !ok || c == nil || len(*c) == 0 { continue } - for _, ci := range *fsm.cache[desc.FilePath] { + for _, ci := range *c { if ci.Match(desc) { if ci.SendStatus == types.FileSendFailed { return true @@ -341,12 +317,11 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { Compresstype: dcProtocol.CompressLZ4, FileSize: ci.Size, Lastmodifytime: ci.LastModifyTime, - Md5: "", + Md5: ci.Md5, Targetrelativepath: ci.TargetRelativePath, Filemode: ci.FileMode, LinkTarget: ci.LinkTarget, NoDuplicated: true, - //Priority: dcSDK.GetPriority(ci), }) } } @@ -591,21 +566,19 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) - servers := m.work.Resource().GetHosts() - m.fileSendMutex.Lock() for _, c := range req.Req.Commands { - for server, f := range m.fileSendMap { + for _, s := range m.work.Resource().GetHosts() { + m.fileSendMutex.Lock() + f := m.fileSendMap[s.Server] + m.fileSendMutex.Unlock() + if f == nil { + continue + } if f.isFilesSendFailed(c.Inputfiles) { - for _, s := range servers { - if s.Server == server { - req.BanWorkerList = append(req.BanWorkerList, s) - break - } - } + req.BanWorkerList = append(req.BanWorkerList, s) } } } - m.fileSendMutex.Unlock() blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) @@ -728,6 +701,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { sendMap := m.fileSendMap[h.Server] if sendMap == nil { m.fileSendMutex.Unlock() + blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) return } m.fileSendMutex.Unlock() From b591ee77a7a8999f9675fc10958375b9025466f6 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 28 Oct 2024 17:13:10 +0800 Subject: [PATCH 03/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index ea9a534c..604243b6 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -135,7 +135,8 @@ type Mgr struct { type fileSendMap struct { sync.RWMutex - cache map[string]*[]*types.FileInfo + cache map[string]*[]*types.FileInfo + failedFiles map[string]*[]*types.FileInfo } func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { @@ -244,6 +245,23 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { return result } +func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + if fsm.failedFiles == nil { + fsm.failedFiles = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.failedFiles[info.FullPath] + if !ok || fc == nil { + infoList := []*types.FileInfo{info} + fsm.failedFiles[info.FullPath] = &infoList + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + return + } + + blog.Debugf("file: update failed files with:%v", fsm.failedFiles) + *fc = append(*fc, info) +} + func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { fsm.Lock() defer fsm.Unlock() @@ -267,37 +285,44 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } return } for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status + //update failed files + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } return } } *c = append(*c, info) - return + if status == types.FileSendFailed { + fsm.updateFailStatus(info) + } } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() - if fsm.cache == nil { + if fsm.failedFiles == nil { return false } for _, desc := range descs { - c, ok := fsm.cache[desc.FilePath] + c, ok := fsm.failedFiles[desc.FilePath] if !ok || c == nil || len(*c) == 0 { continue } for _, ci := range *c { if ci.Match(desc) { - if ci.SendStatus == types.FileSendFailed { - return true - } + return true } } } @@ -629,14 +654,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) - //m.retrySendToolChain(handler, req) return nil, err } - /*if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { - return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - }*/ - ret, err = checkHttpConn(req) if err != nil { return ret, err @@ -727,7 +747,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { sendMap := m.fileSendMap[h.Server] if sendMap == nil { m.fileSendMutex.Unlock() - blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) + blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) return } m.fileSendMutex.Unlock() @@ -839,8 +859,8 @@ func (m *Mgr) ensureFiles( settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", len(fileDetails), m.work.ID(), pid, sandbox.Dir) - blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", - len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) + //blog.Debugf("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server: %v", + // len(fileDetails), m.work.ID(), pid, sandbox.Dir, fileDetails) rules := settings.FilterRules // pump模式下,一次编译依赖的可能有上千个文件,现在的流程会随机的添加到cork发送队列 From 7828ae1cbf2cbba074d9e9cdd0749bd2c2a7778f Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 28 Oct 2024 17:19:58 +0800 Subject: [PATCH 04/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 604243b6..3c72d120 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -334,21 +334,19 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { defer fsm.RUnlock() failFiles := make([]dcSDK.FileDesc, 0) - for _, v := range fsm.cache { + for _, v := range fsm.failedFiles { for _, ci := range *v { - if ci.SendStatus == types.FileSendFailed { - failFiles = append(failFiles, dcSDK.FileDesc{ - FilePath: ci.FullPath, - Compresstype: dcProtocol.CompressLZ4, - FileSize: ci.Size, - Lastmodifytime: ci.LastModifyTime, - Md5: ci.Md5, - Targetrelativepath: ci.TargetRelativePath, - Filemode: ci.FileMode, - LinkTarget: ci.LinkTarget, - NoDuplicated: true, - }) - } + failFiles = append(failFiles, dcSDK.FileDesc{ + FilePath: ci.FullPath, + Compresstype: dcProtocol.CompressLZ4, + FileSize: ci.Size, + Lastmodifytime: ci.LastModifyTime, + Md5: ci.Md5, + Targetrelativepath: ci.TargetRelativePath, + Filemode: ci.FileMode, + LinkTarget: ci.LinkTarget, + NoDuplicated: true, + }) } } return failFiles From 9656ca0f9c3390eeedcff5048eeed11ea7344086 Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 30 Oct 2024 09:39:49 +0800 Subject: [PATCH 05/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 70 +++++++++++-------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 3c72d120..4432321f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -162,6 +162,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList + fsm.updateFailStatus(info) return info, false } @@ -171,6 +172,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if ci.SendStatus == types.FileSendFailed && !query { blog.Debugf("file: retry send file %s", desc.FilePath) ci.SendStatus = types.FileSending + fsm.updateFailStatus(ci) return ci, false } return ci, true @@ -178,6 +180,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F } *c = append(*c, info) + fsm.updateFailStatus(info) return info, false } @@ -210,6 +213,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) + fsm.updateFailStatus(info) continue } @@ -222,6 +226,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { blog.Debugf("file: retry send file %s", desc.FilePath) fileMatched = false ci.SendStatus = types.FileSending + fsm.updateFailStatus(ci) } result = append(result, matchResult{ info: ci, @@ -240,26 +245,37 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) + fsm.updateFailStatus(info) } return result } func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - if fsm.failedFiles == nil { - fsm.failedFiles = make(map[string]*[]*types.FileInfo) - } - fc, ok := fsm.failedFiles[info.FullPath] - if !ok || fc == nil { - infoList := []*types.FileInfo{info} - fsm.failedFiles[info.FullPath] = &infoList + //blog.Debugf("file: update failed files in status %s with:%v", info.SendStatus, fsm.failedFiles) + if info.SendStatus == types.FileSendFailed { //if file send failed + if fsm.failedFiles == nil { + fsm.failedFiles = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.failedFiles[info.FullPath] + if !ok || fc == nil { + infoList := []*types.FileInfo{info} + fsm.failedFiles[info.FullPath] = &infoList + blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) + return + } + blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) + *fc = append(*fc, info) + } else { //if file send succeed or sending + if fsm.failedFiles == nil { + return //not update + } + if _, ok := fsm.failedFiles[info.FullPath]; !ok { + return //not update + } + delete(fsm.failedFiles, info.FullPath) blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - return } - - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) - *fc = append(*fc, info) } func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -285,9 +301,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) return } @@ -295,17 +309,13 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if ci.Match(desc) { ci.SendStatus = status //update failed files - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) return } } *c = append(*c, info) - if status == types.FileSendFailed { - fsm.updateFailStatus(info) - } + fsm.updateFailStatus(info) } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { @@ -551,8 +561,8 @@ func (m *Mgr) workerCheck(ctx context.Context) { } // retry send fail file for all workers case <-fileRetryTicker.C: - blog.Debugf("remote: try to retry send fail file for work(%s)", m.work.ID()) for _, h := range m.work.Resource().GetHosts() { + blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) go m.retrySendFiles(h) } //retry failed tool chain for all workers, recover disable worker @@ -753,18 +763,22 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { if len(failFiles) == 0 { return } - - blog.Debugf("remote: try to retry send fail file for work(%s) with fail files %v", m.work.ID(), failFiles) + var testStr []string + for _, r := range failFiles { + testStr = append(testStr, r.FilePath) + } + blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, strings.Join(testStr, ",")) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ - Pid: 0, + Pid: 1, Req: failFiles, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, }); err != nil { - blog.Errorf("mgr: send remote file failed: %v", err) + blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) + return } - blog.Debugf("remote: try to retry send fail file for work(%s) succeed", m.work.ID()) + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(sendMap.getFailFiles())) } // check if files send to remote worker failed and no need to send again @@ -1039,7 +1053,7 @@ func (m *Mgr) ensureFiles( _ = m.appendCorkFiles(server, needSendCorkFiles) // notify send - m.sendCorkChan <- true + //m.sendCorkChan <- true } } else { // 单个文件发送模式 From e2a0dfb1e3496163bf768453c1171758c8de25a1 Mon Sep 17 00:00:00 2001 From: RJ Date: Mon, 4 Nov 2024 16:23:37 +0800 Subject: [PATCH 06/18] =?UTF-8?q?feat:=20Turbo=E7=BC=96=E8=AF=91=E5=8A=A0?= =?UTF-8?q?=E9=80=9F=E5=8F=98=E6=9B=B4MQ=E7=9A=84exchange=20#313?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../devops/turbo/dao/mongotemplate/TurboSummaryDao.kt | 4 +++- .../com/tencent/devops/turbo/job/BkMetricsDailyJob.kt | 9 +-------- .../com/tencent/devops/common/util/constants/TurboMQ.kt | 2 +- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt index 23c6b25c..bba6d030 100644 --- a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt +++ b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/dao/mongotemplate/TurboSummaryDao.kt @@ -5,6 +5,7 @@ import com.tencent.devops.turbo.model.TTurboDaySummaryEntity import com.tencent.devops.turbo.pojo.TurboDaySummaryOverviewModel import org.bson.Document import org.springframework.beans.factory.annotation.Autowired +import org.springframework.data.domain.Sort import org.springframework.data.mongodb.core.FindAndModifyOptions import org.springframework.data.mongodb.core.MongoTemplate import org.springframework.data.mongodb.core.aggregation.Aggregation @@ -144,8 +145,9 @@ class TurboSummaryDao @Autowired constructor( val skip = Aggregation.skip((pageNum * pageSize).toLong()) val limit = Aggregation.limit(pageSize.toLong()) + val sort = Aggregation.sort(Sort.Direction.ASC, "_id") - val aggregation = Aggregation.newAggregation(match, group, skip, limit) + val aggregation = Aggregation.newAggregation(match, group, sort, skip, limit) val queryResults: AggregationResults = mongoTemplate.aggregate(aggregation, "t_turbo_day_summary_entity", TurboDaySummaryOverviewModel::class.java) return queryResults.mappedResults diff --git a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt index 7513c099..b811e177 100644 --- a/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt +++ b/src/backend/turbo/biz-turbo/src/main/kotlin/com/tencent/devops/turbo/job/BkMetricsDailyJob.kt @@ -85,13 +85,6 @@ class BkMetricsDailyJob @Autowired constructor( turboSaveTime = saveTime ) - bkMetricsRabbitTemplate.convertAndSend(EXCHANGE_METRICS_STATISTIC_TURBO_DAILY, "", - JsonUtil.toJson(bkMetricsMessage)) { message: Message -> - val messageProperties = message.messageProperties - messageProperties.setHeader("contentType", "application/json") - messageProperties.setHeader("contentEncoding", "UTF-8") - messageProperties.deliveryMode = MessageDeliveryMode.PERSISTENT - message - } + bkMetricsRabbitTemplate.convertAndSend(EXCHANGE_METRICS_STATISTIC_TURBO_DAILY, "", bkMetricsMessage) } } diff --git a/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt b/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt index 1114f983..6ab476fa 100644 --- a/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt +++ b/src/backend/turbo/common-turbo/common-turbo-util/src/main/kotlin/com/tencent/devops/common/util/constants/TurboMQ.kt @@ -12,7 +12,7 @@ const val ROUTE_TURBO_PLUGIN_DATA = "route.turbo.plugin.data.new" /** * 蓝盾度量数据上报 */ -const val EXCHANGE_METRICS_STATISTIC_TURBO_DAILY = "e.metrics.statistic.turbo.daily" +const val EXCHANGE_METRICS_STATISTIC_TURBO_DAILY = "metrics.statistic.turbo.daily" /** * 蓝盾项目停用广播通知 From 8bd2aef1c9ee992d7eb2c525d2c65dc5a1e16317 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 4 Nov 2024 17:39:44 +0800 Subject: [PATCH 07/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/common/sdk/worker.go | 1 + .../controller/pkg/manager/remote/mgr.go | 343 +++++++++++------- .../bk_dist/controller/pkg/types/manager.go | 1 + 3 files changed, 220 insertions(+), 125 deletions(-) diff --git a/src/backend/booster/bk_dist/common/sdk/worker.go b/src/backend/booster/bk_dist/common/sdk/worker.go index 6c0c73c5..15ef9fb8 100644 --- a/src/backend/booster/bk_dist/common/sdk/worker.go +++ b/src/backend/booster/bk_dist/common/sdk/worker.go @@ -96,6 +96,7 @@ type FileDesc struct { NoDuplicated bool `json:"no_duplicated"` AllDistributed bool `json:"all_distributed"` Priority FileDescPriority `json:"priority"` + Retry bool `json:"retry"` } // UniqueKey define the file unique key diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 4432321f..cf4535a8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -68,6 +68,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { ), checkSendFileTick: 100 * time.Millisecond, fileSendMap: make(map[string]*fileSendMap), + failFileSendMap: make(map[string]*fileSendMap), fileCollectionSendMap: make(map[string]*[]*types.FileCollectionInfo), fileMessageBank: newFileMessageBank(), conf: work.Config(), @@ -106,6 +107,9 @@ type Mgr struct { fileSendMutex sync.RWMutex fileSendMap map[string]*fileSendMap + failFileSendMutex sync.RWMutex + failFileSendMap map[string]*fileSendMap + fileCollectionSendMutex sync.RWMutex fileCollectionSendMap map[string]*[]*types.FileCollectionInfo @@ -135,11 +139,10 @@ type Mgr struct { type fileSendMap struct { sync.RWMutex - cache map[string]*[]*types.FileInfo - failedFiles map[string]*[]*types.FileInfo + cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -162,28 +165,85 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.F if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - fsm.updateFailStatus(info) return info, false } for _, ci := range *c { if ci.Match(desc) { - //if worker is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed && !query { - blog.Debugf("file: retry send file %s", desc.FilePath) - ci.SendStatus = types.FileSending - fsm.updateFailStatus(ci) - return ci, false - } return ci, true } } *c = append(*c, info) - fsm.updateFailStatus(info) return info, false } +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool, error) { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + return nil, false, errors.New("file cache not found") + } + + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + return nil, false, fmt.Errorf("file %s not found", desc.FilePath) + } + + for _, ci := range *c { + if ci.Match(desc) { + if ci.SendStatus == types.FileSendFailed && !query { + ci.SendStatus = types.FileSending + return ci, false, nil + } + return ci, true, nil + } + } + return nil, false, fmt.Errorf("file %s not found", desc.FilePath) +} + +// 仅匹配失败文件,不执行插入 +func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult { + fsm.Lock() + defer fsm.Unlock() + + if fsm.cache == nil { + blog.Warnf("file cache not found") + return []matchResult{} + } + + result := make([]matchResult, 0, len(descs)) + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + blog.Warnf("file %s not found", desc.FilePath) + continue + } + matched := false + for _, ci := range *c { + if ci.Match(*desc) { + fileMatched := true + if ci.SendStatus == types.FileSendFailed { + ci.SendStatus = types.FileSending + fileMatched = false + } + result = append(result, matchResult{ + info: ci, + match: fileMatched, + }) + matched = true + break + } + } + if matched { + continue + } + } + return result +} + func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -213,24 +273,15 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) - fsm.updateFailStatus(info) continue } matched := false for _, ci := range *c { if ci.Match(*desc) { - fileMatched := true - //if file is send failed before, try to send it again - if ci.SendStatus == types.FileSendFailed { - blog.Debugf("file: retry send file %s", desc.FilePath) - fileMatched = false - ci.SendStatus = types.FileSending - fsm.updateFailStatus(ci) - } result = append(result, matchResult{ info: ci, - match: fileMatched, + match: true, }) matched = true break @@ -245,40 +296,53 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { info: info, match: false, }) - fsm.updateFailStatus(info) } return result } -func (fsm *fileSendMap) updateFailStatus(info *types.FileInfo) { - //blog.Debugf("file: update failed files in status %s with:%v", info.SendStatus, fsm.failedFiles) - if info.SendStatus == types.FileSendFailed { //if file send failed - if fsm.failedFiles == nil { - fsm.failedFiles = make(map[string]*[]*types.FileInfo) - } - fc, ok := fsm.failedFiles[info.FullPath] - if !ok || fc == nil { - infoList := []*types.FileInfo{info} - fsm.failedFiles[info.FullPath] = &infoList - blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) +func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus, server string) { + if status == types.FileSendSucceed && !desc.Retry { + return + } + fsm.Lock() + defer fsm.Unlock() + + info := &types.FileInfo{ + FullPath: desc.FilePath, + Size: desc.FileSize, + LastModifyTime: desc.Lastmodifytime, + Md5: desc.Md5, + TargetRelativePath: desc.Targetrelativepath, + FileMode: desc.Filemode, + LinkTarget: desc.LinkTarget, + SendStatus: status, + } + if fsm.cache == nil { + fsm.cache = make(map[string]*[]*types.FileInfo) + } + fc, ok := fsm.cache[info.FullPath] + if !ok || fc == nil || len(*fc) == 0 { + infoList := []*types.FileInfo{info} + fsm.cache[info.FullPath] = &infoList + blog.Debugf("file: update failed files with add:%v", info) + return + } + for _, ci := range *fc { + if ci.Match(desc) { + blog.Debugf("file: update failed files with refresh before:%v", ci) + ci.SendStatus = status + blog.Debugf("file: update failed files with refresh:%v", ci) return } - blog.Debugf("file: update failed files with:%v", *fsm.failedFiles[info.FullPath]) - *fc = append(*fc, info) - } else { //if file send succeed or sending - if fsm.failedFiles == nil { - return //not update - } - if _, ok := fsm.failedFiles[info.FullPath]; !ok { - return //not update - } - delete(fsm.failedFiles, info.FullPath) - blog.Debugf("file: update failed files with:%v", fsm.failedFiles) } + *fc = append(*fc, info) } func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { + if status != types.FileSendSucceed && desc.Retry { + return + } fsm.Lock() defer fsm.Unlock() @@ -301,50 +365,47 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS if !ok || c == nil || len(*c) == 0 { infoList := []*types.FileInfo{info} fsm.cache[desc.FilePath] = &infoList - fsm.updateFailStatus(info) return } for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status - //update failed files - fsm.updateFailStatus(info) return } } *c = append(*c, info) - fsm.updateFailStatus(info) } func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { fsm.RLock() defer fsm.RUnlock() - if fsm.failedFiles == nil { + if fsm.cache == nil { return false } for _, desc := range descs { - c, ok := fsm.failedFiles[desc.FilePath] + c, ok := fsm.cache[desc.FilePath] if !ok || c == nil || len(*c) == 0 { continue } for _, ci := range *c { if ci.Match(desc) { - return true + return ci.SendStatus == types.FileSendFailed } } } return false } + func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { fsm.RLock() defer fsm.RUnlock() failFiles := make([]dcSDK.FileDesc, 0) - for _, v := range fsm.failedFiles { + for _, v := range fsm.cache { for _, ci := range *v { failFiles = append(failFiles, dcSDK.FileDesc{ FilePath: ci.FullPath, @@ -356,6 +417,7 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { Filemode: ci.FileMode, LinkTarget: ci.LinkTarget, NoDuplicated: true, + Retry: true, }) } } @@ -611,9 +673,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas for _, c := range req.Req.Commands { for _, s := range m.work.Resource().GetHosts() { - m.fileSendMutex.Lock() - f := m.fileSendMap[s.Server] - m.fileSendMutex.Unlock() + m.failFileSendMutex.Lock() + f := m.failFileSendMap[s.Server] + m.failFileSendMutex.Unlock() if f == nil { continue } @@ -751,23 +813,20 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) } func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { - m.fileSendMutex.Lock() - sendMap := m.fileSendMap[h.Server] + m.failFileSendMutex.Lock() + sendMap := m.failFileSendMap[h.Server] if sendMap == nil { - m.fileSendMutex.Unlock() + m.failFileSendMutex.Unlock() blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) return } - m.fileSendMutex.Unlock() + m.failFileSendMutex.Unlock() + failFiles := sendMap.getFailFiles() if len(failFiles) == 0 { return } - var testStr []string - for _, r := range failFiles { - testStr = append(testStr, r.FilePath) - } - blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, strings.Join(testStr, ",")) + blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, Req: failFiles, @@ -778,27 +837,9 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) return } - blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(sendMap.getFailFiles())) + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } -// check if files send to remote worker failed and no need to send again -/*func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - m.fileSendMutex.Unlock() - return false - } - m.fileSendMutex.Unlock() - - for _, c := range commands { - if target.hasReachedFailCount(c.Inputfiles) { - return true - } - } - return false -}*/ - func (m *Mgr) ensureFilesWithPriority( handler dcSDK.RemoteWorkerHandler, pid int, @@ -1029,7 +1070,8 @@ func (m *Mgr) ensureFiles( if v.info.SendStatus == types.FileSendSucceed { wg <- nil continue - } else if v.info.SendStatus == types.FileSendFailed { + } else if v.info.SendStatus == types.FileSendFailed || + v.info.SendStatus == types.FileSendUnknown { wg <- types.ErrSendFileFailed continue } @@ -1101,11 +1143,19 @@ func (m *Mgr) ensureSingleFile( } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) - - status, ok := m.checkOrLockSendFile(host.Server, desc, false) - + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) + var status types.FileSendStatus + var ok bool + if desc.Retry { + status, ok, err = m.checkOrLockSendFailFile(host.Server, desc, false) + if err != nil { // 没找到文件不处理,直接返回不影响其他失败文件发送 + blog.Warnf("remote: checkOrLockSendFailFile(%s) failed: %v", host.Server, err) + return err + } + } else { + status, ok = m.checkOrLockSendFile(host.Server, desc) + } // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ @@ -1117,14 +1167,18 @@ func (m *Mgr) ensureSingleFile( select { case <-tick.C: // 不是发送文件的goroutine,不需要修改状态,仅查询状态 - status, _ = m.checkOrLockSendFile(host.Server, desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, desc) + } } } switch status { case types.FileSendFailed: blog.Errorf("remote: failed to ensure single file(%s) for work(%s) to server(%s), "+ - "file already sent and failed", desc.FilePath, m.work.ID(), host.Server) + "file already sent and failed with retry %v", desc.FilePath, m.work.ID(), host.Server, desc.Retry) return types.ErrSendFileFailed case types.FileSendSucceed: blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", @@ -1153,8 +1207,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -1189,8 +1243,8 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %v", + desc.FilePath, m.work.ID(), host.Server, desc.Retry) return nil } @@ -1214,7 +1268,11 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { select { case <-tick.C: // 不是发送文件的goroutine,不能修改状态 - status, _ = m.checkOrLockSendFile(host.Server, *desc, true) + if desc.Retry { + status, _, _ = m.checkOrLockSendFailFile(host.Server, *desc, true) + } else { + status, _ = m.checkOrLockSendFile(host.Server, *desc) + } } } @@ -1325,8 +1383,9 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { t1 := time.Now().Local() + m.fileSendMutex.Lock() t2 := time.Now().Local() @@ -1347,10 +1406,29 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc, query) + info, match := target.matchOrInsert(desc) return info.SendStatus, match } +func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool, error) { + m.failFileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.failFileSendMutex.Unlock() + + info, match, err := target.matchFail(desc, query) + if err != nil { + return types.FileSendUnknown, false, errors.New("file not found") + } + if info == nil { + return types.FileSendUnknown, false, errors.New("file is nil") + } + return info.SendStatus, match, nil +} + type matchResult struct { info *types.FileInfo match bool @@ -1358,15 +1436,27 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - target = &fileSendMap{} - m.fileSendMap[server] = target - } - m.fileSendMutex.Unlock() + if len(descs) == 0 || !descs[0].Retry { // 普通的文件 + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + target = &fileSendMap{} + m.fileSendMap[server] = target + } + m.fileSendMutex.Unlock() + + return target.matchOrInserts(descs) + } else { // 失败的文件 + m.fileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.fileSendMutex.Unlock() - return target.matchOrInserts(descs) + return target.matchFails(descs) + } } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1377,9 +1467,17 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi m.fileSendMap[server] = target } m.fileSendMutex.Unlock() - target.updateStatus(desc, status) - return + + m.failFileSendMutex.Lock() + failTarget, ok := m.failFileSendMap[server] + if !ok { + failTarget = &fileSendMap{} + m.failFileSendMap[server] = failTarget + } + m.failFileSendMutex.Unlock() + + failTarget.updateFailStatus(desc, status, server) } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1425,40 +1523,32 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } -// getFailedFileCollectionByHost 返回文件集合,如果文件集没有全部完成,返回false,否则返回true -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { +// getFailedFileCollectionByHost 返回失败文件集合 +func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server) + return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { - //如果有fc未到终结状态,则直接返回 - if !re.SendStatus.IsFinished() { - blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) - return nil, false, nil - } + re.Retry = true if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } - return fcs, true, nil + return fcs, nil } // retry send failed tool chain func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) if err != nil { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) return } - if !isNotTeminated { - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) all file collection in host(%s) is not finished", m.work.ID(), req.Pid, req.Server.Server) - return - } if len(fileCollections) == 0 { //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) return @@ -1473,7 +1563,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R } // enable worker m.resource.EnableWorker(req.Server) - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) succeed", m.work.ID(), req.Pid, req.Server.Server) + blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } @@ -1608,6 +1698,9 @@ func (m *Mgr) ensureOneFileCollection( fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) for _, f := range fc.Files { f.NoDuplicated = true + /*if fc.Retry { + f.Retry = true + }*/ fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f, @@ -1649,7 +1742,7 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { // set status to sending if fc send failed - if f.SendStatus == types.FileSendFailed { + if f.SendStatus == types.FileSendFailed && fc.Retry { f.SendStatus = types.FileSending return f.SendStatus, false } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index 8f14c607..cc06b055 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -485,6 +485,7 @@ type FileCollectionInfo struct { SendStatus FileSendStatus `json:"send_status"` Files []dcSDK.FileDesc `json:"files"` Timestamp int64 `json:"timestamp"` + Retry bool `json:"retry"` } // FileInfo record file info From 94b4c2343ec4a1f170ca6fc1556603605f7ef68f Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 4 Nov 2024 19:53:13 +0800 Subject: [PATCH 08/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 9 ++---- .../pkg/manager/remote/slotbyworkeroffer.go | 30 ----------------- .../controller/pkg/manager/remote/slots.go | 32 ------------------- 3 files changed, 3 insertions(+), 68 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index cf4535a8..5239775b 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -39,8 +39,6 @@ const ( corkMaxSize = 1024 * 1024 * 10 // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB - - //fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -1070,8 +1068,7 @@ func (m *Mgr) ensureFiles( if v.info.SendStatus == types.FileSendSucceed { wg <- nil continue - } else if v.info.SendStatus == types.FileSendFailed || - v.info.SendStatus == types.FileSendUnknown { + } else if v.info.SendStatus == types.FileSendFailed { wg <- types.ErrSendFileFailed continue } @@ -1436,7 +1433,7 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { - if len(descs) == 0 || !descs[0].Retry { // 普通的文件 + if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件 m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -1446,7 +1443,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat m.fileSendMutex.Unlock() return target.matchOrInserts(descs) - } else { // 失败的文件 + } else { // 失败重试的文件 m.fileSendMutex.Lock() target, ok := m.failFileSendMap[server] if !ok { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index bd3b545d..17b94f80 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -282,36 +282,6 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) } -/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wo.workerLock.Lock() - defer wo.workerLock.Unlock() - - for _, wk := range wo.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead,do nothing now", host) - return false - } - - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying,do nothing now", host) - return true - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return false - } - - return false -}*/ - func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { wo.workerLock.Lock() defer wo.workerLock.Unlock() diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 1e309d32..6273946e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -28,7 +28,6 @@ type RemoteSlotMgr interface { RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) EnableWorker(host *dcProtocol.Host) - //CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) @@ -299,37 +298,6 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) } -/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wr.workerLock.Lock() - defer wr.workerLock.Unlock() - for _, wk := range wr.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead, do nothing now", host) - return false - } - if !wk.disabled { - return false - } - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying, do nothing now", host) - return false - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return true - } - - return false -}*/ - func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { wr.workerLock.Lock() defer wr.workerLock.Unlock() From 293515c51f1c790749f80e49eba14c54e9c38d4c Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 4 Nov 2024 19:54:23 +0800 Subject: [PATCH 09/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/controller/pkg/manager/remote/mgr.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 5239775b..6b681f4b 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -1695,9 +1695,6 @@ func (m *Mgr) ensureOneFileCollection( fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) for _, f := range fc.Files { f.NoDuplicated = true - /*if fc.Retry { - f.Retry = true - }*/ fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f, From b9bd1d16d0920caf0354674f8b4f0b6c9aa2050f Mon Sep 17 00:00:00 2001 From: yanafu Date: Tue, 5 Nov 2024 15:44:31 +0800 Subject: [PATCH 10/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 88 +++++++++++++------ 1 file changed, 60 insertions(+), 28 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 6b681f4b..a3a391a4 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -72,8 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - toolChainRetryTick: 6 * time.Second, - fileRetryCheckTick: 7 * time.Second, + retryCheckTick: 3 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -117,13 +116,12 @@ type Mgr struct { conf *config.ServerConfig - resourceCheckTick time.Duration - workerCheckTick time.Duration - fileRetryCheckTick time.Duration - toolChainRetryTick time.Duration - lastUsed uint64 // only accurate to second now - lastApplied uint64 // only accurate to second now - remotejobs int64 // save job number which using remote worker + resourceCheckTick time.Duration + workerCheckTick time.Duration + retryCheckTick time.Duration + lastUsed uint64 // only accurate to second now + lastApplied uint64 // only accurate to second now + remotejobs int64 // save job number which using remote worker sendCorkTick time.Duration sendCorkChan chan bool @@ -299,7 +297,7 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { return result } -func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus, server string) { +func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { if status == types.FileSendSucceed && !desc.Retry { return } @@ -404,7 +402,13 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { failFiles := make([]dcSDK.FileDesc, 0) for _, v := range fsm.cache { + if v == nil || len(*v) == 0 { + continue + } for _, ci := range *v { + if ci.SendStatus != types.FileSendFailed { + continue + } failFiles = append(failFiles, dcSDK.FileDesc{ FilePath: ci.FullPath, Compresstype: dcProtocol.CompressLZ4, @@ -449,6 +453,8 @@ func (m *Mgr) Init() { } if m.work.ID() != "" { go m.workerCheck(ctx) + go m.retryFailFiles(ctx) + go m.retrySendToolChains(ctx) } if m.conf.SendCork { @@ -592,12 +598,8 @@ func (m *Mgr) resourceCheck(ctx context.Context) { func (m *Mgr) workerCheck(ctx context.Context) { blog.Infof("remote: run worker check tick for work: %s", m.work.ID()) ticker := time.NewTicker(m.workerCheckTick) - fileRetryTicker := time.NewTicker(m.fileRetryCheckTick) - toolChainRetryTicker := time.NewTicker(m.toolChainRetryTick) defer ticker.Stop() - defer fileRetryTicker.Stop() - defer toolChainRetryTicker.Stop() for { select { @@ -619,24 +621,51 @@ func (m *Mgr) workerCheck(ctx context.Context) { }(w) } - // retry send fail file for all workers - case <-fileRetryTicker.C: - for _, h := range m.work.Resource().GetHosts() { - blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) - go m.retrySendFiles(h) - } - //retry failed tool chain for all workers, recover disable worker - case <-toolChainRetryTicker.C: - blog.Debugf("remote: try to retry send tool chain for work(%s)", m.work.ID()) + } + } +} + +func (m *Mgr) retrySendToolChains(ctx context.Context) { + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + return + default: + var wg sync.WaitGroup handler := m.remoteWorker.Handler(0, nil, nil, nil) - for _, h := range m.work.Resource().GetHosts() { + hosts := m.work.Resource().GetHosts() + wg.Add(len(hosts)) + for _, h := range hosts { + blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }) + }, &wg) + } + wg.Wait() // 等待所有 goroutine 完成 + time.Sleep(m.workerCheckTick) + } + } +} + +func (m *Mgr) retryFailFiles(ctx context.Context) { + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + default: + var wg sync.WaitGroup + hosts := m.work.Resource().GetHosts() + wg.Add(len(hosts)) + for _, h := range hosts { + blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) + go m.retrySendFiles(h, &wg) } + wg.Wait() // 等待所有 goroutine 完成 + time.Sleep(m.retryCheckTick) } } } @@ -810,7 +839,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host) { +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) { + defer wg.Done() // 在函数结束时调用 Done + m.failFileSendMutex.Lock() sendMap := m.failFileSendMap[h.Server] if sendMap == nil { @@ -1474,7 +1505,7 @@ func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.Fi } m.failFileSendMutex.Unlock() - failTarget.updateFailStatus(desc, status, server) + failTarget.updateFailStatus(desc, status) } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1540,7 +1571,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) { + defer wg.Done() // 在函数结束时调用 Done fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) if err != nil { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) From 28c8b16b7f01d45a03779099f62bb23e24b031d8 Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 6 Nov 2024 14:19:35 +0800 Subject: [PATCH 11/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 81 ++++++++++++------- 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index a3a391a4..a1697d3f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -635,15 +635,23 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { var wg sync.WaitGroup handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() - wg.Add(len(hosts)) for _, h := range hosts { blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) + fileCollections, err := m.getFailedFileCollectionByHost(h.Server) + if err != nil { + blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err) + continue + } + if len(fileCollections) == 0 { + continue + } + wg.Add(1) go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }, &wg) + }, fileCollections, &wg) } wg.Wait() // 等待所有 goroutine 完成 time.Sleep(m.workerCheckTick) @@ -656,13 +664,28 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + return default: var wg sync.WaitGroup hosts := m.work.Resource().GetHosts() - wg.Add(len(hosts)) + //wg.Add(len(hosts)) for _, h := range hosts { blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) - go m.retrySendFiles(h, &wg) + m.failFileSendMutex.Lock() + sendMap := m.failFileSendMap[h.Server] + if sendMap == nil { + m.failFileSendMutex.Unlock() + blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) + continue + } + m.failFileSendMutex.Unlock() + + failFiles := sendMap.getFailFiles() + if len(failFiles) == 0 { + continue + } + wg.Add(1) + go m.retrySendFiles(h, failFiles, &wg) } wg.Wait() // 等待所有 goroutine 完成 time.Sleep(m.retryCheckTick) @@ -707,11 +730,20 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas continue } if f.isFilesSendFailed(c.Inputfiles) { - req.BanWorkerList = append(req.BanWorkerList, s) + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(s) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, s) + } } } } - blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList) @@ -761,7 +793,16 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { - req.BanWorkerList = append(req.BanWorkerList, req.Server) + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(req.Server) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, req.Server) + } var banlistStr string for _, s := range req.BanWorkerList { banlistStr = banlistStr + s.Server + "," @@ -839,22 +880,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) { +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) { defer wg.Done() // 在函数结束时调用 Done - m.failFileSendMutex.Lock() - sendMap := m.failFileSendMap[h.Server] - if sendMap == nil { - m.failFileSendMutex.Unlock() - blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) - return - } - m.failFileSendMutex.Unlock() - - failFiles := sendMap.getFailFiles() - if len(failFiles) == 0 { - return - } blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, @@ -1571,17 +1599,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) { +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) { defer wg.Done() // 在函数结束时调用 Done - fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) - if err != nil { - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) - return - } - if len(fileCollections) == 0 { - //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) - return - } blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) From ec99ec225d56b91920df6e693e65f431f7cf06ca Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 18 Nov 2024 16:21:30 +0800 Subject: [PATCH 12/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 101 +++++++++++------- .../controller/pkg/manager/remote/slots.go | 47 ++++---- .../controller/pkg/manager/remote/worker.go | 2 + 3 files changed, 94 insertions(+), 56 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index a1697d3f..a441ba24 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -72,7 +72,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, - retryCheckTick: 3 * time.Second, + retryCheckTick: 10 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -388,7 +388,7 @@ func (fsm *fileSendMap) isFilesSendFailed(descs []dcSDK.FileDesc) bool { } for _, ci := range *c { if ci.Match(desc) { - return ci.SendStatus == types.FileSendFailed + return ci.SendStatus != types.FileSendSucceed } } } @@ -626,51 +626,73 @@ func (m *Mgr) workerCheck(ctx context.Context) { } func (m *Mgr) retrySendToolChains(ctx context.Context) { + ticker := time.NewTicker(m.workerCheckTick) + defer ticker.Stop() + + var workerStatus sync.Map for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - default: - var wg sync.WaitGroup + case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() + count := 0 + wg := make(chan string, len(hosts)) for _, h := range hosts { - blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) - fileCollections, err := m.getFailedFileCollectionByHost(h.Server) - if err != nil { - blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err) + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { continue } + fileCollections := m.getFailedFileCollectionByHost(h.Server) if len(fileCollections) == 0 { continue } - wg.Add(1) + workerStatus.Store(h.Server, false) + count++ go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }, fileCollections, &wg) + }, fileCollections, wg) } - wg.Wait() // 等待所有 goroutine 完成 - time.Sleep(m.workerCheckTick) + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() } } } func (m *Mgr) retryFailFiles(ctx context.Context) { + ticker := time.NewTicker(m.retryCheckTick) + defer ticker.Stop() + + var workerStatus sync.Map for { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) return - default: - var wg sync.WaitGroup + case <-ticker.C: hosts := m.work.Resource().GetHosts() - //wg.Add(len(hosts)) + wg := make(chan string, len(hosts)) + count := 0 for _, h := range hosts { - blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) + workerNeedRetry := true + if v, ok := workerStatus.Load(h.Server); ok { + workerNeedRetry = v.(bool) + } + if !workerNeedRetry { + continue + } m.failFileSendMutex.Lock() sendMap := m.failFileSendMap[h.Server] if sendMap == nil { @@ -684,11 +706,16 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { if len(failFiles) == 0 { continue } - wg.Add(1) - go m.retrySendFiles(h, failFiles, &wg) + workerStatus.Store(h.Server, false) + count++ + go m.retrySendFiles(h, failFiles, wg) } - wg.Wait() // 等待所有 goroutine 完成 - time.Sleep(m.retryCheckTick) + go func() { + for i := 0; i < count; i++ { + host := <-wg + workerStatus.Store(host, true) + } + }() } } } @@ -880,21 +907,21 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) { - defer wg.Done() // 在函数结束时调用 Done - +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, host chan string) { blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) - if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ + _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, Req: failFiles, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }); err != nil { + }) + if err != nil { blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) - return + } else { + blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } - blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) + host <- h.Server } func (m *Mgr) ensureFilesWithPriority( @@ -997,7 +1024,7 @@ func (m *Mgr) ensureFiles( _, t, _ := rules.Satisfy(fd.File.FilePath) - blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) + //blog.Debugf("remote: ensure file %s and match rule %d", fd.File.FilePath, t) if f.AllDistributed { t = dcSDK.FilterRuleHandleAllDistribution } @@ -1040,7 +1067,7 @@ func (m *Mgr) ensureFiles( } r = append(r, f.Targetrelativepath) - blog.Debugf("remote: debug ensure into fd.Servers") + //blog.Debugf("remote: debug ensure into fd.Servers") for _, s := range fd.Servers { if s == nil { continue @@ -1151,7 +1178,7 @@ func (m *Mgr) ensureFiles( _ = m.appendCorkFiles(server, needSendCorkFiles) // notify send - //m.sendCorkChan <- true + m.sendCorkChan <- true } } else { // 单个文件发送模式 @@ -1441,9 +1468,7 @@ func (m *Mgr) checkBatchCache( // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { t1 := time.Now().Local() - m.fileSendMutex.Lock() - t2 := time.Now().Local() if d1 := t2.Sub(t1); d1 > 50*time.Millisecond { // blog.Debugf("check cache lock wait too long server(%s): %s", server, d1.String()) @@ -1580,13 +1605,14 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote } // getFailedFileCollectionByHost 返回失败文件集合 -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { +func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollectionInfo { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) + blog.Infof("remote: no found host(%s) in file send cache") + return nil } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { @@ -1595,13 +1621,11 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect fcs = append(fcs, re) } } - return fcs, nil + return fcs } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) { - defer wg.Done() // 在函数结束时调用 Done - +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, host chan string) { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { @@ -1611,6 +1635,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R } // enable worker m.resource.EnableWorker(req.Server) + host <- req.Server.Server blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 6273946e..127441a8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -513,12 +513,16 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) if worker.disabled || worker.dead { continue } + matched := false for _, host := range banWorkerList { if worker.host.Equal(host) { - continue + matched = true + break } } - + if matched { + continue + } free := worker.totalSlots - worker.occupiedSlots if free >= max { max = free @@ -542,12 +546,17 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco if worker.disabled || worker.dead { continue } + matched := false for _, host := range banWorkerList { if worker.host.Equal(host) { - continue + matched = true + break } } + if matched { + continue + } free := worker.totalSlots - worker.occupiedSlots // 在资源空闲时,大文件优先 @@ -666,13 +675,14 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 { set := wr.getUsageSet(usage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", - wr.totalSlots, wr.occupiedSlots) - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) - satisfied = true + if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", + wr.totalSlots, wr.occupiedSlots) + msg.result <- h + satisfied = true + } } } @@ -688,24 +698,25 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { func (wr *resource) putSlot(msg lockWorkerMessage) { wr.freeWorkerSlots(msg.toward) wr.occupiedSlots-- + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.totalSlots) usage := msg.jobUsage set := wr.getUsageSet(usage) set.occupied-- // check whether other waiting is satisfied now if wr.waitingList.Len() > 0 { + blog.Debugf("remote slot: free slot for worker %v, %v", wr.occupiedSlots, wr.waitingList.Len()) for e := wr.waitingList.Front(); e != nil; e = e.Next() { msg := e.Value.(*lockWorkerMessage) set := wr.getUsageSet(msg.jobUsage) if wr.isIdle(set) { - set.occupied++ - wr.occupiedSlots++ - - msg.result <- wr.occupyWorkerSlots(msg.largeFile, []*dcProtocol.Host{}) - - wr.waitingList.Remove(e) - - break + if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + break + } } } } diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 496b612c..2523d70e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -80,11 +80,13 @@ type worker struct { func (wr *worker) occupySlot() error { wr.occupiedSlots++ + blog.Debugf("worker: occupy slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil } func (wr *worker) freeSlot() error { wr.occupiedSlots-- + blog.Debugf("worker: free slot for worker %s, %v, %v", wr.host.Server, wr.occupiedSlots, wr.totalSlots) return nil } From e25a54405baacf0119a34f8c69083f5789d89592 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 18 Nov 2024 17:48:16 +0800 Subject: [PATCH 13/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index a441ba24..1a645a8d 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -908,7 +908,7 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) } func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, host chan string) { - blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) + blog.Infof("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, Req: failFiles, @@ -919,7 +919,7 @@ func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, hos if err != nil { blog.Errorf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s failed: %v", m.work.ID(), 1, h.Server, err) } else { - blog.Debugf("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) + blog.Infof("remote: success to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) } host <- h.Server } @@ -1628,16 +1628,17 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollecti func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, host chan string) { blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + if err != nil { blog.Errorf("remote: retry send tool chain for work(%s) from pid(%d) to server(%s), "+ "send tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) - return + + } else { + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } - // enable worker - m.resource.EnableWorker(req.Server) host <- req.Server.Server - blog.Infof("remote: success to retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - } func (m *Mgr) sendFileCollectionOnce( From 885fb000ce735f9222525b9563097e5cfecd14c7 Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 20 Nov 2024 15:54:27 +0800 Subject: [PATCH 14/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 14 ++-- .../controller/pkg/manager/remote/slots.go | 67 +++++++++++++++---- .../worker/pkg/client/bkcommondist_handler.go | 8 +-- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 1a645a8d..0eda7806 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -83,8 +83,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { } const ( - syncHostTimeTimes = 3 - toolChainRetryTimes = 10 + syncHostTimeTimes = 3 ) // Mgr describe the remote manager @@ -633,7 +632,7 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { for { select { case <-ctx.Done(): - blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: handler := m.remoteWorker.Handler(0, nil, nil, nil) @@ -679,7 +678,7 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { for { select { case <-ctx.Done(): - blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: hosts := m.work.Resource().GetHosts() @@ -748,8 +747,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLeaveTime) m.work.Basic().UpdateJobStats(req.Stats) + hosts := m.work.Resource().GetHosts() for _, c := range req.Req.Commands { - for _, s := range m.work.Resource().GetHosts() { + for _, s := range hosts { m.failFileSendMutex.Lock() f := m.failFileSendMap[s.Server] m.failFileSendMutex.Unlock() @@ -770,6 +770,10 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas } } } + if len(req.BanWorkerList) == len(hosts) { + return nil, errors.New("no available worker, all worker are banned") + } + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 127441a8..81975a54 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -506,9 +506,10 @@ func (wr *resource) addWorker(host *dcProtocol.Host) { return } -func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) *worker { +func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) (*worker, bool) { var w *worker max := 0 + hasAvailableWorker := false for _, worker := range wr.worker { if worker.disabled || worker.dead { continue @@ -523,6 +524,8 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) if matched { continue } + + hasAvailableWorker = true free := worker.totalSlots - worker.occupiedSlots if free >= max { max = free @@ -534,14 +537,15 @@ func (wr *resource) getWorkerWithMostFreeSlots(banWorkerList []*dcProtocol.Host) // w = wr.worker[0] // } - return w + return w, hasAvailableWorker } // 大文件优先 -func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) *worker { +func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtocol.Host) (*worker, bool) { var w *worker max := 0 inlargequeue := false + hasAvailableWorker := false for _, worker := range wr.worker { if worker.disabled || worker.dead { continue @@ -557,6 +561,8 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco if matched { continue } + + hasAvailableWorker = true free := worker.totalSlots - worker.occupiedSlots // 在资源空闲时,大文件优先 @@ -582,33 +588,34 @@ func (wr *resource) getWorkerLargeFileFirst(f string, banWorkerList []*dcProtoco if w == nil { // w = wr.worker[0] - return w + return w, hasAvailableWorker } if f != "" && !w.hasFile(f) { w.largefiles = append(w.largefiles, f) } - return w + return w, hasAvailableWorker } -func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host { +func (wr *resource) occupyWorkerSlots(f string, banWorkerList []*dcProtocol.Host) (*dcProtocol.Host, bool) { wr.workerLock.Lock() defer wr.workerLock.Unlock() var w *worker + var hasWorkerAvailable bool if f == "" { - w = wr.getWorkerWithMostFreeSlots(banWorkerList) + w, hasWorkerAvailable = wr.getWorkerWithMostFreeSlots(banWorkerList) } else { - w = wr.getWorkerLargeFileFirst(f, banWorkerList) + w, hasWorkerAvailable = wr.getWorkerLargeFileFirst(f, banWorkerList) } if w == nil { - return nil + return nil, hasWorkerAvailable } _ = w.occupySlot() - return w.host + return w.host, hasWorkerAvailable } func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) { @@ -626,6 +633,9 @@ func (wr *resource) freeWorkerSlots(host *dcProtocol.Host) { } func (wr *resource) handleLock(ctx context.Context) { + ticker := time.NewTicker(time.Second * 20) + + defer ticker.Stop() wr.ctx = ctx for { @@ -638,6 +648,10 @@ func (wr *resource) handleLock(ctx context.Context) { wr.getSlot(msg) case <-wr.emptyChan: wr.onSlotEmpty() + case <-ticker.C: + if wr.waitingList.Len() > 0 { + go wr.occupyWaitList() + } } } } @@ -675,7 +689,7 @@ func (wr *resource) getSlot(msg lockWorkerMessage) { if wr.occupiedSlots < wr.totalSlots || wr.totalSlots <= 0 { set := wr.getUsageSet(usage) if wr.isIdle(set) { - if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + if h, _ := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { set.occupied++ wr.occupiedSlots++ blog.Infof("remote slot: total slots:%d occupied slots:%d, remote slot available", @@ -710,12 +724,18 @@ func (wr *resource) putSlot(msg lockWorkerMessage) { msg := e.Value.(*lockWorkerMessage) set := wr.getUsageSet(msg.jobUsage) if wr.isIdle(set) { - if h := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { + if h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList); h != nil { set.occupied++ wr.occupiedSlots++ msg.result <- h wr.waitingList.Remove(e) break + } else if !hasAvailableWorker { + msg.result <- nil + wr.waitingList.Remove(e) + blog.Debugf("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) } } } @@ -732,3 +752,26 @@ func (wr *resource) onSlotEmpty() { wr.waitingList.Remove(e) } } + +func (wr *resource) occupyWaitList() { + for e := wr.waitingList.Front(); e != nil; e = e.Next() { + msg := e.Value.(*lockWorkerMessage) + set := wr.getUsageSet(msg.jobUsage) + if wr.isIdle(set) { + h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) + if h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + blog.Debugf("remote slot: occupy waiting list") + } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + msg.result <- nil + wr.waitingList.Remove(e) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + } + } + } +} diff --git a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go index 957df5ae..5c129625 100644 --- a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go +++ b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go @@ -424,9 +424,9 @@ func (r *CommonRemoteHandler) ExecuteSendFile( // 加本地资源锁 locallocked := false if mgr != nil { - if mgr.LockSlots(dcSDK.JobUsageLocalExe, 1) { + if mgr.LockSlots(dcSDK.JobUsageDefault, 1) { locallocked = true - blog.Debugf("remotehandle: succeed to get one local lock") + blog.Debugf("remotehandle: succeed to get one default lock") } } @@ -466,8 +466,8 @@ func (r *CommonRemoteHandler) ExecuteSendFile( t1 = t2 if locallocked { - mgr.UnlockSlots(dcSDK.JobUsageLocalExe, 1) - blog.Debugf("remotehandle: succeed to release one local lock") + mgr.UnlockSlots(dcSDK.JobUsageDefault, 1) + blog.Debugf("remotehandle: succeed to release one default lock") } if err != nil { From d25d27e70c2c72fa7e28dfdb327d2fb95339861c Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 20 Nov 2024 16:39:51 +0800 Subject: [PATCH 15/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/slots.go | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 81975a54..b4ad8406 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -649,9 +649,7 @@ func (wr *resource) handleLock(ctx context.Context) { case <-wr.emptyChan: wr.onSlotEmpty() case <-ticker.C: - if wr.waitingList.Len() > 0 { - go wr.occupyWaitList() - } + wr.occupyWaitList() } } } @@ -733,7 +731,7 @@ func (wr *resource) putSlot(msg lockWorkerMessage) { } else if !hasAvailableWorker { msg.result <- nil wr.waitingList.Remove(e) - blog.Debugf("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) } else { blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) } @@ -754,23 +752,25 @@ func (wr *resource) onSlotEmpty() { } func (wr *resource) occupyWaitList() { - for e := wr.waitingList.Front(); e != nil; e = e.Next() { - msg := e.Value.(*lockWorkerMessage) - set := wr.getUsageSet(msg.jobUsage) - if wr.isIdle(set) { - h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) - if h != nil { - set.occupied++ - wr.occupiedSlots++ - msg.result <- h - wr.waitingList.Remove(e) - blog.Debugf("remote slot: occupy waiting list") - } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local - blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) - msg.result <- nil - wr.waitingList.Remove(e) - } else { - blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + if wr.waitingList.Len() > 0 { + for e := wr.waitingList.Front(); e != nil; e = e.Next() { + msg := e.Value.(*lockWorkerMessage) + set := wr.getUsageSet(msg.jobUsage) + if wr.isIdle(set) { + h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) + if h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + blog.Debugf("remote slot: occupy waiting list") + } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + msg.result <- nil + wr.waitingList.Remove(e) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + } } } } From 7693da30c0705f6dd19245684c38fb16cd1b76fa Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 20 Nov 2024 17:38:55 +0800 Subject: [PATCH 16/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../worker/pkg/client/bkcommondist_handler_long_tcp.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go index 36ccd415..d8828ab5 100644 --- a/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go +++ b/src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler_long_tcp.go @@ -212,9 +212,9 @@ func (r *CommonRemoteHandler) ExecuteSendFileLongTCP( // 加本地资源锁 locallocked := false if mgr != nil { - if mgr.LockSlots(dcSDK.JobUsageLocalExe, 1) { + if mgr.LockSlots(dcSDK.JobUsageDefault, 1) { locallocked = true - blog.Infof("remotehandle: succeed to get one local lock") + blog.Infof("remotehandle: succeed to get one default lock") } } @@ -261,8 +261,8 @@ func (r *CommonRemoteHandler) ExecuteSendFileLongTCP( t1 = t2 if locallocked { - mgr.UnlockSlots(dcSDK.JobUsageLocalExe, 1) - blog.Infof("remotehandle: succeed to release one local lock") + mgr.UnlockSlots(dcSDK.JobUsageDefault, 1) + blog.Infof("remotehandle: succeed to release one default lock") } if err != nil { From 4642326a286236fa2676d9ae418565f9ed254231 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 21 Nov 2024 16:51:36 +0800 Subject: [PATCH 17/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 0eda7806..3cea011c 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -184,7 +184,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI c, ok := fsm.cache[desc.FilePath] if !ok || c == nil || len(*c) == 0 { - return nil, false, fmt.Errorf("file %s not found", desc.FilePath) + return nil, false, fmt.Errorf("file %s not found, file cache is nil", desc.FilePath) } for _, ci := range *c { @@ -635,6 +635,9 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: + if m.failFileSendMap == nil { + continue + } handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() count := 0 @@ -681,6 +684,9 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: + if m.failFileSendMap == nil { + continue + } hosts := m.work.Resource().GetHosts() wg := make(chan string, len(hosts)) count := 0 @@ -1506,7 +1512,7 @@ func (m *Mgr) checkOrLockSendFailFile(server string, desc dcSDK.FileDesc, query info, match, err := target.matchFail(desc, query) if err != nil { - return types.FileSendUnknown, false, errors.New("file not found") + return types.FileSendUnknown, false, err } if info == nil { return types.FileSendUnknown, false, errors.New("file is nil") @@ -1774,8 +1780,33 @@ func (m *Mgr) ensureOneFileCollection( Servers = append(Servers, host) fileDetails := make([]*types.FilesDetails, 0, len(fc.Files)) + + var failsendMap *fileSendMap + if fc.Retry { + m.failFileSendMutex.Lock() + failsendMap = m.failFileSendMap[host.Server] + if failsendMap == nil { + m.failFileSendMutex.Unlock() + blog.Errorf("remote: send file for work(%s) with no send map", m.work.ID()) + return errors.New("remote: send file with no send map") + } + m.failFileSendMutex.Unlock() + } for _, f := range fc.Files { f.NoDuplicated = true + if fc.Retry { + for _, c := range failsendMap.cache { + for _, d := range *c { + if d.Match(f) { + f.Retry = fc.Retry + break + } + } + if f.Retry { + break + } + } + } fileDetails = append(fileDetails, &types.FilesDetails{ Servers: Servers, File: f, From 7f70df037ab05d637348b23a9964b9e50b432088 Mon Sep 17 00:00:00 2001 From: yanafu Date: Fri, 22 Nov 2024 09:41:39 +0800 Subject: [PATCH 18/18] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/controller/pkg/manager/remote/mgr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 3cea011c..0bd5b044 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -635,7 +635,7 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { blog.Infof("remote: run toolchain check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: - if m.failFileSendMap == nil { + if m.failFileSendMap == nil || len(m.failFileSendMap) == 0 { continue } handler := m.remoteWorker.Handler(0, nil, nil, nil) @@ -684,7 +684,7 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { blog.Infof("remote: run failfiles check for work(%s) canceled by context", m.work.ID()) return case <-ticker.C: - if m.failFileSendMap == nil { + if m.failFileSendMap == nil || len(m.failFileSendMap) == 0 { continue } hosts := m.work.Resource().GetHosts()