Skip to content

Commit

Permalink
feat: 增加toolchain发送失败后重试修复 #289
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Sep 5, 2024
1 parent 678ffb8 commit db2b701
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,32 +1268,40 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote
return nil
}

func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) {
func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) {
m.fileCollectionSendMutex.RLock()
defer m.fileCollectionSendMutex.RUnlock()

target, ok := m.fileCollectionSendMap[server]
if !ok {
return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server)
return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server)
}
fcs := make([]*types.FileCollectionInfo, 0)
for _, re := range *target {
//如果有fc未到终结状态,则直接返回
if !re.SendStatus.IsTerminated() {
return nil, fmt.Errorf("remote: found file collection(%s) in file send cache, but not finished", re.UniqID)
blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus)
return nil, false, nil
}
if re.SendStatus == types.FileSendFailed {
fcs = append(fcs, re)
}
}
return fcs, nil
return fcs, true, nil
}

func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) {
if m.resource.CanWorkerRetry(req.Server) {
go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) {
for i := 0; i < toolChainRetryTimes; i++ {
fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server)
for i := 0; i < toolChainRetryTimes; {
fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server)
if !isNotTeminated {
time.Sleep(m.toolChainRetryTick)
continue
}
i++
blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)",
m.work.ID(), i, req.Pid, req.Server.Server)
if err != nil || len(fileCollections) == 0 {
if err != nil {
blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err)
Expand All @@ -1304,18 +1312,17 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R
continue
}

blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)",
m.work.ID(), i+1, req.Pid, req.Server.Server)
if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil {
blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+
"send tool chain files failed: %v", m.work.ID(), i+1, req.Pid, req.Server.Server, err)
"send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err)
time.Sleep(m.toolChainRetryTick)
} else {
// enable worker
m.resource.EnableWorker(req.Server)
blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
return
}

}
m.resource.SetWorkerStatus(req.Server, RetryFailed)
blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server)
Expand Down

0 comments on commit db2b701

Please sign in to comment.