From 9784de19260c1a62df5053676ca2034d701acad1 Mon Sep 17 00:00:00 2001 From: yanafu Date: Thu, 26 Sep 2024 15:38:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8F=91=E9=80=81=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95=20#298?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 007a1387..de8046c8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -1242,7 +1242,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote // TODO : update all file path for p2p fileCollections := m.getToolChainFromExecuteRequest(req) if fileCollections != nil && len(fileCollections) > 0 { - err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1260,7 +1260,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote if fileCollections != nil && len(fileCollections) > 0 { blog.Infof("remote: found tool chain changed, send toolchain to server[%s] again", req.Server.Server) - err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) + err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1326,7 +1326,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R continue } - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) time.Sleep(m.toolChainRetryTick) @@ -1353,8 +1353,7 @@ func (m *Mgr) sendFileCollectionOnce( pid int, sandbox *dcSyscall.Sandbox, server *dcProtocol.Host, - filecollections []*types.FileCollectionInfo, - retry bool) error { + filecollections []*types.FileCollectionInfo) error { blog.Infof("remote: try to send %d file collection for work(%s) from pid(%d) dir(%s) to server", len(filecollections), m.work.ID(), pid, sandbox.Dir) @@ -1363,10 +1362,10 @@ func (m *Mgr) sendFileCollectionOnce( count := 0 for _, fc := range filecollections { count++ - go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo, retry bool) { - err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox, retry) + go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo) { + err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox) // err <- m.ensureOneFileCollectionByFiles(handler, pid, host, filecollection, sandbox) - }(wg, server, fc, retry) + }(wg, server, fc) } for i := 0; i < count; i++ { @@ -1396,12 +1395,11 @@ func (m *Mgr) ensureOneFileCollection( pid int, host *dcProtocol.Host, fc *types.FileCollectionInfo, - sandbox *dcSyscall.Sandbox, - retry bool) (err error) { + sandbox *dcSyscall.Sandbox) (err error) { blog.Infof("remote: try to ensure one file collection(%s) for work(%s) to server(%s)", fc.UniqID, m.work.ID(), host.Server) - status, ok := m.checkOrLockFileCollection(host.Server, fc, retry) + status, ok := m.checkOrLockFileCollection(host.Server, fc) // 已经有人发送了文件, 等待文件就绪 if ok { @@ -1508,7 +1506,7 @@ func (m *Mgr) ensureOneFileCollection( // checkOrLockFileCollection 检查目标file collection的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, // 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo, retry bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo) (types.FileSendStatus, bool) { m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() @@ -1521,8 +1519,8 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { - // if retry, set status to sending if fc send failed - if retry && f.SendStatus == types.FileSendFailed { + // set status to sending if fc send failed + if f.SendStatus == types.FileSendFailed { f.SendStatus = types.FileSending return f.SendStatus, false }