Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 #311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 4, 2024
1 parent 8bd2aef commit 94b4c23
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ const (
corkMaxSize = 1024 * 1024 * 10
// corkMaxSize = 1024 * 1024 * 1024
largeFileSize = 1024 * 1024 * 100 // 100MB

//fileMaxFailCount = 5
)

// NewMgr get a new Remote Mgr
Expand Down Expand Up @@ -1070,8 +1068,7 @@ func (m *Mgr) ensureFiles(
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed ||
v.info.SendStatus == types.FileSendUnknown {
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
Expand Down Expand Up @@ -1436,7 +1433,7 @@ type matchResult struct {

// checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult {
if len(descs) == 0 || !descs[0].Retry { // 普通的文件
if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
Expand All @@ -1446,7 +1443,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat
m.fileSendMutex.Unlock()

return target.matchOrInserts(descs)
} else { // 失败的文件
} else { // 失败重试的文件
m.fileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,36 +282,6 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) {
blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host)
}

/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool {
if host == nil {
return false
}
wo.workerLock.Lock()
defer wo.workerLock.Unlock()
for _, wk := range wo.worker {
if !wk.host.Equal(host) {
continue
}
if wk.dead {
blog.Infof("remote slot: host:%v is already dead,do nothing now", host)
return false
}
if wk.status == Retrying {
blog.Infof("remote slot: host:%v is retrying,do nothing now", host)
return true
}
blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying)
wk.status = Retrying
return false
}
return false
}*/

func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) {
wo.workerLock.Lock()
defer wo.workerLock.Unlock()
Expand Down
32 changes: 0 additions & 32 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type RemoteSlotMgr interface {
RecoverDeadWorker(w *worker)
DisableWorker(host *dcProtocol.Host)
EnableWorker(host *dcProtocol.Host)
//CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying
SetWorkerStatus(host *dcProtocol.Host, status Status)
Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host
Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host)
Expand Down Expand Up @@ -299,37 +298,6 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) {
blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host)
}

/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool {
if host == nil {
return false
}
wr.workerLock.Lock()
defer wr.workerLock.Unlock()
for _, wk := range wr.worker {
if !wk.host.Equal(host) {
continue
}
if wk.dead {
blog.Infof("remote slot: host:%v is already dead, do nothing now", host)
return false
}
if !wk.disabled {
return false
}
if wk.status == Retrying {
blog.Infof("remote slot: host:%v is retrying, do nothing now", host)
return false
}
blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying)
wk.status = Retrying
return true
}
return false
}*/

func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) {
wr.workerLock.Lock()
defer wr.workerLock.Unlock()
Expand Down

0 comments on commit 94b4c23

Please sign in to comment.