Skip to content

Commit

Permalink
fix: af参数发送失败问题修复 #267
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Jul 12, 2024
1 parent e369da3 commit e1add55
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 85 deletions.
74 changes: 43 additions & 31 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
osWindows = "windows"

envValueTrue = "true"

additionToolName = "additionFile"
)

// ExtraItems describe the info from extra-project-data
Expand Down Expand Up @@ -659,7 +661,10 @@ func (b *Booster) sendAdditionFile() {
AllDistributed: true,
})
}

if len(fds) == 0 {
blog.Infof("booster: no addition files found")
return
}
if err := b.work.Job(&dcSDK.ControllerJobStats{
Pid: os.Getpid(),
WorkID: b.workID,
Expand Down Expand Up @@ -769,7 +774,7 @@ func (b *Booster) runCommands(ctx context.Context) (code int, err error) {
}()

// send addition files to workers before run commands.
b.sendAdditionFile()
//b.sendAdditionFile()

// before run the commands, ensure that environments for workers are set.
b.ensureWorkersEnv()
Expand Down Expand Up @@ -1240,40 +1245,47 @@ func (b *Booster) request(method, server, uri string, data []byte) ([]byte, bool
func (b *Booster) setToolChain() error {
blog.Debugf("booster: try to set tool chain")

if b.config.Works.ToolChainJSONFile == "" || b.config.Works.ToolChainJSONFile == "nothing" {
if len(b.config.Works.AdditionFiles) == 0 &&
(b.config.Works.ToolChainJSONFile == "" || b.config.Works.ToolChainJSONFile == "nothing") {
blog.Debugf("booster: tool chain not set, do nothing now")
return nil
}

tools, err := resolveToolChainJSON(b.config.Works.ToolChainJSONFile)
if err != nil {
blog.Warnf("booster: failed to resolve %s with error:%v", b.config.Works.ToolChainJSONFile, err)
return err
var tools *dcSDK.Toolchain
var err error
if b.config.Works.ToolChainJSONFile != "" && b.config.Works.ToolChainJSONFile != "nothing" {
tools, err = resolveToolChainJSON(b.config.Works.ToolChainJSONFile)
if err != nil {
blog.Warnf("booster: failed to resolve %s with error:%v", b.config.Works.ToolChainJSONFile, err)
return err
}
}
//add addition files to tool chain for all commands
if len(b.config.Works.AdditionFiles) != 0 {
onechain := dcSDK.OneToolChain{
ToolName: additionToolName,
ToolKey: dcSDK.GetAdditionFileKey(),
}

// for _, v := range tools.Toolchains {
// var data []byte
// _ = codec.EncJSON(&v, &data)
// commonconfig := dcSDK.CommonControllerConfig{
// Configkey: dcSDK.CommonConfigKeyToolChain,
// WorkerKey: dcSDK.WorkerKeyConfig{
// BatchMode: b.config.BatchMode,
// ProjectID: b.config.ProjectID,
// Scene: b.config.Type.String(),
// },
// Data: data,
// }

// err = b.controller.SetConfig(&commonconfig)
// if err != nil {
// blog.Warnf("booster: failed to set config [%+v] with error:%v", commonconfig, err)
// return err
// }
// }

// blog.Debugf("booster: success to set tool chain")
// return nil

for _, f := range b.config.Works.AdditionFiles {
absPath, _ := filepath.Abs(f)
if onechain.ToolLocalFullPath == "" {
onechain.ToolLocalFullPath = f
onechain.ToolRemoteRelativePath = filepath.Dir(absPath)
} else {
onechain.Files = append(onechain.Files, dcSDK.ToolFile{
LocalFullPath: f,
RemoteRelativePath: filepath.Dir(absPath),
})
}
}
if tools == nil {
tools = &dcSDK.Toolchain{
Toolchains: []dcSDK.OneToolChain{onechain},
}
} else {
tools.Toolchains = append(tools.Toolchains, onechain)
}
}
return b.setToolChainWithJSON(tools)
}

Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/common/sdk/toolchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"
Expand Down Expand Up @@ -251,3 +252,7 @@ func (t *Toolchain) ToFileDesc() ([]FileDesc, error) {

return toolfiles, nil
}

func GetAdditionFileKey() string {
return fmt.Sprintf("addition\\file|%d", time.Now().Unix())
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,15 @@ func (m *Mgr) SetToolChain(toolchain *types.ToolChain) error {
return nil
}

// IsToolChainExsited return if toolchain files exsited
func (m *Mgr) IsToolChainExsited(key string) bool {
m.toolchainLock.RLock()
defer m.toolchainLock.RUnlock()

_, ok := m.toolchainMap[key]
return ok
}

// GetToolChainFiles return the toolchain files
func (m *Mgr) GetToolChainFiles(key string) ([]dcSDK.FileDesc, int64, error) {
m.toolchainLock.RLock()
Expand Down
166 changes: 112 additions & 54 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,38 +1604,52 @@ func (m *Mgr) syncHostTime(hostList []*dcProtocol.Host) []*dcProtocol.Host {
return hostList
}

func (m *Mgr) getToolFileInfoByKey(key string) *types.FileCollectionInfo {
if !m.work.Resource().SupportAbsPath() {
blog.Infof("remote: ready get relative toolchain files for %s", key)
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainRelativeFiles(key)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files for %s:%v", key, toolchainfiles)
return &types.FileCollectionInfo{
UniqID: key,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
}
}
} else {
blog.Infof("remote: ready get normal toolchain files for %s", key)
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainFiles(key)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files for %s:%v", key, toolchainfiles)
return &types.FileCollectionInfo{
UniqID: key,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
}
}
}
return nil
}

func (m *Mgr) getToolChainFromExecuteRequest(req *types.RemoteTaskExecuteRequest) []*types.FileCollectionInfo {
blog.Debugf("remote: get toolchain with req:[%+v]", *req)
fd := make([]*types.FileCollectionInfo, 0, 2)

for _, c := range req.Req.Commands {
blog.Debugf("remote: ready get toolchain with key:[%s]", c.ExeToolChainKey)
//add additional files to all workers
if additionfd := m.getToolFileInfoByKey(dcSDK.GetAdditionFileKey()); additionfd != nil {
fd = append(fd, additionfd)
}
if c.ExeToolChainKey != "" {
if !m.work.Resource().SupportAbsPath() {
blog.Infof("remote: ready get relative toolchain files")
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainRelativeFiles(c.ExeToolChainKey)
if err == nil && len(toolchainfiles) > 0 {
fd = append(fd, &types.FileCollectionInfo{
UniqID: c.ExeToolChainKey,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
})
}
} else {
blog.Infof("remote: ready get normal toolchain files")
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainFiles(c.ExeToolChainKey)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files:%v", toolchainfiles)
fd = append(fd, &types.FileCollectionInfo{
UniqID: c.ExeToolChainKey,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
})
}
if toolfd := m.getToolFileInfoByKey(c.ExeToolChainKey); toolfd != nil {
fd = append(fd, toolfd)
}
}
}

return fd
}

Expand All @@ -1644,15 +1658,28 @@ func (m *Mgr) isToolChainChanged(req *types.RemoteTaskExecuteRequest, server str

for _, c := range req.Req.Commands {
blog.Debugf("remote: ready check toolchain changed with key:[%s]", c.ExeToolChainKey)
if c.ExeToolChainKey != "" {
timestamp, _ := m.work.Basic().GetToolChainTimestamp(c.ExeToolChainKey)
timestampcached, _ := m.getCachedToolChainTimestamp(server, c.ExeToolChainKey)
//check additional files toolchain
timestamp, err := m.work.Basic().GetToolChainTimestamp(dcSDK.GetAdditionFileKey())
if err == nil {
timestampcached, _ := m.getCachedToolChainTimestamp(server, dcSDK.GetAdditionFileKey())
if timestamp != timestampcached {
blog.Infof("remote: found collection(%s) server(%s) cached timestamp(%d) "+
"newly timestamp(%d) changed", c.ExeToolChainKey, server, timestampcached, timestamp)
"newly timestamp(%d) changed", dcSDK.GetAdditionFileKey(), server, timestampcached, timestamp)
return true, nil
}
}

if c.ExeToolChainKey != "" {
timestamp, err := m.work.Basic().GetToolChainTimestamp(c.ExeToolChainKey)
if err == nil {
timestampcached, _ := m.getCachedToolChainTimestamp(server, c.ExeToolChainKey)
if timestamp != timestampcached {
blog.Infof("remote: found collection(%s) server(%s) cached timestamp(%d) "+
"newly timestamp(%d) changed", c.ExeToolChainKey, server, timestampcached, timestamp)
return true, nil
}
}
}
}

return false, nil
Expand All @@ -1664,45 +1691,76 @@ func (m *Mgr) isToolChainFinished(req *types.RemoteTaskExecuteRequest, server st
allfinished := true
for _, c := range req.Req.Commands {
blog.Debugf("remote: ready check toolchain finished with key:[%s]", c.ExeToolChainKey)
//check additional files toolchain
if m.work.Basic().IsToolChainExsited(dcSDK.GetAdditionFileKey()) {
status, err := m.getCachedToolChainStatus(server, dcSDK.GetAdditionFileKey())
if err == nil {
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
dcSDK.GetAdditionFileKey(), server, status)
allfinished = false
return allfinished, nil
}
}
}
if c.ExeToolChainKey != "" {
status, _ := m.getCachedToolChainStatus(server, c.ExeToolChainKey)
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
c.ExeToolChainKey, server, status)
allfinished = false
return allfinished, nil
if m.work.Basic().IsToolChainExsited(c.ExeToolChainKey) {
status, err := m.getCachedToolChainStatus(server, c.ExeToolChainKey)
if err == nil {
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
c.ExeToolChainKey, server, status)
allfinished = false
return allfinished, nil
}
}
}
}
}

return allfinished, nil
}

func (m *Mgr) getInputFiles(key string, filepath string) (dcSDK.FileDesc, error) {
remotepath := ""
var err error
if !m.work.Resource().SupportAbsPath() {
remotepath, err = m.work.Basic().GetToolChainRelativeRemotePath(key)
} else {
remotepath, err = m.work.Basic().GetToolChainRemotePath(key)
}
if err != nil {
return dcSDK.FileDesc{}, err
}
return dcSDK.FileDesc{
FilePath: filepath,
Compresstype: protocol.CompressLZ4,
FileSize: -1,
Lastmodifytime: 0,
Md5: "",
Targetrelativepath: remotepath,
}, nil
}

func (m *Mgr) updateToolChainPath(req *types.RemoteTaskExecuteRequest) error {
for i, c := range req.Req.Commands {
blog.Debugf("remote: before update toolchain with key:[%s],inputfiles:%+v",
dcSDK.GetAdditionFileKey(), req.Req.Commands[i].Inputfiles)
if inputfiles, err := m.getInputFiles(dcSDK.GetAdditionFileKey(), c.ExeName); err == nil {
req.Req.Commands[i].Inputfiles = append(req.Req.Commands[i].Inputfiles, inputfiles)
blog.Debugf("remote: after update toolchain with key:[%s],remotepath:[%s],inputfiles:%+v",
dcSDK.GetAdditionFileKey(), inputfiles.Targetrelativepath, req.Req.Commands[i].Inputfiles)
}

if c.ExeToolChainKey != "" {
remotepath := ""
var err error
if !m.work.Resource().SupportAbsPath() {
remotepath, err = m.work.Basic().GetToolChainRelativeRemotePath(c.ExeToolChainKey)
} else {
remotepath, err = m.work.Basic().GetToolChainRemotePath(c.ExeToolChainKey)
}
if err != nil {
return fmt.Errorf("not found remote path for toolchain %s", c.ExeToolChainKey)
blog.Debugf("remote: before update toolchain with key:[%s],inputfiles:%+v",
c.ExeToolChainKey, req.Req.Commands[i].Inputfiles)
if inputfiles, err := m.getInputFiles(c.ExeToolChainKey, c.ExeName); err == nil {
req.Req.Commands[i].Inputfiles = append(req.Req.Commands[i].Inputfiles, inputfiles)
blog.Debugf("remote: after update toolchain with key:[%s],remotepath:[%s],inputfiles:%+v",
c.ExeToolChainKey, inputfiles.Targetrelativepath, req.Req.Commands[i].Inputfiles)
}
blog.Debugf("remote: before update toolchain with key:[%s],remotepath:[%s],inputfiles:%+v",
c.ExeToolChainKey, remotepath, req.Req.Commands[i].Inputfiles)
req.Req.Commands[i].Inputfiles = append(req.Req.Commands[i].Inputfiles, dcSDK.FileDesc{
FilePath: c.ExeName,
Compresstype: protocol.CompressLZ4,
FileSize: -1,
Lastmodifytime: 0,
Md5: "",
Targetrelativepath: remotepath,
})
blog.Debugf("remote: after update toolchain with key:[%s],remotepath:[%s],inputfiles:%+v",
c.ExeToolChainKey, remotepath, req.Req.Commands[i].Inputfiles)

}
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ type BasicMgr interface {
// update toolchain
SetToolChain(toolchain *ToolChain) error

//is tool exsited
IsToolChainExsited(key string) bool

// get toolchain files by key
GetToolChainFiles(key string) ([]dcSDK.FileDesc, int64, error)

Expand Down

0 comments on commit e1add55

Please sign in to comment.