From b842e4de1b8395f309c401c870b0547b08481348 Mon Sep 17 00:00:00 2001 From: tbs60 Date: Mon, 7 Aug 2023 17:38:27 +0800 Subject: [PATCH] =?UTF-8?q?controller=E6=94=AF=E6=8C=81=E5=AF=B9container?= =?UTF-8?q?=20name=E7=9A=84=E6=A0=A1=E9=AA=8C=20#103?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 100 ++++++++++++++++++ .../controller/pkg/manager/remote/slots.go | 38 +++++++ .../controller/pkg/manager/remote/types.go | 22 ++++ 3 files changed, 160 insertions(+) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index f588c66f0..d584a29bb 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -13,7 +13,9 @@ import ( "context" "errors" "fmt" + "runtime" "runtime/debug" + "strings" "sync" "sync/atomic" "time" @@ -273,6 +275,7 @@ func (m *Mgr) Init() { } go m.workerCheck(ctx) + go m.checkHostName(ctx) if m.conf.SendCork { m.sendCorkChan = make(chan bool, 1000) @@ -439,6 +442,103 @@ func (m *Mgr) workerCheck(ctx context.Context) { } } +func (m *Mgr) checkHostName(ctx context.Context) { + checkTick := 10 * time.Second + blog.Infof("remote: run container host name check tick for work: %s", m.work.ID()) + ticker := time.NewTicker(checkTick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + blog.Infof("remote: run container host name check for work(%s) canceled by context", m.work.ID()) + return + + case <-ticker.C: + for _, w := range m.resource.getWorkers() { + origin := m.resource.GetContainerHostName(w.host) + if origin == "" { + go m.initContainerHostName(w.host) + } else { + if !w.disabled && !w.dead { + go m.checkContainerHostName(w.host) + } + } + } + } + } +} + +func (m *Mgr) initContainerHostName(h *dcProtocol.Host) { + blog.Infof("remote: start to init (%s) host name for work(%s)", h.Server, m.work.ID()) + name, err := m.getContainerHostName(h) + if err != nil { + blog.Errorf("remote: init (%s) host name for work(%s) failed: %v", h.Server, m.work.ID(), err) + name = "" + } + m.resource.SetContainerHostName(h, name) + blog.Infof("remote: finish to init (%s) host name(%s) for work(%s)", h.Server, name, m.work.ID()) +} + +func (m *Mgr) checkContainerHostName(h *dcProtocol.Host) { + origin := m.resource.GetContainerHostName(h) + if origin == "" { + return + } + + blog.Infof("remote: ready to check (%s) host name with origin name (%s)", h.Server, origin) + n, err := m.getContainerHostName(h) + if err != nil { + blog.Warnf("remote: check (%s) host name for work(%s)", h.Server, m.work.ID()) + return + } + if n != origin { + blog.Errorf("remote: find worker(%s) host name change from (%s) to (%s), disable it", h.Server, origin, n) + m.resource.disableWorker(h) + } +} + +func (m *Mgr) getContainerHostName(h *dcProtocol.Host) (string, error) { + cmd := getCheckCmd(runtime.GOOS) + if cmd == nil { + return "", fmt.Errorf("unsupport os to check container host: %s", runtime.GOOS) + } + + handler := m.remoteWorker.Handler(0, nil, nil, nil) + result, err := handler.ExecuteTask(h, cmd) + if err != nil { + blog.Errorf("remote: get container host name from worker (%s) failed: %v", h.Server, err) + return "", err + } + if len(result.Results) < 1 { + blog.Errorf("remote: get container host name from worker (%s) failed with no result", h.Server) + return "", fmt.Errorf("get host name failed with no result") + } + + if runtime.GOOS == "windows" { + return string(result.Results[0].OutputMessage), nil + } + + envs := strings.Split(string(result.Results[0].OutputMessage), "\n") + for _, s := range envs { + if runtime.GOOS == "linux" { + if !strings.Contains(s, "HOSTNAME=") { + continue + } + } else if runtime.GOOS == "windows" { + if !strings.Contains(s, "COMPUTERNAME=") { + continue + } + } + tmp := strings.Split(s, "=") + if len(tmp) < 2 { + continue + } + return tmp[1], nil + } + return "", nil +} + // ExecuteTask run the task in remote worker and ensure the dependent files func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) { if m.TotalSlots() <= 0 { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 1e5fa85b1..19f7fef47 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -100,6 +100,43 @@ type resource struct { waitingList *list.List } +// GetContainerHostName get host name in container +func (wr *resource) GetContainerHostName(host *dcProtocol.Host) string { + if host == nil { + return "" + } + + wr.workerLock.RLock() + defer wr.workerLock.RUnlock() + + for _, w := range wr.worker { + if !w.host.Equal(host) { + continue + } + return w.containerHostName + } + return "" +} + +// SetContainerHostName get host name in container +func (wr *resource) SetContainerHostName(host *dcProtocol.Host, name string) { + if host == nil { + return + } + + wr.workerLock.Lock() + defer wr.workerLock.Unlock() + + for _, w := range wr.worker { + if !w.host.Equal(host) { + continue + } + w.containerHostName = name + break + } + return +} + // reset with []*dcProtocol.Host // add new hosts and disable released hosts func (wr *resource) Reset(hl []*dcProtocol.Host) ([]*dcProtocol.Host, error) { @@ -643,6 +680,7 @@ type worker struct { largefiletotalsize uint64 continuousNetErrors int dead bool + containerHostName string } func (wr *worker) occupySlot() error { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/types.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/types.go index e70ba6e94..eac104dd6 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/types.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/types.go @@ -76,3 +76,25 @@ type messageItem struct { desc sdk.FileDesc msg []protocol.Message } + +func getCheckCmd(os string) *dcSDK.BKDistCommand { + cmd := &dcSDK.BKDistCommand{} + if os == "linux" { + cmd.Commands = append(cmd.Commands, dcSDK.BKCommand{ + WorkDir: "/root", + ExePath: "", + ExeName: "env", + Params: []string{}, + }) + } else if os == "windows" { + cmd.Commands = append(cmd.Commands, dcSDK.BKCommand{ + WorkDir: "C:\\", + ExePath: "", + ExeName: "echo", + Params: []string{"%COMPUTERNAME%"}, + }) + } else { + return nil + } + return cmd +}