Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#8064 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
bug: bk-dist-controller有可能死锁, issue: TencentBlueKing#8063
  • Loading branch information
tming authored Dec 1, 2022
2 parents 1d953d7 + 5b03d34 commit b7d33f0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (m *Mgr) resourceCheck(ctx context.Context) {
// notify resource release
m.work.Resource().Release(nil)
// send and reset stat data
m.work.Resource().SendAndResetStats(false, 0)
m.work.Resource().SendAndResetStats(false, []int64{0})

// 重置最近一次使用时间
m.setLastUsed(0)
Expand Down
75 changes: 39 additions & 36 deletions src/backend/booster/bk_dist/controller/pkg/manager/resource/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,19 @@ func (m *Mgr) SendStats(brief bool) error {
return nil
}

func (m *Mgr) SendAndResetStats(brief bool, t int64) error {
// send stats and reset after sent, if brief true, then will not send the job stats
// !! this will call m.work.Lock() , to avoid dead lock
func (m *Mgr) SendAndResetStats(brief bool, resapplytimes []int64) error {

data, _ := m.getSendStatsData(false, t)
go m.sendStatsData(data)
for _, t := range resapplytimes {
data, _ := m.getSendStatsData(brief, t)
go m.sendStatsData(data)

// reset stat
m.work.Lock()
m.work.Basic().ResetStat()
m.work.Unlock()
// reset stat
m.work.Lock()
m.work.Basic().ResetStat()
m.work.Unlock()
}

return nil
}
Expand Down Expand Up @@ -682,7 +686,10 @@ func (m *Mgr) inspectInfo(taskID string) {
s = ResourceApplyFailed
}

m.clearOldInvalidRes(&info)
resapplytimes, err := m.clearOldInvalidRes(&info)
if err == nil {
m.SendAndResetStats(false, resapplytimes)
}
m.addRes(&info, s)

m.updateApplyEndStatus(s == ResourceApplySucceed)
Expand All @@ -691,7 +698,10 @@ func (m *Mgr) inspectInfo(taskID string) {
return

case engine.TaskStatusFinish, engine.TaskStatusFailed:
m.clearOldInvalidRes(&info)
resapplytimes, err := m.clearOldInvalidRes(&info)
if err == nil {
m.SendAndResetStats(false, resapplytimes)
}
m.addRes(&info, ResourceApplyFailed)

m.updateApplyEndStatus(false)
Expand All @@ -703,14 +713,14 @@ func (m *Mgr) inspectInfo(taskID string) {
}

// clean old resources which host list is empty, and notify server to terminate these resources
func (m *Mgr) clearOldInvalidRes(info *v2.RespTaskInfo) error {
func (m *Mgr) clearOldInvalidRes(info *v2.RespTaskInfo) ([]int64, error) {
blog.Infof("resource: ready check and clean old invalid resource")

m.reslock.Lock()
defer m.reslock.Unlock()

if len(m.resources) == 0 {
return nil
return nil, nil
}

needrelease := false
Expand All @@ -725,9 +735,10 @@ func (m *Mgr) clearOldInvalidRes(info *v2.RespTaskInfo) error {
}

if !needrelease {
return nil
return nil, nil
}

resapplytimes := []int64{}
newres := []*Res{}
for _, r := range m.resources {
// do nothing with current task info, it mabye need by others
Expand All @@ -738,45 +749,25 @@ func (m *Mgr) clearOldInvalidRes(info *v2.RespTaskInfo) error {

if len(r.taskInfo.HostList) == 0 {
m.releaseOne(nil, r)
// 确保 m.reslock.Lock() 和 m.work.Lock() 不要相互包含,避免死锁
// TODO : send detail stat data and reset stat data

// // send stat
// data, _ := m.getSendStatsData(false, r.applyTime.UnixNano())
// go m.sendStatsData(data)

// // reset stat
// m.work.Lock()
// m.work.Basic().ResetStat()
// m.work.Unlock()

m.SendAndResetStats(false, r.applyTime.UnixNano())
// m.SendAndResetStats(false, r.applyTime.UnixNano())
resapplytimes = append(resapplytimes, r.applyTime.UnixNano())

} else {
newres = append(newres, r)
}
}
m.resources = newres

return nil
return resapplytimes, nil
}

func (m *Mgr) addRes(info *v2.RespTaskInfo, status Status) error {
if info == nil {
return nil
}

changed := false
m.reslock.Lock()
defer func() {
m.reslock.Unlock()
if changed {
m.onResChanged()
}
}()

// send stat data with the newly taskid(resource id)
m.newlyTaskID = info.TaskID

// TODO : reset stat data to new status
// 更新work info状态
m.work.Lock()
Expand All @@ -800,6 +791,18 @@ func (m *Mgr) addRes(info *v2.RespTaskInfo, status Status) error {
}
m.work.Unlock()

changed := false
m.reslock.Lock()
defer func() {
m.reslock.Unlock()
if changed {
m.onResChanged()
}
}()

// send stat data with the newly taskid(resource id)
m.newlyTaskID = info.TaskID

for _, r := range m.resources {
if r.taskid == info.TaskID {
// if resource in release status, do not change it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ type ResourceMgr interface {
SendStats(brief bool) error

// send stats and reset after sent, if brief true, then will not send the job stats
SendAndResetStats(brief bool, t int64) error
// !! this will call m.work.Lock() , to avoid dead lock
SendAndResetStats(brief bool, resapplytimes []int64) error

// get resource status
GetStatus() *v2.RespTaskInfo
Expand Down

0 comments on commit b7d33f0

Please sign in to comment.