From 06a8f69c49fe01adef812e10fb901fbdeb43df01 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 2 Sep 2024 10:45:44 +0800 Subject: [PATCH 1/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 196 +++++++++++++----- .../pkg/manager/remote/slotbyworkeroffer.go | 69 ++++++ .../controller/pkg/manager/remote/slots.go | 77 +++++++ .../controller/pkg/manager/remote/worker.go | 4 + 4 files changed, 293 insertions(+), 53 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 84e807fd7..edcc76c49 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -71,6 +71,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, + toolChainRetryTick: 10 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -81,7 +82,8 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { } const ( - syncHostTimeTimes = 3 + syncHostTimeTimes = 3 + toolChainRetryTimes = 10 ) // Mgr describe the remote manager @@ -110,11 +112,12 @@ type Mgr struct { conf *config.ServerConfig - resourceCheckTick time.Duration - workerCheckTick time.Duration - lastUsed uint64 // only accurate to second now - lastApplied uint64 // only accurate to second now - remotejobs int64 // save job number which using remote worker + resourceCheckTick time.Duration + workerCheckTick time.Duration + toolChainRetryTick time.Duration + lastUsed uint64 // only accurate to second now + lastApplied uint64 // only accurate to second now + remotejobs int64 // save job number which using remote worker sendCorkTick time.Duration sendCorkChan chan bool @@ -131,7 +134,7 @@ type fileSendMap struct { cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -159,6 +162,11 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, boo for _, ci := range *c { if ci.Match(desc) { + //if worker is retrying and not send succeed, try to set send status to sending + if retry && ci.SendStatus != types.FileSendSucceed { + ci.SendStatus = types.FileSending + return ci, false + } return ci, true } } @@ -167,7 +175,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, boo return info, false } -func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { +func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -202,9 +210,15 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { matched := false for _, ci := range *c { if ci.Match(*desc) { + fileMatch := true + //if worker is retrying and not send succeed, try to set send status to sending + if retry && ci.SendStatus != types.FileSendSucceed { + fileMatch = false + ci.SendStatus = types.FileSending + } result = append(result, matchResult{ info: ci, - match: true, + match: fileMatch, }) matched = true break @@ -509,6 +523,7 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) + m.retrySendToolChain(handler, req) return nil, err } @@ -627,7 +642,7 @@ func (m *Mgr) ensureFilesWithPriority( blog.Infof("remote: try to ensure priority(%d) files(%d) for work(%s) from pid(%d) dir(%s) to server", i, len(*f), m.work.ID(), pid, sandbox.Dir) - r, err := m.ensureFiles(handler, pid, sandbox, *f) + r, err := m.ensureFiles(handler, pid, sandbox, *f, false) if err != nil { return nil, err } @@ -646,7 +661,8 @@ func (m *Mgr) ensureFiles( handler dcSDK.RemoteWorkerHandler, pid int, sandbox *dcSyscall.Sandbox, - fileDetails []*types.FilesDetails) ([]string, error) { + fileDetails []*types.FilesDetails, + retry bool) ([]string, error) { settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", @@ -727,15 +743,15 @@ func (m *Mgr) ensureFiles( } count++ if !m.conf.SendCork { - go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender) { + go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) { t := time.Now().Local() - err <- m.ensureSingleFile(handler, host, req, sandbox) + err <- m.ensureSingleFile(handler, host, req, sandbox, retry) d := time.Now().Local().Sub(t) if d > 200*time.Millisecond { blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) } - }(wg, s, sender) + }(wg, s, sender, retry) } else { // for send cork cf := &corkFile{ @@ -804,7 +820,7 @@ func (m *Mgr) ensureFiles( for _, v := range *fs { descs = append(descs, v.file) } - results := m.checkOrLockCorkFiles(server, descs) + results := m.checkOrLockCorkFiles(server, descs, retry) blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server", len(results), len(descs), count, m.work.ID(), pid) needSendCorkFiles := make([]*corkFile, 0, totalFileNum) @@ -826,9 +842,9 @@ func (m *Mgr) ensureFiles( // 启动协程跟踪未发送完成的文件 c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult) { - err <- m.ensureSingleCorkFile(c, r) - }(wg, c, v) + go func(err chan<- error, c *corkFile, r matchResult, retry bool) { + err <- m.ensureSingleCorkFile(c, r, retry) + }(wg, c, v, retry) } // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache @@ -915,28 +931,29 @@ func (m *Mgr) ensureSingleFile( handler dcSDK.RemoteWorkerHandler, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, - sandbox *dcSyscall.Sandbox) (err error) { + sandbox *dcSyscall.Sandbox, + retry bool) (err error) { if len(req.Files) == 0 { return fmt.Errorf("empty files") } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) - status, ok := m.checkOrLockSendFile(host.Server, desc) + status, ok := m.checkOrLockSendFile(host.Server, desc, retry) // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ - "some one is sending this file", desc.FilePath, m.work.ID(), host.Server) + "some one is sending this file with retry %t", desc.FilePath, m.work.ID(), host.Server, retry) tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() for status == types.FileSending { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, desc) + status, _ = m.checkOrLockSendFile(host.Server, desc, retry) } } @@ -972,8 +989,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -998,8 +1015,8 @@ func (m *Mgr) ensureSingleFile( } if err != nil { - blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed: %v", - desc.FilePath, m.work.ID(), host.Server, err) + blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed with retry %t: %v", + desc.FilePath, m.work.ID(), host.Server, retry, err) return err } @@ -1008,13 +1025,13 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) return nil } // ensureSingleCorkFile 保证给到的第一个文件被正确分发到目标机器上, 若给到的文件多于一个, 多余的部分会被忽略 -func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { +func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err error) { status := r.info.SendStatus host := c.host desc := c.file @@ -1032,7 +1049,7 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { for status == types.FileSending { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, *desc) + status, _ = m.checkOrLockSendFile(host.Server, *desc, retry) } } @@ -1143,7 +1160,7 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, retry bool) (types.FileSendStatus, bool) { t1 := time.Now().Local() m.fileSendMutex.Lock() @@ -1165,7 +1182,7 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.Fil } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc) + info, match := target.matchOrInsert(desc, retry) return info.SendStatus, match } @@ -1175,7 +1192,7 @@ type matchResult struct { } // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { +func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry bool) []matchResult { m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -1184,7 +1201,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat } m.fileSendMutex.Unlock() - return target.matchOrInserts(descs) + return target.matchOrInserts(descs, retry) } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1204,7 +1221,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote // TODO : update all file path for p2p fileCollections := m.getToolChainFromExecuteRequest(req) if fileCollections != nil && len(fileCollections) > 0 { - err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1222,7 +1239,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote if fileCollections != nil && len(fileCollections) > 0 { blog.Infof("remote: found tool chain changed, send toolchain to server[%s] again", req.Server.Server) - err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1243,12 +1260,66 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } +func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { + m.fileCollectionSendMutex.RLock() + defer m.fileCollectionSendMutex.RUnlock() + + target, ok := m.fileCollectionSendMap[server] + if !ok { + return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) + } + fcs := make([]*types.FileCollectionInfo, 0) + for _, re := range *target { + if re.SendStatus != types.FileSendSucceed { + fcs = append(fcs, re) + } + } + return fcs, nil +} +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { + fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) + if err != nil { + blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) + return + } + if len(fileCollections) == 0 { + blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + return + } + + if m.resource.CanWorkerRetry(req.Server) { + go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { + for i := 0; i < toolChainRetryTimes; i++ { + blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", + m.work.ID(), i+1, req.Pid, req.Server.Server) + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { + blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ + "send tool chain files failed: %v", m.work.ID(), i+1, req.Pid, req.Server.Server, err) + time.Sleep(m.toolChainRetryTick) + } else { + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + return + } + } + m.resource.SetWorkerStatus(req.Server, Init) + blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) + }(handler, *req) + } else { + blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)", + req.Server.Server, m.work.ID(), req.Pid, req.Server.Server) + } + +} + func (m *Mgr) sendFileCollectionOnce( handler dcSDK.RemoteWorkerHandler, pid int, sandbox *dcSyscall.Sandbox, server *dcProtocol.Host, - filecollections []*types.FileCollectionInfo) error { + filecollections []*types.FileCollectionInfo, + retry bool) error { blog.Infof("remote: try to send %d file collection for work(%s) from pid(%d) dir(%s) to server", len(filecollections), m.work.ID(), pid, sandbox.Dir) @@ -1257,14 +1328,24 @@ func (m *Mgr) sendFileCollectionOnce( count := 0 for _, fc := range filecollections { count++ - go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo) { - err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox) + go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo, retry bool) { + err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox, retry) // err <- m.ensureOneFileCollectionByFiles(handler, pid, host, filecollection, sandbox) - }(wg, server, fc) + }(wg, server, fc, retry) } for i := 0; i < count; i++ { if err = <-wg; err != nil { + // 异常情况下启动一个协程将消息收完,避免发送协程阻塞 + i++ + if i < count { + go func(i, count int, c <-chan error) { + for ; i < count; i++ { + <-c + } + }(i, count, wg) + } + return err } } @@ -1280,11 +1361,12 @@ func (m *Mgr) ensureOneFileCollection( pid int, host *dcProtocol.Host, fc *types.FileCollectionInfo, - sandbox *dcSyscall.Sandbox) (err error) { + sandbox *dcSyscall.Sandbox, + retry bool) (err error) { blog.Infof("remote: try to ensure one file collection(%s) for work(%s) to server(%s)", fc.UniqID, m.work.ID(), host.Server) - status, ok := m.checkOrLockFileCollection(host.Server, fc) + status, ok := m.checkOrLockFileCollection(host.Server, fc, retry) // 已经有人发送了文件, 等待文件就绪 if ok { @@ -1328,8 +1410,8 @@ func (m *Mgr) ensureOneFileCollection( } blog.Infof("remote: try to ensure one file collection(%s) timestamp(%d) filenum(%d) cache-hit(%d) "+ - "for work(%s) to server(%s), going to send this collection", - fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server) + "for work(%s) to server(%s), going to send this collection with retry:%t", + fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server, retry) // !! 这个地方不需要了,需要注释掉,影响性能 // req := &dcSDK.BKDistFileSender{Files: needSentFiles} @@ -1369,7 +1451,7 @@ func (m *Mgr) ensureOneFileCollection( File: f, }) } - _, err = m.ensureFiles(handler, pid, sandbox, fileDetails) + _, err = m.ensureFiles(handler, pid, sandbox, fileDetails, retry) defer func() { status := types.FileSendSucceed if err != nil { @@ -1379,24 +1461,27 @@ func (m *Mgr) ensureOneFileCollection( }() if err != nil { - blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed: %v", - fc.UniqID, m.work.ID(), host.Server, err) + blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed with retry %t: %v ", + fc.UniqID, m.work.ID(), host.Server, retry, err) return err } blog.Debugf("remote: success to execute send file collection(%s) files(%+v) timestamp(%d) filenum(%d) "+ - "for work(%s) to server(%s)", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server) + "for work(%s) to server(%s) with retry %t", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server, retry) return nil } // checkOrLockFileCollection 检查目标file collection的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, // 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo, retry bool) (types.FileSendStatus, bool) { m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() target, ok := m.fileCollectionSendMap[server] if !ok { + if retry { + blog.Warnf("remote: file collection(%s) not found in cache with retry %t", fc.UniqID, retry) + } filecollections := make([]*types.FileCollectionInfo, 0, 10) m.fileCollectionSendMap[server] = &filecollections target = m.fileCollectionSendMap[server] @@ -1404,6 +1489,11 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { + // if retry, set status to sending if fc not send succeed + if retry && f.SendStatus != types.FileSendSucceed { + f.SendStatus = types.FileSending + return f.SendStatus, false + } return f.SendStatus, true } } @@ -1418,8 +1508,8 @@ func (m *Mgr) updateFileCollectionStatus(server string, fc *types.FileCollection m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() - blog.Infof("remote: ready add collection(%s) server(%s) timestamp(%d) status(%d) to cache", - fc.UniqID, server, fc.Timestamp, status) + blog.Infof("remote: ready add collection(%s) server(%s) timestamp(%d) status(%s) to cache", + fc.UniqID, server, fc.Timestamp, status.String()) target, ok := m.fileCollectionSendMap[server] if !ok { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index ba1171a8b..7bcbb79dd 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -257,6 +257,75 @@ func (wo *workerOffer) DisableWorker(host *dcProtocol.Host) { return } +func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { + if host == nil { + return + } + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, w := range wo.worker { + if !host.Equal(w.host) { + continue + } + if !w.disabled { + blog.Infof("remote slot: host:%v enabled before,do nothing now", *host) + return // already enabled + } + + w.disabled = false + w.status = Init + wo.validWorkerNum += 1 + break + } + + blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) +} + +func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { + if host == nil { + return false + } + + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, wk := range wo.worker { + if !wk.host.Equal(host) { + continue + } + + if wk.dead { + blog.Infof("remote slot: host:%v is already dead,do nothing now", host) + return false + } + + if wk.status == Retrying { + blog.Infof("remote slot: host:%v is retrying,do nothing now", host) + return true + } + blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) + wk.status = Retrying + return false + } + + return false +} + +func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, w := range wo.worker { + if !host.Equal(w.host) { + continue + } + + w.status = s + break + } +} + func (wo *workerOffer) WorkerDead(w *worker) { if w == nil || w.host == nil { return diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 392ee887e..5b48381e2 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -27,6 +27,9 @@ type RemoteSlotMgr interface { GetDeadWorkers() []*worker RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) + EnableWorker(host *dcProtocol.Host) + CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying + SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) TotalSlots() int @@ -267,6 +270,80 @@ func (wr *resource) DisableWorker(host *dcProtocol.Host) { return } +func (wr *resource) EnableWorker(host *dcProtocol.Host) { + if host == nil { + return + } + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + + for _, w := range wr.worker { + if !host.Equal(w.host) { + continue + } + if !w.disabled { + blog.Infof("remote slot: host:%v enabled before, do nothing now", *host) + return // already enabled + } + + w.disabled = false + wr.totalSlots += w.totalSlots + w.status = RetrySucceed + break + } + + for _, v := range wr.usageMap { + v.limit = wr.totalSlots + blog.Infof("remote slot: usage map:%v after enable host:%v", *v, *host) + } + blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) +} + +func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { + if host == nil { + return false + } + + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + for _, wk := range wr.worker { + if !wk.host.Equal(host) { + continue + } + + if wk.dead { + blog.Infof("remote slot: host:%v is already dead, do nothing now", host) + return false + } + if !wk.disabled { + return false + } + if wk.status == Retrying { + blog.Infof("remote slot: host:%v is retrying, do nothing now", host) + return false + } + blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) + wk.status = Retrying + return true + } + + return false +} + +func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + + for _, w := range wr.worker { + if !w.host.Equal(host) { + continue + } + + w.status = s + break + } +} + func (wr *resource) WorkerDead(w *worker) { if w == nil || w.host == nil { return diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 901bd369b..3b36dd494 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -28,6 +28,8 @@ const ( DetectFailed Refused InService + Retrying + RetrySucceed Unknown = 99 ) @@ -39,6 +41,8 @@ var ( DetectFailed: "detectfailed", Refused: "refused", InService: "inservice", + Retrying: "retrying", + RetrySucceed: "retrysucceed", Unknown: "unknown", } ) From 1c4e1a6bc240e1238e591ffd8a95ca5e4b46d40b Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 2 Sep 2024 11:18:08 +0800 Subject: [PATCH 2/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/controller/pkg/manager/remote/mgr.go | 2 +- .../booster/bk_dist/controller/pkg/manager/remote/worker.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index cb7afc41c..949d980a9 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -1303,7 +1303,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R return } } - m.resource.SetWorkerStatus(req.Server, Init) + m.resource.SetWorkerStatus(req.Server, RetryFailed) blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) }(handler, *req) } else { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 3b36dd494..496b612cb 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -30,6 +30,7 @@ const ( InService Retrying RetrySucceed + RetryFailed Unknown = 99 ) @@ -43,6 +44,7 @@ var ( InService: "inservice", Retrying: "retrying", RetrySucceed: "retrysucceed", + RetryFailed: "retryfailed", Unknown: "unknown", } ) From f67bc4145565ef9b6123fa6d8449fd27b99e13f3 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 2 Sep 2024 20:02:15 +0800 Subject: [PATCH 3/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 949d980a9..8e955b5f2 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" dcSyscall "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/syscall" @@ -163,7 +162,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.F for _, ci := range *c { if ci.Match(desc) { //if worker is retrying and not send succeed, try to set send status to sending - if retry && ci.SendStatus != types.FileSendSucceed { + if retry && ci.SendStatus == types.FileSendFailed { ci.SendStatus = types.FileSending return ci, false } @@ -211,8 +210,8 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []ma for _, ci := range *c { if ci.Match(*desc) { fileMatch := true - //if worker is retrying and not send succeed, try to set send status to sending - if retry && ci.SendStatus != types.FileSendSucceed { + //if worker is retrying and send failed, try to set send status to sending + if retry && ci.SendStatus == types.FileSendFailed { fileMatch = false ci.SendStatus = types.FileSending } @@ -842,9 +841,9 @@ func (m *Mgr) ensureFiles( // 启动协程跟踪未发送完成的文件 c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult, retry bool) { - err <- m.ensureSingleCorkFile(c, r, retry) - }(wg, c, v, retry) + go func(err chan<- error, c *corkFile, r matchResult) { + err <- m.ensureSingleCorkFile(c, r) + }(wg, c, v) } // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache @@ -953,7 +952,8 @@ func (m *Mgr) ensureSingleFile( for status == types.FileSending { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, desc, retry) + // 不是发送文件的goroutine,不能修改状态 + status, _ = m.checkOrLockSendFile(host.Server, desc, false) } } @@ -1031,7 +1031,7 @@ func (m *Mgr) ensureSingleFile( } // ensureSingleCorkFile 保证给到的第一个文件被正确分发到目标机器上, 若给到的文件多于一个, 多余的部分会被忽略 -func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err error) { +func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { status := r.info.SendStatus host := c.host desc := c.file @@ -1049,7 +1049,8 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err for status == types.FileSending { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, *desc, retry) + // 不是发送文件的goroutine,不能修改状态 + status, _ = m.checkOrLockSendFile(host.Server, *desc, false) } } @@ -1277,19 +1278,20 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect return fcs, nil } func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { - fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) - if err != nil { - blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) - return - } - if len(fileCollections) == 0 { - blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) - return - } - if m.resource.CanWorkerRetry(req.Server) { go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { for i := 0; i < toolChainRetryTimes; i++ { + fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) + if err != nil || len(fileCollections) == 0 { + if err != nil { + blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) + } else { + blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + } + time.Sleep(m.toolChainRetryTick) + continue + } + blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", m.work.ID(), i+1, req.Pid, req.Server.Server) if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { @@ -1489,8 +1491,8 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { - // if retry, set status to sending if fc not send succeed - if retry && f.SendStatus != types.FileSendSucceed { + // if retry, set status to sending if fc send failed + if retry && f.SendStatus == types.FileSendFailed { f.SendStatus = types.FileSending return f.SendStatus, false } @@ -1848,7 +1850,7 @@ func (m *Mgr) updateToolChainPath(req *types.RemoteTaskExecuteRequest) error { c.ExeToolChainKey, remotepath, req.Req.Commands[i].Inputfiles) req.Req.Commands[i].Inputfiles = append(req.Req.Commands[i].Inputfiles, dcSDK.FileDesc{ FilePath: c.ExeName, - Compresstype: protocol.CompressLZ4, + Compresstype: dcProtocol.CompressLZ4, FileSize: -1, Lastmodifytime: 0, Md5: "", From 70877a454820b282b6eb18661b08818341743b87 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 5 Sep 2024 14:53:40 +0800 Subject: [PATCH 4/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 25 ++++++++++++------- .../bk_dist/controller/pkg/types/manager.go | 12 +++++---- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 8e955b5f2..3bdc6adf5 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -161,9 +161,9 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.F for _, ci := range *c { if ci.Match(desc) { - //if worker is retrying and not send succeed, try to set send status to sending + //if worker is retrying and send failed before, try to set send status to retrying if retry && ci.SendStatus == types.FileSendFailed { - ci.SendStatus = types.FileSending + ci.SendStatus = types.FileSendRetrying return ci, false } return ci, true @@ -209,15 +209,15 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []ma matched := false for _, ci := range *c { if ci.Match(*desc) { - fileMatch := true - //if worker is retrying and send failed, try to set send status to sending + fileMatched := true + //if worker is retrying and send failed before, try to set send status to retrying if retry && ci.SendStatus == types.FileSendFailed { - fileMatch = false - ci.SendStatus = types.FileSending + fileMatched = false + ci.SendStatus = types.FileSendRetrying } result = append(result, matchResult{ info: ci, - match: fileMatch, + match: fileMatched, }) matched = true break @@ -952,7 +952,7 @@ func (m *Mgr) ensureSingleFile( for status == types.FileSending { select { case <-tick.C: - // 不是发送文件的goroutine,不能修改状态 + // 不是发送文件的goroutine,不需要发送failed文件 status, _ = m.checkOrLockSendFile(host.Server, desc, false) } } @@ -966,6 +966,10 @@ func (m *Mgr) ensureSingleFile( blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", desc.FilePath, m.work.ID(), host.Server) return nil + case types.FileSendRetrying: + blog.Infof("remote: single file(%s) for work(%s) to server(%s) is retrying now", + desc.FilePath, m.work.ID(), host.Server) + return nil default: return fmt.Errorf("unknown file send status: %s", status.String()) } @@ -1063,6 +1067,9 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { blog.Debugf("remote: end ensure single cork file(%s) for work(%s) to server(%s) succeed", desc.FilePath, m.work.ID(), host.Server) return nil + case types.FileSendRetrying: + blog.Infof("remote: single cork file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) + return nil default: blog.Errorf("remote: end ensure single cork file(%s) for work(%s) to server(%s), "+ " with unknown status", desc.FilePath, m.work.ID(), host.Server) @@ -1271,7 +1278,7 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { - if re.SendStatus != types.FileSendSucceed { + if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index 7cda92868..d7b9c2edf 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -236,16 +236,18 @@ const ( FileSending FileSendSucceed FileSendFailed + FileSendRetrying FileSendUnknown = 99 ) var ( fileStatusMap = map[FileSendStatus]string{ - FileSendInit: "sendinit", - FileSending: "sending", - FileSendSucceed: "sendsucceed", - FileSendFailed: "sendfailed", - FileSendUnknown: "unknown", + FileSendInit: "sendinit", + FileSending: "sending", + FileSendSucceed: "sendsucceed", + FileSendFailed: "sendfailed", + FileSendRetrying: "sendretrying", + FileSendUnknown: "unknown", } ) From 678ffb845c8818528cf6db0aa9965f0d6465797e Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 5 Sep 2024 17:06:56 +0800 Subject: [PATCH 5/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bk_dist/controller/pkg/manager/remote/mgr.go | 13 +++++++++---- .../booster/bk_dist/controller/pkg/types/error.go | 1 + .../booster/bk_dist/controller/pkg/types/manager.go | 4 ++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 3bdc6adf5..178ebea0b 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -967,9 +967,9 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server) return nil case types.FileSendRetrying: - blog.Infof("remote: single file(%s) for work(%s) to server(%s) is retrying now", + blog.Warnf("remote: single file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) - return nil + return types.ErrSendFileRetrying default: return fmt.Errorf("unknown file send status: %s", status.String()) } @@ -1068,8 +1068,8 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { desc.FilePath, m.work.ID(), host.Server) return nil case types.FileSendRetrying: - blog.Infof("remote: single cork file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) - return nil + blog.Warnf("remote: single cork file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) + return types.ErrSendFileRetrying default: blog.Errorf("remote: end ensure single cork file(%s) for work(%s) to server(%s), "+ " with unknown status", desc.FilePath, m.work.ID(), host.Server) @@ -1278,12 +1278,17 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { + //如果有fc未到终结状态,则直接返回 + if !re.SendStatus.IsTerminated() { + return nil, fmt.Errorf("remote: found file collection(%s) in file send cache, but not finished", re.UniqID) + } if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } return fcs, nil } + func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { if m.resource.CanWorkerRetry(req.Server) { go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { diff --git a/src/backend/booster/bk_dist/controller/pkg/types/error.go b/src/backend/booster/bk_dist/controller/pkg/types/error.go index 735e08b64..f0ea3f405 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/error.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/error.go @@ -32,6 +32,7 @@ var ( ErrFileNotFound = fmt.Errorf("not found file info") ErrWorkCannotBeUpdatedHeartbeat = fmt.Errorf("work can not be updated heartbeat") ErrSendFileFailed = fmt.Errorf("send file failed") + ErrSendFileRetrying = fmt.Errorf("send file retrying") ErrTaskCannotBeReleased = fmt.Errorf("task can not be released") ErrTaskAlreadyReleased = fmt.Errorf("task already released") ErrSlotsLockFailed = fmt.Errorf("slots lock failed`") diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index d7b9c2edf..a9ab6acd8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -260,6 +260,10 @@ func (f FileSendStatus) String() string { return "unknown" } +func (f FileSendStatus) IsTerminated() bool { + return f == FileSendSucceed || f == FileSendFailed +} + // FileCollectionInfo save file collection send status type FileCollectionInfo struct { UniqID string `json:"uniq_id"` From db2b70161b7f17da3f1e0564f84f914257338726 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 5 Sep 2024 19:37:31 +0800 Subject: [PATCH 6/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 178ebea0b..72e7af5e8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -1268,32 +1268,40 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } -func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, error) { +func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() target, ok := m.fileCollectionSendMap[server] if !ok { - return nil, fmt.Errorf("remote: no found host(%s) in file send cache", server) + return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server) } fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { //如果有fc未到终结状态,则直接返回 if !re.SendStatus.IsTerminated() { - return nil, fmt.Errorf("remote: found file collection(%s) in file send cache, but not finished", re.UniqID) + blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) + return nil, false, nil } if re.SendStatus == types.FileSendFailed { fcs = append(fcs, re) } } - return fcs, nil + return fcs, true, nil } func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { if m.resource.CanWorkerRetry(req.Server) { go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { - for i := 0; i < toolChainRetryTimes; i++ { - fileCollections, err := m.getFailedFileCollectionByHost(req.Server.Server) + for i := 0; i < toolChainRetryTimes; { + fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + if !isNotTeminated { + time.Sleep(m.toolChainRetryTick) + continue + } + i++ + blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", + m.work.ID(), i, req.Pid, req.Server.Server) if err != nil || len(fileCollections) == 0 { if err != nil { blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) @@ -1304,11 +1312,9 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R continue } - blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", - m.work.ID(), i+1, req.Pid, req.Server.Server) if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ - "send tool chain files failed: %v", m.work.ID(), i+1, req.Pid, req.Server.Server, err) + "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) time.Sleep(m.toolChainRetryTick) } else { // enable worker @@ -1316,6 +1322,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) return } + } m.resource.SetWorkerStatus(req.Server, RetryFailed) blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) From f1e89a5b12d28638ef82352da72aae6448eb3e5c Mon Sep 17 00:00:00 2001 From: yanafu Date: Fri, 6 Sep 2024 11:25:38 +0800 Subject: [PATCH 7/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../booster/bk_dist/controller/pkg/manager/remote/mgr.go | 3 ++- src/backend/booster/bk_dist/controller/pkg/types/manager.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 72e7af5e8..f1fa0577f 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -1268,6 +1268,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } +// getFailedFileCollectionByHost 返回文件集合,如果文件集没有全部完成,返回false,否则返回true func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { m.fileCollectionSendMutex.RLock() defer m.fileCollectionSendMutex.RUnlock() @@ -1279,7 +1280,7 @@ func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollect fcs := make([]*types.FileCollectionInfo, 0) for _, re := range *target { //如果有fc未到终结状态,则直接返回 - if !re.SendStatus.IsTerminated() { + if !re.SendStatus.IsFinished() { blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) return nil, false, nil } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index a9ab6acd8..5c7cfad1d 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -260,7 +260,7 @@ func (f FileSendStatus) String() string { return "unknown" } -func (f FileSendStatus) IsTerminated() bool { +func (f FileSendStatus) IsFinished() bool { return f == FileSendSucceed || f == FileSendFailed } From 2992b14c330a816d41ad540a6ab008814c0b08f5 Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 9 Sep 2024 17:25:09 +0800 Subject: [PATCH 8/8] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0toolchain?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20#289?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bk_dist/controller/pkg/manager/remote/mgr.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index f1fa0577f..b9045c050 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -841,9 +841,9 @@ func (m *Mgr) ensureFiles( // 启动协程跟踪未发送完成的文件 c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult) { - err <- m.ensureSingleCorkFile(c, r) - }(wg, c, v) + go func(err chan<- error, c *corkFile, r matchResult, retry bool) { + err <- m.ensureSingleCorkFile(c, r, retry) + }(wg, c, v, retry) } // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache @@ -949,7 +949,7 @@ func (m *Mgr) ensureSingleFile( tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending { + for status == types.FileSending || (retry && status == types.FileSendRetrying) { select { case <-tick.C: // 不是发送文件的goroutine,不需要发送failed文件 @@ -1035,7 +1035,7 @@ func (m *Mgr) ensureSingleFile( } // ensureSingleCorkFile 保证给到的第一个文件被正确分发到目标机器上, 若给到的文件多于一个, 多余的部分会被忽略 -func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { +func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err error) { status := r.info.SendStatus host := c.host desc := c.file @@ -1050,7 +1050,7 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending { + for status == types.FileSending || (retry && status == types.FileSendRetrying) { select { case <-tick.C: // 不是发送文件的goroutine,不能修改状态