From 3e00c6d614cbde8b455cd4ed9328ec610dd91e51 Mon Sep 17 00:00:00 2001 From: yanafu Date: Tue, 26 Nov 2024 15:09:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 615cbe5f..fe6b8943 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -200,7 +200,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI } // 仅匹配失败文件,不执行插入 -/*func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult { +func (fsm *fileSendMap) matchFails(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -237,9 +237,9 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI } } return result -}*/ +} -/*func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { +func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -294,7 +294,7 @@ func (fsm *fileSendMap) matchFail(desc dcSDK.FileDesc, query bool) (*types.FileI } return result -}*/ +} func (fsm *fileSendMap) updateFailStatus(desc dcSDK.FileDesc, status types.FileSendStatus) { if status == types.FileSendSucceed && !desc.Retry { @@ -1528,34 +1528,37 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { var result []matchResult + var retryDescs, newDescs []*dcSDK.FileDesc for _, desc := range descs { - if !desc.Retry { - m.fileSendMutex.Lock() - target, ok := m.fileSendMap[server] - if !ok { - target = &fileSendMap{} - m.fileSendMap[server] = target - } - m.fileSendMutex.Unlock() - - info, match := target.matchOrInsert(*desc) - result = append(result, matchResult{info: info, match: match}) + if desc.Retry { + retryDescs = append(retryDescs, desc) } else { - m.failFileSendMutex.Lock() - target, ok := m.failFileSendMap[server] - if !ok { - target = &fileSendMap{} - m.failFileSendMap[server] = target - } - m.failFileSendMutex.Unlock() + newDescs = append(newDescs, desc) + } + } + //批量检查重试文件 + if len(retryDescs) > 0 { + m.fileSendMutex.Lock() + target, ok := m.failFileSendMap[server] + if !ok { + target = &fileSendMap{} + m.failFileSendMap[server] = target + } + m.fileSendMutex.Unlock() - info, match, err := target.matchFail(*desc, false) - if err != nil { - blog.Errorf("remote: checkOrLockCorkFiles for work(%s) to server(%s) match file %s failed: %v", m.work.ID(), server, desc.FilePath, err) - } else { - result = append(result, matchResult{info: info, match: match}) - } + result = append(result, target.matchFails(descs)...) + } + //批量检查首次发送文件 + if len(newDescs) > 0 { + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + target = &fileSendMap{} + m.fileSendMap[server] = target } + m.fileSendMutex.Unlock() + + result = append(result, target.matchOrInserts(descs)...) } return result }