Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev tming #309

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/booster/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ const (
FlagDynamicPort = "dynamic_port"
FlagWorkerOfferSlot = "worker_offer_slot"
FlagCleanTmpFilesDayAgo = "clean_tmp_files_day_ago"
FlagIgnoreHttpStatus = "ignore_http_status"

EnvBuildIDOld = "TURBO_PLAN_BUILD_ID"
EnvBuildID = "TBS_BUILD_ID"
Expand Down Expand Up @@ -472,6 +473,10 @@ var (
Name: "clean_tmp_files_day_ago",
Usage: "clean tmp files which modify time before this days, default is 1",
},
commandCli.BoolFlag{
Name: "ignore_http_status",
Usage: "tasks will be executed even local http connection disconnected when this flag set",
},
}
)

Expand Down
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,15 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
IdleKeepSecs: c.Int(FlagIdleKeepSecs),
CleanTmpFilesDayAgo: cleanTmpFilesDayAgo,
SearchToolchain: c.Bool(FlagSearchToolchain),
IgnoreHttpStatus: c.Bool(FlagIgnoreHttpStatus),
},

Transport: dcType.BoosterTransport{
ServerDomain: ServerDomain,
ServerHost: ServerHost,
Timeout: 5 * time.Second,
HeartBeatTick: 5 * time.Second,
InspectTaskTick: 100 * time.Millisecond,
InspectTaskTick: 1000 * time.Millisecond,
TaskPreparingTimeout: time.Duration(waitResourceSeconds) * time.Second,
PrintTaskInfoEveryTime: 5,
CommitSuicideCheckTick: 5 * time.Second,
Expand Down
4 changes: 4 additions & 0 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ func (b *Booster) getWorkersEnv() map[string]string {
requiredEnv[env.KeyExecutorSearchToolchain] = envValueTrue
}

if b.config.Works.IgnoreHttpStatus {
requiredEnv[env.KeyExecutorIgnoreHttpStatus] = envValueTrue
}

resultEnv := make(map[string]string, 10)
for k, v := range requiredEnv {
resultEnv[env.GetEnvKey(k)] = v
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/common/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
KeyExecutorUELibLocalCPUWeight = "UE_LIB_LOCAL_CPU_WEIGHT"
KeyExecutorUELinkLocalCPUWeight = "UE_LINK_LOCAL_CPU_WEIGHT"
KeyExecutorUEShaderLocalCPUWeight = "UE_SHADER_LOCAL_CPU_WEIGHT"
KeyExecutorIgnoreHttpStatus = "IGNORE_HTTP_STATUS"

KeyUserDefinedLogLevel = "USER_DEFINED_LOG_LEVEL"
KeyUserDefinedExecutorLogLevel = "USER_DEFINED_EXECUTOR_LOG_LEVEL"
Expand Down
3 changes: 2 additions & 1 deletion src/backend/booster/bk_dist/common/types/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ type BoosterWorks struct {
EnableLink bool
EnableLib bool

SearchToolchain bool
SearchToolchain bool
IgnoreHttpStatus bool
}

// BoosterTransport describe the transport data to controller
Expand Down
27 changes: 16 additions & 11 deletions src/backend/booster/bk_dist/common/websocket/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -69,10 +68,11 @@ type Session struct {

req *restful.Request

ip string // 远端的ip
port int32 // 远端的port
conn net.Conn
valid bool // 连接是否可用
ip string // 远端的ip
port int32 // 远端的port
conn net.Conn
connKey string
valid bool // 连接是否可用

// 收到数据后的回调函数
callback WebSocketFunc
Expand All @@ -98,7 +98,7 @@ type Session struct {
type WebSocketFunc func(r *restful.Request, id MessageID, data []byte, s *Session) error

// server端创建session,需要指定http处理函数
func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSocketFunc) *Session {
func NewServerSession(w *restful.Response, r *restful.Request, callback WebSocketFunc) *Session {
conn, _, _, err := ws.UpgradeHTTP(r.Request, w)
if err != nil || conn == nil {
blog.Errorf("[session] UpgradeHTTP failed with error:%v", err)
Expand Down Expand Up @@ -127,6 +127,7 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc
ip: ip,
port: int32(port),
conn: conn,
connKey: fmt.Sprintf("%s_%d", remoteaddr, time.Now().Nanosecond()),
sendNotifyChan: sendNotifyChan,
sendQueue: sendQueue,
waitMap: make(map[MessageID]*Message),
Expand All @@ -141,6 +142,10 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc
return s
}

func (s *Session) GetConnKey() string {
return s.connKey
}

// client端创建session,需要指定目标server的ip和端口
func NewClientSession(ip string, port int32, url string, callback WebSocketFunc) *Session {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -454,7 +459,6 @@ func (s *Session) serverReceive(wg *sync.WaitGroup) {
}
}

//
func (s *Session) notifyAndWait(msg *Message) *MessageResult {
blog.Debugf("[session] notify send and wait for response now...")

Expand Down Expand Up @@ -567,10 +571,11 @@ func (s *Session) check(wg *sync.WaitGroup) {
wg.Done()
for {
select {
case <-s.ctx.Done():
blog.Infof("[session] session check canceled by context")
s.clean(ErrorContextCanceled)
return
// 在用户环境上发现s.ctx.Done()异常触发的,不清楚原因,先屏蔽
// case <-s.ctx.Done():
// blog.Infof("[session] session check canceled by context")
// s.clean(ErrorContextCanceled)
// return
case err := <-s.errorChan:
blog.Warnf("[session] session check found error:%v", err)
s.clean(err)
Expand Down
20 changes: 19 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
"github.com/emicklei/go-restful"
)

var (
gHttpConnCache *types.HttpConnCache
)

func init() {
gHttpConnCache = types.NewHttpConnCache()
go gHttpConnCache.Check()
}

func available(_ *restful.Request, resp *restful.Response) {
api.ReturnRest(&api.RestResponse{Resp: resp, Data: &AvailableResp{Pid: os.Getpid()}})
}
Expand Down Expand Up @@ -477,6 +486,9 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) {
return
}

// TODO : 需要关注http的连接状态
config.InitHttpConnStatus(req, gHttpConnCache, nil, 0)

result, err := defaultManager.ExecuteLocalTask(workID, config)
if err != nil {
// blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err)
Expand Down Expand Up @@ -504,7 +516,10 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) {
r.Write2Resp(&api.RestResponse{Resp: resp})
}

func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data []byte, s *websocket.Session) error {
func callbackOfLocalExecute(req *restful.Request,
id websocket.MessageID,
data []byte,
s *websocket.Session) error {
workID := req.PathParameter(pathParamWorkID)
r := &LocalTaskExecuteResp{}

Expand All @@ -526,6 +541,9 @@ func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data [
return ret.Err
}

// TODO : 需要关注http的连接状态
config.InitHttpConnStatus(req, gHttpConnCache, s, 20)

result, err := defaultManager.ExecuteLocalTask(workID, config)
if err != nil {
// blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult {
defer e.mgr.work.Basic().UpdateJobStats(e.stats)

dcSDK.StatsTimeNow(&e.stats.LocalWorkEnterTime)
gotLock := true
gotLock := false
defer func() {
if gotLock {
dcSDK.StatsTimeNow(&e.stats.LocalWorkLeaveTime)
Expand Down Expand Up @@ -392,6 +392,8 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult {
gotLock = false
blog.Infof("executor: not got lock to execute local-task from pid(%d) with weight %d", e.req.Pid, locallockweight)
return nil
} else {
gotLock = true
}

return e.realExecuteLocalTask(locallockweight)
Expand Down
49 changes: 49 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ func (m *Mgr) GetPumpCache() (*analyser.FileCache, *analyser.RootCache) {
return m.pumpFileCache, m.pumpRootCache
}

func checkHttpConn(req *types.LocalTaskExecuteRequest) (*types.LocalTaskExecuteResult, error) {
if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) {
blog.Errorf("local: httpconncache exit execute pid(%d) command:[%s] for http connection[%s] error",
req.Pid, strings.Join(req.Commands, " "), req.HttpConnKey)
return &types.LocalTaskExecuteResult{
Result: &dcSDK.LocalTaskResult{
ExitCode: -1,
Message: types.ErrLocalHttpConnDisconnected.Error(),
Stdout: nil,
Stderr: nil,
},
}, types.ErrLocalHttpConnDisconnected
}

return nil, nil
}

// ExecuteTask 若是task command本身运行失败, 不作为execute失败, 将结果放在result中返回即可
// 只有筹备执行的过程中失败, 才作为execute失败
func (m *Mgr) ExecuteTask(
Expand All @@ -123,6 +140,11 @@ func (m *Mgr) ExecuteTask(
defer e.executeFinalTask()
defer e.handleRecord()

ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 该work被置为degraded || 该executor被置为degraded, 则直接走本地执行
if m.work.Basic().Settings().Degraded || e.degrade() {
blog.Warnf("local: execute task for work(%s) from pid(%d) degrade to local with degraded",
Expand Down Expand Up @@ -150,6 +172,11 @@ func (m *Mgr) ExecuteTask(
// 优化没有远程资源转本地的逻辑; 如果没有远程资源,则先获取本地锁,然后转本地执行
// 如果没有本地锁,则先等待,后面有远程资源时,则直接远程,无需全部阻塞在本地执行
for {
ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 先检查是否有远程资源
if !m.work.Resource().HasAvailableWorkers() {
// check whether this task need remote worker,
Expand Down Expand Up @@ -183,6 +210,11 @@ func (m *Mgr) ExecuteTask(
m.work.Basic().Info().IncPrepared()
m.work.Remote().IncRemoteJobs()

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

c, err := e.executePreTask()
if err != nil {
m.work.Basic().Info().DecPrepared()
Expand All @@ -199,9 +231,16 @@ func (m *Mgr) ExecuteTask(
Sandbox: e.sandbox,
IOTimeout: e.ioTimeout,
BanWorkerList: []*protocol.Host{},
HttpConnCache: req.HttpConnCache,
HttpConnKey: req.HttpConnKey,
}

for i := 0; i < m.getTryTimes(e); i++ {
ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

req.Stats.RemoteTryTimes = i + 1
r, err = m.work.Remote().ExecuteTask(remoteReq)
if err != nil {
Expand Down Expand Up @@ -231,6 +270,11 @@ func (m *Mgr) ExecuteTask(
m.work.Basic().Info().DecPrepared()
m.work.Remote().DecRemoteJobs()
if err != nil {
ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

if err == types.ErrSendFileFailed {
blog.Infof("local: retry remote-task failed from work(%s) for (%d) times from pid(%d)"+
" with send file error, retryOnRemoteFail now",
Expand All @@ -253,6 +297,11 @@ func (m *Mgr) ExecuteTask(
blog.Warnf("local: execute post-task for work(%s) from pid(%d) failed: %v", m.work.ID(), req.Pid, err)
req.Stats.RemoteErrorMessage = err.Error()

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

lr, err := m.retryOnRemoteFail(req, globalWork, e)
if err == nil && lr != nil {
return lr, err
Expand Down
28 changes: 27 additions & 1 deletion src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}
}

func checkHttpConn(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) {
if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) {
blog.Errorf("remote: httpconncache exit execute pid(%d) for http connection[%s] error",
req.Pid, req.HttpConnKey)
return nil, types.ErrLocalHttpConnDisconnected
}

return nil, nil
}

// ExecuteTask run the task in remote worker and ensure the dependent files
func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) {
if m.TotalSlots() <= 0 {
Expand Down Expand Up @@ -550,8 +560,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
m.DecRemoteJobs()
}()

ret, err := checkHttpConn(req)
if err != nil {
return ret, err
}

// 1. send toolchain if required 2. adjust exe remote path for req
err := m.sendToolchain(handler, req)
err = m.sendToolchain(handler, req)
if err != nil {
blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+
"ensure tool chain failed: %v, going to disable host(%s)",
Expand All @@ -565,6 +580,12 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) {
return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server)
}

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req))
if err != nil {
req.BanWorkerList = append(req.BanWorkerList, req.Server)
Expand All @@ -588,6 +609,11 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas
blog.Infof("remote: try to real execute remote task for work(%s) from pid(%d) with timeout(%d) after send files",
m.work.ID(), req.Pid, req.IOTimeout)

ret, err = checkHttpConn(req)
if err != nil {
return ret, err
}

var result *dcSDK.BKDistResult
if m.conf.LongTCP {
if !req.Req.CustomSave {
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ var (
ErrFileLock = fmt.Errorf("lock file failed")
ErrWorkIDEmpty = fmt.Errorf("work id is empty")
ErrWorkNotRead = fmt.Errorf("work is not ready")
ErrLocalHttpConnDisconnected = fmt.Errorf("local http connection disconnected")
)
Loading
Loading