Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 #311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 26, 2024
1 parent 79728b9 commit 3e00c6d
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI
}

// 仅匹配失败文件,不执行插入
/*func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult {
fsm.Lock()
defer fsm.Unlock()

Expand Down Expand Up @@ -237,9 +237,9 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI
}
}
return result
}*/
}

/*func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult {
fsm.Lock()
defer fsm.Unlock()

Expand Down Expand Up @@ -294,7 +294,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI
}

return result
}*/
}

func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) {
if status == types.FileSendSucceed && !desc.Retry {
Expand Down Expand Up @@ -1528,34 +1528,37 @@ type matchResult struct {
// checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult {
var result []matchResult
var retryDescs, newDescs []*dcSDK.FileDesc
for _, desc := range descs {
if !desc.Retry {
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
}
m.fileSendMutex.Unlock()

info, match := target.matchOrInsert(*desc)
result = append(result, matchResult{info: info, match: match})
if desc.Retry {
retryDescs = append(retryDescs, desc)
} else {
m.failFileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
target = &fileSendMap{}
m.failFileSendMap[server] = target
}
m.failFileSendMutex.Unlock()
newDescs = append(newDescs, desc)
}
}
//批量检查重试文件
if len(retryDescs) > 0 {
m.fileSendMutex.Lock()
target, ok := m.failFileSendMap[server]
if !ok {
target = &fileSendMap{}
m.failFileSendMap[server] = target
}
m.fileSendMutex.Unlock()

info, match, err := target.matchFail(*desc, false)
if err != nil {
blog.Errorf("remote: checkOrLockCorkFiles for work(%s) to server(%s) match file %s failed: %v", m.work.ID(), server, desc.FilePath, err)
} else {
result = append(result, matchResult{info: info, match: match})
}
result = append(result, target.matchFails(descs)...)
}
//批量检查首次发送文件
if len(newDescs) > 0 {
m.fileSendMutex.Lock()
target, ok := m.fileSendMap[server]
if !ok {
target = &fileSendMap{}
m.fileSendMap[server] = target
}
m.fileSendMutex.Unlock()

result = append(result, target.matchOrInserts(descs)...)
}
return result
}
Expand Down

0 comments on commit 3e00c6d

Please sign in to comment.