Skip to content

Commit

Permalink
feat: check result
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangliang committed Sep 22, 2022
1 parent c55f5b8 commit 1e07081
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 22 deletions.
3 changes: 1 addition & 2 deletions examples/hello/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ func (p *Player) ID() string {
return p.id
}

type Echo struct {
}
type Echo struct{}

func (e *Echo) SendMsg(msg *agent.ErrMsg) error {
fmt.Println(msg.Name, msg.Intro, msg.Detail, "echo alert...")
Expand Down
15 changes: 10 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Agent struct {
ctx context.Context
cancel context.CancelFunc
engine *Engine
cfg *viper.Viper
variable *variable
endpoint string
view *ViewOpt
alert Alert
Expand All @@ -48,15 +48,16 @@ func NewAgent(engine *Engine, cfg *viper.Viper, opts ...Option) *Agent {
}
}
}
id := cfg.GetString("ID")
endpoint := cfg.GetString(masterAddr)
variable := initVariable(cfg)
id := variable.GetString("ID")
endpoint := variable.GetString(masterAddr)
ctx, cancel := context.WithCancel(context.Background())
at := &Agent{
id: id,
ctx: ctx,
cancel: cancel,
engine: engine,
cfg: cfg,
variable: variable,
endpoint: endpoint,
}
for _, opt := range opts {
Expand All @@ -74,6 +75,10 @@ func (a *Agent) Start() {
}
}

func (a *Agent) CheckResult() error {
return a.stream.checkResult()
}

func (a *Agent) startDefaultAgent() {
a.stream = newProxyFromAgent(a)
newManagerFromAgent(a).startLocalService()
Expand All @@ -82,7 +87,7 @@ func (a *Agent) startDefaultAgent() {
func (a *Agent) startClusterAgent() {
conn := a.dialMaster()
defer conn.Close()
var ctx = a.newOutgoingContext()
ctx := a.newOutgoingContext()
client, err := transfer.NewCourierClient(conn).DeliverMail(ctx)
if err != nil {
log.Panic("request grpc courier error", zap.Error(err))
Expand Down
28 changes: 28 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package agent

import (
"sync"

"github.com/spf13/viper"
)

type variable struct {
sync.Mutex
viper *viper.Viper
}

func initVariable(viper *viper.Viper) *variable {
return &variable{viper: viper}
}

func (v *variable) GetString(key string) string {
v.Lock()
defer v.Unlock()
return v.viper.GetString(key)
}

func (v *variable) Set(key string, val any) {
v.Lock()
defer v.Unlock()
v.viper.Set(key, val)
}
16 changes: 15 additions & 1 deletion pkg/agent/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func printQuantitySlice(name string, qs map[string]*transfer.Quantity, opts ...*
header := []string{"Name", "Total", "Fail", " 0~50MS", "50~100MS", "100~200MS", "200~500MS", "500~1S", "1~2S", "2~5S", "5~10S"}
table.SetHeader(header)
names := make([]string, 0)
var qm = make(map[string]*transfer.Quantity)
qm := make(map[string]*transfer.Quantity)
for _, q := range qs {
if strings.Contains(q.Name, "polling_") {
row := createQuantityRow(q)
Expand Down Expand Up @@ -202,3 +202,17 @@ func echoLocalData(planName string, view *ViewOpt) {
printErrorMessage(planName+":E", quantities.Event, view)
quantities = newQuantities()
}

func hasLocalError() bool {
for _, quantity := range quantities.Handler {
if len(quantity.ErrorMap) > 0 {
return true
}
}
for _, quantity := range quantities.Event {
if len(quantity.ErrorMap) > 0 {
return true
}
}
return false
}
6 changes: 3 additions & 3 deletions pkg/agent/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *manager) startAgentEngine(content []byte, circle bool) error {

func (m *manager) startLocalService() {
var circle bool
if m.cfg.GetString("mode") == "circle" {
if m.variable.GetString("mode") == "circle" {
circle = true
}
m.stream.setPlanCount(len(m.engine.plans), circle)
Expand All @@ -92,7 +92,7 @@ func (m *manager) startLocalService() {

func (m *manager) setEnv(envs map[string]string) {
for k, v := range envs {
m.cfg.Set(k, v)
m.variable.Set(k, v)
if k == "logLevel" && v != "" {
log.ResetLogLevel(v)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (m *manager) startExecutor(executor *executor, market *Market) {

func (m *manager) getParallel() int {
var rs int
pe := m.cfg.GetString("parallel")
pe := m.variable.GetString("parallel")
if pe != "" {
rs, _ = strconv.Atoi(pe)
return rs
Expand Down
7 changes: 0 additions & 7 deletions pkg/agent/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/olekukonko/tablewriter"
"github.com/spf13/viper"
)

type ViewOpt struct {
Expand Down Expand Up @@ -63,12 +62,6 @@ func WithAlert(alert Alert) Option {
}
}

func WithConfig(cfg *viper.Viper) Option {
return func(agent *Agent) {
agent.cfg = cfg
}
}

func WithViewer(views ...*ViewOpt) Option {
view := mergeViewOpt(views...)
return func(agent *Agent) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type proxyStream struct {
index int
count int
circle bool
hasErr bool
id string
client transfer.Courier_DeliverMailClient
viewOpt *ViewOpt
Expand All @@ -40,6 +41,7 @@ func (s *proxyStream) setPlanCount(count int, circle bool) {

func (s *proxyStream) finishPlan(planName string) error {
if s.client == nil {
s.hasErr = hasLocalError()
echoLocalData(planName, s.viewOpt)
}
select {
Expand Down Expand Up @@ -94,3 +96,10 @@ func (s *proxyStream) Recv() (*transfer.Mail, error) {
func (s *proxyStream) formatPlanID(planName string) string {
return fmt.Sprintf("%s-%d-%s", s.id, s.index, planName)
}

func (s *proxyStream) checkResult() error {
if s.hasErr {
return fmt.Errorf("Test Failed")
}
return nil
}
7 changes: 3 additions & 4 deletions pkg/agent/tick.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func (s *Signal) Slave() <-chan One {
return s.market.getSlave()
}

type ParamNotFound struct {
}
type ParamNotFound struct{}

func (p ParamNotFound) Error() string {
return "NotFoundParam"
Expand Down Expand Up @@ -286,12 +285,12 @@ func (t *Tick) GetParamInt(key string) (int, error) {
func (t *Tick) GetParamBool(key string) (bool, error) {
val := t.Blackboard().GetMem(key)
if val == nil {
return false, paramError
return false, paramError
}
str, ok := val.(string)
if !ok {
return false, paramError
}
}
rs, err := strconv.ParseBool(str)
if err != nil {
return false, err
Expand Down

0 comments on commit 1e07081

Please sign in to comment.