From 1e0708139daba7d2bd740674677d8f10d59e903a Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 22 Sep 2022 16:30:08 +0800 Subject: [PATCH] feat: check result --- examples/hello/main.go | 3 +-- pkg/agent/agent.go | 15 ++++++++++----- pkg/agent/config.go | 28 ++++++++++++++++++++++++++++ pkg/agent/data.go | 16 +++++++++++++++- pkg/agent/manager.go | 6 +++--- pkg/agent/option.go | 7 ------- pkg/agent/proxy.go | 9 +++++++++ pkg/agent/tick.go | 7 +++---- 8 files changed, 69 insertions(+), 22 deletions(-) create mode 100644 pkg/agent/config.go diff --git a/examples/hello/main.go b/examples/hello/main.go index 87b6082..7115b72 100644 --- a/examples/hello/main.go +++ b/examples/hello/main.go @@ -167,8 +167,7 @@ func (p *Player) ID() string { return p.id } -type Echo struct { -} +type Echo struct{} func (e *Echo) SendMsg(msg *agent.ErrMsg) error { fmt.Println(msg.Name, msg.Intro, msg.Detail, "echo alert...") diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 26c897d..27b4ac0 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -26,7 +26,7 @@ type Agent struct { ctx context.Context cancel context.CancelFunc engine *Engine - cfg *viper.Viper + variable *variable endpoint string view *ViewOpt alert Alert @@ -48,15 +48,16 @@ func NewAgent(engine *Engine, cfg *viper.Viper, opts ...Option) *Agent { } } } - id := cfg.GetString("ID") - endpoint := cfg.GetString(masterAddr) + variable := initVariable(cfg) + id := variable.GetString("ID") + endpoint := variable.GetString(masterAddr) ctx, cancel := context.WithCancel(context.Background()) at := &Agent{ id: id, ctx: ctx, cancel: cancel, engine: engine, - cfg: cfg, + variable: variable, endpoint: endpoint, } for _, opt := range opts { @@ -74,6 +75,10 @@ func (a *Agent) Start() { } } +func (a *Agent) CheckResult() error { + return a.stream.checkResult() +} + func (a *Agent) startDefaultAgent() { a.stream = newProxyFromAgent(a) newManagerFromAgent(a).startLocalService() @@ -82,7 +87,7 @@ func (a *Agent) startDefaultAgent() { func (a *Agent) startClusterAgent() { conn := a.dialMaster() defer conn.Close() - var ctx = a.newOutgoingContext() + ctx := a.newOutgoingContext() client, err := transfer.NewCourierClient(conn).DeliverMail(ctx) if err != nil { log.Panic("request grpc courier error", zap.Error(err)) diff --git a/pkg/agent/config.go b/pkg/agent/config.go new file mode 100644 index 0000000..7751cd9 --- /dev/null +++ b/pkg/agent/config.go @@ -0,0 +1,28 @@ +package agent + +import ( + "sync" + + "github.com/spf13/viper" +) + +type variable struct { + sync.Mutex + viper *viper.Viper +} + +func initVariable(viper *viper.Viper) *variable { + return &variable{viper: viper} +} + +func (v *variable) GetString(key string) string { + v.Lock() + defer v.Unlock() + return v.viper.GetString(key) +} + +func (v *variable) Set(key string, val any) { + v.Lock() + defer v.Unlock() + v.viper.Set(key, val) +} diff --git a/pkg/agent/data.go b/pkg/agent/data.go index 68470ca..d277f1c 100644 --- a/pkg/agent/data.go +++ b/pkg/agent/data.go @@ -44,7 +44,7 @@ func printQuantitySlice(name string, qs map[string]*transfer.Quantity, opts ...* header := []string{"Name", "Total", "Fail", " 0~50MS", "50~100MS", "100~200MS", "200~500MS", "500~1S", "1~2S", "2~5S", "5~10S"} table.SetHeader(header) names := make([]string, 0) - var qm = make(map[string]*transfer.Quantity) + qm := make(map[string]*transfer.Quantity) for _, q := range qs { if strings.Contains(q.Name, "polling_") { row := createQuantityRow(q) @@ -202,3 +202,17 @@ func echoLocalData(planName string, view *ViewOpt) { printErrorMessage(planName+":E", quantities.Event, view) quantities = newQuantities() } + +func hasLocalError() bool { + for _, quantity := range quantities.Handler { + if len(quantity.ErrorMap) > 0 { + return true + } + } + for _, quantity := range quantities.Event { + if len(quantity.ErrorMap) > 0 { + return true + } + } + return false +} diff --git a/pkg/agent/manager.go b/pkg/agent/manager.go index 4e9034c..892b8fb 100644 --- a/pkg/agent/manager.go +++ b/pkg/agent/manager.go @@ -83,7 +83,7 @@ func (m *manager) startAgentEngine(content []byte, circle bool) error { func (m *manager) startLocalService() { var circle bool - if m.cfg.GetString("mode") == "circle" { + if m.variable.GetString("mode") == "circle" { circle = true } m.stream.setPlanCount(len(m.engine.plans), circle) @@ -92,7 +92,7 @@ func (m *manager) startLocalService() { func (m *manager) setEnv(envs map[string]string) { for k, v := range envs { - m.cfg.Set(k, v) + m.variable.Set(k, v) if k == "logLevel" && v != "" { log.ResetLogLevel(v) } @@ -171,7 +171,7 @@ func (m *manager) startExecutor(executor *executor, market *Market) { func (m *manager) getParallel() int { var rs int - pe := m.cfg.GetString("parallel") + pe := m.variable.GetString("parallel") if pe != "" { rs, _ = strconv.Atoi(pe) return rs diff --git a/pkg/agent/option.go b/pkg/agent/option.go index 3d82b2d..b976576 100644 --- a/pkg/agent/option.go +++ b/pkg/agent/option.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/olekukonko/tablewriter" - "github.com/spf13/viper" ) type ViewOpt struct { @@ -63,12 +62,6 @@ func WithAlert(alert Alert) Option { } } -func WithConfig(cfg *viper.Viper) Option { - return func(agent *Agent) { - agent.cfg = cfg - } -} - func WithViewer(views ...*ViewOpt) Option { view := mergeViewOpt(views...) return func(agent *Agent) { diff --git a/pkg/agent/proxy.go b/pkg/agent/proxy.go index f86f285..68a3b26 100644 --- a/pkg/agent/proxy.go +++ b/pkg/agent/proxy.go @@ -15,6 +15,7 @@ type proxyStream struct { index int count int circle bool + hasErr bool id string client transfer.Courier_DeliverMailClient viewOpt *ViewOpt @@ -40,6 +41,7 @@ func (s *proxyStream) setPlanCount(count int, circle bool) { func (s *proxyStream) finishPlan(planName string) error { if s.client == nil { + s.hasErr = hasLocalError() echoLocalData(planName, s.viewOpt) } select { @@ -94,3 +96,10 @@ func (s *proxyStream) Recv() (*transfer.Mail, error) { func (s *proxyStream) formatPlanID(planName string) string { return fmt.Sprintf("%s-%d-%s", s.id, s.index, planName) } + +func (s *proxyStream) checkResult() error { + if s.hasErr { + return fmt.Errorf("Test Failed") + } + return nil +} diff --git a/pkg/agent/tick.go b/pkg/agent/tick.go index fe55c02..5e577e7 100644 --- a/pkg/agent/tick.go +++ b/pkg/agent/tick.go @@ -35,8 +35,7 @@ func (s *Signal) Slave() <-chan One { return s.market.getSlave() } -type ParamNotFound struct { -} +type ParamNotFound struct{} func (p ParamNotFound) Error() string { return "NotFoundParam" @@ -286,12 +285,12 @@ func (t *Tick) GetParamInt(key string) (int, error) { func (t *Tick) GetParamBool(key string) (bool, error) { val := t.Blackboard().GetMem(key) if val == nil { - return false, paramError + return false, paramError } str, ok := val.(string) if !ok { return false, paramError - } + } rs, err := strconv.ParseBool(str) if err != nil { return false, err