Skip to content

Commit

Permalink
Merge pull request #263 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
fix: aoivd dead lock when send files to worker,issue: #262
  • Loading branch information
tming authored Jul 9, 2024
2 parents 5fd1c38 + 016f5d5 commit dd5716a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (r *CommonRemoteHandler) ExecuteSendFile(
defer func() {
r.updateJobStatsFunc()
}()
blog.Debugf("start send files to server %s", server)
blog.Infof("start send files to server %s", server)
r.recordStats.RemoteWorker = server.Server

if len(req.Files) == 0 {
Expand Down Expand Up @@ -499,6 +499,12 @@ func (r *CommonRemoteHandler) ExecuteSendFile(
client := NewTCPClient(r.ioTimeout)
if err := client.Connect(getRealServer(server.Server)); err != nil {
blog.Warnf("error: %v", err)

if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle: succeed to release one memory lock")
}

return nil, err
}
d := time.Now().Sub(t)
Expand All @@ -516,14 +522,13 @@ func (r *CommonRemoteHandler) ExecuteSendFile(
blog.Debugf("success connect to server %s", server)

err = SendMessages(client, messages)
if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle: succeed to release one memory lock")
}

if err != nil {
blog.Warnf("error: %v", err)

if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle: succeed to release one memory lock")
}

return nil, err
}

Expand All @@ -536,11 +541,6 @@ func (r *CommonRemoteHandler) ExecuteSendFile(

debug.FreeOSMemory() // free memory anyway

if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle: succeed to release one memory lock")
}

blog.Debugf("success sent to server %s", server)
// receive result
data, err := receiveSendFileRsp(client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (r *CommonRemoteHandler) ExecuteSendFileLongTCP(
defer func() {
r.updateJobStatsFunc()
}()
blog.Debugf("start send files to server %s", server)
blog.Infof("start send files to server %s", server)
r.recordStats.RemoteWorker = server.Server

if len(req.Files) == 0 {
Expand Down
31 changes: 16 additions & 15 deletions src/backend/booster/bk_dist/worker/pkg/client/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,23 @@ func (lr *slot) handleLock(ctx context.Context) {
}

func (lr *slot) hasEnoughtSlots(pairChan chanPair) bool {
// 这儿的内存判断,可能导致一直拿不到锁,先屏蔽掉
// 如果已经锁定的内存超过了largeLocked,再次申请内存时,需要关注当前系统内存情况
if lr.occupiedSlots > largeLocked {
v, err := mem.VirtualMemory()
if err == nil {
if v.Available < leastFree ||
v.Available < uint64(pairChan.weight) ||
v.UsedPercent > maxMemPercent {
blog.Infof("send slot: request size:%d,locked size:%d,Available:%d,UsedPercent:%f",
pairChan.weight,
lr.occupiedSlots,
v.Available,
v.UsedPercent)
return false
}
}
}
// if lr.occupiedSlots > largeLocked {
// v, err := mem.VirtualMemory()
// if err == nil {
// if v.Available < leastFree ||
// v.Available < uint64(pairChan.weight) ||
// v.UsedPercent > maxMemPercent {
// blog.Infof("send slot: request size:%d,locked size:%d,Available:%d,UsedPercent:%f",
// pairChan.weight,
// lr.occupiedSlots,
// v.Available,
// v.UsedPercent)
// return false
// }
// }
// }

if lr.occupiedSlots+pairChan.weight < lr.totalSlots {
return true
Expand Down

0 comments on commit dd5716a

Please sign in to comment.