Skip to content

Commit

Permalink
Merge pull request #331 from tbs60/dev_yanafu
Browse files Browse the repository at this point in the history
文件发送失败后会尝试其他worker成功重试优化
  • Loading branch information
tming authored Dec 10, 2024
2 parents fca5b44 + 2f7447b commit 1fbb796
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/backend/booster/bk_dist/common/sdk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ const (
type FileDesc struct {
FilePath string `json:"file_path"`
FileSize int64 `json:"file_size"`
InitFileSize int64 `json:"init_file_size"`
Lastmodifytime int64 `json:"last_modify_time"`
Md5 string `json:"md5"`
Compresstype protocol.CompressType `json:"compress_type"`
Buffer []byte `json:"buffer"`
CompressedSize int64 `json:"compressed_size"`
InitCompressedSize int64 `json:"init_compressed_size"`
Targetrelativepath string `json:"target_relative_path"`
Filemode uint32 `json:"file_mode"`
LinkTarget string `json:"link_target"`
Expand Down
11 changes: 11 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,17 @@ func (m *Mgr) ExecuteTask(
m.work.ID(), req.Pid, i, err)
break
}
// 远程任务失败后,将文件大小和压缩大小都置为初始值,方便其他worker重试
for i, c := range remoteReq.Req.Commands {
for j, f := range c.Inputfiles {
if f.CompressedSize < 0 && f.InitCompressedSize >= 0 {
remoteReq.Req.Commands[i].Inputfiles[j].CompressedSize = remoteReq.Req.Commands[i].Inputfiles[j].InitCompressedSize
}
if f.FileSize < 0 && f.InitFileSize >= 0 {
remoteReq.Req.Commands[i].Inputfiles[j].FileSize = remoteReq.Req.Commands[i].Inputfiles[j].InitFileSize
}
}
}
blog.Infof("local: retry remote-task from work(%s) for the(%d) time from pid(%d) "+
"with error(%v),ban (%d) worker:(%s)",
m.work.ID(),
Expand Down
65 changes: 41 additions & 24 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/debug"
Expand Down Expand Up @@ -318,9 +319,6 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
}

func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
if status == types.FileSendSucceed && !desc.Retry {
return
}
fsm.Lock()
defer fsm.Unlock()

Expand Down Expand Up @@ -356,9 +354,6 @@ func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileS
}

func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
if status != types.FileSendSucceed && desc.Retry {
return
}
fsm.Lock()
defer fsm.Unlock()

Expand Down Expand Up @@ -429,6 +424,11 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc {
if ci.SendStatus != types.FileSendFailed {
continue
}
_, err := os.Stat(ci.FullPath)
if os.IsNotExist(err) {
blog.Warnf("remote: get fail file %s not exist", ci.FullPath)
continue
}
failFiles = append(failFiles, dcSDK.FileDesc{
FilePath: ci.FullPath,
Compresstype: dcProtocol.CompressLZ4,
Expand Down Expand Up @@ -810,8 +810,8 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
m.work.ID(), req.Pid, len(req.BanWorkerList))
return nil, errors.New("no available worker")
}
blog.Infof("remote: selected host(%s) with large file(%s)",
req.Server.Server, fpath)
blog.Infof("remote: selected host(%s) with large file(%s) for work(%s) from pid(%d)",
req.Server.Server, fpath, m.work.ID(), req.Pid)

dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLockTime)
defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkUnlockTime)
Expand Down Expand Up @@ -1595,25 +1595,42 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry
}
}

func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) {
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
func (m *Mgr) needToUpdateFail(desc dcSDK.FileDesc, status types.FileSendStatus) bool {
if status == types.FileSendSucceed && !desc.Retry {
return false
}
m.fileSendMutex.Unlock()
target.updateStatus(desc, status)
if strings.HasSuffix(desc.FilePath, ".ii") {
return false
}
if strings.HasSuffix(desc.FilePath, ".i") {
return false
}
return true
}

m.failFileSendMutex.Lock()
failTarget, ok := m.failFileSendMap[server]
if !ok {
failTarget = &fileSendMap{}
m.failFileSendMap[server] = failTarget
func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) {
if status == types.FileSendSucceed || !desc.Retry {
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
}
m.fileSendMutex.Unlock()
target.updateStatus(desc, status)
}
m.failFileSendMutex.Unlock()

failTarget.updateFailStatus(desc, status)
if m.needToUpdateFail(desc, status) {
m.failFileSendMutex.Lock()
failTarget, ok := m.failFileSendMap[server]
if !ok {
failTarget = &fileSendMap{}
m.failFileSendMap[server] = failTarget
}
m.failFileSendMutex.Unlock()

failTarget.updateFailStatus(desc, status)
}
}

func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error {
Expand Down Expand Up @@ -1666,7 +1683,7 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollecti

target, ok := m.fileCollectionSendMap[server]
if !ok {
blog.Infof("remote: no found host(%s) in file send cache", server)
blog.Debugf("remote: no found host(%s) in file send cache", server)
return nil
}
fcs := make([]*types.FileCollectionInfo, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func updateTaskRequestInputFilesReady(req *types.RemoteTaskExecuteRequest, baseD
index++
continue
}

req.Req.Commands[i].Inputfiles[j].InitFileSize = req.Req.Commands[i].Inputfiles[j].FileSize
req.Req.Commands[i].Inputfiles[j].InitCompressedSize = req.Req.Commands[i].Inputfiles[j].CompressedSize
req.Req.Commands[i].Inputfiles[j].FileSize = -1
req.Req.Commands[i].Inputfiles[j].CompressedSize = -1
req.Req.Commands[i].Inputfiles[j].Targetrelativepath = baseDir
Expand Down

0 comments on commit 1fbb796

Please sign in to comment.