diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index a3a391a4..a1697d3f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -635,15 +635,23 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) { var wg sync.WaitGroup handler := m.remoteWorker.Handler(0, nil, nil, nil) hosts := m.work.Resource().GetHosts() - wg.Add(len(hosts)) for _, h := range hosts { blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server) + fileCollections, err := m.getFailedFileCollectionByHost(h.Server) + if err != nil { + blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err) + continue + } + if len(fileCollections) == 0 { + continue + } + wg.Add(1) go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{ Pid: 0, Server: h, Sandbox: &dcSyscall.Sandbox{Dir: ""}, Stats: &dcSDK.ControllerJobStats{}, - }, &wg) + }, fileCollections, &wg) } wg.Wait() // 等待所有 goroutine 完成 time.Sleep(m.workerCheckTick) @@ -656,13 +664,28 @@ func (m *Mgr) retryFailFiles(ctx context.Context) { select { case <-ctx.Done(): blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID()) + return default: var wg sync.WaitGroup hosts := m.work.Resource().GetHosts() - wg.Add(len(hosts)) + //wg.Add(len(hosts)) for _, h := range hosts { blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server) - go m.retrySendFiles(h, &wg) + m.failFileSendMutex.Lock() + sendMap := m.failFileSendMap[h.Server] + if sendMap == nil { + m.failFileSendMutex.Unlock() + blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) + continue + } + m.failFileSendMutex.Unlock() + + failFiles := sendMap.getFailFiles() + if len(failFiles) == 0 { + continue + } + wg.Add(1) + go m.retrySendFiles(h, failFiles, &wg) } wg.Wait() // 等待所有 goroutine 完成 time.Sleep(m.retryCheckTick) @@ -707,11 +730,20 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas continue } if f.isFilesSendFailed(c.Inputfiles) { - req.BanWorkerList = append(req.BanWorkerList, s) + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(s) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, s) + } } } } - blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList) + blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList) // 如果有超过100MB的大文件,则在选择host时,作为选择条件 fpath, _ := getMaxSizeFile(req, m.largeFileSize) req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList) @@ -761,7 +793,16 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { - req.BanWorkerList = append(req.BanWorkerList, req.Server) + matched := false + for _, h := range req.BanWorkerList { + if h.Equal(req.Server) { + matched = true + break + } + } + if !matched { + req.BanWorkerList = append(req.BanWorkerList, req.Server) + } var banlistStr string for _, s := range req.BanWorkerList { banlistStr = banlistStr + s.Server + "," @@ -839,22 +880,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } -func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) { +func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) { defer wg.Done() // 在函数结束时调用 Done - m.failFileSendMutex.Lock() - sendMap := m.failFileSendMap[h.Server] - if sendMap == nil { - m.failFileSendMutex.Unlock() - blog.Infof("remote: send file for work(%s) with no send map", m.work.ID()) - return - } - m.failFileSendMutex.Unlock() - - failFiles := sendMap.getFailFiles() - if len(failFiles) == 0 { - return - } blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles)) if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{ Pid: 1, @@ -1571,17 +1599,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } // retry send failed tool chain -func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) { +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) { defer wg.Done() // 在函数结束时调用 Done - fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) - if err != nil { - blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) - return - } - if len(fileCollections) == 0 { - //blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server) - return - } blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)