Skip to content

Commit

Permalink
controller支持对container name的校验 TencentBlueKing#103
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Aug 7, 2023
1 parent 0576b8c commit b842e4d
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"context"
"errors"
"fmt"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -273,6 +275,7 @@ func (m *Mgr) Init() {
}

go m.workerCheck(ctx)
go m.checkHostName(ctx)

if m.conf.SendCork {
m.sendCorkChan = make(chan bool, 1000)
Expand Down Expand Up @@ -439,6 +442,103 @@ func (m *Mgr) workerCheck(ctx context.Context) {
}
}

func (m *Mgr) checkHostName(ctx context.Context) {
checkTick := 10 * time.Second
blog.Infof("remote: run container host name check tick for work: %s", m.work.ID())
ticker := time.NewTicker(checkTick)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
blog.Infof("remote: run container host name check for work(%s) canceled by context", m.work.ID())
return

case <-ticker.C:
for _, w := range m.resource.getWorkers() {
origin := m.resource.GetContainerHostName(w.host)
if origin == "" {
go m.initContainerHostName(w.host)
} else {
if !w.disabled && !w.dead {
go m.checkContainerHostName(w.host)
}
}
}
}
}
}

func (m *Mgr) initContainerHostName(h *dcProtocol.Host) {
blog.Infof("remote: start to init (%s) host name for work(%s)", h.Server, m.work.ID())
name, err := m.getContainerHostName(h)
if err != nil {
blog.Errorf("remote: init (%s) host name for work(%s) failed: %v", h.Server, m.work.ID(), err)
name = ""
}
m.resource.SetContainerHostName(h, name)
blog.Infof("remote: finish to init (%s) host name(%s) for work(%s)", h.Server, name, m.work.ID())
}

func (m *Mgr) checkContainerHostName(h *dcProtocol.Host) {
origin := m.resource.GetContainerHostName(h)
if origin == "" {
return
}

blog.Infof("remote: ready to check (%s) host name with origin name (%s)", h.Server, origin)
n, err := m.getContainerHostName(h)
if err != nil {
blog.Warnf("remote: check (%s) host name for work(%s)", h.Server, m.work.ID())
return
}
if n != origin {
blog.Errorf("remote: find worker(%s) host name change from (%s) to (%s), disable it", h.Server, origin, n)
m.resource.disableWorker(h)
}
}

func (m *Mgr) getContainerHostName(h *dcProtocol.Host) (string, error) {
cmd := getCheckCmd(runtime.GOOS)
if cmd == nil {
return "", fmt.Errorf("unsupport os to check container host: %s", runtime.GOOS)
}

handler := m.remoteWorker.Handler(0, nil, nil, nil)
result, err := handler.ExecuteTask(h, cmd)
if err != nil {
blog.Errorf("remote: get container host name from worker (%s) failed: %v", h.Server, err)
return "", err
}
if len(result.Results) < 1 {
blog.Errorf("remote: get container host name from worker (%s) failed with no result", h.Server)
return "", fmt.Errorf("get host name failed with no result")
}

if runtime.GOOS == "windows" {
return string(result.Results[0].OutputMessage), nil
}

envs := strings.Split(string(result.Results[0].OutputMessage), "\n")
for _, s := range envs {
if runtime.GOOS == "linux" {
if !strings.Contains(s, "HOSTNAME=") {
continue
}
} else if runtime.GOOS == "windows" {
if !strings.Contains(s, "COMPUTERNAME=") {
continue
}
}
tmp := strings.Split(s, "=")
if len(tmp) < 2 {
continue
}
return tmp[1], nil
}
return "", nil
}

// ExecuteTask run the task in remote worker and ensure the dependent files
func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) {
if m.TotalSlots() <= 0 {
Expand Down
38 changes: 38 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,43 @@ type resource struct {
waitingList *list.List
}

// GetContainerHostName get host name in container
func (wr *resource) GetContainerHostName(host *dcProtocol.Host) string {
if host == nil {
return ""
}

wr.workerLock.RLock()
defer wr.workerLock.RUnlock()

for _, w := range wr.worker {
if !w.host.Equal(host) {
continue
}
return w.containerHostName
}
return ""
}

// SetContainerHostName get host name in container
func (wr *resource) SetContainerHostName(host *dcProtocol.Host, name string) {
if host == nil {
return
}

wr.workerLock.Lock()
defer wr.workerLock.Unlock()

for _, w := range wr.worker {
if !w.host.Equal(host) {
continue
}
w.containerHostName = name
break
}
return
}

// reset with []*dcProtocol.Host
// add new hosts and disable released hosts
func (wr *resource) Reset(hl []*dcProtocol.Host) ([]*dcProtocol.Host, error) {
Expand Down Expand Up @@ -643,6 +680,7 @@ type worker struct {
largefiletotalsize uint64
continuousNetErrors int
dead bool
containerHostName string
}

func (wr *worker) occupySlot() error {
Expand Down
22 changes: 22 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,25 @@ type messageItem struct {
desc sdk.FileDesc
msg []protocol.Message
}

func getCheckCmd(os string) *dcSDK.BKDistCommand {
cmd := &dcSDK.BKDistCommand{}
if os == "linux" {
cmd.Commands = append(cmd.Commands, dcSDK.BKCommand{
WorkDir: "/root",
ExePath: "",
ExeName: "env",
Params: []string{},
})
} else if os == "windows" {
cmd.Commands = append(cmd.Commands, dcSDK.BKCommand{
WorkDir: "C:\\",
ExePath: "",
ExeName: "echo",
Params: []string{"%COMPUTERNAME%"},
})
} else {
return nil
}
return cmd
}

0 comments on commit b842e4d

Please sign in to comment.