Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

af参数发送失败问题修复 #268

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 43 additions & 31 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
osWindows = "windows"

envValueTrue = "true"

additionToolName = "additionFile"
)

// ExtraItems describe the info from extra-project-data
Expand Down Expand Up @@ -659,7 +661,10 @@ func (b *Booster) sendAdditionFile() {
AllDistributed: true,
})
}

if len(fds) == 0 {
blog.Infof("booster: no addition files found")
return
}
if err := b.work.Job(&dcSDK.ControllerJobStats{
Pid: os.Getpid(),
WorkID: b.workID,
Expand Down Expand Up @@ -769,7 +774,7 @@ func (b *Booster) runCommands(ctx context.Context) (code int, err error) {
}()

// send addition files to workers before run commands.
b.sendAdditionFile()
//b.sendAdditionFile()

// before run the commands, ensure that environments for workers are set.
b.ensureWorkersEnv()
Expand Down Expand Up @@ -1240,40 +1245,47 @@ func (b *Booster) request(method, server, uri string, data []byte) ([]byte, bool
func (b *Booster) setToolChain() error {
blog.Debugf("booster: try to set tool chain")

if b.config.Works.ToolChainJSONFile == "" || b.config.Works.ToolChainJSONFile == "nothing" {
if len(b.config.Works.AdditionFiles) == 0 &&
(b.config.Works.ToolChainJSONFile == "" || b.config.Works.ToolChainJSONFile == "nothing") {
blog.Debugf("booster: tool chain not set, do nothing now")
return nil
}

tools, err := resolveToolChainJSON(b.config.Works.ToolChainJSONFile)
if err != nil {
blog.Warnf("booster: failed to resolve %s with error:%v", b.config.Works.ToolChainJSONFile, err)
return err
var tools *dcSDK.Toolchain
var err error
if b.config.Works.ToolChainJSONFile != "" && b.config.Works.ToolChainJSONFile != "nothing" {
tools, err = resolveToolChainJSON(b.config.Works.ToolChainJSONFile)
if err != nil {
blog.Warnf("booster: failed to resolve %s with error:%v", b.config.Works.ToolChainJSONFile, err)
return err
}
}
//add addition files to tool chain for all commands
if len(b.config.Works.AdditionFiles) != 0 {
onechain := dcSDK.OneToolChain{
ToolName: additionToolName,
ToolKey: dcSDK.GetAdditionFileKey(),
}

// for _, v := range tools.Toolchains {
// var data []byte
// _ = codec.EncJSON(&v, &data)
// commonconfig := dcSDK.CommonControllerConfig{
// Configkey: dcSDK.CommonConfigKeyToolChain,
// WorkerKey: dcSDK.WorkerKeyConfig{
// BatchMode: b.config.BatchMode,
// ProjectID: b.config.ProjectID,
// Scene: b.config.Type.String(),
// },
// Data: data,
// }

// err = b.controller.SetConfig(&commonconfig)
// if err != nil {
// blog.Warnf("booster: failed to set config [%+v] with error:%v", commonconfig, err)
// return err
// }
// }

// blog.Debugf("booster: success to set tool chain")
// return nil

for _, f := range b.config.Works.AdditionFiles {
absPath, _ := filepath.Abs(f)
if onechain.ToolLocalFullPath == "" {
onechain.ToolLocalFullPath = f
onechain.ToolRemoteRelativePath = filepath.Dir(absPath)
} else {
onechain.Files = append(onechain.Files, dcSDK.ToolFile{
LocalFullPath: f,
RemoteRelativePath: filepath.Dir(absPath),
})
}
}
if tools == nil {
tools = &dcSDK.Toolchain{
Toolchains: []dcSDK.OneToolChain{onechain},
}
} else {
tools.Toolchains = append(tools.Toolchains, onechain)
}
}
return b.setToolChainWithJSON(tools)
}

Expand Down
4 changes: 4 additions & 0 deletions src/backend/booster/bk_dist/common/sdk/toolchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,7 @@ func (t *Toolchain) ToFileDesc() ([]FileDesc, error) {

return toolfiles, nil
}

func GetAdditionFileKey() string {
return "addition\\file|key"
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,15 @@ func (m *Mgr) SetToolChain(toolchain *types.ToolChain) error {
return nil
}

// IsToolChainExsited return if toolchain files exsited
func (m *Mgr) IsToolChainExsited(key string) bool {
m.toolchainLock.RLock()
defer m.toolchainLock.RUnlock()

_, ok := m.toolchainMap[key]
return ok
}

// GetToolChainFiles return the toolchain files
func (m *Mgr) GetToolChainFiles(key string) ([]dcSDK.FileDesc, int64, error) {
m.toolchainLock.RLock()
Expand Down
116 changes: 82 additions & 34 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas

remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req))
if err != nil {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
var banlistStr string
for _, s := range req.BanWorkerList {
banlistStr = banlistStr + s.Server + ","
}
blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+
"ensure files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err)
"ensure files failed: %v, after add failed server, banworkerlist is %s", m.work.ID(), req.Pid, req.Server.Server, err, banlistStr)
return nil, err
}
if err = updateTaskRequestInputFilesReady(req, remoteDirs); err != nil {
Expand Down Expand Up @@ -1604,38 +1609,52 @@ func (m *Mgr) syncHostTime(hostList []*dcProtocol.Host) []*dcProtocol.Host {
return hostList
}

func (m *Mgr) getToolFileInfoByKey(key string) *types.FileCollectionInfo {
if !m.work.Resource().SupportAbsPath() {
blog.Infof("remote: ready get relative toolchain files for %s", key)
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainRelativeFiles(key)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files for %s:%v", key, toolchainfiles)
return &types.FileCollectionInfo{
UniqID: key,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
}
}
} else {
blog.Infof("remote: ready get normal toolchain files for %s", key)
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainFiles(key)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files for %s:%v", key, toolchainfiles)
return &types.FileCollectionInfo{
UniqID: key,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
}
}
}
return nil
}

func (m *Mgr) getToolChainFromExecuteRequest(req *types.RemoteTaskExecuteRequest) []*types.FileCollectionInfo {
blog.Debugf("remote: get toolchain with req:[%+v]", *req)
fd := make([]*types.FileCollectionInfo, 0, 2)

for _, c := range req.Req.Commands {
blog.Debugf("remote: ready get toolchain with key:[%s]", c.ExeToolChainKey)
//add additional files to all workers
if additionfd := m.getToolFileInfoByKey(dcSDK.GetAdditionFileKey()); additionfd != nil {
fd = append(fd, additionfd)
}
if c.ExeToolChainKey != "" {
if !m.work.Resource().SupportAbsPath() {
blog.Infof("remote: ready get relative toolchain files")
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainRelativeFiles(c.ExeToolChainKey)
if err == nil && len(toolchainfiles) > 0 {
fd = append(fd, &types.FileCollectionInfo{
UniqID: c.ExeToolChainKey,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
})
}
} else {
blog.Infof("remote: ready get normal toolchain files")
toolchainfiles, timestamp, err := m.work.Basic().GetToolChainFiles(c.ExeToolChainKey)
if err == nil && len(toolchainfiles) > 0 {
blog.Infof("remote: got toolchain files:%v", toolchainfiles)
fd = append(fd, &types.FileCollectionInfo{
UniqID: c.ExeToolChainKey,
Files: toolchainfiles,
SendStatus: types.FileSending,
Timestamp: timestamp,
})
}
if toolfd := m.getToolFileInfoByKey(c.ExeToolChainKey); toolfd != nil {
fd = append(fd, toolfd)
}
}
}

return fd
}

Expand All @@ -1644,15 +1663,28 @@ func (m *Mgr) isToolChainChanged(req *types.RemoteTaskExecuteRequest, server str

for _, c := range req.Req.Commands {
blog.Debugf("remote: ready check toolchain changed with key:[%s]", c.ExeToolChainKey)
if c.ExeToolChainKey != "" {
timestamp, _ := m.work.Basic().GetToolChainTimestamp(c.ExeToolChainKey)
timestampcached, _ := m.getCachedToolChainTimestamp(server, c.ExeToolChainKey)
//check additional files toolchain
timestamp, err := m.work.Basic().GetToolChainTimestamp(dcSDK.GetAdditionFileKey())
if err == nil {
timestampcached, _ := m.getCachedToolChainTimestamp(server, dcSDK.GetAdditionFileKey())
if timestamp != timestampcached {
blog.Infof("remote: found collection(%s) server(%s) cached timestamp(%d) "+
"newly timestamp(%d) changed", c.ExeToolChainKey, server, timestampcached, timestamp)
"newly timestamp(%d) changed", dcSDK.GetAdditionFileKey(), server, timestampcached, timestamp)
return true, nil
}
}

if c.ExeToolChainKey != "" {
timestamp, err := m.work.Basic().GetToolChainTimestamp(c.ExeToolChainKey)
if err == nil {
timestampcached, _ := m.getCachedToolChainTimestamp(server, c.ExeToolChainKey)
if timestamp != timestampcached {
blog.Infof("remote: found collection(%s) server(%s) cached timestamp(%d) "+
"newly timestamp(%d) changed", c.ExeToolChainKey, server, timestampcached, timestamp)
return true, nil
}
}
}
}

return false, nil
Expand All @@ -1664,13 +1696,29 @@ func (m *Mgr) isToolChainFinished(req *types.RemoteTaskExecuteRequest, server st
allfinished := true
for _, c := range req.Req.Commands {
blog.Debugf("remote: ready check toolchain finished with key:[%s]", c.ExeToolChainKey)
//check additional files toolchain
if m.work.Basic().IsToolChainExsited(dcSDK.GetAdditionFileKey()) {
status, err := m.getCachedToolChainStatus(server, dcSDK.GetAdditionFileKey())
if err == nil {
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
dcSDK.GetAdditionFileKey(), server, status)
allfinished = false
return allfinished, nil
}
}
}
if c.ExeToolChainKey != "" {
status, _ := m.getCachedToolChainStatus(server, c.ExeToolChainKey)
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
c.ExeToolChainKey, server, status)
allfinished = false
return allfinished, nil
if m.work.Basic().IsToolChainExsited(c.ExeToolChainKey) {
status, err := m.getCachedToolChainStatus(server, c.ExeToolChainKey)
if err == nil {
if status != types.FileSendSucceed && status != types.FileSendFailed {
blog.Infof("remote: found collection(%s) server(%s) status(%d) not finished",
c.ExeToolChainKey, server, status)
allfinished = false
return allfinished, nil
}
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ type BasicMgr interface {
// update toolchain
SetToolChain(toolchain *ToolChain) error

//is tool exsited
IsToolChainExsited(key string) bool

// get toolchain files by key
GetToolChainFiles(key string) ([]dcSDK.FileDesc, int64, error)

Expand Down
Loading