From a03efad2e2ec84b731d14299a0a70d82e14ea368 Mon Sep 17 00:00:00 2001 From: yanafu Date: Fri, 6 Dec 2024 09:21:12 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20=E6=96=87=E4=BB=B6=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=E4=BC=9A=E5=B0=9D=E8=AF=95=E5=85=B6?= =?UTF-8?q?=E4=BB=96worker=E6=88=90=E5=8A=9F=E9=87=8D=E8=AF=95=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20#330?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/common/sdk/worker.go | 2 + .../controller/pkg/manager/local/mgr.go | 11 ++++ .../controller/pkg/manager/remote/mgr.go | 65 ++++++++++++------- .../controller/pkg/manager/remote/utils.go | 3 +- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/backend/booster/bk_dist/common/sdk/worker.go b/src/backend/booster/bk_dist/common/sdk/worker.go index 15ef9fb8..ff133af6 100644 --- a/src/backend/booster/bk_dist/common/sdk/worker.go +++ b/src/backend/booster/bk_dist/common/sdk/worker.go @@ -85,11 +85,13 @@ const ( type FileDesc struct { FilePath string `json:"file_path"` FileSize int64 `json:"file_size"` + InitFileSize int64 `json:"init_file_size"` Lastmodifytime int64 `json:"last_modify_time"` Md5 string `json:"md5"` Compresstype protocol.CompressType `json:"compress_type"` Buffer []byte `json:"buffer"` CompressedSize int64 `json:"compressed_size"` + InitCompressedSize int64 `json:"init_compressed_size"` Targetrelativepath string `json:"target_relative_path"` Filemode uint32 `json:"file_mode"` LinkTarget string `json:"link_target"` diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go index 401115e1..09db96f4 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go @@ -255,6 +255,17 @@ func (m *Mgr) ExecuteTask( m.work.ID(), req.Pid, i, err) break } + // 远程任务失败后,将文件大小和压缩大小都置为初始值,方便其他worker重试 + for i, c := range remoteReq.Req.Commands { + for j, f := range c.Inputfiles { + if f.Compresstype < 0 && f.InitCompressedSize > 0 { + remoteReq.Req.Commands[i].Inputfiles[j].CompressedSize = remoteReq.Req.Commands[i].Inputfiles[j].InitCompressedSize + } + if f.FileSize < 0 && f.InitFileSize > 0 { + remoteReq.Req.Commands[i].Inputfiles[j].FileSize = remoteReq.Req.Commands[i].Inputfiles[j].InitFileSize + } + } + } blog.Infof("local: retry remote-task from work(%s) for the(%d) time from pid(%d) "+ "with error(%v),ban (%d) worker:(%s)", m.work.ID(), diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 80a702f3..218368a6 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -13,6 +13,7 @@ import ( "context" "errors" "fmt" + "os" "path/filepath" "runtime" "runtime/debug" @@ -318,9 +319,6 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { } func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { - if status == types.FileSendSucceed && !desc.Retry { - return - } fsm.Lock() defer fsm.Unlock() @@ -356,9 +354,6 @@ func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileS } func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { - if status != types.FileSendSucceed && desc.Retry { - return - } fsm.Lock() defer fsm.Unlock() @@ -429,6 +424,11 @@ func (fsm *fileSendMap) getFailFiles() []dcSDK.FileDesc { if ci.SendStatus != types.FileSendFailed { continue } + _, err := os.Stat(ci.FullPath) + if os.IsNotExist(err) { + blog.Warnf("remote: get fail file %s not exist", ci.FullPath) + continue + } failFiles = append(failFiles, dcSDK.FileDesc{ FilePath: ci.FullPath, Compresstype: dcProtocol.CompressLZ4, @@ -810,8 +810,8 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, len(req.BanWorkerList)) return nil, errors.New("no available worker") } - blog.Infof("remote: selected host(%s) with large file(%s)", - req.Server.Server, fpath) + blog.Infof("remote: selected host(%s) with large file(%s) for work(%s) from pid(%d)", + req.Server.Server, fpath, m.work.ID(), req.Pid) dcSDK.StatsTimeNow(&req.Stats.RemoteWorkLockTime) defer dcSDK.StatsTimeNow(&req.Stats.RemoteWorkUnlockTime) @@ -1595,25 +1595,42 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry } } -func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - target = &fileSendMap{} - m.fileSendMap[server] = target +func (m *Mgr) needToUpdateFail(desc dcSDK.FileDesc, status types.FileSendStatus) bool { + if status == types.FileSendSucceed && !desc.Retry { + return false } - m.fileSendMutex.Unlock() - target.updateStatus(desc, status) + if strings.HasSuffix(desc.FilePath, ".ii") { + return false + } + if strings.HasSuffix(desc.FilePath, ".i") { + return false + } + return true +} - m.failFileSendMutex.Lock() - failTarget, ok := m.failFileSendMap[server] - if !ok { - failTarget = &fileSendMap{} - m.failFileSendMap[server] = failTarget +func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { + if status == types.FileSendSucceed || !desc.Retry { + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + target = &fileSendMap{} + m.fileSendMap[server] = target + } + m.fileSendMutex.Unlock() + target.updateStatus(desc, status) } - m.failFileSendMutex.Unlock() - failTarget.updateFailStatus(desc, status) + if m.needToUpdateFail(desc, status) { + m.failFileSendMutex.Lock() + failTarget, ok := m.failFileSendMap[server] + if !ok { + failTarget = &fileSendMap{} + m.failFileSendMap[server] = failTarget + } + m.failFileSendMutex.Unlock() + + failTarget.updateFailStatus(desc, status) + } } func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) error { @@ -1666,7 +1683,7 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) []*types.FileCollecti target, ok := m.fileCollectionSendMap[server] if !ok { - blog.Infof("remote: no found host(%s) in file send cache", server) + blog.Debugf("remote: no found host(%s) in file send cache", server) return nil } fcs := make([]*types.FileCollectionInfo, 0) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/utils.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/utils.go index 39dba418..94a3d099 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/utils.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/utils.go @@ -70,7 +70,8 @@ func updateTaskRequestInputFilesReady(req *types.RemoteTaskExecuteRequest, baseD index++ continue } - + req.Req.Commands[i].Inputfiles[j].InitFileSize = req.Req.Commands[i].Inputfiles[j].FileSize + req.Req.Commands[i].Inputfiles[j].InitCompressedSize = req.Req.Commands[i].Inputfiles[j].CompressedSize req.Req.Commands[i].Inputfiles[j].FileSize = -1 req.Req.Commands[i].Inputfiles[j].CompressedSize = -1 req.Req.Commands[i].Inputfiles[j].Targetrelativepath = baseDir From a4abc10187901b1943354431a2bbeb962661d841 Mon Sep 17 00:00:00 2001 From: yanafu Date: Fri, 6 Dec 2024 09:52:13 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E6=96=87=E4=BB=B6=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=E4=BC=9A=E5=B0=9D=E8=AF=95=E5=85=B6?= =?UTF-8?q?=E4=BB=96worker=E6=88=90=E5=8A=9F=E9=87=8D=E8=AF=95=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20#330?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go index 09db96f4..324aa00e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go @@ -258,7 +258,7 @@ func (m *Mgr) ExecuteTask( // 远程任务失败后,将文件大小和压缩大小都置为初始值,方便其他worker重试 for i, c := range remoteReq.Req.Commands { for j, f := range c.Inputfiles { - if f.Compresstype < 0 && f.InitCompressedSize > 0 { + if f.CompressedSize < 0 && f.InitCompressedSize > 0 { remoteReq.Req.Commands[i].Inputfiles[j].CompressedSize = remoteReq.Req.Commands[i].Inputfiles[j].InitCompressedSize } if f.FileSize < 0 && f.InitFileSize > 0 { From 2f7447bfb484b5e788e482ac521f72e343298ea4 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 9 Dec 2024 17:15:32 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20=E6=96=87=E4=BB=B6=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=E4=BC=9A=E5=B0=9D=E8=AF=95=E5=85=B6?= =?UTF-8?q?=E4=BB=96worker=E6=88=90=E5=8A=9F=E9=87=8D=E8=AF=95=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20#330?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/controller/pkg/manager/local/mgr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go index 324aa00e..0b6a2f1a 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go @@ -258,10 +258,10 @@ func (m *Mgr) ExecuteTask( // 远程任务失败后,将文件大小和压缩大小都置为初始值,方便其他worker重试 for i, c := range remoteReq.Req.Commands { for j, f := range c.Inputfiles { - if f.CompressedSize < 0 && f.InitCompressedSize > 0 { + if f.CompressedSize < 0 && f.InitCompressedSize >= 0 { remoteReq.Req.Commands[i].Inputfiles[j].CompressedSize = remoteReq.Req.Commands[i].Inputfiles[j].InitCompressedSize } - if f.FileSize < 0 && f.InitFileSize > 0 { + if f.FileSize < 0 && f.InitFileSize >= 0 { remoteReq.Req.Commands[i].Inputfiles[j].FileSize = remoteReq.Req.Commands[i].Inputfiles[j].InitFileSize } }