diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 2d895733..b9045c05 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" dcSyscall "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/syscall" @@ -71,6 +70,7 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { conf: work.Config(), resourceCheckTick: 5 * time.Second, workerCheckTick: 5 * time.Second, + toolChainRetryTick: 10 * time.Second, sendCorkTick: 10 * time.Millisecond, corkSize: corkSize, corkMaxSize: corkMaxSize, @@ -81,7 +81,8 @@ func NewMgr(pCtx context.Context, work *types.Work) types.RemoteMgr { } const ( - syncHostTimeTimes = 3 + syncHostTimeTimes = 3 + toolChainRetryTimes = 10 ) // Mgr describe the remote manager @@ -110,11 +111,12 @@ type Mgr struct { conf *config.ServerConfig - resourceCheckTick time.Duration - workerCheckTick time.Duration - lastUsed uint64 // only accurate to second now - lastApplied uint64 // only accurate to second now - remotejobs int64 // save job number which using remote worker + resourceCheckTick time.Duration + workerCheckTick time.Duration + toolChainRetryTick time.Duration + lastUsed uint64 // only accurate to second now + lastApplied uint64 // only accurate to second now + remotejobs int64 // save job number which using remote worker sendCorkTick time.Duration sendCorkChan chan bool @@ -131,7 +133,7 @@ type fileSendMap struct { cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -159,6 +161,11 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, boo for _, ci := range *c { if ci.Match(desc) { + //if worker is retrying and send failed before, try to set send status to retrying + if retry && ci.SendStatus == types.FileSendFailed { + ci.SendStatus = types.FileSendRetrying + return ci, false + } return ci, true } } @@ -167,7 +174,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc) (*types.FileInfo, boo return info, false } -func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { +func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -202,9 +209,15 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { matched := false for _, ci := range *c { if ci.Match(*desc) { + fileMatched := true + //if worker is retrying and send failed before, try to set send status to retrying + if retry && ci.SendStatus == types.FileSendFailed { + fileMatched = false + ci.SendStatus = types.FileSendRetrying + } result = append(result, matchResult{ info: ci, - match: true, + match: fileMatched, }) matched = true break @@ -509,6 +522,7 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.work.ID(), req.Pid, req.Server.Server, err, req.Server.Server) m.resource.DisableWorker(req.Server) + m.retrySendToolChain(handler, req) return nil, err } @@ -627,7 +641,7 @@ func (m *Mgr) ensureFilesWithPriority( blog.Infof("remote: try to ensure priority(%d) files(%d) for work(%s) from pid(%d) dir(%s) to server", i, len(*f), m.work.ID(), pid, sandbox.Dir) - r, err := m.ensureFiles(handler, pid, sandbox, *f) + r, err := m.ensureFiles(handler, pid, sandbox, *f, false) if err != nil { return nil, err } @@ -646,7 +660,8 @@ func (m *Mgr) ensureFiles( handler dcSDK.RemoteWorkerHandler, pid int, sandbox *dcSyscall.Sandbox, - fileDetails []*types.FilesDetails) ([]string, error) { + fileDetails []*types.FilesDetails, + retry bool) ([]string, error) { settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", @@ -727,15 +742,15 @@ func (m *Mgr) ensureFiles( } count++ if !m.conf.SendCork { - go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender) { + go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) { t := time.Now().Local() - err <- m.ensureSingleFile(handler, host, req, sandbox) + err <- m.ensureSingleFile(handler, host, req, sandbox, retry) d := time.Now().Local().Sub(t) if d > 200*time.Millisecond { blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) } - }(wg, s, sender) + }(wg, s, sender, retry) } else { // for send cork cf := &corkFile{ @@ -804,7 +819,7 @@ func (m *Mgr) ensureFiles( for _, v := range *fs { descs = append(descs, v.file) } - results := m.checkOrLockCorkFiles(server, descs) + results := m.checkOrLockCorkFiles(server, descs, retry) blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server", len(results), len(descs), count, m.work.ID(), pid) needSendCorkFiles := make([]*corkFile, 0, totalFileNum) @@ -826,9 +841,9 @@ func (m *Mgr) ensureFiles( // 启动协程跟踪未发送完成的文件 c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult) { - err <- m.ensureSingleCorkFile(c, r) - }(wg, c, v) + go func(err chan<- error, c *corkFile, r matchResult, retry bool) { + err <- m.ensureSingleCorkFile(c, r, retry) + }(wg, c, v, retry) } // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache @@ -915,28 +930,30 @@ func (m *Mgr) ensureSingleFile( handler dcSDK.RemoteWorkerHandler, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, - sandbox *dcSyscall.Sandbox) (err error) { + sandbox *dcSyscall.Sandbox, + retry bool) (err error) { if len(req.Files) == 0 { return fmt.Errorf("empty files") } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) - status, ok := m.checkOrLockSendFile(host.Server, desc) + status, ok := m.checkOrLockSendFile(host.Server, desc, retry) // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ - "some one is sending this file", desc.FilePath, m.work.ID(), host.Server) + "some one is sending this file with retry %t", desc.FilePath, m.work.ID(), host.Server, retry) tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending { + for status == types.FileSending || (retry && status == types.FileSendRetrying) { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, desc) + // 不是发送文件的goroutine,不需要发送failed文件 + status, _ = m.checkOrLockSendFile(host.Server, desc, false) } } @@ -949,6 +966,10 @@ func (m *Mgr) ensureSingleFile( blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", desc.FilePath, m.work.ID(), host.Server) return nil + case types.FileSendRetrying: + blog.Warnf("remote: single file(%s) for work(%s) to server(%s) is retrying now", + desc.FilePath, m.work.ID(), host.Server) + return types.ErrSendFileRetrying default: return fmt.Errorf("unknown file send status: %s", status.String()) } @@ -972,8 +993,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -998,8 +1019,8 @@ func (m *Mgr) ensureSingleFile( } if err != nil { - blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed: %v", - desc.FilePath, m.work.ID(), host.Server, err) + blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed with retry %t: %v", + desc.FilePath, m.work.ID(), host.Server, retry, err) return err } @@ -1008,13 +1029,13 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", - desc.FilePath, m.work.ID(), host.Server) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %t", + desc.FilePath, m.work.ID(), host.Server, retry) return nil } // ensureSingleCorkFile 保证给到的第一个文件被正确分发到目标机器上, 若给到的文件多于一个, 多余的部分会被忽略 -func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { +func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err error) { status := r.info.SendStatus host := c.host desc := c.file @@ -1029,10 +1050,11 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending { + for status == types.FileSending || (retry && status == types.FileSendRetrying) { select { case <-tick.C: - status, _ = m.checkOrLockSendFile(host.Server, *desc) + // 不是发送文件的goroutine,不能修改状态 + status, _ = m.checkOrLockSendFile(host.Server, *desc, false) } } @@ -1045,6 +1067,9 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { blog.Debugf("remote: end ensure single cork file(%s) for work(%s) to server(%s) succeed", desc.FilePath, m.work.ID(), host.Server) return nil + case types.FileSendRetrying: + blog.Warnf("remote: single cork file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) + return types.ErrSendFileRetrying default: blog.Errorf("remote: end ensure single cork file(%s) for work(%s) to server(%s), "+ " with unknown status", desc.FilePath, m.work.ID(), host.Server) @@ -1143,7 +1168,7 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, retry bool) (types.FileSendStatus, bool) { t1 := time.Now().Local() m.fileSendMutex.Lock() @@ -1165,7 +1190,7 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc) (types.Fil } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc) + info, match := target.matchOrInsert(desc, retry) return info.SendStatus, match } @@ -1175,7 +1200,7 @@ type matchResult struct { } // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { +func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry bool) []matchResult { m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -1184,7 +1209,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat } m.fileSendMutex.Unlock() - return target.matchOrInserts(descs) + return target.matchOrInserts(descs, retry) } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1204,7 +1229,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote // TODO : update all file path for p2p fileCollections := m.getToolChainFromExecuteRequest(req) if fileCollections != nil && len(fileCollections) > 0 { - err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1222,7 +1247,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote if fileCollections != nil && len(fileCollections) > 0 { blog.Infof("remote: found tool chain changed, send toolchain to server[%s] again", req.Server.Server) - err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) + err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1243,12 +1268,80 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote return nil } +// getFailedFileCollectionByHost 返回文件集合,如果文件集没有全部完成,返回false,否则返回true +func (m *Mgr) getFailedFileCollectionByHost(server string) ([]*types.FileCollectionInfo, bool, error) { + m.fileCollectionSendMutex.RLock() + defer m.fileCollectionSendMutex.RUnlock() + + target, ok := m.fileCollectionSendMap[server] + if !ok { + return nil, true, fmt.Errorf("remote: no found host(%s) in file send cache", server) + } + fcs := make([]*types.FileCollectionInfo, 0) + for _, re := range *target { + //如果有fc未到终结状态,则直接返回 + if !re.SendStatus.IsFinished() { + blog.Infof("remote: found file collection(%s) in file send cache, but not finished, status:%s", re.UniqID, re.SendStatus) + return nil, false, nil + } + if re.SendStatus == types.FileSendFailed { + fcs = append(fcs, re) + } + } + return fcs, true, nil +} + +func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.RemoteTaskExecuteRequest) { + if m.resource.CanWorkerRetry(req.Server) { + go func(handler dcSDK.RemoteWorkerHandler, req types.RemoteTaskExecuteRequest) { + for i := 0; i < toolChainRetryTimes; { + fileCollections, isNotTeminated, err := m.getFailedFileCollectionByHost(req.Server.Server) + if !isNotTeminated { + time.Sleep(m.toolChainRetryTick) + continue + } + i++ + blog.Infof("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s)", + m.work.ID(), i, req.Pid, req.Server.Server) + if err != nil || len(fileCollections) == 0 { + if err != nil { + blog.Errorf("remote: get failed file collection by host(%s) failed: %v", req.Server.Server, err) + } else { + blog.Errorf("remote: retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + } + time.Sleep(m.toolChainRetryTick) + continue + } + + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { + blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ + "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) + time.Sleep(m.toolChainRetryTick) + } else { + // enable worker + m.resource.EnableWorker(req.Server) + blog.Infof("remote: success to retry to send tool chain for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + return + } + + } + m.resource.SetWorkerStatus(req.Server, RetryFailed) + blog.Errorf("remote: already retry to send tool chain for work(%s) for %d times from pid(%d) to server(%s) failed, keep worker disable", m.work.ID(), toolChainRetryTimes, req.Pid, req.Server.Server) + }(handler, *req) + } else { + blog.Infof("remote: worker(%s) is alreay retry to send tool chain for work(%s) from pid(%d) to server(%s)", + req.Server.Server, m.work.ID(), req.Pid, req.Server.Server) + } + +} + func (m *Mgr) sendFileCollectionOnce( handler dcSDK.RemoteWorkerHandler, pid int, sandbox *dcSyscall.Sandbox, server *dcProtocol.Host, - filecollections []*types.FileCollectionInfo) error { + filecollections []*types.FileCollectionInfo, + retry bool) error { blog.Infof("remote: try to send %d file collection for work(%s) from pid(%d) dir(%s) to server", len(filecollections), m.work.ID(), pid, sandbox.Dir) @@ -1257,14 +1350,24 @@ func (m *Mgr) sendFileCollectionOnce( count := 0 for _, fc := range filecollections { count++ - go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo) { - err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox) + go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo, retry bool) { + err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox, retry) // err <- m.ensureOneFileCollectionByFiles(handler, pid, host, filecollection, sandbox) - }(wg, server, fc) + }(wg, server, fc, retry) } for i := 0; i < count; i++ { if err = <-wg; err != nil { + // 异常情况下启动一个协程将消息收完,避免发送协程阻塞 + i++ + if i < count { + go func(i, count int, c <-chan error) { + for ; i < count; i++ { + <-c + } + }(i, count, wg) + } + return err } } @@ -1280,11 +1383,12 @@ func (m *Mgr) ensureOneFileCollection( pid int, host *dcProtocol.Host, fc *types.FileCollectionInfo, - sandbox *dcSyscall.Sandbox) (err error) { + sandbox *dcSyscall.Sandbox, + retry bool) (err error) { blog.Infof("remote: try to ensure one file collection(%s) for work(%s) to server(%s)", fc.UniqID, m.work.ID(), host.Server) - status, ok := m.checkOrLockFileCollection(host.Server, fc) + status, ok := m.checkOrLockFileCollection(host.Server, fc, retry) // 已经有人发送了文件, 等待文件就绪 if ok { @@ -1328,8 +1432,8 @@ func (m *Mgr) ensureOneFileCollection( } blog.Infof("remote: try to ensure one file collection(%s) timestamp(%d) filenum(%d) cache-hit(%d) "+ - "for work(%s) to server(%s), going to send this collection", - fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server) + "for work(%s) to server(%s), going to send this collection with retry:%t", + fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server, retry) // !! 这个地方不需要了,需要注释掉,影响性能 // req := &dcSDK.BKDistFileSender{Files: needSentFiles} @@ -1369,7 +1473,7 @@ func (m *Mgr) ensureOneFileCollection( File: f, }) } - _, err = m.ensureFiles(handler, pid, sandbox, fileDetails) + _, err = m.ensureFiles(handler, pid, sandbox, fileDetails, retry) defer func() { status := types.FileSendSucceed if err != nil { @@ -1379,24 +1483,27 @@ func (m *Mgr) ensureOneFileCollection( }() if err != nil { - blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed: %v", - fc.UniqID, m.work.ID(), host.Server, err) + blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed with retry %t: %v ", + fc.UniqID, m.work.ID(), host.Server, retry, err) return err } blog.Debugf("remote: success to execute send file collection(%s) files(%+v) timestamp(%d) filenum(%d) "+ - "for work(%s) to server(%s)", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server) + "for work(%s) to server(%s) with retry %t", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server, retry) return nil } // checkOrLockFileCollection 检查目标file collection的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, // 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo, retry bool) (types.FileSendStatus, bool) { m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() target, ok := m.fileCollectionSendMap[server] if !ok { + if retry { + blog.Warnf("remote: file collection(%s) not found in cache with retry %t", fc.UniqID, retry) + } filecollections := make([]*types.FileCollectionInfo, 0, 10) m.fileCollectionSendMap[server] = &filecollections target = m.fileCollectionSendMap[server] @@ -1404,6 +1511,11 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { + // if retry, set status to sending if fc send failed + if retry && f.SendStatus == types.FileSendFailed { + f.SendStatus = types.FileSending + return f.SendStatus, false + } return f.SendStatus, true } } @@ -1418,8 +1530,8 @@ func (m *Mgr) updateFileCollectionStatus(server string, fc *types.FileCollection m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() - blog.Infof("remote: ready add collection(%s) server(%s) timestamp(%d) status(%d) to cache", - fc.UniqID, server, fc.Timestamp, status) + blog.Infof("remote: ready add collection(%s) server(%s) timestamp(%d) status(%s) to cache", + fc.UniqID, server, fc.Timestamp, status.String()) target, ok := m.fileCollectionSendMap[server] if !ok { @@ -1758,7 +1870,7 @@ func (m *Mgr) updateToolChainPath(req *types.RemoteTaskExecuteRequest) error { c.ExeToolChainKey, remotepath, req.Req.Commands[i].Inputfiles) req.Req.Commands[i].Inputfiles = append(req.Req.Commands[i].Inputfiles, dcSDK.FileDesc{ FilePath: c.ExeName, - Compresstype: protocol.CompressLZ4, + Compresstype: dcProtocol.CompressLZ4, FileSize: -1, Lastmodifytime: 0, Md5: "", diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index ba1171a8..7bcbb79d 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -257,6 +257,75 @@ func (wo *workerOffer) DisableWorker(host *dcProtocol.Host) { return } +func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { + if host == nil { + return + } + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, w := range wo.worker { + if !host.Equal(w.host) { + continue + } + if !w.disabled { + blog.Infof("remote slot: host:%v enabled before,do nothing now", *host) + return // already enabled + } + + w.disabled = false + w.status = Init + wo.validWorkerNum += 1 + break + } + + blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) +} + +func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { + if host == nil { + return false + } + + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, wk := range wo.worker { + if !wk.host.Equal(host) { + continue + } + + if wk.dead { + blog.Infof("remote slot: host:%v is already dead,do nothing now", host) + return false + } + + if wk.status == Retrying { + blog.Infof("remote slot: host:%v is retrying,do nothing now", host) + return true + } + blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) + wk.status = Retrying + return false + } + + return false +} + +func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { + wo.workerLock.Lock() + defer wo.workerLock.Unlock() + + for _, w := range wo.worker { + if !host.Equal(w.host) { + continue + } + + w.status = s + break + } +} + func (wo *workerOffer) WorkerDead(w *worker) { if w == nil || w.host == nil { return diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 392ee887..5b48381e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -27,6 +27,9 @@ type RemoteSlotMgr interface { GetDeadWorkers() []*worker RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) + EnableWorker(host *dcProtocol.Host) + CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying + SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) TotalSlots() int @@ -267,6 +270,80 @@ func (wr *resource) DisableWorker(host *dcProtocol.Host) { return } +func (wr *resource) EnableWorker(host *dcProtocol.Host) { + if host == nil { + return + } + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + + for _, w := range wr.worker { + if !host.Equal(w.host) { + continue + } + if !w.disabled { + blog.Infof("remote slot: host:%v enabled before, do nothing now", *host) + return // already enabled + } + + w.disabled = false + wr.totalSlots += w.totalSlots + w.status = RetrySucceed + break + } + + for _, v := range wr.usageMap { + v.limit = wr.totalSlots + blog.Infof("remote slot: usage map:%v after enable host:%v", *v, *host) + } + blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) +} + +func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { + if host == nil { + return false + } + + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + for _, wk := range wr.worker { + if !wk.host.Equal(host) { + continue + } + + if wk.dead { + blog.Infof("remote slot: host:%v is already dead, do nothing now", host) + return false + } + if !wk.disabled { + return false + } + if wk.status == Retrying { + blog.Infof("remote slot: host:%v is retrying, do nothing now", host) + return false + } + blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) + wk.status = Retrying + return true + } + + return false +} + +func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + + for _, w := range wr.worker { + if !w.host.Equal(host) { + continue + } + + w.status = s + break + } +} + func (wr *resource) WorkerDead(w *worker) { if w == nil || w.host == nil { return diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go index 901bd369..496b612c 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/worker.go @@ -28,6 +28,9 @@ const ( DetectFailed Refused InService + Retrying + RetrySucceed + RetryFailed Unknown = 99 ) @@ -39,6 +42,9 @@ var ( DetectFailed: "detectfailed", Refused: "refused", InService: "inservice", + Retrying: "retrying", + RetrySucceed: "retrysucceed", + RetryFailed: "retryfailed", Unknown: "unknown", } ) diff --git a/src/backend/booster/bk_dist/controller/pkg/types/error.go b/src/backend/booster/bk_dist/controller/pkg/types/error.go index 735e08b6..f0ea3f40 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/error.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/error.go @@ -32,6 +32,7 @@ var ( ErrFileNotFound = fmt.Errorf("not found file info") ErrWorkCannotBeUpdatedHeartbeat = fmt.Errorf("work can not be updated heartbeat") ErrSendFileFailed = fmt.Errorf("send file failed") + ErrSendFileRetrying = fmt.Errorf("send file retrying") ErrTaskCannotBeReleased = fmt.Errorf("task can not be released") ErrTaskAlreadyReleased = fmt.Errorf("task already released") ErrSlotsLockFailed = fmt.Errorf("slots lock failed`") diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index 7cda9286..5c7cfad1 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -236,16 +236,18 @@ const ( FileSending FileSendSucceed FileSendFailed + FileSendRetrying FileSendUnknown = 99 ) var ( fileStatusMap = map[FileSendStatus]string{ - FileSendInit: "sendinit", - FileSending: "sending", - FileSendSucceed: "sendsucceed", - FileSendFailed: "sendfailed", - FileSendUnknown: "unknown", + FileSendInit: "sendinit", + FileSending: "sending", + FileSendSucceed: "sendsucceed", + FileSendFailed: "sendfailed", + FileSendRetrying: "sendretrying", + FileSendUnknown: "unknown", } ) @@ -258,6 +260,10 @@ func (f FileSendStatus) String() string { return "unknown" } +func (f FileSendStatus) IsFinished() bool { + return f == FileSendSucceed || f == FileSendFailed +} + // FileCollectionInfo save file collection send status type FileCollectionInfo struct { UniqID string `json:"uniq_id"`