Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 #311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 6, 2024
1 parent b9bd1d1 commit 28c8b16
Showing 1 changed file with 50 additions and 31 deletions.
81 changes: 50 additions & 31 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,23 @@ func (m *Mgr) retrySendToolChains(ctx context.Context) {
var wg sync.WaitGroup
handler := m.remoteWorker.Handler(0, nil, nil, nil)
hosts := m.work.Resource().GetHosts()
wg.Add(len(hosts))
for _, h := range hosts {
blog.Debugf("remote: try to retry send tool chain for work(%s) to server %s", m.work.ID(), h.Server)
fileCollections, err := m.getFailedFileCollectionByHost(h.Server)
if err != nil {
blog.Infof("remote: retry send tool chain for work(%s) to server(%s) failed: %v", m.work.ID(), h.Server, err)
continue
}
if len(fileCollections) == 0 {
continue
}
wg.Add(1)
go m.retrySendToolChain(handler, &types.RemoteTaskExecuteRequest{
Pid: 0,
Server: h,
Sandbox: &dcSyscall.Sandbox{Dir: ""},
Stats: &dcSDK.ControllerJobStats{},
}, &wg)
}, fileCollections, &wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.workerCheckTick)
Expand All @@ -656,13 +664,28 @@ func (m *Mgr) retryFailFiles(ctx context.Context) {
select {
case <-ctx.Done():
blog.Infof("remote: run worker check for work(%s) canceled by context", m.work.ID())
return
default:
var wg sync.WaitGroup
hosts := m.work.Resource().GetHosts()
wg.Add(len(hosts))
//wg.Add(len(hosts))
for _, h := range hosts {
blog.Debugf("remote: try to retry send fail file for work(%s) to server %s", m.work.ID(), h.Server)
go m.retrySendFiles(h, &wg)
m.failFileSendMutex.Lock()
sendMap := m.failFileSendMap[h.Server]
if sendMap == nil {
m.failFileSendMutex.Unlock()
blog.Infof("remote: send file for work(%s) with no send map", m.work.ID())
continue
}
m.failFileSendMutex.Unlock()

failFiles := sendMap.getFailFiles()
if len(failFiles) == 0 {
continue
}
wg.Add(1)
go m.retrySendFiles(h, failFiles, &wg)
}
wg.Wait() // 等待所有 goroutine 完成
time.Sleep(m.retryCheckTick)
Expand Down Expand Up @@ -707,11 +730,20 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
continue
}
if f.isFilesSendFailed(c.Inputfiles) {
req.BanWorkerList = append(req.BanWorkerList, s)
matched := false
for _, h := range req.BanWorkerList {
if h.Equal(s) {
matched = true
break
}
}
if !matched {
req.BanWorkerList = append(req.BanWorkerList, s)
}
}
}
}
blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %v", m.work.ID(), req.Pid, req.BanWorkerList)
blog.Debugf("remote: try to execute remote task for work(%s) from pid(%d) with ban worker list %d, %v", m.work.ID(), req.Pid, len(req.BanWorkerList), req.BanWorkerList)
// 如果有超过100MB的大文件,则在选择host时,作为选择条件
fpath, _ := getMaxSizeFile(req, m.largeFileSize)
req.Server = m.lockSlots(dcSDK.JobUsageRemoteExe, fpath, req.BanWorkerList)
Expand Down Expand Up @@ -761,7 +793,16 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas

remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req))
if err != nil {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
matched := false
for _, h := range req.BanWorkerList {
if h.Equal(req.Server) {
matched = true
break
}
}
if !matched {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
}
var banlistStr string
for _, s := range req.BanWorkerList {
banlistStr = banlistStr + s.Server + ","
Expand Down Expand Up @@ -839,22 +880,9 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error)
)
}

func (m *Mgr) retrySendFiles(h *dcProtocol.Host, wg *sync.WaitGroup) {
func (m *Mgr) retrySendFiles(h *dcProtocol.Host, failFiles []dcSDK.FileDesc, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done

m.failFileSendMutex.Lock()
sendMap := m.failFileSendMap[h.Server]
if sendMap == nil {
m.failFileSendMutex.Unlock()
blog.Infof("remote: send file for work(%s) with no send map", m.work.ID())
return
}
m.failFileSendMutex.Unlock()

failFiles := sendMap.getFailFiles()
if len(failFiles) == 0 {
return
}
blog.Debugf("remote: try to retry send fail file for work(%s) from pid(%d) to server %s with fail files %v", m.work.ID(), 1, h.Server, len(failFiles))
if _, err := m.SendFiles(&types.RemoteTaskSendFileRequest{
Pid: 1,
Expand Down Expand Up @@ -1571,17 +1599,8 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect
}

// retry send failed tool chain
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, wg *sync.WaitGroup) {
func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest, fileCollections []*types.FileCollectionInfo, wg *sync.WaitGroup) {
defer wg.Done() // 在函数结束时调用 Done
fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server)
if err != nil {
blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
return
}
if len(fileCollections) == 0 {
//blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s) failed: no filecollection found", m.work.ID(), req.Pid, req.Server.Server)
return
}

blog.Infof("remote: retry send tool chain for work(%s) from pid(%d) to server(%s)",
m.work.ID(), req.Pid, req.Server.Server)
Expand Down

0 comments on commit 28c8b16

Please sign in to comment.