Skip to content

Commit

Permalink
Merge pull request #1 from LilithGames/jansen
Browse files Browse the repository at this point in the history
add agent option
  • Loading branch information
wujianhanshu authored Jul 1, 2021
2 parents 83e2005 + 3b541d1 commit 4b8c058
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 36 deletions.
12 changes: 7 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package agent

import (
"context"
"os"

"github.com/LilithGames/agent-go/pkg/transfer"
"github.com/LilithGames/agent-go/tools/log"
"github.com/rs/xid"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"os"
)

const masterAddr = "MASTER_ADDR"

type Agent struct {
engine *Engine
endpoint string
opt *AgentOpt
}

func IsTestMode() bool {
Expand All @@ -26,12 +28,12 @@ func IsTestMode() bool {
return false
}

func NewAgent(engine *Engine) *Agent {
func NewAgent(engine *Engine, opts ...*AgentOpt) *Agent {
if len(engine.plans) == 0 {
log.Panic("absent plans")
}
endpoint := os.Getenv(masterAddr)
return &Agent{engine: engine, endpoint: endpoint}
return &Agent{engine: engine, endpoint: endpoint, opt: mergeAgentOpt(opts...)}
}

func (a *Agent) Start() {
Expand All @@ -43,7 +45,7 @@ func (a *Agent) Start() {
}

func (a *Agent) startDefaultAgent() {
c := newProxyStream(nil)
c := newProxyStream(nil, a.opt.getView())
newManager(a.engine, c).startReadyService()
<-c.ctx.Done()
}
Expand All @@ -56,7 +58,7 @@ func (a *Agent) startClusterAgent() {
if err != nil {
log.Panic("request grpc courier error", zap.Error(err))
}
var c = newProxyStream(client)
var c = newProxyStream(client, a.opt.getView())
newManager(a.engine, c).startService()
<-c.ctx.Done()
}
Expand Down
51 changes: 30 additions & 21 deletions pkg/agent/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package agent
import (
"bytes"
"fmt"
"github.com/LilithGames/agent-go/pkg/transfer"
"github.com/olekukonko/tablewriter"
"sort"
"strconv"
"strings"
"time"

"github.com/LilithGames/agent-go/pkg/transfer"
"github.com/olekukonko/tablewriter"
)

func newQuantity(name string) *transfer.Quantity {
Expand All @@ -19,13 +20,15 @@ func newQuantity(name string) *transfer.Quantity {
}

type quantities struct {
Value map[string]*transfer.Quantity `json:"value"`
Name string
Value map[string]*transfer.Quantity `json:"value"`
ErrResult map[string]int
}

func newQuantities() *quantities {
func newQuantities(name string) *quantities {
return &quantities{
Value: map[string]*transfer.Quantity{},
Name: name,
Value: map[string]*transfer.Quantity{},
ErrResult: map[string]int{},
}
}
Expand All @@ -37,22 +40,23 @@ func computeAverage(val1, num1, val2, num2 int64) int64 {
return part1 + part2
}

func printQuantities(quantities *quantities) {
quantitySlice := make([]*transfer.Quantity, 0)
for _, quantity := range quantities.Value {
quantitySlice = append(quantitySlice, quantity)
}
if len(quantitySlice) > 1 {
printQuantitySlice(quantitySlice)
}
if len(quantities.ErrResult) > 0 {
printErrorMessage(quantities.ErrResult)
}
func (q *quantities) print(opts ...*ViewOpt) {
q.printQuantitySlice(opts...)
q.printErrorMessage(opts...)
}

func printQuantitySlice(quantities []*transfer.Quantity) {
func (q *quantities) printQuantitySlice(opts ...*ViewOpt) {
quantities := make([]*transfer.Quantity, 0)
for _, quantity := range q.Value {
quantities = append(quantities, quantity)
}
if len(quantities) <= 1 {
return
}
opt := mergeViewOpt(opts...)
buf := bytes.NewBuffer(nil)
table := tablewriter.NewWriter(buf)
opt.apply(table)
header := []string{"Name", "Total", "Fail", "Min", "Average", "Max"}
table.SetHeader(header)
names := make([]string, 0)
Expand Down Expand Up @@ -80,7 +84,7 @@ func printQuantitySlice(quantities []*transfer.Quantity) {
table.Append(row)
}
table.Render()
title := "Plan Statistic"
title := fmt.Sprintf("Plan Statistic(%s): ", q.Name)
fmt.Printf("\r\n%s\r\n%s", title, buf.String())
}

Expand All @@ -100,19 +104,24 @@ func toTimeStr(val int64) string {
return dur.Round(time.Millisecond).String()
}

func printErrorMessage(errs map[string]int) {
func (q *quantities) printErrorMessage(opts ...*ViewOpt) {
if len(q.ErrResult) <= 0 {
return
}
opt := mergeViewOpt(opts...)
buf := bytes.NewBuffer(nil)
table := tablewriter.NewWriter(buf)
opt.apply(table)
header := []string{"Reason", "Count"}
table.SetHeader(header)
for err, count := range errs {
for err, count := range q.ErrResult {
row := make([]string, len(header))
row[0] = err
row[1] = strconv.Itoa(count)
table.Append(row)
}
table.Render()
title := "Task Error: "
title := fmt.Sprintf("Task Error(%s): ", q.Name)
fmt.Printf("\r\n%s\r\n%s", title, buf.String())
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/agent/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package agent

import "github.com/olekukonko/tablewriter"

type ViewOpt struct {
tableApply []func(*tablewriter.Table)
}

func mergeViewOpt(opts ...*ViewOpt) *ViewOpt {
res := &ViewOpt{}
for _, opt := range opts {
if opt == nil {
continue
}
res.tableApply = append(res.tableApply, opt.tableApply...)
}
return res
}

func (o *ViewOpt) apply(t *tablewriter.Table) {
for _, f := range o.tableApply {
if f == nil {
continue
}
f(t)
}
}

func ViewColWidth(v int) *ViewOpt {
return &ViewOpt{
tableApply: []func(*tablewriter.Table){
func(t *tablewriter.Table) {
t.SetColWidth(v)
},
},
}
}

type AgentOpt struct {
view *ViewOpt
}

func AgentViewOpt(opts ...*ViewOpt) *AgentOpt {
return &AgentOpt{
view: mergeViewOpt(opts...),
}
}

func (o *AgentOpt) getView() *ViewOpt {
return o.view
}

func mergeAgentOpt(opts ...*AgentOpt) *AgentOpt {
res := &AgentOpt{}
for _, opt := range opts {
if opt == nil {
continue
}
res.view = mergeViewOpt(res.view, opt.view)
}
return res
}
22 changes: 12 additions & 10 deletions pkg/agent/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package agent
import (
"context"
"fmt"
"os"

"github.com/LilithGames/agent-go/pkg/transfer"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"os"
)

type proxyStream struct {
Expand All @@ -18,10 +19,11 @@ type proxyStream struct {
ctx context.Context
cancel context.CancelFunc
client transfer.Courier_DeliverMailClient
viewOpt *ViewOpt
}

func newProxyStream(client transfer.Courier_DeliverMailClient) *proxyStream {
proxy := &proxyStream{client: client, id: os.Getenv("ID")}
func newProxyStream(client transfer.Courier_DeliverMailClient, viewOpts ...*ViewOpt) *proxyStream {
proxy := &proxyStream{client: client, id: os.Getenv("ID"), viewOpt: mergeViewOpt(viewOpts...)}
proxy.ctx, proxy.cancel = context.WithCancel(context.Background())
return proxy
}
Expand All @@ -39,7 +41,7 @@ func (s *proxyStream) sendFinish(planName string) error {
s.index++
}()
if s.client == nil {
return s.echoLocalData()
return s.echoLocalData(planName)
}
planID := s.formatPlanID(planName)
mail := &transfer.Mail{Action: transfer.ACTION_FINISH_PLAN, Content: []byte(planID)}
Expand All @@ -58,7 +60,7 @@ func (s *proxyStream) sendFinish(planName string) error {

func (s *proxyStream) sendReport(planName string, report *transfer.Report) error {
if s.client == nil {
return s.pushLocalData(report)
return s.pushLocalData(planName, report)
}
report.PlanID = s.formatPlanID(planName)
content, err := proto.Marshal(report)
Expand All @@ -80,17 +82,17 @@ func (s *proxyStream) Recv() (*transfer.Mail, error) {
return nil, nil
}

func (s *proxyStream) pushLocalData(report *transfer.Report) error {
func (s *proxyStream) pushLocalData(planName string, report *transfer.Report) error {
if s.quantities == nil {
s.quantities = newQuantities()
s.quantities = newQuantities(planName)
}
putReportData(s.quantities, report.Outcomes)
return nil
}

func (s *proxyStream) echoLocalData() error {
printQuantities(s.quantities)
s.quantities = newQuantities()
func (s *proxyStream) echoLocalData(planName string) error {
s.quantities.print(s.viewOpt)
s.quantities = newQuantities(planName)
return nil
}

Expand Down

0 comments on commit 4b8c058

Please sign in to comment.