Skip to content

Commit

Permalink
Merge pull request #309 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
Dev tming
  • Loading branch information
tming authored Oct 24, 2024
2 parents 4e26792 + f103a2e commit 32b710e
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 16 deletions.
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/booster/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ const (
FlagDynamicPort = "dynamic_port"
FlagWorkerOfferSlot = "worker_offer_slot"
FlagCleanTmpFilesDayAgo = "clean_tmp_files_day_ago"
FlagIgnoreHttpStatus = "ignore_http_status"

EnvBuildIDOld = "TURBO_PLAN_BUILD_ID"
EnvBuildID = "TBS_BUILD_ID"
Expand Down Expand Up @@ -472,6 +473,10 @@ var (
Name: "clean_tmp_files_day_ago",
Usage: "clean tmp files which modify time before this days, default is 1",
},
commandCli.BoolFlag{
Name: "ignore_http_status",
Usage: "tasks will be executed even local http connection disconnected when this flag set",
},
}
)

Expand Down
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,15 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
IdleKeepSecs: c.Int(FlagIdleKeepSecs),
CleanTmpFilesDayAgo: cleanTmpFilesDayAgo,
SearchToolchain: c.Bool(FlagSearchToolchain),
IgnoreHttpStatus: c.Bool(FlagIgnoreHttpStatus),
},

Transport: dcType.BoosterTransport{
ServerDomain: ServerDomain,
ServerHost: ServerHost,
Timeout: 5 * time.Second,
HeartBeatTick: 5 * time.Second,
InspectTaskTick: 100 * time.Millisecond,
InspectTaskTick: 1000 * time.Millisecond,
TaskPreparingTimeout: time.Duration(waitResourceSeconds) * time.Second,
PrintTaskInfoEveryTime: 5,
CommitSuicideCheckTick: 5 * time.Second,
Expand Down
4 changes: 4 additions & 0 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ func (b *Booster) getWorkersEnv() map[string]string {
requiredEnv[env.KeyExecutorSearchToolchain] = envValueTrue
}

if b.config.Works.IgnoreHttpStatus {
requiredEnv[env.KeyExecutorIgnoreHttpStatus] = envValueTrue
}

resultEnv := make(map[string]string, 10)
for k, v := range requiredEnv {
resultEnv[env.GetEnvKey(k)] = v
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/common/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
KeyExecutorUELibLocalCPUWeight = "UE_LIB_LOCAL_CPU_WEIGHT"
KeyExecutorUELinkLocalCPUWeight = "UE_LINK_LOCAL_CPU_WEIGHT"
KeyExecutorUEShaderLocalCPUWeight = "UE_SHADER_LOCAL_CPU_WEIGHT"
KeyExecutorIgnoreHttpStatus = "IGNORE_HTTP_STATUS"

KeyUserDefinedLogLevel = "USER_DEFINED_LOG_LEVEL"
KeyUserDefinedExecutorLogLevel = "USER_DEFINED_EXECUTOR_LOG_LEVEL"
Expand Down
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/common/types/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ type BoosterWorks struct {
EnableLink bool
EnableLib bool

SearchToolchain bool
SearchToolchain bool
IgnoreHttpStatus bool
}

// BoosterTransport describe the transport data to controller
Expand Down
27 changes: 16 additions & 11 deletions src/backend/booster/bk_dist/common/websocket/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -69,10 +68,11 @@ type Session struct {

req *restful.Request

ip string // 远端的ip
port int32 // 远端的port
conn net.Conn
valid bool // 连接是否可用
ip string // 远端的ip
port int32 // 远端的port
conn net.Conn
connKey string
valid bool // 连接是否可用

// 收到数据后的回调函数
callback WebSocketFunc
Expand All @@ -98,7 +98,7 @@ type Session struct {
type WebSocketFunc func(r *restful.Request, id MessageID, data []byte, s *Session) error

// server端创建session,需要指定http处理函数
func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSocketFunc) *Session {
func NewServerSession(w *restful.Response, r *restful.Request, callback WebSocketFunc) *Session {
conn, _, _, err := ws.UpgradeHTTP(r.Request, w)
if err != nil || conn == nil {
blog.Errorf("[session] UpgradeHTTP failed with error:%v", err)
Expand Down Expand Up @@ -127,6 +127,7 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc
ip: ip,
port: int32(port),
conn: conn,
connKey: fmt.Sprintf("%s_%d", remoteaddr, time.Now().Nanosecond()),
sendNotifyChan: sendNotifyChan,
sendQueue: sendQueue,
waitMap: make(map[MessageID]*Message),
Expand All @@ -141,6 +142,10 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc
return s
}

func (s *Session) GetConnKey() string {
return s.connKey
}

// client端创建session,需要指定目标server的ip和端口
func NewClientSession(ip string, port int32, url string, callback WebSocketFunc) *Session {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -454,7 +459,6 @@ func (s *Session) serverReceive(wg *sync.WaitGroup) {
}
}

//
func (s *Session) notifyAndWait(msg *Message) *MessageResult {
blog.Debugf("[session] notify send and wait for response now...")

Expand Down Expand Up @@ -567,10 +571,11 @@ func (s *Session) check(wg *sync.WaitGroup) {
wg.Done()
for {
select {
case <-s.ctx.Done():
blog.Infof("[session] session check canceled by context")
s.clean(ErrorContextCanceled)
return
// 在用户环境上发现s.ctx.Done()异常触发的,不清楚原因,先屏蔽
// case <-s.ctx.Done():
// blog.Infof("[session] session check canceled by context")
// s.clean(ErrorContextCanceled)
// return
case err := <-s.errorChan:
blog.Warnf("[session] session check found error:%v", err)
s.clean(err)
Expand Down
20 changes: 19 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
"github.com/emicklei/go-restful"
)

var (
gHttpConnCache *types.HttpConnCache
)

func init() {
gHttpConnCache = types.NewHttpConnCache()
go gHttpConnCache.Check()
}

func available(_ *restful.Request, resp *restful.Response) {
api.ReturnRest(&api.RestResponse{Resp: resp, Data: &AvailableResp{Pid: os.Getpid()}})
}
Expand Down Expand Up @@ -477,6 +486,9 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) {
return
}

// TODO : 需要关注http的连接状态
config.InitHttpConnStatus(req, gHttpConnCache, nil, 0)

result, err := defaultManager.ExecuteLocalTask(workID, config)
if err != nil {
// blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err)
Expand Down Expand Up @@ -504,7 +516,10 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) {
r.Write2Resp(&api.RestResponse{Resp: resp})
}

func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data []byte, s *websocket.Session) error {
func callbackOfLocalExecute(req *restful.Request,
id websocket.MessageID,
data []byte,
s *websocket.Session) error {
workID := req.PathParameter(pathParamWorkID)
r := &LocalTaskExecuteResp{}

Expand All @@ -526,6 +541,9 @@ func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data [
return ret.Err
}

// TODO : 需要关注http的连接状态
config.InitHttpConnStatus(req, gHttpConnCache, s, 20)

result, err := defaultManager.ExecuteLocalTask(workID, config)
if err != nil {
// blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult {
defer e.mgr.work.Basic().UpdateJobStats(e.stats)

dcSDK.StatsTimeNow(&e.stats.LocalWorkEnterTime)
gotLock := true
gotLock := false
defer func() {
if gotLock {
dcSDK.StatsTimeNow(&e.stats.LocalWorkLeaveTime)
Expand Down Expand Up @@ -392,6 +392,8 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult {
gotLock = false
blog.Infof("executor: not got lock to execute local-task from pid(%d) with weight %d", e.req.Pid, locallockweight)
return nil
} else {
gotLock = true
}

return e.realExecuteLocalTask(locallockweight)
Expand Down
49 changes: 49 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ func (m *Mgr) GetPumpCache() (*analyser.FileCache, *analyser.RootCache) {
return m.pumpFileCache, m.pumpRootCache
}

func checkHttpConn(req *types.LocalTaskExecuteRequest) (*types.LocalTaskExecuteResult, error) {
if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) {
blog.Errorf("local: httpconncache exit execute pid(%d) command:[%s] for http connection[%s] error",
req.Pid, strings.Join(req.Commands, " "), req.HttpConnKey)
return &types.LocalTaskExecuteResult{
Result: &dcSDK.LocalTaskResult{
ExitCode: -1,
Message: types.ErrLocalHttpConnDisconnected.Error(),
Stdout: nil,
Stderr: nil,
},
}, types.ErrLocalHttpConnDisconnected
}

return nil, nil
}

// ExecuteTask 若是task command本身运行失败, 不作为execute失败, 将结果放在result中返回即可
// 只有筹备执行的过程中失败, 才作为execute失败
func (m *Mgr) ExecuteTask(
Expand All @@ -123,6 +140,11 @@ func (m *Mgr) ExecuteTask(
defer e.executeFinalTask()
defer e.handleRecord()

ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 该work被置为degraded || 该executor被置为degraded, 则直接走本地执行
if m.work.Basic().Settings().Degraded || e.degrade() {
blog.Warnf("local: execute task for work(%s) from pid(%d) degrade to local with degraded",
Expand Down Expand Up @@ -150,6 +172,11 @@ func (m *Mgr) ExecuteTask(
// 优化没有远程资源转本地的逻辑; 如果没有远程资源,则先获取本地锁,然后转本地执行
// 如果没有本地锁,则先等待,后面有远程资源时,则直接远程,无需全部阻塞在本地执行
for {
ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 先检查是否有远程资源
if !m.work.Resource().HasAvailableWorkers() {
// check whether this task need remote worker,
Expand Down Expand Up @@ -183,6 +210,11 @@ func (m *Mgr) ExecuteTask(
m.work.Basic().Info().IncPrepared()
m.work.Remote().IncRemoteJobs()

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

c, err := e.executePreTask()
if err != nil {
m.work.Basic().Info().DecPrepared()
Expand All @@ -199,9 +231,16 @@ func (m *Mgr) ExecuteTask(
Sandbox: e.sandbox,
IOTimeout: e.ioTimeout,
BanWorkerList: []*protocol.Host{},
HttpConnCache: req.HttpConnCache,
HttpConnKey: req.HttpConnKey,
}

for i := 0; i < m.getTryTimes(e); i++ {
ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

req.Stats.RemoteTryTimes = i + 1
r, err = m.work.Remote().ExecuteTask(remoteReq)
if err != nil {
Expand Down Expand Up @@ -231,6 +270,11 @@ func (m *Mgr) ExecuteTask(
m.work.Basic().Info().DecPrepared()
m.work.Remote().DecRemoteJobs()
if err != nil {
ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

if err == types.ErrSendFileFailed {
blog.Infof("local: retry remote-task failed from work(%s) for (%d) times from pid(%d)"+
" with send file error, retryOnRemoteFail now",
Expand All @@ -253,6 +297,11 @@ func (m *Mgr) ExecuteTask(
blog.Warnf("local: execute post-task for work(%s) from pid(%d) failed: %v", m.work.ID(), req.Pid, err)
req.Stats.RemoteErrorMessage = err.Error()

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

lr, err := m.retryOnRemoteFail(req, globalWork, e)
if err == nil && lr != nil {
return lr, err
Expand Down
28 changes: 27 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}
}

func checkHttpConn(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) {
if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) {
blog.Errorf("remote: httpconncache exit execute pid(%d) for http connection[%s] error",
req.Pid, req.HttpConnKey)
return nil, types.ErrLocalHttpConnDisconnected
}

return nil, nil
}

// ExecuteTask run the task in remote worker and ensure the dependent files
func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) {
if m.TotalSlots() <= 0 {
Expand Down Expand Up @@ -550,8 +560,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
m.DecRemoteJobs()
}()

ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 1. send toolchain if required 2. adjust exe remote path for req
err := m.sendToolchain(handler, req)
err = m.sendToolchain(handler, req)
if err != nil {
blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+
"ensure tool chain failed: %v, going to disable host(%s)",
Expand All @@ -565,6 +580,12 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) {
return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
}

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req))
if err != nil {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
Expand All @@ -588,6 +609,11 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
blog.Infof("remote: try to real execute remote task for work(%s) from pid(%d) with timeout(%d) after send files",
m.work.ID(), req.Pid, req.IOTimeout)

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

var result *dcSDK.BKDistResult
if m.conf.LongTCP {
if !req.Req.CustomSave {
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ var (
ErrFileLock = fmt.Errorf("lock file failed")
ErrWorkIDEmpty = fmt.Errorf("work id is empty")
ErrWorkNotRead = fmt.Errorf("work is not ready")
ErrLocalHttpConnDisconnected = fmt.Errorf("local http connection disconnected")
)
Loading

0 comments on commit 32b710e

Please sign in to comment.