Skip to content

Commit

Permalink
feat: 发送文件失败后重试 #298
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Sep 26, 2024
1 parent d32af23 commit 9784de1
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote
// TODO : update all file path for p2p
fileCollections := m.getToolChainFromExecuteRequest(req)
if fileCollections != nil && len(fileCollections) > 0 {
err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false)
err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections)
if err != nil {
blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+
"ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
Expand All @@ -1260,7 +1260,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote
if fileCollections != nil && len(fileCollections) > 0 {
blog.Infof("remote: found tool chain changed, send toolchain to server[%s] again",
req.Server.Server)
err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false)
err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections)
if err != nil {
blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+
"ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
Expand Down Expand Up @@ -1326,7 +1326,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R
continue
}

if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil {
if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil {
blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+
"send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err)
time.Sleep(m.toolChainRetryTick)
Expand All @@ -1353,8 +1353,7 @@ func (m *Mgr) sendFileCollectionOnce(
pid int,
sandbox *dcSyscall.Sandbox,
server *dcProtocol.Host,
filecollections []*types.FileCollectionInfo,
retry bool) error {
filecollections []*types.FileCollectionInfo) error {
blog.Infof("remote: try to send %d file collection for work(%s) from pid(%d) dir(%s) to server",
len(filecollections), m.work.ID(), pid, sandbox.Dir)

Expand All @@ -1363,10 +1362,10 @@ func (m *Mgr) sendFileCollectionOnce(
count := 0
for _, fc := range filecollections {
count++
go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo, retry bool) {
err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox, retry)
go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo) {
err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox)
// err <- m.ensureOneFileCollectionByFiles(handler, pid, host, filecollection, sandbox)
}(wg, server, fc, retry)
}(wg, server, fc)
}

for i := 0; i < count; i++ {
Expand Down Expand Up @@ -1396,12 +1395,11 @@ func (m *Mgr) ensureOneFileCollection(
pid int,
host *dcProtocol.Host,
fc *types.FileCollectionInfo,
sandbox *dcSyscall.Sandbox,
retry bool) (err error) {
sandbox *dcSyscall.Sandbox) (err error) {
blog.Infof("remote: try to ensure one file collection(%s) for work(%s) to server(%s)",
fc.UniqID, m.work.ID(), host.Server)

status, ok := m.checkOrLockFileCollection(host.Server, fc, retry)
status, ok := m.checkOrLockFileCollection(host.Server, fc)

// 已经有人发送了文件, 等待文件就绪
if ok {
Expand Down Expand Up @@ -1508,7 +1506,7 @@ func (m *Mgr) ensureOneFileCollection(

// checkOrLockFileCollection 检查目标file collection的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过,
// 则将其置于sending, 并返回false
func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo, retry bool) (types.FileSendStatus, bool) {
func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo) (types.FileSendStatus, bool) {
m.fileCollectionSendMutex.Lock()
defer m.fileCollectionSendMutex.Unlock()

Expand All @@ -1521,8 +1519,8 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI

for _, f := range *target {
if f.UniqID == fc.UniqID {
// if retry, set status to sending if fc send failed
if retry && f.SendStatus == types.FileSendFailed {
// set status to sending if fc send failed
if f.SendStatus == types.FileSendFailed {
f.SendStatus = types.FileSending
return f.SendStatus, false
}
Expand Down

0 comments on commit 9784de1

Please sign in to comment.